-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-31588][checkpoint] Correct the unaligned checkpoint type after aligned barrier timeout to unaligned barrier on PipelinedSubpartition #22392
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
pnowojski
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix proposal @1996fanrui , I've left a couple of comments.
| () -> { | ||
| try { | ||
| operatorChain.alignedBarrierTimeout(checkpointId); | ||
| operatorChain.alignedBarrierTimeout(checkpointId, metrics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe instead of adding a dependency to CheckpointMetrics to all of the call stack down to the subpartition, can alignedBarrierTimeout return true or false depending if the barrier has timed out or not? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @pnowojski , thanks for your review.
It's a great suggestion, updated.
| () -> { | ||
| try { | ||
| operatorChain.alignedBarrierTimeout(checkpointId); | ||
| operatorChain.alignedBarrierTimeout(checkpointId, metrics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we safely pass the CheckpointMetricsBuilder to the timer thread? Maybe I'm mis-remembering something, but the ownership and fully responsibility for CheckpointMetricsBuilder seems to be passed from the SubtaskCheckpointCoordinatorImpl to the AsyncCheckpointRunnable, which uses it to build the metrics. AsyncCheckpointRunnable and the alignment timer, are running in different threads, creating both problems with the actual memory visibility AND race conditions?
Shouldn't this be set in the AsyncCheckpointRunnable thread via a code path similar to org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.SnapshotsFinalizeResult#bytesPersistedDuringAlignment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AsyncCheckpointRunnable and the alignment timer, are running in different threads, creating both problems with the actual memory visibility AND race conditions?
Before this PR, the CheckpointMetricsBuilder#setUnalignedCheckpoint is only called on Task thread[1]. And IIUC, registerTimer should be executed by task thread as well.AsyncCheckpointRunnable won't call CheckpointMetricsBuilder#setUnalignedCheckpoint, AsyncCheckpointRunnable just uses it to build the metrics. So it cannot be modified concurrently.
After detailed analysis, I guess the first comment[2] should be reverted. It may lead to wrong unaligned type due to the order of execution, for example:
- registerTimer thread: aligned barrier timeout to unaligned
- registerTimer thread: channelStateFuture.complete(inflightBuffers)
- Channel state writer thread: write these buffers and complete the
resultSubpartitionStateFuture - AsyncCheckpointRunnable thread: all states are written, and build metrics
- registerTimer thread: call
CheckpointMetricsBuilder#setUnalignedCheckpoint(true)
If the inflightBuffers is empty or very small, the step 3 and step 4 will faster than step5, and then the unaligned type will be wrong.
Based on this case, i think the solution is :
CheckpointMetricsBuilder#setUnalignedCheckpoint(true)should be executed beforechannelStateFuture.complete(inflightBuffers), that is,CheckpointMetricsBuildershould be passed toPipelinedSubpartition#alignedBarrierTimeout.- Add the volatile for
CheckpointMetricsBuilder#unalignedCheckpointto ensure AsyncCheckpointRunnable can read it correctly.
I updated the solution here[3].
Shouldn't this be set in the AsyncCheckpointRunnable thread via a code path similar to org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.SnapshotsFinalizeResult#bytesPersistedDuringAlignment?
This solution can work. Actually, I have tried to this solution. And I found this code path is too complex, it includes too many exception cases(in ChannelStateCheckpointWriter), completedFuture(includes dataFuture and resultFuture) and complete these futures after merging channel state.
However, I implemented a POC version[4] using this solution. Core process:
- Adding a
CompletableFuture<Boolean> timeoutToUnalignedinside ofPipelinedSubpartition, and complete it when complete channelStateFuture ChannelStateWriteResult(it's at subtask level) added aCompletableFuture<Boolean> resultSubpartitionTimeoutToUnaligned;, the future will be completed in the following cases:-
- true: Any subpartition be switched from aligned to unaligned checkpoint.
-
- false: This result was completed and all subpartitions don't switched to unaligned checkpoint.
-
- false: This result fails before any subpartition switched to unaligned checkpoint.
-
ChannelStateWriteResultwill pass the result toOperatorSnapshotFutures, and then pass it toAsyncCheckpointRunnable
Solution2 is more complex than solution1, however, it's more reasonable. Which one do you prefer?
[1]
Line 738 in a81ffa6
| checkpointMetrics.setUnalignedCheckpoint(checkpointOptions.isUnalignedCheckpoint()); |
[2] #22392 (comment)
[3] 1996fanrui@c23c5db
[4] 1996fanrui@d5f2537
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ehhh. [4] is indeed a bit complicated. I think I misspoke, I actually meant something a bit different ([5] below).
[3] Technically it works, but I don't like how fragile the contract there actually is, where value of the volatile boolean unalignedCheckpoint is only valid if other methods/things are happening in the correct order. To fix it, we would need something like:
[5] Change CheckpointMetricsBuilder#unalignedCheckpoint into some kind of CompletableFuture<Boolean>. Using that, AsyncCheckpointRunnable could just call CheckpointMetricsBuilder#unalignedCheckpoint.get(), without taking into account if that's safe or not. However I hoped that it can be set exactly as [3], but there is a problem. We know when to set it to true, but when to set it to false would require quite a bit of logic :/
[6] Another potential solution would be to move out the completion of the AsyncCheckpointRunnable from that the async thread, into the mailbox thread, which would also remove some race conditions and simplify the logic. But that's probably not worth doing for the sake of this single flag...
All in all, I'm started to think that maybe your original idea, to approximate the true/false flag based on the bytesPersistedDuringAlignment > 0 might be the lesser evil. The case when bytesPersistedDuringAlignment == 0 but the checkpoint barrier actually timed out in the output buffers is quite extreme/rare, and shouldn't be that significant to the end user and probably not worth of making the code so much more complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All in all, I'm started to think that maybe your original idea, to approximate the true/false flag based on the bytesPersistedDuringAlignment > 0 might be the lesser evil. The case when bytesPersistedDuringAlignment == 0 but the checkpoint barrier actually timed out in the output buffers is quite extreme/rare, and shouldn't be that significant to the end user and probably not worth of making the code so much more complicated.
I agree with you, these solutions are complicated, and it probably not worth of making the code so much more complicated, so I prefer generate unaligned checkpoint type based on persisted data.
Do you think it's ok? If yes, I can go ahead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, lets' do that. Apart of that let's create a ticket to explain the problem with the approximation, linking to this conversation, and setting it priority to "not a priority"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your quick response, I created FLINK-31864 to briefly explain it.
c8d6b94 to
8963055
Compare
4f830a1 to
a6b1c49
Compare
pnowojski
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix!
Can you add a small unit test? Apart of that LGTM Feel free to merge after adding the unit test and with green azure :)
a6b1c49 to
9391716
Compare
… on the bytesPersistedDuringAlignment
9391716 to
8bebc45
Compare
Thanks for the quick feedback, updated. |
What is the purpose of the change
Correct the unaligned checkpoint type after aligned barrier timeout to unaligned barrier on PipelinedSubpartition.
Brief change log
Correct the unaligned checkpoint type after aligned barrier timeout to unaligned barrier on PipelinedSubpartition.
Verifying this change
This change improved old tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation