-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-23741][checkpointing] Properly decline triggerCheckpoint RPC if StreamTask is not running #16800
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit b8906e0 (Thu Aug 12 17:11:17 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
|
Frankly I don't have a good idea how to test this in a meaningful way. Especially the failure case. Maybe we should just relay on the WIP FLINK-21090 that actually discovered this problem? |
|
Hi Piotr @pnowojski , very thanks for opening the PR! It seems to me the current implementation get the result whether the trigger is successful only after it actually execute the synchronous part? It seems we might need to be that strict: currently as long as Although logically we only care about the "false" result, but since Akka has a timeout, thus if the mail queued for a long time or the synchronous part takes long time, we might meet with AkkaAskTimeout and cancel the checkpoint wrongly? |
|
I would prefer to not complicate this RPC chain with having two different definitions of what
is a valid concern. I will just double check if indeed this would cause akka timeouts. |
| CompletableFuture<Acknowledge> resultFuture = new CompletableFuture<>(); | ||
| task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions) | ||
| .thenApply( | ||
| triggerResult -> | ||
| triggerResult | ||
| ? resultFuture.complete(Acknowledge.get()) | ||
| : resultFuture.completeExceptionally( | ||
| new CheckpointException( | ||
| "Task is not running?", | ||
| CheckpointFailureReason | ||
| .TRIGGER_CHECKPOINT_FAILURE))); | ||
|
|
||
| return CompletableFuture.completedFuture(Acknowledge.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦 note that in this version CompletableFuture.completedFuture(Acknowledge.get()) is returned regardless if the resultFuture succeeded or failed...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for also missing this issue...
|
Very thanks Piotr @pnowojski for updating the PR! The new method looks good to me. The only concern is that there might be repeat decline some times logically, but in realistic the And since now the change is limited to Task, perhaps we could add some UT in public void testDeclineCheckpointIfTaskIsNotRunning() throws Exception {
TestCheckpointResponder testCheckpointResponder = new TestCheckpointResponder();
final Task task =
createTaskBuilder().setCheckpointResponder(testCheckpointResponder).build();
task.triggerCheckpointBarrier(
1,
1,
CheckpointOptions.alignedNoTimeout(
CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference.getDefault()));
assertEquals(1, testCheckpointResponder.getDeclineReports().size());
assertEquals(
CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY,
testCheckpointResponder
.getDeclineReports()
.get(0)
.getCause()
.getCheckpointFailureReason());
} |
…f StreamTask is not running With ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH enabled, final checkpoint can deadlock (or timeout after very long time) if there is a race condition between selecting tasks to trigger checkpoint on and finishing tasks. FLINK-21246 was supposed to handle it, but it doesn't work as expected, because futures from: org.apache.flink.runtime.taskexecutor.TaskExecutor#triggerCheckpoint and org.apache.flink.streaming.runtime.tasks.StreamTask#triggerCheckpointAsync are not linked together. TaskExecutor#triggerCheckpoint reports that checkpoint has been successfully triggered, while StreamTask might have actually finished.
|
Thanks @gaoyunhaii for your suggestion. Your test wouldn't actually test the added cases by me, as task wouldn't be in running state, but I've easily added coverage for all 4 declining cases. |
gaoyunhaii
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very thanks Piotr @pnowojski for updating the PR! LGTM and +1 to merge~
|
@flinkbot run azure |
With ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH enabled, final checkpoint can deadlock (or timeout after very long time) if there is a race condition between selecting tasks to trigger checkpoint on and finishing tasks. FLINK-21246 was supposed to handle it, but it doesn't work as expected, because futures from:
org.apache.flink.runtime.taskexecutor.TaskExecutor#triggerCheckpoint
and
org.apache.flink.streaming.runtime.tasks.StreamTask#triggerCheckpointAsync
are not linked together. TaskExecutor#triggerCheckpoint reports that checkpoint has been successfully triggered, while StreamTask might have actually finished.
Verifying this change
TODO: implement test
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation