New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-1819] Key should be available in @OnTimer methods #11154
Conversation
# Conflicts: # runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
# Conflicts: # sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java # sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
retest this please |
retest this please |
R: @mxm |
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome @rehmanmuradali. The changes look good to me. I'd just change the signature of fireTimerInternal
now that we have adapted the tests.
@@ -837,11 +837,15 @@ public void onProcessingTime(InternalTimer<ByteBuffer, TimerData> timer) { | |||
|
|||
// allow overriding this in ExecutableStageDoFnOperator to set the key context | |||
protected void fireTimerInternal(Object key, TimerData timerData) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change this to ByteBuffer
now? Same in ExecutableStageDoFnOperator
where it is overridden.
@mxm done |
retest this please |
run dataflow validatesrunner |
run flink validatesrunner |
retest this please |
run dataflow validatesrunner |
run flink validatesrunner |
1 similar comment
run flink validatesrunner |
Run Java PreCommit |
run flink validatesrunner |
Run Java PreCommit |
@mxm do you see any obvious reason why the SDF test is failing on Flink, with just one element missing in the output? |
@reuvenlax The test became flaky recently with the latest SDF changes. It's a Flink Runner bug fixed here: #11533 |
Run Java PreCommit |
@mxm so unrelated to this PR then |
Yes. The tests should be passing now. |
Run Flink ValidatesRunner |
Good job @rehmanmuradali! Also thank you @reuvenlax for mentoring the PR! |
@@ -560,7 +560,6 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) { | |||
throw new IllegalStateException( | |||
String.format("Unknown URN %s", pTransform.getSpec().getUrn())); | |||
} | |||
this.onTimerContext = new OnTimerContext(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intent of these context objects was to not create a new one when processing each element/timer and instead to reference a member variable as can be seen in:
beam/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
Line 1468 in 591de34
return pipelineOptions; |
beam/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
Line 1506 in 591de34
return currentElement.getValue(); |
It would have made more sense to have OnTimerContext just return currentTimer.getUserKey()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Let's fix these in a follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And document the intent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 worth to create a JIRA to track it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mxm I am currently working on BEAM-1589, BEAM-8543. I will get back to it after that. The Jira is filed here: https://issues.apache.org/jira/browse/BEAM-9839
Starting with this PR merge the Spark ValidatesRunner tests are broken can you please take a look @rehmanmuradali This can be reproduced locally by running CC @ibzib who reported the issue first https://issues.apache.org/jira/browse/BEAM-9836 |
On it |
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.