-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.3: Dataset writes for position deletes #7029
Spark 3.3: Dataset writes for position deletes #7029
Conversation
@@ -29,7 +29,7 @@ private static Schema pathPosSchema(Schema rowSchema) { | |||
return new Schema( | |||
MetadataColumns.DELETE_FILE_PATH, | |||
MetadataColumns.DELETE_FILE_POS, | |||
Types.NestedField.required( | |||
Types.NestedField.optional( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was necessary for the writer to allow writing position deletes with "row", but still be ok when there is null "row".
Currently, the writer code either uses a schema with required "row" field as is here, or a schema without the "row" field (see posPathSchema method just below). This one with required row field is actually not used, so changing to optional should have no impact.
This is actually more in line with the position-delete schema in the spec, where "row" is optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: Looks like a few GenericWriter depend on this, to throw exception if null rows passed in. This will thus be a change of behavior , but backward compatible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some concerns about this change, the Delete Formats
spec says the reason why this column type should be required
is to make sure the statistics of the deleted row values is accurate. I think the reason to make sure the statistics are accurate is because of the manifest reader will use them to filter delete files:
So I think if this type is changed to optional, the statistics will become unreliable, which may cause the delete manifest entry to be incorrectly filtered? This is just my understanding of the spec, but I'm not sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea you are right, this is tricky. It says this in spec:
2147483544 row | required struct<...> [1] | Deleted row values. Omit the column when not storing deleted rows. |
---|
When present in the delete file, row is required because all delete entries must include the row values.
So either, entire position delete file has 'row', or entire file does not have 'row'. (Currently it seems Spark does not set 'row' at all, ref: https://github.com/apache/iceberg/blob/master/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java#L436)
I somehow need a way, when compacting delete files, to know whether the original position file all have rows or not. I am not sure at the moment how to get this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the pr with a fix for this , the idea from chatting with @RussellSpitzer offline. I made SparkWrite.DeleteWriter now a fan-out that can redirect deletes to two files, one that either has 'row' as required struct, or no 'row' at all. In most case only one will be chosen. Thanks for the initial comment.
8775b2a
to
1364baf
Compare
9fecbba
to
38cc095
Compare
Rebase on updated version of #6924 |
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ScanTaskSetManager.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ScanTaskSetManager.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
Outdated
Show resolved
Hide resolved
38cc095
to
fb29954
Compare
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/AbstractFileRewriteCoordinator.java
Outdated
Show resolved
Hide resolved
I went through the change. Let me do a detailed review round with fresh eyes tomorrow. |
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
Outdated
Show resolved
Hide resolved
Made suggested changes (refactored to new classe SparkPositionDeletesRewrite from SparkWrite). Notes, new classes drop some unused codes from the previous path, like reportMetrics method and cleanupOnAbort flag to control abort behavior. I assume we can go back to this when we implement the commit manager part, as of now its not clear whether we need this or not. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is getting close. I did a detailed round. Will check tests with fresh eyes tomorrow.
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/AbstractFileRewriteCoordinator.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/AbstractFileRewriteCoordinator.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/AbstractFileRewriteCoordinator.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/AbstractFileRewriteCoordinator.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/AbstractFileRewriteCoordinator.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
Outdated
Show resolved
Hide resolved
.../spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
Show resolved
Hide resolved
29788fa
to
b02c011
Compare
b02c011
to
652f37f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there.
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/BaseFileRewriteCoordinator.java
Outdated
Show resolved
Hide resolved
.../spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java
Show resolved
Hide resolved
.../spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java
Outdated
Show resolved
Hide resolved
.../spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I left a few minor comments. Feel free to merge whenever you are ready, @szehon-ho.
Nice work!
|
||
abstract class BaseFileRewriteCoordinator<F extends ContentFile<F>> { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(FileRewriteCoordinator.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are using a wrong class for logging. It should be BaseFileRewriteCoordinator
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, fixed
Preconditions.checkArgument( | ||
fileSetId != null, "position_deletes table can only be written by RewriteDeleteFiles"); | ||
Preconditions.checkArgument( | ||
writeConf.handleTimestampWithoutZone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this part would be easier to read if we define fileSetId
and handleTimestampWithoutZone
instance variables, similar to what we have in SparkWriteBuilder
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its a bit harder to read if we define in Constructor, like SparkWriteBuilder, as its a bit detached from this code. I rewrote to define the vars at beginning of the method.
partitions.addAll(tasks.stream().map(ContentScanTask::partition).collect(Collectors.toList())); | ||
Preconditions.checkArgument( | ||
partitions.size() == 1, | ||
"All scan tasks of %s are expected to have the same partition", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we miss , but got %s
at the end to include partitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, done
String fileSetId = writeConf.rewrittenFileSetId(); | ||
|
||
Preconditions.checkArgument( | ||
fileSetId != null, "position_deletes table can only be written by RewriteDeleteFiles"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is RewriteDeleteFiles
.
What about a more generic message like Can only write to %s via actions", table.name()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, done
resultMap.remove(id); | ||
} | ||
|
||
public Set<String> fetchSetIDs(Table table) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we renamed it to be fetchSetIds
instead of fetchSetIDs
, so have to keep the old method for now.
|
||
SparkFileWriterFactory writerFactoryWithRow = | ||
SparkFileWriterFactory.builderFor(table) | ||
.dataSchema(writeSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure we need to set these dataXXX
methods since we are not writing any data (here).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
private StructLike partition(String fileSetId, List<PositionDeletesScanTask> tasks) { | ||
StructLikeSet partitions = StructLikeSet.create(tasks.get(0).spec().partitionType()); | ||
partitions.addAll(tasks.stream().map(ContentScanTask::partition).collect(Collectors.toList())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think you can use forEach
instead of a temp list.
tasks.stream().map(ContentScanTask::partition).forEach(partitions::add);
In any case, I like what you did here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Merged, thanks @aokolnychyi for detailed review, and @zhongyujiang @amogh-jahagirdar for initial reviews |
This is the last pre-requisite for implementing RewriteDeleteFiles.
It allows dataset writes to the position_deletes metadata table, on condition of rewritte_file_set_id being set (ie, it comes from Iceberg's internal use).
Part of this pr, which is simple refactoring, is already split out into: #6924