New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-10940] Lengthprefix any input coder for an ProcessBundleDescriptor. #13120
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
if (bagUserStateSpecs.size() > 0 || timerSpecs.size() > 0) { | ||
lengthPrefixKeyCoder(stage.getInputPCollection().getId(), components); | ||
} | ||
lengthPrefixAnyInputCoder(stage.getInputPCollection().getId(), components); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth mentioning this will break any existing non-stateful pipelines. We could add another exception here for SDFs, but it's probably better to have a consistent encoding which does not depend on DoFns using state or timers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not expect it will break non-stateful pipelines since non-stateful transform doesn't look into this coder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does, e.g. for buffering data during checkpoints. However, there are usually also other changes in Beam which makes restoring state from previous Beam versions hard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, I would image that the buffering data should be encoded/decoded with grpc input coder, which has been lengthperfixed already. Would you like to elaborate more on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or do we have some test cases that we can verify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, the length prefixing here only applies to the coder in the ProcessBundleDescriptor. Indeed, any buffering won't be affected by this because the elements have already been decoded when they are buffered.
For the test case, FlinkSavepointITCase should verify that.
private static void lengthPrefixKeyCoder( | ||
String inputColId, Components.Builder componentsBuilder) { | ||
RunnerApi.PCollection pcollection = componentsBuilder.getPcollectionsOrThrow(inputColId); | ||
RunnerApi.Coder kvCoder = componentsBuilder.getCodersOrThrow(pcollection.getCoderId()); | ||
Preconditions.checkState( | ||
ModelCoders.KV_CODER_URN.equals(kvCoder.getSpec().getUrn()), | ||
"Stateful executable stages must use a KV coder, but is: %s", | ||
kvCoder.getSpec().getUrn()); | ||
String keyCoderId = ModelCoders.getKvCoderComponents(kvCoder).keyCoderId(); | ||
// Retain the original coder, but wrap in LengthPrefixCoder | ||
String newKeyCoderId = | ||
LengthPrefixUnknownCoders.addLengthPrefixedCoder(keyCoderId, componentsBuilder, false); | ||
// Replace old key coder with LengthPrefixCoder<old_key_coder> | ||
kvCoder = kvCoder.toBuilder().setComponentCoderIds(0, newKeyCoderId).build(); | ||
componentsBuilder.putCoders(pcollection.getCoderId(), kvCoder); | ||
private static void lengthPrefixAnyInputCoder( | ||
String inputPCollectionId, Components.Builder componentsBuilder) { | ||
RunnerApi.PCollection pcollection = | ||
componentsBuilder.getPcollectionsOrThrow(inputPCollectionId); | ||
String newInputCoderId = | ||
LengthPrefixUnknownCoders.addLengthPrefixedCoder( | ||
pcollection.getCoderId(), componentsBuilder, false); | ||
componentsBuilder.putPcollections( | ||
inputPCollectionId, pcollection.toBuilder().setCoderId(newInputCoderId).build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering, why is length-prefixing the key coder not necessary anymore? Wouldn't the SDK Harness be able to extract a non-length-prefixed key coder even though the input coder has been legth-prefixed? This would then cause a regression like in #9997 if the SDK Harness didn't use the NESTED contex, which it currently does because we had fix this a while ago:
key=self._key_coder.encode_nested(key))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this will still work because if we have a KV Coder we will always end up length-prefixing the key because the KV coder is a known coder and it will recurse into the key afterwards.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's right. This logic is also verified by the unit test.
.../src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java
Outdated
Show resolved
Hide resolved
Run Python_PVR_Flink PreCommit |
Thanks for your help! I'll merge this PR once all tests pass. |
…put coder for an ProcessBundleDescriptor. [BEAM-10940] Lengthprefix any input coder for an ProcessBundleDescriptor.
We need to length-prefix the coder of the input pcollection for one executable stage especially when there is an SDF or a stateful DoFn, where:
In SDF, the input coder is for encoding DelayedBundleApplication
In one stateful DoFn, the input coder is for encoding the key for state request.
r: @lukecwik @mxm
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.