Skip to content

Conversation

@aokolnychyi
Copy link
Contributor

This PR adds SparkPartitionReaderFactory that creates readers based on tasks in input partitions.

@github-actions github-actions bot added the spark label Dec 2, 2022
import org.slf4j.LoggerFactory;

class BatchDataReader extends BaseBatchReader<FileScanTask> {
class BatchDataReader extends BaseBatchReader<FileScanTask>
Copy link
Contributor Author

@aokolnychyi aokolnychyi Dec 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was only used as PartitionReader in SparkScan, where we extended it, mixed PartitionReader and called the implementation as BatchReader. After adding a common reader factory, we may have multiple batch readers now. That's why BatchDataReader seemed like a more accurate name than BatchReader. As there were no other places that used this class, I decided to implement PartitionReader directly here.

Any feedback is welcome. See SparkScan below for old usage.

}

BatchDataReader(
ScanTaskGroup<FileScanTask> task,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most other readers have another order of parameters: table, taskGroup, expectedSchema, caseSensitive.

public Batch toBatch() {
return new SparkChangelogBatch(
spark, table, readConf, taskGroups(), expectedSchema, hashCode());
return new SparkBatch(sparkContext, table, readConf, taskGroups(), expectedSchema, hashCode());
Copy link
Contributor Author

@aokolnychyi aokolnychyi Dec 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you take a look at SparkChangelogBatch, it was identical to SparkBatch, except the reader factory. Since we are adding a common factory for all tasks, it seemed appropriate to always leverage one class.


SparkInputPartition partition = (SparkInputPartition) inputPartition;

if (partition.allTasksOfType(FileScanTask.class)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho, this is where you would check the type of tasks and select your reader.

@aokolnychyi
Copy link
Contributor Author

return (ScanTaskGroup<T>) taskGroup;
}

public <T extends ScanTask> boolean allTasksOfType(Class<T> javaClass) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious question here, do we assume a single task type for every taskGroup or not?

Just wondering if we can optimize it to just check the type of taskGroup.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Dec 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to be able to check whether all tasks conform to a particular known parent type. For instance, for changelog tasks we only check if all are tasks implement ChangelogScanTask. Then the reader itself may downcast it to a particular child type (e.g. ChangelogRowReader).

We can't check the type of taskGroup due to Java type erasure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we allow a taskgroup to have tasks of different types? I didn't think so. But I don't think it costs us that much to just check all of them instead of just the first element.

Copy link
Member

@szehon-ho szehon-ho Dec 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Yea my question was Russell's question, if there's one to one relation always (defined by TaskGroup). But I guess there's nothing in TaskGroup preventing different tasks that are subclass of T, although that would be strange.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, ScanTaskGroup may have arbitrary tasks.


@Override
public boolean supportColumnarReads(InputPartition inputPartition) {
return batchSize > 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how this relates to vectorized reads?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this was in the old code, I guess we keep it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, I just copied what we had before. Our SparkBatch decides whether vectorized reads are supported and passes a batch size of > 1 if supported, 0 otherwise.

Copy link
Contributor

@flyrain flyrain Dec 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding methods .vectorized(boolean).batchSize(int)? It would be more code, but cleaner logically.

class SparkPartitionReaderFactory implements PartitionReaderFactory {
private final int batchSize;

SparkPartitionReaderFactory(int batchSize) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should just doc here that a batch size > 1 will create a ColumnarReader , just seems like a bit of a magic parameter here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about creating separate batch and non-batch reader factories? I just copied the existing code but having separate classes seems more natural than checking if batchSize is > 1.

Thoughts, @RussellSpitzer @szehon-ho?

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all makes sense to me. @szehon-ho is closer to the implementation here though so I'll defer to him on if this makes sense for his delete file reader implementation

Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I'll still have to see how my new table will fit (a bit behind), but refactor looks good to me, and we can revisit if needed

Copy link
Contributor

@flyrain flyrain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good. Thanks @aokolnychyi!

@aokolnychyi
Copy link
Contributor Author

I've split the factory into two. I feel that eliminates the confusion pretty well.

@aokolnychyi aokolnychyi merged commit 7fd9ded into apache:master Dec 2, 2022
@aokolnychyi
Copy link
Contributor Author

Thanks for reviewing, @szehon-ho @RussellSpitzer @flyrain!

sunchao pushed a commit to sunchao/iceberg that referenced this pull request May 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants