-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-8872] Support split at fraction for OffsetRangeTracker #11418
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
lukecwik
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at the Python implementation as well and I think we can improve it so that trySplit returns a split even if nothing has been claimed and so that we can get rid of checkpointed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can split before any successfully claimed block by returning [from, to) and updating the current range to be [from, from)
This makes sense in some cases where we want to handoff all the work to someone else for the active element while this bundle finishes other processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allowing split before first claiming makes sense to me. Python has already allowed that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need checkpointed?
Shouldn't the range restriction change so that to becomes lastClaimed (or from if nothing has been claimed)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just return early since we know there is no more split after checkpointing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing the state makes the code more complicated though since the bounds checking varies based upon whether you got 0.0 or 0.00001 which is likely to produce the same final state with one having an additional boolean being set.
|
Run Java PreCommit |
|
Run RAT PreCommit |
| OffsetRange checkpoint = tracker.trySplit(0).getResidual(); | ||
| SplitResult res = tracker.trySplit(0); | ||
| assertEquals(new OffsetRange(100, 100), res.getPrimary()); | ||
| assertEquals(new OffsetRange(100, 200), res.getResidual()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
primary and residual shouldn't have the same value, primary should be an empty range like [100, 100)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this test case, the expected primary is [100, 100) and the expected residual is [100, 200)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that was my mistake, I read both lines as being [100, 200)
| // residual is null when the entire restriction has been processed. | ||
| if (processContext.numClaimedBlocks > 0) { | ||
| residual = checkNotNull(processContext.takeCheckpointNow()); | ||
| residual = processContext.takeCheckpointNow(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
takeCheckpointNow should work regardless whether numClaimedBlocks > 0 or not.
Even if tryClaim never happens, the watermark may advance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the original assumption is, checkpoint should happen after at least one tryClaim called. Since we change the assumption, the numClaimedBlocks can also be removed.
| residual = checkNotNull(processContext.takeCheckpointNow()); | ||
| residual = processContext.takeCheckpointNow(); | ||
| processContext.tracker.checkDone(); | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comments below will likely need updating
| assertEquals(new OffsetRange(100, 161), tracker.currentRestriction()); | ||
| assertEquals(new OffsetRange(161, 200), checkpoint); | ||
| assertNull(tracker.trySplit(0)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } | |
| tracker.checkDone(); | |
| } |
| import static org.junit.Assert.assertNull; | ||
| import static org.junit.Assert.assertTrue; | ||
|
|
||
| import org.apache.beam.sdk.io.range.OffsetRange; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add tests to verify tryClaim(0), tryClaim(0.1), tryClaim(1) on an empty range like [100, 100)
Can we also add tests to verify the behavior of tryClaim(1) on range [100, 200)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trySplit right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Your right. Need more sleep.
| } | ||
| this.residualRestriction = residualRestriction; | ||
| this.futureOutputWatermark = futureOutputWatermark; | ||
| this.futureWatermarkEstimatorState = futureWatermarkEstimatorState; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the comment below could be incorrect. If we get stop(), we shouldn't have a residual restriction.
|
Run Java PreCommit |
|
It seems this PR broke the ValidatesRunner tests for most runners. Can you guys PTAL. For ref. |
|
Running post commits here would have caught this (especially considering this is a new feature). The Flink ValidatesRunner test suite takes less than 10 minutes to complete. Perhaps we should move it to pre commit to be sure to catch these. |
|
Fix in #11475 |
1 similar comment
|
Fix in #11475 |
…11418) * [BEAM-8872] Support split at fraction for OffsetRangeTracker * Address comments * Update checkDone
Computation logic is copied from python OffsetRestrictionTracker.try_split()
r: @lukecwik
cc: @youngoli
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.