New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Aggregators directly in the DirectRunner #735
Conversation
208c79b
to
3cdaef3
Compare
StepContext stepContext, | ||
CounterSet.AddCounterMutator addCounterMutator, | ||
WindowingStrategy<?, ?> windowingStrategy) { | ||
return simpleRunner(options, fn, sideInputReader, outputManager, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Format consistency?
Done. PTAL. |
@@ -82,7 +86,8 @@ public String getName() { | |||
} | |||
} | |||
|
|||
private final HashMap<String, AggregatorInfo<?, ?, ?>> accumulators = new HashMap<>(); | |||
private final ConcurrentHashMap<String, AggregatorInfo<?, ?, ?>> accumulators = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just ConcurrentMap
Otherwise LGTM |
Will wait for tests to turn green and then I'd like to submit. @kennknowles any comments? |
ec43e7b
to
d6cd936
Compare
LGTM when green. |
d6cd936
to
1e85f20
Compare
return new CounterAggregator<>(generateInternalAggregatorName(name), | ||
combiner, addCounterMutator); | ||
checkNotNull(combiner, "Combiner passed to createAggregator cannot be null"); | ||
return aggregatorFactory.createAggregator(name, combiner); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be aggregatorFactory.createAggregator(generateInternalAggregatorName(name), combiner);
to ensure we associate each aggregator with the current step context.
Previously, we relied on conversion to Counter rather than just running the specified CombineFn. This aligns the direct runner more closely with the model. This PR also parameterizes DoFnRunner on an AggregatorFactory to implement aggregators, allowing each runner to provide the appropriate implementation.
fdb7d71
to
4199b11
Compare
Updated to actually make sure that aggregators are properly scoped per-step. PTAL. |
checkNotNull(combiner, "Combiner passed to createAggregator cannot be null"); | ||
return aggregatorFactory.createAggregator(name, combiner); | ||
checkNotNull(combiner, "Combiner passed to createAggregatorForDoFn cannot be null"); | ||
boolean isSystemDoFn = fn.getClass().isAnnotationPresent(SystemDoFnInternal.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Running |
* chore: Update gapic-generator-python to v1.11.4 PiperOrigin-RevId: 547897126 Source-Link: googleapis/googleapis@c09c75e Source-Link: https://github.com/googleapis/googleapis-gen/commit/45e0ec4343517cd0aa66b5ca64232a1802c2f945 Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiNDVlMGVjNDM0MzUxN2NkMGFhNjZiNWNhNjQyMzJhMTgwMmMyZjk0NSJ9 * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.