-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-25486][Runtime/Coordination] Fix the bug that flink will lost state when zookeeper leader changes #18296
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 16eed82 (Fri Jan 07 10:50:10 UTC 2022) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
|
CI fails with https://issues.apache.org/jira/browse/FLINK-25307 |
|
@flinkbot run azure |
16eed82 to
e12d974
Compare
dmvk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @Myracle! I think we need to take a slightly different approach here. Lost of the JobMaster leadership (see JobMasterServiceLeadershipRunner) shouldn't lead to the process termination. Instead the JobMasterServiceLeadershipRunner should be able to re-participate in the new election and potentially get re-elected.
I guess the fix could be as simple as not completing the shutdownFuture in MiniDispatcher#jobReachedTerminalState for non-globally terminal states (already suggested by @tillrohrmann in the JIRA).
WDYT?
e12d974 to
ac842b5
Compare
|
@dmvk Thanks for the review and suggestions. I agree with you that the the process should not exit in this case. I have modified the code. Could you review again? |
|
@dmvk Could you review again? Thank you. |
dmvk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR, this is headed in a good direction, I've added few more comments.
Even though I think this fixes the underlying issue, I would feel more comfortable if we could create a simple integration test for the actual scenario that has been reported. This would ensure that we don't introduce the same problem in the future by accident and that we've really fixed the problem.
Basically we'd test the following:
- Setup a
MiniClusterwith theJobDispatcherFactoryand the corresponding job - Wait for the checkpoint
- Revoke & Grant leadership
- Assert that we have recovered from checkpoint
- The test should fail without a fix (this currently doesn't hold for the newly introduced unit test, so this made me think that we need to put more effort here)
We already have a pretty similar test for the application mode, which you can use to as an inspiration (ApplicationDispatcherBootstrapITCase).
WDYT?
| * signals job termination if the JobStatus is not globally terminal state. | ||
| */ | ||
| @Test | ||
| public void testNotTerminationWithoutGloballyTerminalState() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the test passes without the fix as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have modified the code to test the shutDownFuture not finished.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| executionMode); | ||
| shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus)); | ||
| } else { | ||
| log.warn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to print a warning here?
Always try to think about it from the user perspective. If users sees this log message, is it relevant to what's going on? Is it something he / she should investigate?
|
|
||
| if (jobCancelled || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) { | ||
| JobStatus jobStatus = archivedExecutionGraph.getState(); | ||
| if ((jobStatus != null && jobStatus.isGloballyTerminalState()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the job status be null here? My intuition would be that for the terminal state this should never be the case and would actually signal an underlying issue. Maybe adding a safeguard (Objects.requireNonNull) with a reasonable message could a better fit?
|
What's the state of this PR? Can we resolve the open comments to merge it soon? |
ac842b5 to
6ffad61
Compare
|
@tillrohrmann Sorry for late reply. I have written the most code and will finish it soon. |
6ffad61 to
216076e
Compare
|
@flinkbot run azure |
…state when zookeeper leader changes
216076e to
f098fea
Compare
|
@flinkbot run azure |
dmvk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @Myracle, I really like it! <3 I've added few more suggestions for the test cases for speeding them up + aligning on the community code style (junit 5 for the new tests).
I think we once we address these the PR should be good to go! Once more, thanks for the contribution, this is a really great improvement.
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
Outdated
Show resolved
Hide resolved
| * signals job termination if the JobStatus is not globally terminal state. | ||
| */ | ||
| @Test | ||
| public void testNotTerminationWithoutGloballyTerminalState() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
Outdated
Show resolved
Hide resolved
|
@dmvk Thanks for the valuable suggestions which make the code clean. I have modified the code. Could you please review again? |
dmvk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍 Thanks for the update
|
:( compile step is failing, |
477c124 to
226244d
Compare
|
@dmvk Fixed and the CI passed. |
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR @Myracle. Merging it now.
…state when zookeeper leader changes This closes apache#18296.
…state when zookeeper leader changes This closes apache#18296.
…state when zookeeper leader changes This closes apache#18296.
…state when zookeeper leader changes This closes apache#18296.
…state when zookeeper leader changes This closes apache#18296.
…state when zookeeper leader changes This closes apache#18296.
What is the purpose of the change
When the config high-availability.zookeeper.client.tolerate-suspended-connections is default false, the appMaster will failover once zk leader changes. In this case, the old appMaster will clean up all the HA data and the new appMaster will not recover from the latest checkpoint. We will fix it.
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation