From 7a3bfedf25cb74624ed898cec81e3bcadffb2373 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 9 Aug 2021 15:58:51 -0700 Subject: [PATCH] Core: Add predicate pushdown for files metadata table (#2926) --- .../org/apache/iceberg/BaseMetadataTable.java | 20 ++ .../org/apache/iceberg/DataFilesTable.java | 25 +- .../org/apache/iceberg/PartitionsTable.java | 21 +- .../iceberg/TestMetadataTableScans.java | 259 ++++++++++++------ 4 files changed, 220 insertions(+), 105 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index 81ecaefb599..0a764f79fd7 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -35,6 +35,7 @@ * deserialization. */ abstract class BaseMetadataTable implements Table, HasTableOperations, Serializable { + protected static final String PARTITION_FIELD_PREFIX = "partition."; private final PartitionSpec spec = PartitionSpec.unpartitioned(); private final SortOrder sortOrder = SortOrder.unsorted(); private final TableOperations ops; @@ -47,6 +48,25 @@ protected BaseMetadataTable(TableOperations ops, Table table, String name) { this.name = name; } + /** + * This method transforms the table's partition spec to a spec that is used to rewrite the user-provided filter + * expression against the given metadata table. + *

+ * The resulting partition spec maps $partitionPrefix.X fields to partition X using an identity partition transform. + * When this spec is used to project an expression for the given metadata table, the projection will remove + * predicates for non-partition fields (not in the spec) and will remove the "$partitionPrefix." prefix from fields. + * + * @param metadataTableSchena schema of the metadata table + * @param spec spec on which the metadata table schema is based + * @param partitionPrefix prefix to remove from each field in the partition spec + * @return a spec used to rewrite the metadata table filters to partition filters using an inclusive projection + */ + static PartitionSpec transformSpec(Schema metadataTableSchena, PartitionSpec spec, String partitionPrefix) { + PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(metadataTableSchena); + spec.fields().forEach(pf -> identitySpecBuilder.identity(partitionPrefix + pf.name(), pf.name())); + return identitySpecBuilder.build(); + } + abstract MetadataTableType metadataTableType(); protected Table table() { diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java index 145663ce4d9..d6b80ee6658 100644 --- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java @@ -21,9 +21,12 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ManifestEvaluator; +import org.apache.iceberg.expressions.Projections; import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; @@ -109,7 +112,22 @@ protected CloseableIterable planFiles( Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter; ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter); - return CloseableIterable.transform(manifests, manifest -> + // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions + Expression partitionFilter = Projections + .inclusive( + transformSpec(fileSchema, table().spec(), PARTITION_FIELD_PREFIX), + caseSensitive) + .project(rowFilter); + + ManifestEvaluator manifestEval = ManifestEvaluator.forPartitionFilter( + partitionFilter, table().spec(), caseSensitive); + CloseableIterable filtered = CloseableIterable.filter(manifests, manifestEval::eval); + + // Data tasks produce the table schema, not the projection schema and projection is done by processing engines. + // This data task needs to use the table schema, which may not include a partition schema to avoid having an + // empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in + // all cases. + return CloseableIterable.transform(filtered, manifest -> new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals)); } } @@ -138,5 +156,10 @@ public CloseableIterable rows() { public Iterable split(long splitSize) { return ImmutableList.of(this); // don't split } + + @VisibleForTesting + ManifestFile manifest() { + return manifest; + } } } diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java index 0215dfd45c0..285cd0fbeed 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java +++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java @@ -37,7 +37,6 @@ public class PartitionsTable extends BaseMetadataTable { private final Schema schema; static final boolean PLAN_SCANS_WITH_WORKER_POOL = SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true); - private static final String PARTITION_FIELD_PREFIX = "partition."; PartitionsTable(TableOperations ops, Table table) { this(ops, table, table.name() + ".partitions"); @@ -112,7 +111,7 @@ static CloseableIterable planFiles(StaticTableScan scan) { // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions Expression partitionFilter = Projections - .inclusive(transformSpec(scan.schema(), table.spec()), caseSensitive) + .inclusive(transformSpec(scan.schema(), table.spec(), PARTITION_FIELD_PREFIX), caseSensitive) .project(scan.filter()); ManifestGroup manifestGroup = new ManifestGroup(table.io(), snapshot.dataManifests(), snapshot.deleteManifests()) @@ -132,24 +131,6 @@ static CloseableIterable planFiles(StaticTableScan scan) { return manifestGroup.planFiles(); } - /** - * This method transforms the table's partition spec to a spec that is used to rewrite the user-provided filter - * expression against the partitions table. - *

- * The resulting partition spec maps partition.X fields to partition X using an identity partition transform. When - * this spec is used to project an expression for the partitions table, the projection will remove predicates for - * non-partition fields (not in the spec) and will remove the "partition." prefix from fields. - * - * @param partitionTableSchema schema of the partition table - * @param spec spec on which the partition table schema is based - * @return a spec used to rewrite partition table filters to partition filters using an inclusive projection - */ - private static PartitionSpec transformSpec(Schema partitionTableSchema, PartitionSpec spec) { - PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(partitionTableSchema); - spec.fields().forEach(pf -> identitySpecBuilder.identity(PARTITION_FIELD_PREFIX + pf.name(), pf.name())); - return identitySpecBuilder.build(); - } - private class PartitionsScan extends StaticTableScan { PartitionsScan(TableOperations ops, Table table) { super(ops, table, PartitionsTable.this.schema(), PartitionsTable.this.metadataTableType().name(), diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index a6a411ec327..9e811f4969f 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -26,6 +26,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; +import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; import org.junit.Assert; import org.junit.Test; @@ -47,6 +48,21 @@ public TestMetadataTableScans(int formatVersion) { super(formatVersion); } + private void preparePartitionedTable() { + table.newFastAppend() + .appendFile(FILE_PARTITION_0) + .commit(); + table.newFastAppend() + .appendFile(FILE_PARTITION_1) + .commit(); + table.newFastAppend() + .appendFile(FILE_PARTITION_2) + .commit(); + table.newFastAppend() + .appendFile(FILE_PARTITION_3) + .commit(); + } + @Test public void testManifestsTableAlwaysIgnoresResiduals() throws IOException { table.newFastAppend() @@ -164,18 +180,7 @@ public void testAllManifestsTableHonorsIgnoreResiduals() throws IOException { @Test public void testPartitionsTableScanNoFilter() { - table.newFastAppend() - .appendFile(FILE_PARTITION_0) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_1) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_2) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_3) - .commit(); + preparePartitionedTable(); Table partitionsTable = new PartitionsTable(table.ops(), table); Types.StructType expected = new Schema( @@ -194,18 +199,7 @@ public void testPartitionsTableScanNoFilter() { @Test public void testPartitionsTableScanAndFilter() { - table.newFastAppend() - .appendFile(FILE_PARTITION_0) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_1) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_2) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_3) - .commit(); + preparePartitionedTable(); Table partitionsTable = new PartitionsTable(table.ops(), table); @@ -220,18 +214,7 @@ public void testPartitionsTableScanAndFilter() { @Test public void testPartitionsTableScanLtFilter() { - table.newFastAppend() - .appendFile(FILE_PARTITION_0) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_1) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_2) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_3) - .commit(); + preparePartitionedTable(); Table partitionsTable = new PartitionsTable(table.ops(), table); @@ -247,18 +230,7 @@ public void testPartitionsTableScanLtFilter() { @Test public void testPartitionsTableScanOrFilter() { - table.newFastAppend() - .appendFile(FILE_PARTITION_0) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_1) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_2) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_3) - .commit(); + preparePartitionedTable(); Table partitionsTable = new PartitionsTable(table.ops(), table); @@ -274,20 +246,10 @@ public void testPartitionsTableScanOrFilter() { validateIncludesPartitionScan(tasksOr, 3); } + @Test public void testPartitionsScanNotFilter() { - table.newFastAppend() - .appendFile(FILE_PARTITION_0) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_1) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_2) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_3) - .commit(); + preparePartitionedTable(); Table partitionsTable = new PartitionsTable(table.ops(), table); Expression not = Expressions.not(Expressions.lessThan("partition.data_bucket", 2)); @@ -300,18 +262,7 @@ public void testPartitionsScanNotFilter() { @Test public void testPartitionsTableScanInFilter() { - table.newFastAppend() - .appendFile(FILE_PARTITION_0) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_1) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_2) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_3) - .commit(); + preparePartitionedTable(); Table partitionsTable = new PartitionsTable(table.ops(), table); @@ -325,18 +276,7 @@ public void testPartitionsTableScanInFilter() { @Test public void testPartitionsTableScanNotNullFilter() { - table.newFastAppend() - .appendFile(FILE_PARTITION_0) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_1) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_2) - .commit(); - table.newFastAppend() - .appendFile(FILE_PARTITION_3) - .commit(); + preparePartitionedTable(); Table partitionsTable = new PartitionsTable(table.ops(), table); @@ -350,6 +290,137 @@ public void testPartitionsTableScanNotNullFilter() { validateIncludesPartitionScan(tasksUnary, 3); } + @Test + public void testFilesTableScanNoFilter() { + preparePartitionedTable(); + + Table dataFilesTable = new DataFilesTable(table.ops(), table); + Types.StructType expected = new Schema( + required(102, "partition", Types.StructType.of( + optional(1000, "data_bucket", Types.IntegerType.get())), + "Partition data tuple, schema based on the partition spec")).asStruct(); + + TableScan scanNoFilter = dataFilesTable.newScan().select("partition.data_bucket"); + Assert.assertEquals(expected, scanNoFilter.schema().asStruct()); + CloseableIterable tasksAndEq = scanNoFilter.planFiles(); + + Assert.assertEquals(4, Iterables.size(tasksAndEq)); + validateFileScanTasks(tasksAndEq, 0); + validateFileScanTasks(tasksAndEq, 1); + validateFileScanTasks(tasksAndEq, 2); + validateFileScanTasks(tasksAndEq, 3); + } + + @Test + public void testFilesTableScanAndFilter() { + preparePartitionedTable(); + + Table dataFilesTable = new DataFilesTable(table.ops(), table); + + Expression andEquals = Expressions.and( + Expressions.equal("partition.data_bucket", 0), + Expressions.greaterThan("record_count", 0)); + TableScan scanAndEq = dataFilesTable.newScan().filter(andEquals); + CloseableIterable tasksAndEq = scanAndEq.planFiles(); + Assert.assertEquals(1, Iterables.size(tasksAndEq)); + validateFileScanTasks(tasksAndEq, 0); + } + + @Test + public void testFilesTableScanAndFilterWithPlanTasks() { + preparePartitionedTable(); + + Table dataFilesTable = new DataFilesTable(table.ops(), table); + + Expression andEquals = Expressions.and( + Expressions.equal("partition.data_bucket", 0), + Expressions.greaterThan("record_count", 0)); + TableScan scanAndEq = dataFilesTable.newScan().filter(andEquals); + CloseableIterable tasksAndEq = scanAndEq.planTasks(); + Assert.assertEquals(1, Iterables.size(tasksAndEq)); + validateCombinedScanTasks(tasksAndEq, 0); + } + + @Test + public void testFilesTableScanLtFilter() { + preparePartitionedTable(); + + Table dataFilesTable = new DataFilesTable(table.ops(), table); + + Expression lt = Expressions.lessThan("partition.data_bucket", 2); + TableScan scan = dataFilesTable.newScan().filter(lt); + CloseableIterable tasksLt = scan.planFiles(); + Assert.assertEquals(2, Iterables.size(tasksLt)); + validateFileScanTasks(tasksLt, 0); + validateFileScanTasks(tasksLt, 1); + } + + @Test + public void testFilesTableScanOrFilter() { + preparePartitionedTable(); + + Table dataFilesTable = new DataFilesTable(table.ops(), table); + + Expression or = Expressions.or( + Expressions.equal("partition.data_bucket", 2), + Expressions.greaterThan("record_count", 0)); + TableScan scan = dataFilesTable.newScan() + .filter(or); + CloseableIterable tasksOr = scan.planFiles(); + Assert.assertEquals(4, Iterables.size(tasksOr)); + validateFileScanTasks(tasksOr, 0); + validateFileScanTasks(tasksOr, 1); + validateFileScanTasks(tasksOr, 2); + validateFileScanTasks(tasksOr, 3); + } + + @Test + public void testFilesScanNotFilter() { + preparePartitionedTable(); + Table dataFilesTable = new DataFilesTable(table.ops(), table); + + Expression not = Expressions.not(Expressions.lessThan("partition.data_bucket", 2)); + TableScan scan = dataFilesTable.newScan() + .filter(not); + CloseableIterable tasksNot = scan.planFiles(); + Assert.assertEquals(2, Iterables.size(tasksNot)); + validateFileScanTasks(tasksNot, 2); + validateFileScanTasks(tasksNot, 3); + } + + @Test + public void testFilesTableScanInFilter() { + preparePartitionedTable(); + + Table dataFilesTable = new DataFilesTable(table.ops(), table); + + Expression set = Expressions.in("partition.data_bucket", 2, 3); + TableScan scan = dataFilesTable.newScan() + .filter(set); + CloseableIterable tasksNot = scan.planFiles(); + Assert.assertEquals(2, Iterables.size(tasksNot)); + + validateFileScanTasks(tasksNot, 2); + validateFileScanTasks(tasksNot, 3); + } + + @Test + public void testFilesTableScanNotNullFilter() { + preparePartitionedTable(); + + Table dataFilesTable = new DataFilesTable(table.ops(), table); + Expression unary = Expressions.notNull("partition.data_bucket"); + TableScan scan = dataFilesTable.newScan() + .filter(unary); + CloseableIterable tasksUnary = scan.planFiles(); + Assert.assertEquals(4, Iterables.size(tasksUnary)); + + validateFileScanTasks(tasksUnary, 0); + validateFileScanTasks(tasksUnary, 1); + validateFileScanTasks(tasksUnary, 2); + validateFileScanTasks(tasksUnary, 3); + } + @Test public void testDataFilesTableSelection() throws IOException { table.newFastAppend() @@ -391,4 +462,24 @@ private void validateIncludesPartitionScan(CloseableIterable tasks StreamSupport.stream(tasks.spliterator(), false).anyMatch( a -> a.file().partition().get(0, Object.class).equals(partValue))); } + + private void validateFileScanTasks(CloseableIterable fileScanTasks, int partValue) { + Assert.assertTrue("File scan tasks do not include correct file", + StreamSupport.stream(fileScanTasks.spliterator(), false).anyMatch(t -> { + ManifestFile mf = ((DataFilesTable.ManifestReadTask) t).manifest(); + return manifestHasPartition(mf, partValue); + })); + } + + private void validateCombinedScanTasks(CloseableIterable tasks, int partValue) { + StreamSupport.stream(tasks.spliterator(), false) + .flatMap(c -> c.files().stream().map(t -> ((DataFilesTable.ManifestReadTask) t).manifest())) + .anyMatch(m -> manifestHasPartition(m, partValue)); + } + + private boolean manifestHasPartition(ManifestFile mf, int partValue) { + int lower = Conversions.fromByteBuffer(Types.IntegerType.get(), mf.partitions().get(0).lowerBound()); + int upper = Conversions.fromByteBuffer(Types.IntegerType.get(), mf.partitions().get(0).upperBound()); + return (lower <= partValue) && (upper >= partValue); + } }