New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-5605] Add support for channel splitting to the gRPC read "source" and propagate "split" calls to the downstream receiver #10501
Conversation
…e" and propagate "split" calls to the downstream receiver This code mirrors the logic/implementation within https://github.com/apache/beam/blob/16757ef9a6da4d0ac218c6c4d6b19e2a49ccca45/sdks/python/apache_beam/runners/worker/bundle_processor.py#L206 To be able to propagate the split call to the downstream receiver, I collapsed all the harness FnDataReceiver types into two existing implementations and one new implementations. The previous hierarchy was: element counting receiver -> time counting receiver -> multiplexing receiver (possibly the original receiver) The current implementation combined the element counting, time counting and multiplexing into the MultiplexingMetricTrackingFnDataReceiver while for the singleton case into the MetricTrackingFnDataReceiver. To propagate splits, a SplittingMetricTrackingFnDataReceiver was created that extends the MetricTrackingFnDataReceiver. Note, like in Python, there is currently no support for splitting as in https://github.com/apache/beam/blob/c167d8ef99b21148bcab7c37538a6ef2f64864c7/sdks/python/apache_beam/runners/worker/operations.py#L133
d0eb6dd
to
d541573
Compare
Run Java PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, looks good to me.
@@ -131,6 +137,11 @@ | |||
private final BeamFnDataClient beamFnDataClient; | |||
private final Coder<WindowedValue<OutputT>> coder; | |||
|
|||
private final Object splittingLock = new Object(); | |||
// 0-based count of the number of elements |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0 based index of the current element being processed(?).
private final Object splittingLock = new Object(); | ||
// 0-based count of the number of elements | ||
private long index = -1; | ||
// 0-based count of the number of elements |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0-based index of the first element to not process. (Or is this the last element to process?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
first element to not process.
totalBufferSize = stopIndex; | ||
} | ||
|
||
// In the case where we have yet to process an element, set the current element progress to 1. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be an else clause below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its logically the same where the else clause is the default based upon what we initialize.
|
||
// Compute the amount of "remaining" work that we know of. | ||
double remainder = totalBufferSize - index - currentElementProgress; | ||
// Compute the fraction of work that we should "keep". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compute the number of elements that we should "keep."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Run Java PreCommit |
1 similar comment
Run Java PreCommit |
This code mirrors the logic/implementation within Python.
To be able to propagate the split call to the downstream receiver, I collapsed all the harness
FnDataReceiver
types into two existing implementations and one new implementation.The previous hierarchy was:
The current implementation combined the element counting, time counting and multiplexing into the
MultiplexingMetricTrackingFnDataReceiver
while for the singleton case into theMetricTrackingFnDataReceiver
.To propagate splits, a
SplittingMetricTrackingFnDataReceiver
was created that extends the MetricTrackingFnDataReceiver. Note, like in Python, there is currently no support for splitting a multiplexed consumer.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.