Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: pushdown data_file.content when filter manifests in entries table #10203

Merged
merged 8 commits into from
Jun 24, 2024
166 changes: 164 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.BoundReference;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionVisitors;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
Expand Down Expand Up @@ -68,6 +72,7 @@ static CloseableIterable<FileScanTask> planFiles(
Expression rowFilter = context.rowFilter();
boolean caseSensitive = context.caseSensitive();
boolean ignoreResiduals = context.ignoreResiduals();
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;

LoadingCache<Integer, ManifestEvaluator> evalCache =
Caffeine.newBuilder()
Expand All @@ -77,14 +82,18 @@ static CloseableIterable<FileScanTask> planFiles(
PartitionSpec transformedSpec = BaseFilesTable.transformSpec(tableSchema, spec);
return ManifestEvaluator.forRowFilter(rowFilter, transformedSpec, caseSensitive);
});
ManifestContentEvaluator manifestContentEvaluator =
new ManifestContentEvaluator(filter, tableSchema.asStruct(), caseSensitive);

CloseableIterable<ManifestFile> filteredManifests =
CloseableIterable.filter(
manifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
manifests,
manifest ->
evalCache.get(manifest.partitionSpecId()).eval(manifest)
&& manifestContentEvaluator.eval(manifest));

String schemaString = SchemaParser.toJson(projectedSchema);
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

return CloseableIterable.transform(
Expand All @@ -94,6 +103,159 @@ static CloseableIterable<FileScanTask> planFiles(
table, manifest, projectedSchema, schemaString, specString, residuals));
}

/**
* Evaluates an {@link Expression} on a {@link ManifestFile} to test whether a given data or
* delete manifests shall be included in the scan
*/
private static class ManifestContentEvaluator {

private final Expression boundExpr;

private ManifestContentEvaluator(
Expression expr, Types.StructType structType, boolean caseSensitive) {
Expression rewritten = Expressions.rewriteNot(expr);
this.boundExpr = Binder.bind(structType, rewritten, caseSensitive);
}

private boolean eval(ManifestFile manifest) {
return new ManifestEvalVisitor().eval(manifest);
}

private class ManifestEvalVisitor extends ExpressionVisitors.BoundExpressionVisitor<Boolean> {

private int manifestContentId;

private static final boolean ROWS_MIGHT_MATCH = true;
private static final boolean ROWS_CANNOT_MATCH = false;

private boolean eval(ManifestFile manifestFile) {
this.manifestContentId = manifestFile.content().id();
return ExpressionVisitors.visitEvaluator(boundExpr, this);
}

@Override
public Boolean alwaysTrue() {
return ROWS_MIGHT_MATCH;
}

@Override
public Boolean alwaysFalse() {
return ROWS_CANNOT_MATCH;
}

@Override
public Boolean not(Boolean result) {
return !result;
}

@Override
public Boolean and(Boolean leftResult, Boolean rightResult) {
return leftResult && rightResult;
}

@Override
public Boolean or(Boolean leftResult, Boolean rightResult) {
return leftResult || rightResult;
}

@Override
public <T> Boolean isNull(BoundReference<T> ref) {
if (fileContent(ref)) {
return ROWS_CANNOT_MATCH; // date_file.content should not be null
} else {
return ROWS_MIGHT_MATCH;
}
}

@Override
public <T> Boolean notNull(BoundReference<T> ref) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean isNaN(BoundReference<T> ref) {
if (fileContent(ref)) {
return ROWS_CANNOT_MATCH; // date_file.content should not be nan
} else {
return ROWS_MIGHT_MATCH;
}
}

@Override
public <T> Boolean notNaN(BoundReference<T> ref) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
if (fileContent(ref)) {
Literal<Integer> intLit = lit.to(Types.IntegerType.get());
Integer fileContentId = intLit.value();
if (fileContentId == FileContent.DATA.id()) {
return manifestContentId == ManifestContent.DATA.id();
} else {
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
return manifestContentId == ManifestContent.DELETES.id();
}
}
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
if (fileContent(ref)) {
if (!literalSet.contains(manifestContentId)) {
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
return ROWS_MIGHT_MATCH;
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

private <T> boolean fileContent(BoundReference<T> ref) {
return ref.fieldId() == DataFile.CONTENT.fieldId();
}
}
}

static class ManifestReadTask extends BaseFileScanTask implements DataTask {
private final Schema projection;
private final Schema fileProjection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ protected Set<String> expectedManifestListPaths(
.collect(Collectors.toSet());
}

protected Set<String> actualManifestPaths(TableScan tableScan) {
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
return StreamSupport.stream(tableScan.planFiles().spliterator(), false)
.map(t -> t.file().path().toString())
.collect(Collectors.toSet());
}

protected void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals)
throws IOException {
try (CloseableIterable<CombinedScanTask> tasks = scan.planTasks()) {
Expand Down
36 changes: 36 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,42 @@ public void testManifestEntriesTableWithDroppedPartition() throws IOException {
}
}

@TestTemplate
public void testEntriesTableDataFileContentEq() {
preparePartitionedTable();

Table entriesTable = new ManifestEntriesTable(table);

Expression dataOnly = Expressions.equal("data_file.content", 0);
TableScan entriesTableScan = entriesTable.newScan().filter(dataOnly);
Set<String> expected =
table.currentSnapshot().dataManifests(table.io()).stream()
.map(ManifestFile::path)
.collect(Collectors.toSet());

assertThat(actualManifestPaths(entriesTableScan))
.as("Expected manifest filter by data file content does not match")
.isEqualTo(expected);
}

@TestTemplate
public void testEntriesTableDataFileContentIn() {
preparePartitionedTable();

Table entriesTable = new ManifestEntriesTable(table);

Expression dataOnly = Expressions.in("data_file.content", 1, 2);
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
TableScan entriesTableScan = entriesTable.newScan().filter(dataOnly);
Set<String> expected =
table.currentSnapshot().deleteManifests(table.io()).stream()
.map(ManifestFile::path)
.collect(Collectors.toSet());

assertThat(actualManifestPaths(entriesTableScan))
.as("Expected manifest filter by data file content does not match")
.isEqualTo(expected);
}

@TestTemplate
public void testAllDataFilesTableHonorsIgnoreResiduals() throws IOException {
table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
Expand Down