New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-22379][runtime] CheckpointCoordinator checks the state of all … #15728
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 87072bd (Sat Aug 28 11:19:55 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
*/ | ||
CompletableFuture<CheckpointPlan> calculateCheckpointPlan(); | ||
CompletableFuture<CheckpointPlan> calculateCheckpointPlan(boolean isUnalignedCheckpoint); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps, it makes sense to make this flag as the field of DefaultCheckpointPlanCalculator because as I understand, it is impossible to change the checkpoint configuration on runtime.
|
||
// when: Creating the checkpoint plan for aligned and unaligned checkpoints. | ||
runWithNotRunTask(twoSourcesBuilder, true); | ||
runWithNotRunTask(twoSourcesBuilder, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't sure that to test both scenarios(aligned and unaligned) in one test is good idea but I didn't find good example to implement that.
checkTasksStarted( | ||
isUnalignedCheckpoint | ||
? result.getTasksToWaitFor() | ||
: result.getTasksToTrigger()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I understand correctly, that the motivation to limit the change to UC mode only is that there is channel state to recover in UC mode?
If so, I think it would be better to always use result.getTasksToWaitFor()
, because
- operator state initialization is a part of restore/init phase too
- UC mode option can change but the snapshot can still contain channel state that needs to be restored
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I agree with you. But as I understand, @AHeise has different opinion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main motivation is the following:
- imagine you are running a large-scale application in AC
- you checkpoint every minute because of SLA
- after some failure, Flink is in recovery but recovery takes a 2 minutes
- we have implicit backpressure as any downstream task with large state will block upstream
previously:
- checkpoint barriers have been added to the stream while recovering
- as soon as a task finishes recovery, it can also start checkpointing
with the change (and no distinction):
- during that time no checkpoints are triggered
- only after every task has been recovered, is checkpoint started at all
- the first checkpoint is delayed compared to before
However, after having written that, I'm not too sure if the new behavior is that bad:
- the checkpoints that is generated previously is awfully close to the recovered checkpoint unless there is huge backpressure
- having multiple checkpoint barriers being backpressured during recovery might lead to quite a bit of I/O to the checkpoint storage after the last task has recovered.
So all in all, only high backpressure cases would really suffer from that change and we want to encourage UC for them. High throughput, large scale with AC might actually benefit from that changed behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AHeise, So as I understand from your comment, in general, you don't have any objections and we can use result.getTasksToWaitFor()
for both cases(AC and UC)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What @AHeise described makes sense to me (skipping processing of data accumulated during recovery and passing barriers right away, reducing e2e latency) but it seems quite a narrow use-case:
- checkpoint timeout must be less than recovery time
- time to process the accumulated data must be significantly higher than checkpoint interval
So one alternative would be to add an option to allow power-users to trigger checkpoints while still in recovery, but don't trigger by default. But such option can also be added separately.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean 'checkpoint timeout must be greater than recovery time'? If I understand checkpoint timeout correctly, then the checkpoint will be discarded and it will start from beginning if checkpoint timeout is less than recovery time.
Yes (it's a matter of wording).
Timeouts are counted as checkpoint failures, and if this count reaches a threshold then the job will fail.
Currently first checkpoint would complete after 1h 10 minutes. If we wait for recovery to complete before triggering first checkpoint, first checkpoint would complete after 2h.
Given that the checkpoint timeout is big enough (default is 10m, for larger setups I saw 30m).
That is, I'm assuming both ways are fine with their disadvantages and we should just pick one. In doubt, pick the one that preserves the old behavior.
In this case old behavior (for AC) is combined with the new behaviour (for UC) and this seems misleading from both users and developers point of view IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case old behavior (for AC) is combined with the new behaviour (for UC) and this seems misleading from both users and developers point of view IMO.
Also a fair point. I don't have a strong opinion on this anymore as I think both versions are good (and a bit bad). If you say that it's easier for the user (not 100% clear to me) and dev to unify it, then I'm for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, +1 from my side on unifying the behaviour between UC and AC.
One more question from my side. Is this change compatible with FLIP-147? When some of the sources have already or are about to FINISHED
? calculateAfterTasksFinished
a couple of lines above suggest so, but can you double check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I also think that checking taskRunningStatus
in calculateAfterTasksFinished
should be enough.
@gaoyunhaii, could you please confirm that replacing checkTasksStarted(result.getTasksToTrigger())
with checkTasksStarted(result.getTasksToWaitFor());
is fine here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @rkhachatryan, it should be ok since the finished tasks would not be included in result.getTasksToWaitFor()
, only running tasks would.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR @akalash.
I have some remarks regarding DefaultCheckpointPlanCalculatorTest
, PTAL.
All other changes LGTM.
private void runWithNotRunTask( | ||
FunctionWithException<JobVertexID, ExecutionGraph, Exception> graphBuilder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new tests structure seems more complex to me than it could be:
- it's not clear what vertex states combination is being tested eventually
- ...because transition happens in several places
- ...and transition filter is applied to not all vertices
- test names aren't very readable
- we have
Predicate
field,Function
argument and several functions
WDYT about always creating two vertices in the same place and using a boolean flag isSource
, so that the state combination and DAG are obvious?
For example:
for (boolean isSource : new boolean[]{true, false}) { // or pass as an argument
for (ExecutionState nonRunningState : EnumSet.complementOf(EnumSet.of(ExecutionState.RUNNING))) {
JobVertexID runningSource = new JobVertexID();
JobVertexID notRunningVertex = new JobVertexID();
ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
.addJobVertex(runningSource, true)
.addJobVertex(notRunningVertex, isSource)
.setTransitToRunning(false)
// transition runningSource to RUNNING
// transition notRunningVertex to nonRunningState
// validate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've rewritten tests mostly as you suggested and I also collapsed them into one test. Please, take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
...e/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculatorTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…subtasks before triggering the checkpoint
CI failure unrelated (FLINK-22406), merging. |
} catch (Exception e) { | ||
e.printStackTrace(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are exceptions not logged via the logging framework?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also wonder if we shouldn't rather fail the test if an exception is thrown while creating the job?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it works to change the signature of takeSavepoint() to use FunctionWithException
instead of Function
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, will remove it in #15858.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot!
…subtasks before triggering the checkpoint
What is the purpose of the change
Changes allow to avoid triggering checkpoint until all subtasks are ready.
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation