-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-22] Clean up InProcess Read Evaluators #106
Conversation
R: @bjchambers |
UnboundedReadEvaluator<OutputT> evaluator = | ||
new UnboundedReadEvaluator<OutputT>(transform, evaluationContext, evaluatorQueue); | ||
new UnboundedReadEvaluator<OutputT>( | ||
transform, evaluationContext, evaluatorQueue, source); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Argument order should be the same between these methods (eg., transform, source, context, ...).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
LGTM. Will wait for green CI before commit. |
b6ba797
to
71e95b7
Compare
@@ -117,12 +117,19 @@ | |||
private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform; | |||
private final InProcessEvaluationContext evaluationContext; | |||
private boolean contentsRemaining; | |||
/** | |||
* The source being read from by this {@link BoundedReadEvaluator}. This may not be the same | |||
* source as returned by {@link Bounded#getSource()} within {@link #transform} due to splitting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe shorten:
The source being read from by this {@link BoundedReadEvaluator}.
This may differ from the initial source derived from the {@link #tranfsorm} due to splitting.
(Documenting the path by which it is derived seems brittle to future changes)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
This permits use of sources that are not the initial source used in the transform. BoundedSource#splitIntoBundles and UnboundedSource#generateInitialSplits generate multiple source objects for the same transform in order to permit parallelism.
Use BoundedReader instead of Reader. contentsRemaining should be method-scoped not instance-scoped.
71e95b7
to
a8f862d
Compare
LISAMZA-31528: Add top level module for improving Beam auto ELR stability and productivity
* refactor: drop six package use * fix conflicts * fix conflicts * fix conflicts * fix conflicts
These are a couple of minor improvements to BoundedReadEvaluator
and UnboundedReadEvaluator that enables splitting a source at
evaluation time, as well as minor code cleanup.