New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-23233][runtime] Failing checkpoints before failover for failed events in OperatorCoordinator #16432
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit c874338 (Thu Sep 23 18:02:32 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
3f92e0c
to
fa6a87a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR so fast @gaoyunhaii. I had a couple of comments and a question about which future to wait on. Maybe we can fix the problem also by waiting on the result handling future instead of waiting on the result future directly. PTAL.
@@ -108,4 +94,22 @@ void removeFromSet(CompletableFuture<?> future) { | |||
lock.unlock(); | |||
} | |||
} | |||
|
|||
@VisibleForTesting | |||
Collection<CompletableFuture<?>> getCurrentIncomplete() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collection<CompletableFuture<?>> getCurrentIncomplete() { | |
Collection<CompletableFuture<?>> getCurrentIncompleteAndReset() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry here I made a mistake that I initially want to make this method a pure getter one without modifying the state, and it would be only used in the tests for the verification. Do you think this would be also ok that I remove incompleteFutures.clear()
and keep the name as getCurrentIncomplete()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If incompleteFuture.clear()
is not needed to be called in this method, then it should be fine.
...c/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java
Outdated
Show resolved
Hide resolved
executor.triggerAll(); | ||
|
||
// Finish the event sending. This will insert one runnable that handle | ||
// failed events to the executor. And we pending this runnable to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean with "we pending this runnable"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here should be "delay the runnable", the initial thought is to test the case that a new checkpoint is triggered before the stage to trigger failover get executed, thus we need some method to delay this stage till the checkpoint is triggered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically we are not delaying it but ignoring it, right?
...test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
Outdated
Show resolved
Hide resolved
@Override | ||
public void execute(@Nonnull Runnable command) { | ||
if (pendingNewRunnables) { | ||
pendingRunnables.add(command); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will the Runnables
in pendingRunnables
ever be executed or will they simply be ignored?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the real case these Runnables
would be executed finally, and here if they are executed would not change the result since we are testing the logic before they get executed. I think I could also finally execute these Runnables
to be more consistent with the real case.
|
||
private final Queue<Runnable> pendingRunnables = new ArrayDeque<>(); | ||
|
||
public void setPendingNewRunnables(boolean pendingNewRunnables) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a better name would be ignoreExecuteCalls(boolean)
or so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we change it to something like delayNewRunnables
, and in the test we finally execute these Runnables
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
result.handleAsync( | ||
(success, failure) -> { | ||
if (failure != null && subtaskAccess.isStillRunning()) { | ||
String msg = | ||
String.format( | ||
EVENT_LOSS_ERROR_MESSAGE, | ||
evt, | ||
subtaskAccess.subtaskName()); | ||
subtaskAccess.triggerTaskFailover(new FlinkException(msg, failure)); | ||
if (failure != null) { | ||
if (subtaskAccess.isStillRunning()) { | ||
String msg = | ||
String.format( | ||
EVENT_LOSS_ERROR_MESSAGE, | ||
evt, | ||
subtaskAccess.subtaskName()); | ||
subtaskAccess.triggerTaskFailover( | ||
new FlinkException(msg, failure)); | ||
} | ||
|
||
nonSuccessFuturesTrack.removeFailedFuture(result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering whether it wouldn't be simpler to change result.handleAsync
to result.whenAsync
and then to add the result of this operation to the incompleteFuturesTracker
? That way we are sure that we will have handled the result before doing any other operations (e.g. failing/completing checkpoints).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Till, very thanks for the review! First for this issue, my initial concern for this method is that it seems might cause deadlocks:
- For a checkpoint when it gets to the
completeCheckpointOnceEventsAreDone
, it would block the main thread and waits for all the pending event futures (with this method it would be thewhenAsync
one) are done. - When the event sending result future finished, the thread finish it would also try to submit the
whenAsync
stage to the main thread, which would not get executed since the main thread is blocked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, let me check whether this is the case. If the checkpoint blocks the main thread, then this is another serious problem that should not happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't block the main thread. It just creates a conjunct future of all pending event futures and chains the checkpoint completion to that future. That is all fully async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, indeed, very sorry for I misunderstood at the original implementation.
473db5d
to
1b3a0d6
Compare
Hi Till, very thanks for the review! I addressed the inline comments, and for another possible option I think it seems to might cause deadlock. |
@gaoyunhaii could you show me where exactly the deadlock will happen? I could not find it based on your description. I think the checkpoint won't block the main thread. |
Hi @tillrohrmann sorry for I should have misunderstood the implementation of Then it seems indeed also ok to make the checkpoint future waiting on the failure handling future: Since there won't be concurrent checkpoint triggers, thus no new checkpoint would arrive before the pending checkpoint finished; and it should also be ok to delay the checkpoint for a bit while to wait for failing the subtask. This method seems to be simpler based on the current implementation, and do you prefer to this method~? |
I am taking a look here, would like to double check two things before going ahead with this. |
Technically, both solutions should work (modulo Stephan's investigation of the two things). If we consider error handling being part of the event sending, then the |
To re-summarize the failure cause: A failed RPC (failed result future) leads to the failure of the checkpoint and triggers a task failure. Then the tracking structures are reset, assuming that the failure is taken care of. I don't fully understand how this PR fixes that. It looks like it tries to change a bit where failure notifications are set, and that in sendingExecutor.execute(
() -> {
nonSuccessFuturesTrack.trackFuture(result);
sender.sendEvent(sendAction, result);
}); Alternative SolutionI think what @tillrohrmann suggested is the right direction: We really need to guarantee is that by the time the event-sending future that we track is failed (due to RPC loss), we have already marked the job as failed, so no other checkpoint can be triggered. So my proposal is to re-write the following section of the code: Existing Code Note how the final Callable<CompletableFuture<Acknowledge>> sendAction =
subtaskAccess.createEventSendAction(serializedEvent);
final CompletableFuture<Acknowledge> result = new CompletableFuture<>();
FutureUtils.assertNoException(
result.handleAsync(
(success, failure) -> {
if (failure != null && subtaskAccess.isStillRunning()) {
String msg =
String.format(
EVENT_LOSS_ERROR_MESSAGE, evt, subtaskAccess.subtaskName());
subtaskAccess.triggerTaskFailover(new FlinkException(msg, failure));
}
return null;
},
sendingExecutor));
sendingExecutor.execute(() -> sender.sendEvent(sendAction, result));
return result; Changed Code Here, the future that is in the tracker is only complete once the subtask is marked as failed. final CompletableFuture<Acknowledge> sendResult = new CompletableFuture<>();
sendingExecutor.execute(() -> sender.sendEvent(sendAction, sendResult));
final CompletableFuture<Acknowledge> result =
sendResult.handleAsync(
(success, failure) -> {
if (failure != null && subtaskAccess.isStillRunning()) {
String msg =
String.format(
EVENT_LOSS_ERROR_MESSAGE, evt, subtaskAccess.subtaskName());
subtaskAccess.triggerTaskFailover(new FlinkException(msg, failure));
}
return success;
},
sendingExecutor);
incompleteFuturesTracker.trackFutureWhileIncomplete(result);
return result; We do need to pass the incomplete futures tracker into the |
Very thanks @tillrohrmann for the review and very thanks @StephanEwen for the detail investigation and analysis! I also agree with chaining the futures as @tillrohrmann suggested would be correct and simpler . I'll update the PR according to this method very soon. And for clarification, for the original method that used
my understanding is that the order is ensured since both the above Runnable and For all the events during checkpointing
First with my understand now we should request the implementation of
|
1b3a0d6
to
ff66945
Compare
Hi, I updated the PR according to the above comments. During the implementation and test I found that we would still need to keep tracking future to be atomic with event sending: suppose the user thread is keeping sending events, and now a checkpoint is triggered and finally we call |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @gaoyunhaii. I do agree with you that starting to track the sending event result future needs to happen from the main thread. Otherwise we can track events which won't be sent because the EventValve
is already shut.
I had a few more comments. Please take a look.
@@ -80,11 +86,14 @@ | |||
subtaskAccess.subtaskName()); | |||
subtaskAccess.triggerTaskFailover(new FlinkException(msg, failure)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should guard that this method does not throw an exception. If it does, then we should fail hard. This will ensure that we don't swallow this failure as a send event failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can add a util Runnables.assertNoException(Runnable)
that calls the FatalExitExceptionHandler.INSTANCE
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got that, we indeed should not leak the exception~ I added the util method and the check~
...test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
Outdated
Show resolved
Hide resolved
...test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
Outdated
Show resolved
Hide resolved
...test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
Outdated
Show resolved
Hide resolved
...test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
Outdated
Show resolved
Hide resolved
Very thanks @tillrohrmann for the review! I updated the PR according to the comments~ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @gaoyunhaii. I think the changes look good to me. +1 for merging it after CI gives green light. Feel also free to clean up the commit history and squashing the fixup commits.
+1 also from my side. |
91fe935
to
eb00c91
Compare
Very thanks @tillrohrmann @StephanEwen for the review! Then I squashed the commits~ |
@flinkbot run azure |
…led events processed for OepratorCoordinator This closes apache#16432.
eb00c91
to
7135067
Compare
…led events processed for OepratorCoordinator This closes apache#16432.
7135067
to
c874338
Compare
…led events processed for OepratorCoordinator This closes #16432.
…led events processed for OepratorCoordinator This closes #16432.
…led events processed for OepratorCoordinator This closes apache#16432.
What is the purpose of the change
This PR changes how checkpoint in OperatorCoordinator tracks the result of the previously sent event to be that the failed events would be kept till it has been processed (namely triggered failover for the corresponding subtasks). Otherwise there might be event loses if there are checkpoints after fails to sending event and the subtask failover due to the lost event won't be included in these checkpoints.
Brief change log
Verifying this change
(Please pick either of the following options)
This change can be verified by the added unit tests and by the manually test with the failed cases.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation