Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,24 @@
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;

class SparkBatch implements Batch {
abstract class SparkBatch implements Batch {

private final JavaSparkContext sparkContext;
private final Table table;
private final SparkReadConf readConf;
private final List<CombinedScanTask> tasks;
private final Schema expectedSchema;
private final boolean caseSensitive;
private final boolean localityEnabled;

SparkBatch(JavaSparkContext sparkContext, Table table, SparkReadConf readConf,
List<CombinedScanTask> tasks, Schema expectedSchema) {
this.sparkContext = sparkContext;
SparkBatch(SparkSession spark, Table table, SparkReadConf readConf, Schema expectedSchema) {
this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
this.readConf = readConf;
this.tasks = tasks;
this.expectedSchema = expectedSchema;
this.caseSensitive = readConf.caseSensitive();
this.localityEnabled = readConf.localityEnabled();
Expand All @@ -65,18 +63,24 @@ public InputPartition[] planInputPartitions() {
Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
String expectedSchemaString = SchemaParser.toJson(expectedSchema);

InputPartition[] readTasks = new InputPartition[tasks.size()];
InputPartition[] readTasks = new InputPartition[tasks().size()];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: is there any concern with calling tasks() repeatedly like this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ja, in some Batch like fileBatch, tasks are recomputed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The subclasses all lazily instantiate tasks only once AFAIK


Tasks.range(readTasks.length)
.stopOnFailure()
.executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null)
.run(index -> readTasks[index] = new ReadTask(
tasks.get(index), tableBroadcast, expectedSchemaString,
tasks().get(index), tableBroadcast, expectedSchemaString,
Copy link
Contributor

@singhpk234 singhpk234 Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] task() is synchronized, and we might be accessing here with a pool, should remove synchronized from function def and make it like :

private volatile List<CombinedScanTask> tasks = null

protected  List<CombinedScanTask> tasks() {
    if (tasks == null) {
      synchronized(this) {
        if (tasks == null) {
            task = // assign stuff to task
         }
       }
    }
    return tasks;
  }

I think now we might also want to make SparkFilesScan#task() handle multi-threading.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we weren't synchronizing before in 0.13, but I can add that check if you think it's needed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, as per my understanding, was thinking, we would be requiring it, as we have a possibility to call tasks() via mt (as we were in a thread pool), Though I see now in L66 above we already would have called tasks() which would have populated this task anyhow, since we have made the function itself synchronized, each of this access will now be sequential even for the case we have task as not null.

All in all would say I don't have a strong reason for it, would say it's just something that caught my eye :) !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looked like there are other cases where concurrent access would cause issues so I decided just to revert it back to the way it was in 0.13 in principle. For example in SparkBatchQueryScan both tasks() and files() behave similarly. So we'd need to update both if we want to support concurrency.

caseSensitive, localityEnabled));

return readTasks;
}

protected abstract List<CombinedScanTask> tasks();

protected JavaSparkContext sparkContext() {
return sparkContext;
}

@Override
public PartitionReaderFactory createReaderFactory() {
return new ReaderFactory(batchSize());
Expand All @@ -93,7 +97,7 @@ private int batchSize() {
}

private boolean parquetOnly() {
return tasks.stream().allMatch(task -> !task.isDataTask() && onlyFileFormat(task, FileFormat.PARQUET));
return tasks().stream().allMatch(task -> !task.isDataTask() && onlyFileFormat(task, FileFormat.PARQUET));
}

private boolean parquetBatchReadsEnabled() {
Expand All @@ -103,12 +107,12 @@ private boolean parquetBatchReadsEnabled() {
}

private boolean orcOnly() {
return tasks.stream().allMatch(task -> !task.isDataTask() && onlyFileFormat(task, FileFormat.ORC));
return tasks().stream().allMatch(task -> !task.isDataTask() && onlyFileFormat(task, FileFormat.ORC));
}

private boolean orcBatchReadsEnabled() {
return readConf.orcVectorizationEnabled() && // vectorization enabled
tasks.stream().noneMatch(TableScanUtil::hasDeletes); // no delete files
tasks().stream().noneMatch(TableScanUtil::hasDeletes); // no delete files
}

private boolean onlyFileFormat(CombinedScanTask task, FileFormat fileFormat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
Expand All @@ -57,10 +56,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class SparkScan implements Scan, SupportsReportStatistics {
abstract class SparkScan extends SparkBatch implements Scan, SupportsReportStatistics {
private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class);

private final JavaSparkContext sparkContext;
private final Table table;
private final SparkReadConf readConf;
private final boolean caseSensitive;
Expand All @@ -69,14 +67,14 @@ abstract class SparkScan implements Scan, SupportsReportStatistics {
private final boolean readTimestampWithoutZone;

// lazy variables
private StructType readSchema = null;
private StructType readSchema;

SparkScan(SparkSession spark, Table table, SparkReadConf readConf,
Schema expectedSchema, List<Expression> filters) {
super(spark, table, readConf, expectedSchema);

SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema);

this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
this.readConf = readConf;
this.caseSensitive = readConf.caseSensitive();
Expand All @@ -101,16 +99,14 @@ protected List<Expression> filterExpressions() {
return filterExpressions;
}

protected abstract List<CombinedScanTask> tasks();

@Override
public Batch toBatch() {
return new SparkBatch(sparkContext, table, readConf, tasks(), expectedSchema);
return this;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suspicion is that there is some check that the scan implements Batch, or some other equality check someplace, that is required for filter pushdown.

}

@Override
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
return new SparkMicroBatchStream(sparkContext, table, readConf, expectedSchema, checkpointLocation);
return new SparkMicroBatchStream(sparkContext(), table, readConf, expectedSchema, checkpointLocation);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ public void testUseLocalIterator() {
checkExpirationResults(1L, 0L, 0L, 1L, 2L, results);

Assert.assertEquals("Expected total number of jobs with stream-results should match the expected number",
5L, jobsRunDuringStreamResults);
4L, jobsRunDuringStreamResults);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark is planning the scan more optimally by reusing BatchScanExecs, so there are only 4 tasks. However if anyone has more in depth knowledge of this assertion, it would be good to double check this is correct.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you explain what the 4 task is? I'm really confused, thanks a lot

});
}
}