-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java) #12678
Conversation
R: @boyuanzz |
Please advice if there is a better way than simply matching the DoFn with GroupIntoBatchesFn. This should not be merged until the backend can recognize the new property. But would like to send it out anyway to collect early feedback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, Siyuan! Overall it looks good, just left some comments.
@@ -1264,6 +1268,10 @@ private static void translateFn( | |||
// in streaming but does not work in batch | |||
if (context.getPipelineOptions().isStreaming() && isStateful) { | |||
stepContext.addInput(PropertyNames.USES_KEYED_STATE, "true"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is ALLOWS_SHARDABLE_STATE
supported by both streaming engine and windmill appliance? If it's only for streaming engine, is it safe to populate this field for both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a first step, it's only supported by streaming engine. But it's fine to set it in both cases since the field will be ignored in appliance. We will need to wait for the backend to recognize this property anyway, otherwise the pipeline will check fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update on this:
I added an experiment to gate the auto-sharding so this can be merged without waiting for the backend. It will also make the testing easier.
I also added a check for the experiment, "beam_fn_api". My intention was to disable the feature for unified worker but I guess this way we would disable auto-sharding for both unified worker and java worker using fn api - I remember that we are not going to support the latter so it seems fine to me. But let me know if my understanding is incorrect.
@@ -100,8 +99,7 @@ public long getBatchSize() { | |||
ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, allowedLateness, keyCoder, valueCoder))); | |||
} | |||
|
|||
@VisibleForTesting | |||
static class GroupIntoBatchesDoFn<K, InputT> | |||
public static class GroupIntoBatchesDoFn<K, InputT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to make GroupInToBatchesDoFn
as public
. Instead of adding property in translateFn
, we can do it at transform level, where you can check transform instanceof GroupIntoBatches
, for example: https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java#L752
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I did try matching the transform by transform instanceof GroupIntoBatches
but got error:
Inconvertible types; cannot cast 'org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory.ParDoSingle<InputT,OutputT>' to 'org.apache.beam.sdk.transforms.GroupIntoBatches'
Any ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tried registerTransformTranslator
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume that we would still need to access to GroupIntoBatchesDoFn
if we register a separate translator, to fill USER_FN for example: https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java#L1241.
Can the same transform go though multiple translators? Is that what you were suggesting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about we could check GroupIntoBatches
directly but it turns out that the translation happens after transform expansions, where we only can get ParDo(GroupIntoBatchesDoFn)
. One thing we can do to avoid exposing GroupIntoBatchesDoFn
is to make DoFnSignature
understand that this is a shardable stateful DoFn
, like what we do for RESTRICTION_ENCODING
. @lukecwik Do you have any suggestion here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There have been a few ways this has been done in the past:
- (easiest), record which objects need this property within the DataflowRunner and then lookup this information during translation (e.g. doesPCollectionRequireIndexedFormat)
- replace the public GroupIntoBatches transform with a Dataflow specific
primitive
that makes any additional information visible that is needed during translation (e.g. DataflowRunner.CombineGroupedValues)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I applied the first solution recommended above and updated the PR accordingly - I added an override for streaming GroupIntoBatches which records the input PCollection, and also a transform matcher to pass the override validation. PTAL.
Run Java PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Is this ready to be merged? |
a181d4d
to
d05e7f6
Compare
To enable optimization for the transform
GroupIntoBatches
, add a new step property when translating stateful DoFns to inform the backend that the special transform allows shardable states.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.