New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Incremental processing implementation #315
Conversation
730b85f
to
dcc4020
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One blocking comment: can we rename startSnapshotId to exclusiveStartSnapshotId to be explicit?
* @return a table scan which can read incremental data from {@param fromSnapshotId} | ||
* exclusive and up to {@toSnapshotId} inclusive | ||
*/ | ||
TableScan newIncrementalScan(long fromSnapshotId, long toSnapshotId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: What about when the user reads incremental scan for the first time [bootstrap] what should the fromSnapshotId
be , or even toSnapshotId
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For new tables, from will be the first snapshot ID. For existing tables, where the oldest snapshot has already expired, users will need to choose a starting snapshot. Usually, that would be after running a batch process to handle existing table data before the incremental process starts. So it would be the snapshot on which the batch process ran.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is essential that we can start streaming from a large table and have multiple batches for this. @rdblue, can you elaborate on the batch process you mentioned?
I've also summarized my thoughts on requirements for Structured Streaming sources in this comment. Let me know if that makes sense to you, @rdsr @rdblue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Splitting a snapshot into multiple batches will require some thinking. Maybe, this can be done by streaming sources. For example, our implementation of Offset
in Spark can store additional information. Storing consumed files in offset JSON files isn't an option but maybe we can store a pointer to a list of consumed manifests or something (which can be an Avro or a Parquet file, for example).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds reasonable to me, but I think it should also be possible to start consuming from a specific snapshot, since processing the rest of the table seems needlessly expensive in a lot of cases. We move jobs from batch to streaming and don't want to rewrite the history.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely agree, consuming from a given snapshot covers most of use cases. At the same time, there are scenarios when Structured Streaming in Spark is used, for example, beyond streaming. I just want to capture those use cases as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When using incremental for the first time [bootstrap] I think fromSnapshotId
should be null since the contract is an incremental scan gives all the appends between fromSnapshotId
exclusive to toSnapshotId
. When fromSnapshotId
is null we will return all the appends till toSnapshotId
. In this way a user will not miss any data.
thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using Table#newScan
for the first time, and then call TableScan#snapshot
to know till what snapshot the first time read?
} | ||
|
||
protected CloseableIterable<FileScanTask> planFiles(TableOperations ops, Snapshot snapshot, | ||
Expression rowFilter, boolean caseSensitive, boolean colStats, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: formatting of the params
Will it be possible to use this API for #179? |
Hi @aokolnychyi , I haven't looked too closely into #179. I'll read that PR and get back to you |
@rdblue, any comments on this updated rb? |
@rdsr, sorry for the delay! I didn't notice that this was updated or see your request. Sorry! |
@rdsr, thanks for working on this! I really like how clean this version is. Definitely improving quickly! |
This has been lagging for a while since I didn't get time to address the comments. I plan to take this up again in the coming weeks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @rdsr. We have a similar use case, so LMK if you need help testing this.
Nice work @rdsr , are you also thinking of an API that's exposed under |
core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
Outdated
Show resolved
Hide resolved
table().snapshot(newFromSnapshotId) != null, "fromSnapshotId: %s does not exist", newFromSnapshotId); | ||
Preconditions.checkArgument( | ||
table().snapshot(newToSnapshotId) != null, "toSnapshotId: %s does not exist", newToSnapshotId); | ||
return new IncrementalDataTableScan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this ensure that newFromSnapshot
is an ancestor of newToSnapshot
?
This probably doesn't need to be done in this commit, but it would be a good follow-up to ensure that the range exists.
Since this is a refinement, it may also be a good idea to make this a subset of the existing selected range. That is, both newFromSnapshotId
and newToSnapshotId
must be in the existing range of fromSnapshotId
to toSnapshotId
.
Putting it another way, when I create a scan using appendsBetween(A, C).appendsBetween(B, D)
, what should the behavior be? I'd say that is concerning because D is outside the original range. Probably a good idea to fail instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thankyou, I'll file ticket for followup!
manifestEntry.status() == ManifestEntry.Status.ADDED; | ||
|
||
return planFiles( | ||
tableOps(), snapshot, filter(), isCaseSensitive(), colStats(), matchingManifests, matchingManifestEntries); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is correct.
As a follow-up, we can change the logic slightly to only require one ManifestGroup
. As long as we read each manifest that was added in a selected snapshot only once and only select ADDED files with the right snapshot ID, we can do the planning in a single run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this makes sense. I'll def. tackle this in a followup. Thks!
@rdsr, only minor things left! Thanks for all your work on this, it's really coming together nicely. |
One of the commits in the PR was not cleanly applying. I had to squash all commits to a single commit and fix diff issues. This has resulted in a single commit for the PR. Sorry about that as it may get hard to review the new changes |
1352de4
to
f8cf81c
Compare
String specString = PartitionSpecParser.toJson(spec); | ||
ResidualEvaluator residuals = ResidualEvaluator.of(spec, dataFilter, caseSensitive); | ||
return CloseableIterable.transform(entries, e -> new BaseFileScanTask( | ||
e.copy().file(), schemaString, specString, residuals)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later, we will probably want to detect whether any stats column was projected and use file.copy()
or file.copyWithoutStats()
depending on the projection.
@@ -42,6 +42,16 @@ private SnapshotUtil() { | |||
return ancestorIds(table.currentSnapshot(), table::snapshot); | |||
} | |||
|
|||
/** | |||
* @return List of snapshot ids in the range - (fromSnapshotId, toSnapshotId] | |||
* This method assumes that fromSnapshotId is an ancestor of toSnapshotId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any place that checks whether the from snapshot is an ancestor of the to snapshot. That seems like a requirement for this to work correctly to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It will be in the follow-up
@rdsr, I'm merging this. Thanks for all your work on it! I think we do need to follow up pretty quickly with better validations for snapshot IDs passed to appendsBetween and appendsAfter. Right now, I don't think there is anything that validates that there is a range of snapshots from the start to the end -- it looks like this would use the whole ancestry of the "to" snapshot if "from" isn't an ancestor. Let's clean that up with validations, but for now I've committed this since it quite a large patch. |
Fixes: apache#765: This addresses the following follow-ups from apache#315 1. Have validations on snapshot id range 2. Improve tests for the same
WIP branch to feedback on the overall approach.
The basic idea is if the user is asking for incremental scan between s1 and s2 . We only scan manifests which belong in the range (s1, s2] - [excluding s1 and including s2] and in those manifests only look at manifest entries which belong in the range (s1, s2]