From 1b413ec7da2cb9ec12cae6a804accbcbed3fd54b Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Mon, 27 Jun 2022 20:50:11 -0700 Subject: [PATCH 1/3] Spark: fix regression from scan refactor --- .../iceberg/spark/source/SparkBatch.java | 19 +++++++++++-------- .../iceberg/spark/source/SparkScan.java | 14 +++++++++++--- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index ee4674f27bb3..ea18744165ff 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source; import java.util.List; +import java.util.function.Supplier; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; @@ -43,17 +44,17 @@ class SparkBatch implements Batch { private final JavaSparkContext sparkContext; private final Table table; private final SparkReadConf readConf; - private final List tasks; + private final Supplier> taskSupplier; private final Schema expectedSchema; private final boolean caseSensitive; private final boolean localityEnabled; SparkBatch(JavaSparkContext sparkContext, Table table, SparkReadConf readConf, - List tasks, Schema expectedSchema) { + Supplier> taskSupplier, Schema expectedSchema) { this.sparkContext = sparkContext; this.table = table; this.readConf = readConf; - this.tasks = tasks; + this.taskSupplier = taskSupplier; this.expectedSchema = expectedSchema; this.caseSensitive = readConf.caseSensitive(); this.localityEnabled = readConf.localityEnabled(); @@ -65,6 +66,7 @@ public InputPartition[] planInputPartitions() { Broadcast tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); String expectedSchemaString = SchemaParser.toJson(expectedSchema); + List tasks = taskSupplier.get(); InputPartition[] readTasks = new InputPartition[tasks.size()]; Tasks.range(readTasks.length) @@ -83,16 +85,17 @@ public PartitionReaderFactory createReaderFactory() { } private int batchSize() { - if (parquetOnly() && parquetBatchReadsEnabled()) { + List tasks = taskSupplier.get(); + if (parquetOnly(tasks) && parquetBatchReadsEnabled()) { return readConf.parquetBatchSize(); - } else if (orcOnly() && orcBatchReadsEnabled()) { + } else if (orcOnly(tasks) && orcBatchReadsEnabled(tasks)) { return readConf.orcBatchSize(); } else { return 0; } } - private boolean parquetOnly() { + private boolean parquetOnly(List tasks) { return tasks.stream().allMatch(task -> !task.isDataTask() && onlyFileFormat(task, FileFormat.PARQUET)); } @@ -102,11 +105,11 @@ private boolean parquetBatchReadsEnabled() { expectedSchema.columns().stream().allMatch(c -> c.type().isPrimitiveType()); // only primitives } - private boolean orcOnly() { + private boolean orcOnly(List tasks) { return tasks.stream().allMatch(task -> !task.isDataTask() && onlyFileFormat(task, FileFormat.ORC)); } - private boolean orcBatchReadsEnabled() { + private boolean orcBatchReadsEnabled(List tasks) { return readConf.orcVectorizationEnabled() && // vectorization enabled tasks.stream().noneMatch(TableScanUtil::hasDeletes); // no delete files } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 7d93ad66e1e8..e484735103fb 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -69,7 +69,9 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { private final boolean readTimestampWithoutZone; // lazy variables - private StructType readSchema = null; + private StructType readSchema; + private Batch batch; + private MicroBatchStream microBatchStream; SparkScan(SparkSession spark, Table table, SparkReadConf readConf, Schema expectedSchema, List filters) { @@ -105,12 +107,18 @@ protected List filterExpressions() { @Override public Batch toBatch() { - return new SparkBatch(sparkContext, table, readConf, tasks(), expectedSchema); + if (batch == null) { + batch = new SparkBatch(sparkContext, table, readConf, this::tasks, expectedSchema); + } + return batch; } @Override public MicroBatchStream toMicroBatchStream(String checkpointLocation) { - return new SparkMicroBatchStream(sparkContext, table, readConf, expectedSchema, checkpointLocation); + if (microBatchStream == null) { + microBatchStream = new SparkMicroBatchStream(sparkContext, table, readConf, expectedSchema, checkpointLocation); + } + return microBatchStream; } @Override From 7c22fa10f182eabab8e574dac2ea6f399b7517ae Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Mon, 27 Jun 2022 22:21:25 -0700 Subject: [PATCH 2/3] Make SparkScan implement the Batch interface --- .../iceberg/spark/source/SparkBatch.java | 39 ++++++++++--------- .../iceberg/spark/source/SparkScan.java | 20 ++-------- 2 files changed, 24 insertions(+), 35 deletions(-) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index ea18744165ff..a9a8f95691b0 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -20,7 +20,6 @@ package org.apache.iceberg.spark.source; import java.util.List; -import java.util.function.Supplier; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; @@ -35,26 +34,24 @@ import org.apache.iceberg.util.ThreadPools; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReaderFactory; -class SparkBatch implements Batch { +abstract class SparkBatch implements Batch { private final JavaSparkContext sparkContext; private final Table table; private final SparkReadConf readConf; - private final Supplier> taskSupplier; private final Schema expectedSchema; private final boolean caseSensitive; private final boolean localityEnabled; - SparkBatch(JavaSparkContext sparkContext, Table table, SparkReadConf readConf, - Supplier> taskSupplier, Schema expectedSchema) { - this.sparkContext = sparkContext; + SparkBatch(SparkSession spark, Table table, SparkReadConf readConf, Schema expectedSchema) { + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); this.table = table; this.readConf = readConf; - this.taskSupplier = taskSupplier; this.expectedSchema = expectedSchema; this.caseSensitive = readConf.caseSensitive(); this.localityEnabled = readConf.localityEnabled(); @@ -66,37 +63,41 @@ public InputPartition[] planInputPartitions() { Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); String expectedSchemaString = SchemaParser.toJson(expectedSchema); - List tasks = taskSupplier.get(); - InputPartition[] readTasks = new InputPartition[tasks.size()]; + InputPartition[] readTasks = new InputPartition[tasks().size()]; Tasks.range(readTasks.length) .stopOnFailure() .executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null) .run(index -> readTasks[index] = new ReadTask( - tasks.get(index), tableBroadcast, expectedSchemaString, + tasks().get(index), tableBroadcast, expectedSchemaString, caseSensitive, localityEnabled)); return readTasks; } + protected abstract List tasks(); + + protected JavaSparkContext sparkContext() { + return sparkContext; + } + @Override public PartitionReaderFactory createReaderFactory() { return new ReaderFactory(batchSize()); } private int batchSize() { - List tasks = taskSupplier.get(); - if (parquetOnly(tasks) && parquetBatchReadsEnabled()) { + if (parquetOnly() && parquetBatchReadsEnabled()) { return readConf.parquetBatchSize(); - } else if (orcOnly(tasks) && orcBatchReadsEnabled(tasks)) { + } else if (orcOnly() && orcBatchReadsEnabled()) { return readConf.orcBatchSize(); } else { return 0; } } - private boolean parquetOnly(List tasks) { - return tasks.stream().allMatch(task -> !task.isDataTask() && onlyFileFormat(task, FileFormat.PARQUET)); + private boolean parquetOnly() { + return tasks().stream().allMatch(task -> !task.isDataTask() && onlyFileFormat(task, FileFormat.PARQUET)); } private boolean parquetBatchReadsEnabled() { @@ -105,13 +106,13 @@ private boolean parquetBatchReadsEnabled() { expectedSchema.columns().stream().allMatch(c -> c.type().isPrimitiveType()); // only primitives } - private boolean orcOnly(List tasks) { - return tasks.stream().allMatch(task -> !task.isDataTask() && onlyFileFormat(task, FileFormat.ORC)); + private boolean orcOnly() { + return tasks().stream().allMatch(task -> !task.isDataTask() && onlyFileFormat(task, FileFormat.ORC)); } - private boolean orcBatchReadsEnabled(List tasks) { + private boolean orcBatchReadsEnabled() { return readConf.orcVectorizationEnabled() && // vectorization enabled - tasks.stream().noneMatch(TableScanUtil::hasDeletes); // no delete files + tasks().stream().noneMatch(TableScanUtil::hasDeletes); // no delete files } private boolean onlyFileFormat(CombinedScanTask task, FileFormat fileFormat) { diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index e484735103fb..b9292541eaeb 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -40,7 +40,6 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.util.PropertyUtil; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; @@ -57,10 +56,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract class SparkScan implements Scan, SupportsReportStatistics { +abstract class SparkScan extends SparkBatch implements Scan, SupportsReportStatistics { private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class); - private final JavaSparkContext sparkContext; private final Table table; private final SparkReadConf readConf; private final boolean caseSensitive; @@ -70,15 +68,13 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { // lazy variables private StructType readSchema; - private Batch batch; - private MicroBatchStream microBatchStream; SparkScan(SparkSession spark, Table table, SparkReadConf readConf, Schema expectedSchema, List filters) { + super(spark, table, readConf, expectedSchema); SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema); - this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); this.table = table; this.readConf = readConf; this.caseSensitive = readConf.caseSensitive(); @@ -103,22 +99,14 @@ protected List filterExpressions() { return filterExpressions; } - protected abstract List tasks(); - @Override public Batch toBatch() { - if (batch == null) { - batch = new SparkBatch(sparkContext, table, readConf, this::tasks, expectedSchema); - } - return batch; + return this; } @Override public MicroBatchStream toMicroBatchStream(String checkpointLocation) { - if (microBatchStream == null) { - microBatchStream = new SparkMicroBatchStream(sparkContext, table, readConf, expectedSchema, checkpointLocation); - } - return microBatchStream; + return new SparkMicroBatchStream(sparkContext(), table, readConf, expectedSchema, checkpointLocation); } @Override From f1d3304d7fce26939f03d5d9d3c5c067a00e7bc4 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Tue, 28 Jun 2022 09:10:44 -0700 Subject: [PATCH 3/3] Fix assertion for test --- .../apache/iceberg/spark/actions/TestExpireSnapshotsAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java index b278bd08a608..20b399e12580 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java @@ -1147,7 +1147,7 @@ public void testUseLocalIterator() { checkExpirationResults(1L, 0L, 0L, 1L, 2L, results); Assert.assertEquals("Expected total number of jobs with stream-results should match the expected number", - 5L, jobsRunDuringStreamResults); + 4L, jobsRunDuringStreamResults); }); } }