-
Notifications
You must be signed in to change notification settings - Fork 13.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-17012][streaming] 'RUNNING' state split into 'RUNNING' and 'RE… #15221
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit f3d9685 (Sat Aug 28 12:18:16 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
9adddc4
to
94ca73a
Compare
…VERING' in order to distinguish when the task is really running
} | ||
|
||
return CompletableFuture.allOf(recoveredFutures.toArray(new CompletableFuture[0])) | ||
.thenRun(mailboxProcessor::allActionsCompleted); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to task https://issues.apache.org/jira/browse/FLINK-19385, I suspect that this compound future can never be finished in some cases. So perhaps, it should be other way to understand when recovery is finished.
@@ -172,6 +172,7 @@ public void drain() throws Exception { | |||
|
|||
/** Runs the mailbox processing loop. This is where the main work is done. */ | |||
public void runMailboxLoop() throws Exception { | |||
mailboxLoopRunning = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't sure how it is legal to reset the flag to true. In fact, I didn't find why MailboxProcessor can not be restarted but I could miss something.
ensureNotCanceled(); | ||
|
||
if (!allGatesRecoveredFuture.isDone()) { | ||
throw new Exception("Mailbox loop interrupted before recovery was finished."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not clear what type of exception should be here.
try { | ||
beforeInvoke(); | ||
if (!restored) { | ||
throw new Exception("Could not perform the task because the state wasn't restored."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same question about exception type
@@ -740,7 +740,7 @@ | |||
}, | |||
"status" : { | |||
"type" : "string", | |||
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ] | |||
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RECOVERING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not fully clear to me how I should handle the new state here. As you can see I just changed the output in the test but I don't think it is all that I should be done. Should I declare somewhere that API is changed? Or maybe I should change some version or something similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure myself. @zentol WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the rest api docs should be updated; https://github.com/apache/flink/tree/master/flink-docs; https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/rest_api.html#jobs-jobid
@@ -1148,10 +1158,22 @@ private void maybeReleasePartitionsAndSendCancelRpcCall( | |||
} | |||
} | |||
|
|||
boolean switchToRecovering() { | |||
if (switchTo(DEPLOYING, RECOVERING)) { | |||
sendPartitionInfos(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, pay attention that I moved 'sendPartitionInfos' from switchToRunning to switchToRecovering. There are no visible changes right now because RECOVERING state does nothing. But in the future, the RECOVERING state will start the mailbox loop for a short period until recovery is not finished and only then the RUNNING state will start the mailbox loop again until all actions processed. So we need to decide what state is right for sending partition. In my opinion, according to the current semantics, the RECOVERING state is the right choice.
@@ -46,7 +46,7 @@ public void testJobDetailsMarshalling() throws JsonProcessingException { | |||
9L, | |||
JobStatus.RUNNING, | |||
8L, | |||
new int[] {1, 3, 3, 7, 4, 2, 7, 3, 3}, | |||
new int[] {1, 3, 3, 4, 7, 4, 2, 7, 3, 3}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no idea how to choose the right number. I just choose the random one. If it is specific logic here, please, share it with me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @akalash . I think there is one problem with checking backpressure.
@@ -719,7 +721,9 @@ private void updatePartitionConsumers(final IntermediateResultPartition partitio | |||
// Consumer is deploying => cache the partition info which would be | |||
// sent after switching to running | |||
// ---------------------------------------------------------------- | |||
if (consumerState == DEPLOYING || consumerState == RUNNING) { | |||
if (consumerState == DEPLOYING |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the || consumerState == RUNNING
here? Shouldn't we update only when switching from DEPLOYING
to RECOVERING
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like it indeed can be removed from here. But in general, calling sendUpdatePartitionInfoRpcCall in both RECOVERING and RUNNING states are not a mistake. So I decided to left it here but maybe I am wrong. Maybe, @zentol can share his opinion about that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't modify the existing logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zentol Just to be really clear here. By "existing" you mean the logic of this PR or prior to the PR?
If I understand this PR well, after the PR sendUpdatePartitionInfoRpcCall
will be send twice. When switching to RECOVERING
and a second time when switching over to RUNNING
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've seen you changed it @akalash however it is still different than prior to your changes. We were sending the partition update info as soon as we were starting to recover at the beginning of RUNNING
however now we are sending it only after we finish to recover...
I am not sure what are the consequences of the sendUpdatePartitionInfoRpcCall
call, so would appreciate another opinion here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @dawidwys is partially right. The logic currently is different compared to what it used to be. Also I think now it's wrong if we ever enter this method for unaligned checkpoints. With UC, consumer might be in RECOVERING
state and already it might need partition informations to continue recovering.
So all in all IMO the original change as proposed by @akalash made the most sense for me. If consumer is deploying, just cache the update. If consumer is running or recovering, send the RPC - in both cases StreamTask
/Task
will be able to process the request. Also only such change would preserve the previous behaviour, where on master RECOVERING
is essentially/implicitly part of the RUNNING
state.
If I understand this PR well, after the PR sendUpdatePartitionInfoRpcCall will be send twice. When switching to RECOVERING and a second time when switching over to RUNNING.
I don't think so. Here we will/we should cache the update if consumer is DEPLOYING
or send it to the consumer if it's RECOVERING
or RUNNING
. Cached updates are sent to the consumer once consumer switches from DEPLOYING
to RECOVERING
in lines 1162-1163:
if (switchTo(DEPLOYING, RECOVERING)) {
sendPartitionInfos();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, right now I think right now that @dawidwys was initially right and we should just replace RUNNING by RECOVERING. But as more safe changes I can return RECOVERING into this condition.
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
Show resolved
Hide resolved
@@ -505,6 +505,7 @@ AbstractInvokable getInvokable() { | |||
public boolean isBackPressured() { | |||
if (invokable == null | |||
|| consumableNotifyingPartitionWriters.length == 0 | |||
|| executionState != ExecutionState.RECOVERING |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be || (executionState != ExecutionState.RECOVERING && executionState != ExecutionState.RUNNING)
?
The way it is now, one of the two last conditions is always true. Can we add a test for it if we don't have one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, you absolutely right. My mistake.
@@ -224,16 +224,21 @@ public static void canceledExecution( | |||
public static void setExecutionToRunning( | |||
DefaultScheduler scheduler, JobVertexID jvid, int subtask) { | |||
final ExecutionAttemptID attemptID = getAttemptId(scheduler, jvid, subtask); | |||
scheduler.updateTaskExecutionState( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be better to have a separate setExecutionToRecovering
? We do not switch over multiple states in any of the other methods and it might be surprising for future users of this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this method more general.
@@ -740,7 +740,7 @@ | |||
}, | |||
"status" : { | |||
"type" : "string", | |||
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ] | |||
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RECOVERING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure myself. @zentol WDYT?
@@ -52,6 +52,9 @@ | |||
|
|||
DEPLOYING, | |||
|
|||
/** Restoring last possible valid state of the task if it has it. */ | |||
RECOVERING, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for compatibility purposes it could make sense to move this to the very end, to ensure ordinals from previous jobs still map to the correct state.
Hmm... there are test failures in REST API tests. Could you take a look at those @akalash ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now the test failure is unrelated. LGTM, merging.
This would be a breaking change for the history server, because |
@stormy-ua The history server only serves json files; it never de-serializes anything. The deserializer is only used, if at all, by the Flink client, for which we do not provide any compatibility guarantees across versions. |
Well I'll be damned, we actually do deserialize JobDetails in the HS, and it does run into an NPE. |
I created this ticket - https://issues.apache.org/jira/browse/FLINK-23034. And I already have a PR - #16199. @zentol if you have time can you, please, review it? (it is a really easy fix). |
…COVERING' in order to distinguish when the task is really running
What is the purpose of the change
This pull request separates ExecutionState#RUNNING into two differents states RUNNING and RECOVERING. It needs to be done because right now when the task switch to RUUNING state it can do a recovery for some time but from the user side, it looks like the task does nothing.
Brief change log
Verifying this change
This change is already covered by existing tests with minor changes, such as TaskTest, ExecutionGraph*.
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation