-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-11481] emit output watermark on watermark hold change #13571
Conversation
Can you please add a new DoFnOperatorTest for this? |
try { | ||
processInputWatermark(false); | ||
} catch (Exception ex) { | ||
// should not happen |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please add this as a message for IllegalStateException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't IllegalStateException
meant to be used for an illegal state, which is a state that should not happen? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just love seeing "This should never happen" messages in logs when debugging ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes total sense, we want to progress output WM as fast as possible 👍 In case of splittable DoFn, does input watermark progress at all?
My best guess is that impulse progresses input WM from MIN to MAX and we let watermark hold to handle the progression... in that case, SDF probably never worked properly on flink runner 🤔 |
...src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
Outdated
Show resolved
Hide resolved
if (newWatermarkHold > currentWatermarkHold) { | ||
try { | ||
processInputWatermark(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic should be removed. All logic which deals with watermark emission should be handled through processWatermark
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree - handling watermark emission in processWatermark
only is what causes the issues.
@@ -739,9 +741,12 @@ public final void processWatermark1(Watermark mark) throws Exception { | |||
} | |||
|
|||
currentInputWatermark = mark.getTimestamp(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need the following to generalize watermark emission to be able to call this method from other places.
if (mark.getTimestamp > currentInputWatermark) {
currentInputWatermark = mark.getTimestamp();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Input watermark is not what is concerned by this PR. That logic did not change, we only need to be able to progress output watermark when watermark hold changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You would need this change in order to go through the regular watermark emission code without changing the latest seen input watermark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to generalize the existing code, instead of adding code paths for every exception which is bound to be error-prone.
|
||
private void processInputWatermark(boolean advanceInputWatermark) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove if we don't want to scatter the logic about watermark advancement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that is what we must do. We could call processWatermark
from processElement
, but the call does stuff not necessary to do in processElement
. That's why I simply wrapped what is needed to be done in processInputWatermark
(maybe we could find a better name to make it clearer).
doFnRunner.processElement(streamRecord.getValue()); | ||
checkInvokeFinishBundleByCount(); | ||
emitWatermarkIfHoldChanged(oldHold); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this required on every element? I'd rather trigger this only if we set / remove a hold.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very cheap call and actually tests exactly if hold was set (actually reset).
Run Flink ValidatesRunner |
Run Java Flink PortableValidatesRunner Streaming |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the test, LGTM 👍 I don't have any strong opinions about Max's concerns.
Maybe we could just call processWatermark1(currentInputWatermark) after each element / timer to make processWatermark1
the single code path for watermark advancement / emission, but not sure if that's doable for performance reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd appreciate if we could use processWatermark to contain all watermark logic including emission of new output watermarks if watermark holds change. This avoids scattering the logic for watermarks. Also, I'm not sure it works correctly with the portability code otherwise.
In processWatermark, instead of advancing the input watermark immediately, the code has to first calculate the output watermark based on the latest min watermark hold. Then emit a new output watermark before advancing the input watermark which may result in another output watermark emission:
def processWatermark(watermark):
newOutputWatermark = calculateOutputWatermark()
maybeEmitOutputWatermark(newOutputWatermark)
if watermark > currentInputWatermark:
currentInputWatermark = watermark
timeServiceManage.advanceWatermark(watermark)
processWatermark(watermark)
We can call this on every element, the call is cheap.
@mxm I refactored the code a little, please have a look now. |
Run Flink ValidatesRunner |
Run Java Flink PortableValidatesRunner Streaming |
Run Python_PVR_Flink PreCommit |
Run Java PreCommit |
Feel free to proceed however you like. No need to block this on me. After all, it's a good fix! |
Run Java PreCommit |
Fixes [BEAM-11481].
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.