-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-22638] Keep channels blocked on alignment timeout #15897
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit bab4983 (Sat Aug 28 12:11:27 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
This commit keeps channels blocked in case an alignment timeout occurs. That way we prioritize the channels that we have not received the barrier yet. This solution is based on the assumption that all upstream operators are working with aligned checkpoints and we do not mind delaying the subsequent checkpoints on the blocked channels.
@pnowojski What do you think about those changes? |
@curcur Do you mind taking a look? |
.../flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerState.java
Outdated
Show resolved
Hide resolved
.../apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriersUnaligned.java
Outdated
Show resolved
Hide resolved
.../apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriersUnaligned.java
Outdated
Show resolved
Hide resolved
...che/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrierUnaligned.java
Outdated
Show resolved
Hide resolved
inputs[channelInfo.getGateIdx()].resumeConsumption(channelInfo); | ||
channelState.blockChannel(channelInfo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit struggling thinking of cases where out-of-order barriers come? Since transition happens when AlternatingWaitingForFirstBarrier#alignmentTimeout, and converting to priority events happen before transition. On the other hand, all barriers seen has already have the announcement removed.
...che/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrierUnaligned.java
Outdated
Show resolved
Hide resolved
@@ -507,13 +477,20 @@ public void testAllChannelsUnblockedAfterAlignmentTimeout() throws Exception { | |||
|
|||
// we set timer on announcement and test channels do not produce announcements by themselves | |||
send(EventSerializer.toBuffer(new EventAnnouncement(checkpointBarrier, 0), true), 0, gate); | |||
send(checkpointBarrierBuffer, 0, gate); | |||
// emulate blocking channels on aligned barriers | |||
((TestInputChannel) gate.getChannel(0)).setBlocked(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary? The channel is blocked after receiving the first aliged barrier in
AbstractAlternatingAlignedBarrierHandlerState#barrierReceived
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is. I agree it is a bit confusing. The network stack actually blocks the regular channels. Therefore the calls in BarrierHandlerState
are necessary only for bookkeeping to unblock them in the end. This emulates the network stack behaviour.
See
public abstract class IndexedInputGate extends InputGate implements CheckpointableInput {
....
@Override
public void blockConsumption(InputChannelInfo channelInfo) {
// Unused. Network stack is blocking consumption automatically by revoking credits.
}
...
}
@dawidwys Hey Dawid, thanks very much for the PR. I did one pass and I do not see any obvious problem with the change of "blocking after receiving barrier". There are some inline comments above + a few more comments in general (which may not directly related the change made here)
|
Ad. 2 I think correctness-wise it does not make a difference when the channels are unblocked. What we care about is that the channels are unblocked once a checkpoint finishes (either completed or aborted). I would not enforce that contract in unit tests. It is kind of covered by the performance tests. If we say at some point that actually it's better to unblock the channels as soon as possible we can change it easily. WDYT? |
@dawidwys , thanks for addressing the comments, I've approved it. As to the question about out-of-order cases. My concern is if we can not reason about it, it may potentially hide bugs behind it. But I understand this was there before this PR, so maybe we can discuss it after this PR. |
This commit keeps channels blocked in case an alignment timeout occurs. That way we prioritize the channels that we have not received the barrier yet. This solution is based on the assumption that all upstream operators are working with aligned checkpoints and we do not mind delaying the subsequent checkpoints on the blocked channels. This closes #15897
What is the purpose of the change
Improve the end to end checkpointing time in case alignment timeout occurs.
Brief change log
See the commits
Verifying this change
Updated tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation