-
Notifications
You must be signed in to change notification settings - Fork 13.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-22367][streaming] Reset syncSavepointId only if it is equal to… #17055
Conversation
… checkpoint id from event
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 84e6a95 (Mon Aug 30 15:18:28 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
// when: Receiving the abort notification of the previous checkpoint before the complete | ||
// notification of the savepoint terminate. | ||
harness.streamTask.notifyCheckpointAbortAsync(1); | ||
harness.streamTask.notifyCheckpointCompleteAsync(2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can this happen? You cannot complete a checkpoint that has not been triggered yet. The order of notifyCheckpointCompleteAsync
and triggerCheckpointOnBarrier
is wrong here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took this order from the test terminateWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished
. Usually, it indeed happens in normal order, but sometimes I see that the abort happens before the complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant the order of notifyCheckpointCompleteAsync
and triggerCheckpointOnBarrier
.
The barrier must arrive first before we can say the checkpoint completed. It's not possible to receive a notifyCheckpointCompleteAsync
before we've seen a barrier for the checkpoint.
In your testcase you're first sending the notifyCheckpointCompleteAsync(2)
and after that you're sending a barrier: triggerCheckpointOnBarrier(...(2)...)
For the complete vs abort, you're right, I can see that happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
notifyCheckpointCompleteAsync
and notifyCheckpointAbortAsync
actually do nothing but add the task to mailbox. When triggerCheckpointOnBarrier
is called it first of all does its own job and only then it directly execute all waiting mails until the checkpoint is complete. So in fact the triggerCheckpointOnBarrier
is always first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still this is based on the internal knowledge of the component. It is still true that those methods can not be called in that order in the production code. I find the current order both confusing and prone to changes in the implementation. I find it also distracting from the real scenario that's being tested here.
I might be wrong, but I truly believe I am not the only one that would be confused by this order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I understand correctly, that you are talking about order in the test? Do you mean that we should explicitly call triggerCheckpointOnBarrier
first? If so please, check the my last commit I a little rewrote the test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I am talking about the order in the test.
Why wasn't it necessary in the previous version to call the triggerCheckpointOnBarrier
from within the executor?
Do we need to call the notifyCheckpointCompleteAsync
and notifyCheckpointAbortAsync` from the executor? I think those methods are called via RPC in the production code and are not executed from the executor thread...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I checked it myself. If we execute it from the test thread it gets stuck in the synchronous savepoint loop.
I think executing triggerCheckpointOnBarrier
from the MailboxExecutor
is fine here. I would not do it for the *Async
methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I have removed the mailbox Executor for async calls. Initially, I added it that only for keeping the one style but it indeed didn't have any real function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your investigation is valid. Have you checked if it is a problem for earlier versions that we support? E.g. 1.12.x?
AFAICT it should not be a problem for master. It should've been fixed there. One change I'd do is to reset it for any newer checkpoint than that of the savepoint. Just to stay on the safe side. WDYT?
It should not happen, as there should be no checkpoints triggered from the checkpoint coordinator after a sync savepoint (it should either stop the job or trigger a failover).
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Dawid Wysakowicz <dwysakowicz@apache.org>
It is also valid for at least release-1.12 - #17104 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, let's wait for Azure
… checkpoint id from event This closes #17055
… checkpoint id from event
What is the purpose of the change
PR fixes the bug with resetting syncSavepointId at the wrong time which led to problem with terminateWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation