-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-18063][checkpointing] Fix the race condition for aborting current checkpoint in CheckpointBarrierUnaligner #12460
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit c3f3aca (Wed Jun 03 09:42:03 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
...ing-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
Show resolved
Hide resolved
...ing-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
Show resolved
Hide resolved
...ing-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
Outdated
Show resolved
Hide resolved
...ing-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
Outdated
Show resolved
Hide resolved
...ing-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
Outdated
Show resolved
Hide resolved
Thanks for review @pnowojski ! I appended two hotfix commits and a fixup commit for solving the issue of |
FYI: Actually I also planned to get ride of It is almost done on my local but brought some unexpected other failures, i am not sure whether have time to further focus on it. Let's see then. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good already. I guess I'd mostly like to see a guard in the mail that executes notifyCheckpoint
.
...ing-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
Show resolved
Hide resolved
...ing-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
Show resolved
Hide resolved
...ing-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
Outdated
Show resolved
Hide resolved
...ing-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
Outdated
Show resolved
Hide resolved
…d method in CheckpointBarrierHandler Simplify the implementations of CheckpointBarrierTracker and CheckpointBarrierUnaligner to reuse the parament default implementation. This closes apache#12460.
…atingCheckpointBarrierHandler#getAlignmentDurationNanos We should take the value from active handler instead of aligned handler, because aligned handler is only used for savepoint and in most cases the unaligned alignment duration should always be 0. This cloese apache#12460.
…d method in CheckpointBarrierHandler Simplify the implementations of CheckpointBarrierTracker and CheckpointBarrierUnaligner to reuse the parent default implementation. This closes apache#12460.
…atingCheckpointBarrierHandler#getAlignmentDurationNanos We should take the value from active handler instead of aligned handler, because aligned handler is only used for savepoint and in most cases the unaligned alignment duration should always be 0. This cloese apache#12460.
…point in CheckpointBarrierUnaligner There are three aborting scenarios which might encounter race condition: 1. CheckpointBarrierUnaligner#processCancellationBarrier 2. CheckpointBarrierUnaligner#processEndOfPartition 3. AlternatingCheckpointBarrierHandler#processBarrier They only consider the pending checkpoint triggered by #processBarrier from task thread to abort it. Actually the checkpoint might also be triggered by #notifyBarrierReceived from netty thread in race condition, so we should also handle properly to abort it. This closes apache#12460.
…point in CheckpointBarrierUnaligner There are three aborting scenarios which might encounter race condition: 1. CheckpointBarrierUnaligner#processCancellationBarrier 2. CheckpointBarrierUnaligner#processEndOfPartition 3. AlternatingCheckpointBarrierHandler#processBarrier They only consider the pending checkpoint triggered by #processBarrier from task thread to abort it. Actually the checkpoint might also be triggered by #notifyBarrierReceived from netty thread in race condition, so we should also handle properly to abort it. This closes apache#12460.
Thanks for the review @AHeise . I have arranged the commit messages for the final version. Could you double check whether there are any pending concerns now? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with some smaller nits and suggestions.
...ing-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
Show resolved
Hide resolved
...ing-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
Outdated
Show resolved
Hide resolved
...java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
Outdated
Show resolved
Hide resolved
...java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
Outdated
Show resolved
Hide resolved
...java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
Outdated
Show resolved
Hide resolved
…point in CheckpointBarrierUnaligner There are three aborting scenarios which might encounter race condition: 1. CheckpointBarrierUnaligner#processCancellationBarrier 2. CheckpointBarrierUnaligner#processEndOfPartition 3. AlternatingCheckpointBarrierHandler#processBarrier They only consider the pending checkpoint triggered by #processBarrier from task thread to abort it. Actually the checkpoint might also be triggered by #notifyBarrierReceived from netty thread in race condition, so we should also handle properly to abort it. This closes apache#12460.
5729182
to
cb3e18c
Compare
…d method in CheckpointBarrierHandler Simplify the implementations of CheckpointBarrierTracker and CheckpointBarrierUnaligner to reuse the parent default implementation. This closes apache#12460.
…atingCheckpointBarrierHandler#getAlignmentDurationNanos We should take the value from active handler instead of aligned handler, because aligned handler is only used for savepoint and in most cases the unaligned alignment duration should always be 0. This cloese apache#12460.
…point in CheckpointBarrierUnaligner There are three aborting scenarios which might encounter race condition: 1. CheckpointBarrierUnaligner#processCancellationBarrier 2. CheckpointBarrierUnaligner#processEndOfPartition 3. AlternatingCheckpointBarrierHandler#processBarrier They only consider the pending checkpoint triggered by #processBarrier from task thread to abort it. Actually the checkpoint might also be triggered by #notifyBarrierReceived from netty thread in race condition, so we should also handle properly to abort it. This closes apache#12460.
…d method in CheckpointBarrierHandler Simplify the implementations of CheckpointBarrierTracker and CheckpointBarrierUnaligner to reuse the parent default implementation. This closes apache#12460.
…atingCheckpointBarrierHandler#getAlignmentDurationNanos We should take the value from active handler instead of aligned handler, because aligned handler is only used for savepoint and in most cases the unaligned alignment duration should always be 0. This cloese apache#12460.
…point in CheckpointBarrierUnaligner There are three aborting scenarios which might encounter race condition: 1. CheckpointBarrierUnaligner#processCancellationBarrier 2. CheckpointBarrierUnaligner#processEndOfPartition 3. AlternatingCheckpointBarrierHandler#processBarrier They only consider the pending checkpoint triggered by #processBarrier from task thread to abort it. Actually the checkpoint might also be triggered by #notifyBarrierReceived from netty thread in race condition, so we should also handle properly to abort it. This closes apache#12460.
…d method in CheckpointBarrierHandler Simplify the implementations of CheckpointBarrierTracker and CheckpointBarrierUnaligner to reuse the parent default implementation. This closes apache#12460.
…atingCheckpointBarrierHandler#getAlignmentDurationNanos We should take the value from active handler instead of aligned handler, because aligned handler is only used for savepoint and in most cases the unaligned alignment duration should always be 0. This cloese apache#12460.
…point in CheckpointBarrierUnaligner There are three aborting scenarios which might encounter race condition: 1. CheckpointBarrierUnaligner#processCancellationBarrier 2. CheckpointBarrierUnaligner#processEndOfPartition 3. AlternatingCheckpointBarrierHandler#processBarrier They only consider the pending checkpoint triggered by #processBarrier from task thread to abort it. Actually the checkpoint might also be triggered by #notifyBarrierReceived from netty thread in race condition, so we should also handle properly to abort it. This closes apache#12460.
…d method in CheckpointBarrierHandler Simplify the implementations of CheckpointBarrierTracker and CheckpointBarrierUnaligner to reuse the parent default implementation. This closes apache#12460.
…atingCheckpointBarrierHandler#getAlignmentDurationNanos We should take the value from active handler instead of aligned handler, because aligned handler is only used for savepoint and in most cases the unaligned alignment duration should always be 0. This cloese apache#12460.
…point in CheckpointBarrierUnaligner There are three aborting scenarios which might encounter race condition: 1. CheckpointBarrierUnaligner#processCancellationBarrier 2. CheckpointBarrierUnaligner#processEndOfPartition 3. AlternatingCheckpointBarrierHandler#processBarrier They only consider the pending checkpoint triggered by #processBarrier from task thread to abort it. Actually the checkpoint might also be triggered by #notifyBarrierReceived from netty thread in race condition, so we should also handle properly to abort it. This closes apache#12460.
…point in CheckpointBarrierUnaligner There are three aborting scenarios which might encounter race condition: 1. CheckpointBarrierUnaligner#processCancellationBarrier 2. CheckpointBarrierUnaligner#processEndOfPartition 3. AlternatingCheckpointBarrierHandler#processBarrier They only consider the pending checkpoint triggered by #processBarrier from task thread to abort it. Actually the checkpoint might also be triggered by #notifyBarrierReceived from netty thread in race condition, so we should also handle properly to abort it. This closes apache#12460.
…d method in CheckpointBarrierHandler Simplify the implementations of CheckpointBarrierTracker and CheckpointBarrierUnaligner to reuse the parent default implementation. This closes apache#12460.
…atingCheckpointBarrierHandler#getAlignmentDurationNanos We should take the value from active handler instead of aligned handler, because aligned handler is only used for savepoint and in most cases the unaligned alignment duration should always be 0. This cloese apache#12460.
…point in CheckpointBarrierUnaligner There are three aborting scenarios which might encounter race condition: 1. CheckpointBarrierUnaligner#processCancellationBarrier 2. CheckpointBarrierUnaligner#processEndOfPartition 3. AlternatingCheckpointBarrierHandler#processBarrier They only consider the pending checkpoint triggered by #processBarrier from task thread to abort it. Actually the checkpoint might also be triggered by #notifyBarrierReceived from netty thread in race condition, so we should also handle properly to abort it. This closes apache#12460.
…point in CheckpointBarrierUnaligner There are three aborting scenarios which might encounter race condition: 1. CheckpointBarrierUnaligner#processCancellationBarrier 2. CheckpointBarrierUnaligner#processEndOfPartition 3. AlternatingCheckpointBarrierHandler#processBarrier They only consider the pending checkpoint triggered by #processBarrier from task thread to abort it. Actually the checkpoint might also be triggered by #notifyBarrierReceived from netty thread in race condition, so we should also handle properly to abort it. This closes apache#12460.
…point in CheckpointBarrierUnaligner There are three aborting scenarios which might encounter race condition: 1. CheckpointBarrierUnaligner#processCancellationBarrier 2. CheckpointBarrierUnaligner#processEndOfPartition 3. AlternatingCheckpointBarrierHandler#processBarrier They only consider the pending checkpoint triggered by #processBarrier from task thread to abort it. Actually the checkpoint might also be triggered by #notifyBarrierReceived from netty thread in race condition, so we should also handle properly to abort it. This closes apache#12460.
…d method in CheckpointBarrierHandler Simplify the implementations of CheckpointBarrierTracker and CheckpointBarrierUnaligner to reuse the parent default implementation. This closes apache#12460.
…atingCheckpointBarrierHandler#getAlignmentDurationNanos We should take the value from active handler instead of aligned handler, because aligned handler is only used for savepoint and in most cases the unaligned alignment duration should always be 0. This cloese apache#12460.
…point in CheckpointBarrierUnaligner There are three aborting scenarios which might encounter race condition: 1. CheckpointBarrierUnaligner#processCancellationBarrier 2. CheckpointBarrierUnaligner#processEndOfPartition 3. AlternatingCheckpointBarrierHandler#processBarrier They only consider the pending checkpoint triggered by #processBarrier from task thread to abort it. Actually the checkpoint might also be triggered by #notifyBarrierReceived from netty thread in race condition, so we should also handle properly to abort it. This closes apache#12460.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some smaller nits.
...ing-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
Show resolved
Hide resolved
...ing-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
Show resolved
Hide resolved
...ing-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
Show resolved
Hide resolved
...ing-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
Show resolved
Hide resolved
…d method in CheckpointBarrierHandler Simplify the implementations of CheckpointBarrierTracker and CheckpointBarrierUnaligner to reuse the parent default implementation. This closes #12460.
…atingCheckpointBarrierHandler#getAlignmentDurationNanos We should take the value from active handler instead of aligned handler, because aligned handler is only used for savepoint and in most cases the unaligned alignment duration should always be 0. This cloese #12460.
…d method in CheckpointBarrierHandler Simplify the implementations of CheckpointBarrierTracker and CheckpointBarrierUnaligner to reuse the parent default implementation. This closes #12460.
…atingCheckpointBarrierHandler#getAlignmentDurationNanos We should take the value from active handler instead of aligned handler, because aligned handler is only used for savepoint and in most cases the unaligned alignment duration should always be 0. This cloese #12460.
…point in CheckpointBarrierUnaligner There are three aborting scenarios which might encounter race condition: 1. CheckpointBarrierUnaligner#processCancellationBarrier 2. CheckpointBarrierUnaligner#processEndOfPartition 3. AlternatingCheckpointBarrierHandler#processBarrier They only consider the pending checkpoint triggered by #processBarrier from task thread to abort it. Actually the checkpoint might also be triggered by #notifyBarrierReceived from netty thread in race condition, so we should also handle properly to abort it. This closes #12460.
…d method in CheckpointBarrierHandler Simplify the implementations of CheckpointBarrierTracker and CheckpointBarrierUnaligner to reuse the parent default implementation. This closes apache#12460.
…atingCheckpointBarrierHandler#getAlignmentDurationNanos We should take the value from active handler instead of aligned handler, because aligned handler is only used for savepoint and in most cases the unaligned alignment duration should always be 0. This cloese apache#12460.
…point in CheckpointBarrierUnaligner There are three aborting scenarios which might encounter race condition: 1. CheckpointBarrierUnaligner#processCancellationBarrier 2. CheckpointBarrierUnaligner#processEndOfPartition 3. AlternatingCheckpointBarrierHandler#processBarrier They only consider the pending checkpoint triggered by #processBarrier from task thread to abort it. Actually the checkpoint might also be triggered by #notifyBarrierReceived from netty thread in race condition, so we should also handle properly to abort it. This closes apache#12460.
What is the purpose of the change
There are three aborting scenarios which might encounter race condition:
They only consider the pending checkpoint triggered by #processBarrier from task thread to abort it. Actually the checkpoint might also be triggered by #notifyBarrierReceived from netty thread in race condition, so we should also handle properly to abort it.
Brief change log
AlternatingCheckpointBarrierHandler#processBarrier
CheckpointBarrierUnaligner#processEndOfPartition
to abort checkpoint properlyCheckpointBarrierUnaligner#processCancellationBarrier
to abort checkpoint properlyVerifying this change
CheckpointBarrierUnalignerTest#testProcessCancellationBarrierAfterNotifyBarrierReceived
CheckpointBarrierUnalignerTest#testProcessCancellationBarrierAfterProcessBarrier
CheckpointBarrierUnalignerTest#testProcessCancellationBarrierBeforeProcessAndReceiveBarrier
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation