Skip to content

Spark: Adopt the new Scan Task APIs in Spark Readers#5248

Merged
aokolnychyi merged 14 commits intoapache:masterfrom
flyrain:cdc-reader
Jul 26, 2022
Merged

Spark: Adopt the new Scan Task APIs in Spark Readers#5248
aokolnychyi merged 14 commits intoapache:masterfrom
flyrain:cdc-reader

Conversation

@flyrain
Copy link
Contributor

@flyrain flyrain commented Jul 11, 2022

This PR mainly adopts the change in #5077. I'm also experimenting the Change log reader to demonstrate how the new reader hierarchy works.
cc @aokolnychyi @rdblue @RussellSpitzer @stevenzwu @szehon-ho @karuppayya

import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;

public class ChangelogRowReader {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is only for demonstrating, I will remove it before merging, and will file a separated PR for change log reader.

Comment on lines 90 to 100
Stream<ContentFile> stream = Stream.of(contentScanTask.file());
if (contentScanTask.isFileScanTask()) {
stream = Stream.concat(stream, contentScanTask.asFileScanTask().deletes().stream());
} else if (contentScanTask instanceof AddedRowsScanTask) {
stream = Stream.concat(stream, ((AddedRowsScanTask) contentScanTask).deletes().stream());
} else if (contentScanTask instanceof DeletedDataFileScanTask) {
stream = Stream.concat(stream, ((DeletedDataFileScanTask) contentScanTask).existingDeletes().stream());
} else if (contentScanTask instanceof DeletedRowsScanTask) {
stream = Stream.concat(stream, ((DeletedRowsScanTask) contentScanTask).addedDeletes().stream());
stream = Stream.concat(stream, ((DeletedRowsScanTask) contentScanTask).existingDeletes().stream());
}
Copy link
Contributor Author

@flyrain flyrain Jul 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not happy with this down casting. To eliminate it, we can have a method returning all related content files in class ContentScanTask. For example,

  1. The method returns data file as well as all delete files in FileScanTask.
  2. The method returns data file, addedDeletes files, existingDeletes files and in DeletedRowsScanTask

The method name could be allContentFiles(), or relatedContentFiles(), etc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is not good. I'd be up exposing something like referencedDataFiles or dataFiles with referencedDeleteFiles or deleteFiles to avoid ? in public API.

Maybe, we can even even add those methods to ScanTask?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with these solutions. Adding them to ScanTask also makes sense.
For inputFile map in the BaseDataReader, we don't actually need to know if it is a DataFile or DeleteFile. But to avoid ? in the public APIs, we still need two methods, one for DataFile and another for DeleteFile. Just wondering if we can remove the type parameter in interface ContentFile<F> so that we can use ContentFile in this use case.
Here is another use case to unify DataFile and DeleteFile, #4142 (review).

@aokolnychyi
Copy link
Contributor

Let me take a look now.

Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a preliminary look and made some comments

return stream;
});

dataFileStream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little outside scope of change but now we are making a variable, how about,

Map<String, ByteBuffer> keyMetadata = dataFileStream.collect.toMap(file -> file.key, file -> file.value)

* @param <T> is the Java class returned by this reader whose objects contain one or more rows.
*/
abstract class BaseDataReader<T> implements Closeable {
abstract class BaseDataReader<T, CST extends ContentScanTask<?>, G extends ScanTaskGroup<CST>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I am not convinced we have to restrict this to ContentScanTask. Also, keep in mind that different changelog tasks can be packed into the same task group. That's why the reader should not be restricted to strictly one task type.

I'd consider using ScanTask and changing the hierarchy a bit. This can can become BaseReader. I also don't think we need the second type parameter. We can just work with ScanTaskGroup<ST>.

abstract class BaseReader<T, ST extends ScanTask> implements Closeable {

  ...

  BaseReader(Table table, ScanTaskGroup<ST> taskGroup) {
    this.table = table;
    this.tasks = taskGroup.tasks().iterator();
    this.inputFiles = inputFiles(taskGroup);
    this.currentIterator = CloseableIterator.empty();
  }

  ...
}

Then I'd consider adding BaseRowReader like this (also no data in the name).

abstract class BaseRowReader<ST extends ScanTask> extends BaseReader<InternalRow, ST> {

  private final Schema tableSchema;
  private final Schema expectedSchema;
  private final String nameMapping;
  private final boolean caseSensitive;

  BaseRowReader(Table table, ScanTaskGroup<ST> taskGroup, Schema expectedSchema, boolean caseSensitive) {
    super(table, taskGroup);
    this.tableSchema = table.schema();
    this.expectedSchema = expectedSchema;
    this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
    this.caseSensitive = caseSensitive;
  }

  protected Schema tableSchema() {
    return tableSchema;
  }

  protected Schema expectedSchema() {
    return expectedSchema;
  }

  protected CloseableIterable<InternalRow> newIterable(InputFile file, FileFormat format, long start, long length,
                                                       Expression residual, Schema projection,
                                                       Map<Integer, ?> idToConstant) {
    switch (format) {
      case PARQUET:
        return newParquetIterable(file, start, length, residual, projection, idToConstant);

      case AVRO:
        return newAvroIterable(file, start, length, projection, idToConstant);

      case ORC:
        return newOrcIterable(file, start, length, residual, projection, idToConstant);

      default:
        throw new UnsupportedOperationException("Cannot read unknown format: " + format);
    }
  }

  ...
}

Then the existing RowDataReader won't need to change a lot.

class RowDataReader extends BaseRowReader<FileScanTask> {
  ...
}

Finally, we will have ChangelogRowReader capable of reading all types of changelog tasks like this.

class ChangelogRowReader extends BaseRowReader<ChangelogScanTask> {
  ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the second type parameter isn't needed. Thanks for the suggestion.

If we want to use ScanTask in the BaseReader/BaseDataReader, we need to either

  1. move some common logic to its subclasses, e.g. getInputFile(), constantsMap() , which need the method from ContentScanTask.
  2. add new methods in ScanTask. e.g. file()

Comment on lines 90 to 100
Stream<ContentFile> stream = Stream.of(contentScanTask.file());
if (contentScanTask.isFileScanTask()) {
stream = Stream.concat(stream, contentScanTask.asFileScanTask().deletes().stream());
} else if (contentScanTask instanceof AddedRowsScanTask) {
stream = Stream.concat(stream, ((AddedRowsScanTask) contentScanTask).deletes().stream());
} else if (contentScanTask instanceof DeletedDataFileScanTask) {
stream = Stream.concat(stream, ((DeletedDataFileScanTask) contentScanTask).existingDeletes().stream());
} else if (contentScanTask instanceof DeletedRowsScanTask) {
stream = Stream.concat(stream, ((DeletedRowsScanTask) contentScanTask).addedDeletes().stream());
stream = Stream.concat(stream, ((DeletedRowsScanTask) contentScanTask).existingDeletes().stream());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is not good. I'd be up exposing something like referencedDataFiles or dataFiles with referencedDeleteFiles or deleteFiles to avoid ? in public API.

Maybe, we can even even add those methods to ScanTask?

@github-actions github-actions bot added the API label Jul 21, 2022
@github-actions github-actions bot removed the API label Jul 22, 2022
Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks almost ready to go. I had a question about constructing an empty delete filter for batch reads instead of passing null and a few nits.

protected StructLike asStructLike(InternalRow row) {
return asStructLike.wrap(row);
}
SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, task.deletes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this changed the previous behavior where we would not construct a delete filter if the list of deletes if empty. Shall we still pass null if deletes are empty just to avoid surprises? I am not sure there will be any performance degradation but it seems safer to keep the old behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that it is safer to not change the behavior.

this.batchSize = batchSize;
}

protected CloseableIterable<ColumnarBatch> newBatchIterable(InputFile location, FileFormat format,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I know it was like this in the old implementation but what about renaming location -> file?

}
}

private CloseableIterable<ColumnarBatch> newParquetIterable(InputFile location, long start, long length,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Same here

}

abstract CloseableIterator<T> open(FileScanTask task);
abstract CloseableIterator<T> open(TaskT task);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: In other classes (not very consistent), we usually put abstract methods immediately after the constructor so that it is obvious what children must implement. Since we are changing this line anyway and also adding one more abstract method, what about putting those two immediately after the constructor and making both either protected or package-private for consistency?

Stream<EncryptedInputFile> encryptedFiles = taskGroup.tasks().stream()
.flatMap(this::referencedFiles)
.map(file ->
EncryptedFiles.encryptedInput(table.io().newInputFile(file.path().toString()), file.keyMetadata()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: You could define an helper method for constructing encrypted input files and fit this on one line.

Stream<EncryptedInputFile> encryptedFiles = taskGroup.tasks().stream()
    .flatMap(this::referencedFiles)
    .map(this::toEncryptedInputFile);
private EncryptedInputFile toEncryptedInputFile(ContentFile<?> file) {
  InputFile inputFile = table.io().newInputFile(file.path().toString());
  return EncryptedFiles.encryptedInput(inputFile, file.keyMetadata());
}

if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
StructType partitionType = Partitioning.partitionType(table);
return PartitionUtil.constantsMap(task, partitionType, BaseDataReader::convertConstant);
Types.StructType partitionType = Partitioning.partitionType(table);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we add Types. on purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Let me remove it.

protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
private final InternalRowWrapper asStructLike;

SparkDeleteFilter(String filePath, List<DeleteFile> deletes, Schema requestedSchema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: You may remove this constructor by using expectedSchema() in EqualityDeleteRowReader that is available now via the base reader. It is a bit confusing that we compute the table schema using an instance var of the outer class yet accept a variable for expected schema even though we have access to the expected schema in this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. I was trying not to touch EqualityDeleteRowReader. Made the change.

.build();
}

private CloseableIterable<ColumnarBatch> newOrcIterable(InputFile location, long start, long length,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Same here

StructInternalRow row = new StructInternalRow(readSchema.asStruct());
CloseableIterable<InternalRow> asSparkRows = CloseableIterable.transform(
task.asDataTask().rows(), row::setStruct);
CloseableIterable<InternalRow> asSparkRows = CloseableIterable.transform(task.asDataTask().rows(), row::setStruct);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: What about inlining to remove the useless temp var?

return CloseableIterable.transform(task.asDataTask().rows(), row::setStruct);

import static org.apache.iceberg.Files.localOutput;

public class TestSparkBaseDataReader {
public class TestBaseReader {
Copy link
Contributor

@aokolnychyi aokolnychyi Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the rename required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, but I prefer this renaming since we changed the class name from BaseDataReader to BaseReader.

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending tests.

@aokolnychyi aokolnychyi merged commit d7b1a87 into apache:master Jul 26, 2022
@aokolnychyi
Copy link
Contributor

Thanks for the change, @flyrain! Thanks for reviewing, @szehon-ho!

zhongyujiang pushed a commit to zhongyujiang/iceberg that referenced this pull request Apr 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants