New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-19462][checkpointing] Update failed checkpoint stats #14635
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit bd8d680 (Fri May 28 08:06:00 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change @rkhachatryan . I've left a couple of comments and questions (I might haven't fully understood some things)
...src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
Show resolved
Hide resolved
...ing-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
Outdated
Show resolved
Hide resolved
0a44c3a
to
fcc06b3
Compare
Thanks for reviewing, @pnowojski. |
026f0b5
to
2e65254
Compare
…arguments This is a preparatory step for FLINK-19462. CheckpointStatsTracker is created with a fixed set of vertices. At time of checkpoint creation this set can be different. As checkpoint already carries the vertices there is no need to store them as state. Besides that, changing the type from List<ExecutionJobVertex> to Map<JobVertexID, Integer> simplifies writing the tests.
This is a preparatory step for FLINK-19462. Motivation: 1. Ability to report metrics without state snapshot (subsequent commit) 2. Consistency with other metrics
This is a preparatory step for FLINK-19462.
…stent This is a preparatory step for FLINK-19462. Access to checkpoint stats by ID without calling history.snapshot() allows to update failed checkpoint stats without PendingCheckpoint instance.
…heckpointStats Inherit mutation semantics from PendingCheckpointStats to allow updates even after checkpoint failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried out those changes and I think they are not working well as it is now. I've modified WordCount
example to randomly timeout async checkpoint phase and I wanted to see what does it look like.
Two screenshots: https://imgur.com/a/sv6w2IJ
With this PR there is no way to know which task has failed checkpoint. Also async phase timed out (5 seconds) and the end to end duration is in milliseconds range for all tasks/subtasks? Async duration is also in [0ms, 5ms] range for all tasks, even the one that has timed out.
I think it's strictly necessary to:
- clearly mark which checkpoint for which subtask has failed
- if we were not able to collect/calculate a metric, it must be
N/A
- not just0ms
I think it's almost must have to: - correctly calculate the durations (end to end, sync, async, etc...) also for failed checkpoints, not just
N/A
Otherwise this is very misleading :(
(I haven't tried it with different kind of failures, but this should be tried out before merging)
Thanks a lot for trying it out.
It is not always the task that fails a checkpoint. Timeout decision is made by the
I don't see
A checkpoint can be cancelled before even being started on some subtasks. |
2e65254
to
8900426
Compare
I've updated the PR (adding 4 new commits):
cc: @NicoK |
So, as soon as we are through the sync phase, we will get stats (if the CP is aborted during the sync phase, that won't interrupt the sync part anyway and will wait for it to complete). If we didn't reach the sync phase yet, the timeout could be because of slowly moving barriers (no barrier was received yet) or slow alignment (some barriers received but not all). These could be derived from looking at backpressure or data skew or starting times of other subtasks or timings from previous subtasks. I think, the current state is a good step forward and the stats look good 👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @rkhachatryan. % some minor comments, there is potentially one more issue. If for example AsynCheckpointRunnable
fails (throws an exception), I can not see any stats for any subtasks that have finished after the failure. In that case I see only n/a
for the whole subtask, which is a bit inconsistent with timeout behaviour.
...b/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.html
Outdated
Show resolved
Hide resolved
This change introduces a new RPC from TM to JM. Existing one can't be used because it: a) confirms the checkpoint b) requires task state snapshot The call is issued after cancelling task state-persisting futures upon receiving abortion notification. This way more precise metrics are available (compared to reporting from AsyncCheckpointRunnable after cancellation).
3618093
to
bd8d680
Compare
Thanks for the review @pnowojski .
As discussed offline, this happens because the failed upstream doesn't sent barrier downstream. |
What is the purpose of the change
Update checkpoint statistics (shown in the web UI) even after a checkpoint fails
(this would facilitate investigation of issues with slow checkpointing).
With this change, failed checkpoint stats is updated when:
AsyncCheckpointRunnable
completes normally and reports snapshot as usual.CheckpointCoordinator
was updated to handle these callsVerifying this change
This change added tests and can be verified as follows:
CheckpointCoordinatorTest.testCheckpointStatsUpdatedAfterFailure
CheckpointCoordinatorTest.testAbortedCheckpointStatsUpdatedAfterFailure
DataStreamAllroundTestProgram
on local cluser:Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation