-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cherrypick snapshot feature #695
Conversation
@rdblue @aokolnychyi Please take a look. This is a first cut implementation with appends only. Initial implementation submission for feedback. |
core/src/main/java/org/apache/iceberg/CherryPickFromSnapshot.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/CherryPickFromSnapshot.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/CherryPickFromSnapshot.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/CherryPickFromSnapshot.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/CherryPickFromSnapshot.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/CherryPickFromSnapshot.java
Outdated
Show resolved
Hide resolved
* Cherry-picking should apply the exact set of changes that were done in the original commit. | ||
* - All added files should be added to the new version. | ||
* - Todo: If files were deleted, then those files must still exist in the data set. | ||
* - Does not support Overwrite operations currently. Overwrites are considered as conflicts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few easy cases to support. First, appends can always be picked on top of any version. That's a good one to start with. Next, if the current snapshot is the picked commit's parent, then this can commit a "fast-forward" change and update just the current snapshot ID.
Overwrites get harder because we have to check history. For a dynamic partition overwrite, we need to make sure that no data has been added to any of the affected partitions since the commit. For expression overwrites, we need to similarly make sure that no new files match the commit's delete expression.
I think we would need extra metadata for overwrites and fast-forwards are a slightly different case, so I think it is a good idea to focus on just the append case in this PR.
core/src/main/java/org/apache/iceberg/CherryPickFromSnapshot.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/CherryPickFromSnapshot.java
Outdated
Show resolved
Hide resolved
"Snapshot provided for cherrypick didn't have wap.id set on it. " + | ||
"Only snapshots that were staged can be cherrypicked from."); | ||
ValidationException.check(!base.isWapIdPublished(Long.parseLong(wapId)), | ||
"Duplicate request to cherry pick wap id that was published already: %s", wapId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this check, but I think it should be whether the snapshot ID was published, not necessarily the WAP id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is checking on the client provided wap.id
which could be different from the snapshot id. It's part of our wap workflow. How we use this in our wap workflow is: we are setting an incoming batch-id (an id for a new delta of data) as the wap.id
and use that to audit our snapshot. If the same batch (wap.id) was published previously we reject that snapshot. This way multiple writers trying to cherry-pick the same batch would be safeguarded as the second one trying to write would get this ValidationException
. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a good idea to have the check, but shouldn't we use Iceberg's snapshot ID? Then we can check whether that snapshot is in the current snapshot's ancestors, or whether any ancestor was picked from that snapshot.
If we use WAP ID for this, then we have to rely on having a WAP ID, which might not be the case if we are picking a commit that was rolled back like in the example I gave in another comment: if a table has snapshots A -> B -> C and is rolled back to A, it should be possible to pick C on top of A.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a good idea to have the check, but shouldn't we use Iceberg's snapshot ID? Then we can check whether that snapshot is in the current snapshot's ancestors, or whether any ancestor was picked from that snapshot.
I think we are talking about two different validations. viz.
- check if a snapshot is already in the existing chain of snapshots so that same snapshot isn't cherrypicked again later down the chain (this, I think is what you are referring to),
- check if the incoming snapshot has a
wap.id
that was already committed up the chain. Here thewap.id
is a duplicate id across two or more unique snapshots that those writers are trying to cherry-pick. (this is what I'm trying to check for)
The difference between 1 and 2 is that since snapshots are generated by iceberg api, two different writers that try to commit the same wap.id could get unique snapshot ids. Check 1 would still allow both snapshots to enter the snapshot chain. We are trying to use iceberg as the central mechanism to optimistically write and fail if there is duplication instead of trying to depend on some external locking mechanism. This is what check 2 is trying to achieve.
I agree that the rollback usecase is a problem when checking using wap.id. In general I think we can make wap.id
optional on the incoming snapshot. In validations, we can perform both checks 1 & 2. I can make wap.id optional and perform check 2 only if there is a wap.id
on the snapshot.
Check 2 is essential to our usecase as it allows writers to write concurrently and protects against duplicate wap.ids from entering the system. Do you think this is something we can can incorporate into this workflow? Or do you suggest we do this another way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, that makes sense. I was thinking that a WAP ID was unique to reach write but it makes sense that you may want to use it that way and that you would want to check WAP ID as well. Let's add both checks.
I think that this should always check that the snapshot itself was not already picked into the current table state. I'm not sure whether to always check that the WAP ID was not picked into the current table state as well. I don't think that it would cause problems if we included the check by default, though. So it's probably easier for everyone if both are on by default. Sound okay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 .. if we need an override to skip the 2nd check I can open a separate issue later. But for now defaulting to both checks works just fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wanted to call out two things about the cherrypick, w.r.t to validation checks..
- if A->B->C is the current snapshot chain and we rolled-back to A. Then subsequently we cherrypicked C over A. If we perform check 1 ( to check if C exists in the existing chain) then C will fail this check as it's in the list of snapshots just that it's not an ancestor of A. This is why I haven't added that check yet. Did you mean here we should check instead if C is already an ancestor of A? maybe i misunderstood this check.
- When cherrypicking C , the cherrypick operation will create a new snapshot by applying C's appends and create a new snapshot D which is then set as the current_snapshot. So each cherrypick operation creates a new snapshot.
how does this sound to you?
core/src/main/java/org/apache/iceberg/CherryPickFromSnapshot.java
Outdated
Show resolved
Hide resolved
|
||
if (targetSnapshotId == null) { | ||
// if no target snapshot was configured then NOOP by returning current state | ||
return table.currentSnapshot(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic seems to conflict with the validation above. If it's okay to not have a target snapshot ID and the result is a noop, then we don't need the check that there is an operation, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
if (isWapWorkflow) { | ||
set(SnapshotSummary.PUBLISHED_WAP_ID_PROP, wapId); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For #744, we also need to set the picked snapshot ID in the new snapshot metadata here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
switch (operation) { | ||
case CHERRYPICK: | ||
Snapshot cherryPickSnapshot = base.snapshot(this.targetSnapshotId); | ||
String wapId = stagedWapId(cherryPickSnapshot); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: it may be nice to move this logic to a validation method to keep apply smaller: validateWapPublish(targetSnapshotId)
.
We will also need a second validation, validateNotAncestor(targetSnapshotId)
that validates the picked snapshot ID is not an ancestor and was not cherry-picked to create an ancestor. That should avoid double cherry-picks of non-WAP snapshots as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
…ce and target snapshot, added tests for ancestor checks
@rdblue I'v addressed pending comments and added the |
@prodeezy, thanks for the update! I did a deeper review on the commit operations and I found a few things to fix. I opened a PR against the PR branch to help out and get this in quicker: rominparekh#2 Can you review that? |
Minimize changes and fix commit logic.
Merged rominparekh#2 |
@prodeezy, looks like there are two main causes for test failures, from the ones I looked at. First, https://github.com/apache/incubator-iceberg/pull/695/files#diff-62b86887b9d53434d54feab812bef0abR122 is wrong. It should be this instead: ValidationException.check(
isCurrentAncestor(current, snapshotId),
"Cannot roll back to snapshot, not an ancestor of the current state: %s", snapshotId); The second problem is that this will now automatically fast-forward when possible instead of creating new cherry-pick snapshots. That means we have fewer than expected snapshots in some tests. Can you update the tests? |
@rdblue : I have updated the WAP workflows failing tests but there are two existing tests which seem to fail with the recent change on fast-forwarding snapshots. org.apache.iceberg.spark.source.TestIcebergSourceHadoopTables > testHistoryTable FAILED
org.apache.iceberg.exceptions.CommitFailedException at TestIcebergSourceHadoopTables.java:246
org.apache.iceberg.spark.source.TestIcebergSourceHadoopTables > testSnapshotsTable FAILED
org.apache.iceberg.exceptions.CommitFailedException at TestIcebergSourceHadoopTables.java:318 cc: @prodeezy |
Thanks @rominparekh! I'll take a look at those tests. I really appreciate you fixing the others! |
@rominparekh, @prodeezy, I was able to find out why those tests were failing. I opened a PR against your branch. Can you merge and update this PR? You may also want to review #774, which was one of the reasons why this happened. |
thanks a lot @rdblue for helping out! I will take a look today. |
Fix refresh logic in SnapshotProducer subclasses.
@rdblue : Thanks for picking it up. I believe we are in good place to merge this in? \cc : @aokolnychyi , @prodeezy |
I think we are ready. I'll let tests finish first. |
Merged! Thanks for all your work on this, @prodeezy & @rominparekh! |
@trivial: add SOURCE_SNAPSHOT_ID_PROP from Iceberg's PR apache#695
This is a working version of the cherry-pick feature to address Issue #629. This initial version only picks appends from the audit snapshot and creates an output snapshot which is published to the table.
While publishing, we add the
published.wap.id
to the committed snapshot summary. If another snapshot with the samewap.id
is Cherrypicked on again after this, we reject that.Added tests for WAP workflow to illustrate usage and resilience scenarios. Also added spark DS options to enable wap and set the wap id.
Example usage:
Co-authored-by: @rominparekh