-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-10482] Fix double counting of checkpoint stat #7118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
f09a853 to
dc836d7
Compare
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR @azagrebin. I had some minor comments and a question. Before merging this PR it would be great if you could address them.
| numFailedCheckpoints); | ||
| } | ||
|
|
||
| private boolean assertDecrementOfInProgressCheckpointsNumber() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe rename method into canDecrementOfInProgressCheckpointsNumber.
| String errorMessage = "Incremented the completed number of checkpoints " + | ||
| "without incrementing the in progress checkpoints before."; | ||
| LOG.warn(errorMessage); | ||
| LOG.debug("Inconsistent CheckpointStatsCounts", new IllegalStateException(errorMessage)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this logging statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, debugging leftover
| }, getMainThreadExecutor()) | ||
| .exceptionally(throwable -> { | ||
| if (cancelJob) { | ||
| if (cancelJob && executionGraph.getState() == JobStatus.RUNNING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What situation are we filtering out with this change?
| new ArrayList<>(pendingCheckpoints.values()); | ||
| pendingCheckpoints.clear(); | ||
|
|
||
| for (PendingCheckpoint p : pendingCheckpointsSnapshot) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just as a note, I guess we could have solved the problem also by changing JobMaster.java:965-975 into a handleAsync part because the problem seems to be that by aborting a PendingCheckpoint we fail the savepoint future which is then directly triggers the execution of the exceptionally handler. By decoupling the exceptionally handler from the stopCheckpointScheduler call, there should not be any concurrency problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, this fix will solve the original problem without other changes.
|
Thank you for the review @tillrohrmann ! I addressed the comments. |
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR @azagrebin. LGTM. Merging.
|
Manually merged. |
What is the purpose of the change
This PR fixes double counting of checkpoints in progress in their statistics.
Brief change log
Verifying this change
submit DataStreamAllroundTestProgram in a loop in aws emr, wait for failure, check there was no negative number of checkpoints logs
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation