New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-25941][streaming] Only emit committables with Long.MAX_VALUE as checkpoint id in batch mode #18784
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 5ad29aa (Tue Feb 15 15:44:08 UTC 2022) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your contribution, it looks good to me. One small nit is that the boolean value could have a better name than just 'batch'
...link/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
Outdated
Show resolved
Hide resolved
After some offline discussion I discovered that the current solution does not suffice because if users are not enabling checkpointing and run in streaming mode there is currently no way that a commit is triggered. |
@alpreu can you take another look I updated the PR with solution that triggers a full commit for a pipeline in streaming mode if checkpointing is not enabled. |
…s checkpoint id in batch mode Before this commit the SinkWriter and Committer operators emitted committables on endInput. This was troublesome because by doing so the checkpointId was set to effectively Long.MAX_VALUE because the emission was not part of any checkpoint. With the completion of FLIP-143 all jobs in streaming mode have a final checkpoint when they transition to finish so we can rely on the normal checkpoint mechanism and only need endInput for the batch execution.
What is the purpose of the change
Before this commit the SinkWriter and Committer operators emitted
committables on endInput. This was troublesome because by doing so the
checkpointId was set to effectively null/Long.MAX_VALUE because
the emission was not part of any checkpoint. With the completion of
FLIP-143 all jobs in streaming mode have a final checkpoint when they
transition to finish so we can rely on the normal checkpoint mechanism
and only need endInput for the batch execution.
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation