New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-20331][checkpointing][task] Don't fail the task if unaligned checkpoint was subsumed #14218
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 1109ad2 (Wed Nov 25 12:42:11 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
assert Thread.holdsLock(receivedBuffers); | ||
|
||
if (checkpointId < lastBarrierId) { | ||
throw new CheckpointException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was in doubt whether this exception type is appropriate here or whether an Optional
should be returned.
But at the upper level, it ultimately should be used, and the reason is only known here (so it would be harder to analyze the exception/empty buffers later).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is exactly the case for which CheckpointException
was designed.
@@ -158,14 +163,16 @@ public void processBarrierAnnouncement( | |||
@Override | |||
public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws IOException { | |||
final long cancelledId = cancelBarrier.getCheckpointId(); | |||
if (currentCheckpointId > cancelledId || (currentCheckpointId == cancelledId && numBarriersReceived == 0)) { | |||
return; | |||
if (cancelledId >= currentCheckpointId && (cancelledId > currentCheckpointId || numBarriersReceived > 0)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please double-check this @AHeise :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's correct but it might be easier to do
cancelledId > currentCheckpointId || (currentCheckpointId == cancelledId && numBarriersReceived > 0)
So either it's a future checkpoint, or it's the current checkpoint and not yet canceled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it!
// by setting the currentCheckpointId to this checkpoint while keeping the numBarriers | ||
// at zero means that no checkpoint barrier can start a new alignment | ||
currentCheckpointId = cancelledId; | ||
currentCheckpointId = Math.max(cancelledId, currentCheckpointId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not necessary currently.
But because I extracted this method the assignment became more error-prone, so I added this check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much. The fix looks very good.
I made a suggestion on the more complicated condition.
@@ -158,14 +163,16 @@ public void processBarrierAnnouncement( | |||
@Override | |||
public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws IOException { | |||
final long cancelledId = cancelBarrier.getCheckpointId(); | |||
if (currentCheckpointId > cancelledId || (currentCheckpointId == cancelledId && numBarriersReceived == 0)) { | |||
return; | |||
if (cancelledId >= currentCheckpointId && (cancelledId > currentCheckpointId || numBarriersReceived > 0)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's correct but it might be easier to do
cancelledId > currentCheckpointId || (currentCheckpointId == cancelledId && numBarriersReceived > 0)
So either it's a future checkpoint, or it's the current checkpoint and not yet canceled.
…heckpoint was subsumed
What is the purpose of the change
A check was added recently that the
RemoteInputChannel.lastBarrierSequenceNumber
was not overwritten by a newer barrier.The check itself is valid, however the task shouldn't fail in that case.
Relying on the assumption of at most one active unaligned checkpoint at a time, the older checkpoint should simply be declined.
Cleanup is needed for (all) channels, barrier controller, handler and checkpoint writer. Therefore, abort in
SingleCheckpointBarrierHandler
should be used.Verifying this change
This change is already covered by existing test:
UnalignedCheckpointITCase
.parallel pipeline with mixed channels, p = 20
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation