-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-16383][task] Do not relay notifyCheckpointComplete to closed operators #12186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 26b98ea (Fri Oct 16 10:50:44 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
|
|
||
| for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) { | ||
| operatorWrapper.getStreamOperator().notifyCheckpointComplete(checkpointId); | ||
| if (!operatorWrapper.isClosed()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we shouldn't throw some exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on what we are trying to achieve. Assume that we have a chain source -> map -> sink and source and map have been closed already. Wouldn't it be nice to still commit stuff in sink?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If one operator is closed in the chain, doesn't it mean all of them are closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not with the wrapper. It closes them one after another from head to tail with mails. That's why the notify can sneak in in the first place.
Only when one operator is closed completed, the next operator in line is closed.
...src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
Show resolved
Hide resolved
...src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
Show resolved
Hide resolved
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
Show resolved
Hide resolved
pnowojski
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % test failure?
|
@flinkbot run azure |
…perators. Through StreamOperatorWrapper an operator may already be closed while the StreamTask is still running. Notification might be relayed in that time from the task to the closed operator causing issues on operators reacting on completed checkpoints, such as two phase commit sinks. This commit adds the information of the closing to the wrapper and avoids relaying notifications to closed operators. Also fixes a potential related issue in SubtaskCheckpointCoordinatorImpl#takeSnapshotSync.
What is the purpose of the change
Through StreamOperatorWrapper an operator may already be closed while the StreamTask is still running. Notification might be relayed in that time from the task to the closed operator causing issues on operators reacting on completed checkpoints, such as two phase commit sinks.
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation