New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-10760] Optimize state cleanup for global window in portable Flink runner #12733
Conversation
This change appears to work as expected, based on e2e test. I have not worked on the unit tests yet (couple will fail). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tweise. Looks good to me. Let's add a test to ExecutableStageDoFnOperatorTest
, then this looks ready to be merged.
...rg/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
Outdated
Show resolved
Hide resolved
5659941
to
6e169f1
Compare
…nk runner Cleanup timers per key lead to potentially unbounded state/checkpoint size growth with a global window in streaming mode. Instead of timers, we can use the final watermark as the barrier to perform state cleanup.
085f8c5
to
a6f4a60
Compare
@mxm PTAL |
Unrelated error in
|
👍 |
Run Portable_Python PreCommit |
This PR fixes the issue for portable runner only, am I right? Can we do the same for non-portable runner? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point @je-ik. I've just drafted a general solution which I will follow up with shortly.
// Check if the final watermark was triggered to perform state cleanup for global window | ||
if (potentialOutputWatermark > BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis() | ||
&& currentOutputWatermark <= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { | ||
cleanupGlobalWindowState(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic should be moved to DoFnOperator#emitWatermark
. It can then also be simplified.
Followed up on this via #12759. |
Cleanup timers per key lead to potentially unbounded state/checkpoint size growth with a global window in streaming mode.
Instead of timers, we can use the final watermark as the barrier to perform state cleanup.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.