-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-21439][core] WIP: Adds Exception History for AdaptiveScheduler #15898
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 699bacf (Tue May 11 18:40:48 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @bytesandwich for your draft. And thanks for your patience. I looked through your proposal and the changes look good already. I had a few remarks and questions which are listed below. Looking forward to your reply.
I didn't go through the StopWithSavepointTest
in detail, yet: Some of the tests are failing. Keep in mind that we also want to cover the other StateWithExecutionGraph
implementations as well for the exception history.
.../java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
Outdated
Show resolved
Hide resolved
final Collection<RootExceptionHistoryEntry> copy = new ArrayList<>(exceptionHistory.size()); | ||
exceptionHistory.forEach(copy::add); | ||
return copy; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could think of moving this logic into BoundedFIFOQueue
considering that SchedulerBase
uses the exact same code. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does making BoundedFIFOQueue
an AbstractQueue and replacing this with new ArrayList{<the queue>)
work?
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
Outdated
Show resolved
Hide resolved
...runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
Outdated
Show resolved
Hide resolved
* The {@link FailureResult} describes how a failure shall be handled. Currently, there are two | ||
* alternatives: Either restarting the job or failing it. | ||
*/ | ||
static final class FailureResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering whether we should introduce a unit test for FailureResult
considering that it becomes more "powerful". And, maybe, moving it into AdaptiveScheduler
might make sense? WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a follow up refactor of handleAnyFailure
etc to be had.
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
Show resolved
Hide resolved
|
||
assertThat(scheduler.howToHandleFailure(new Exception("test")).canRestart(), is(false)); | ||
assertThat( | ||
scheduler.howToHandleFailure(null, new Exception("test")).canRestart(), is(false)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Theoretically, we would have to test passing a non-null value here as well for the failingExecutionVertexId
parameter. Introducing a FailureResultTest
as mentioned above would free us from doing that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing this. Could you move this case into its own test method?
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
Show resolved
Hide resolved
assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start)); | ||
assertThat(failure.getTimestamp(), lessThanOrEqualTo(end)); | ||
assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue())); | ||
assertThat(failure.getFailingTaskName(), Matchers.is(nullValue())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a ExceptionHistoryEntryMatcher
which you could use (and extend) instead. @
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add the code from getFailureTimestamp
in DefaultSchedulerTest
to use that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's a good idea. That makes the test more precise as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @bytesandwich . The changes go into the right direction, I think. 👍 We're still missing the support of concurrent failures. I tried to pin-point this in some of my comments below. Looking forward to your response.
@Nullable private final Duration backoffTime; | ||
|
||
/** | ||
* the {@link ExecutionVertexID} refering to the {@link ExecutionVertex} the failure is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* the {@link ExecutionVertexID} refering to the {@link ExecutionVertex} the failure is | |
* The {@link ExecutionVertexID} refering to the {@link ExecutionVertex} the failure is |
nit
@Nullable | ||
protected ExecutionVertexID getExecutionVertexId(ExecutionAttemptID id) { | ||
Execution execution = getExecutionGraph().getRegisteredExecutions().get(id); | ||
if (execution == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like I missed that last time: This seems to be wrong, doesn't it? This method returning null
would lead to the failure being interpreted as a global one. It feels to be the wrong location for this decision. I'd propose that the method expects the ID to be present. Setting the null
value should be done in the handleGlobalFailure
method explicitly. Alternatively, you could follow what DefaultScheduler
/SchedulerBase
are doing with returning an Optional
and doing the state check in case of an successful update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the same behavior as the original method:
flink/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
Line 519 in d105eb3
return Optional.ofNullable(executionGraph.getRegisteredExecutions().get(executionAttemptId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure whether I understand you correctly: The method implementation exists in SchedulerBase
as well, yes.
* @return The {@code FailureHandlingResultSnapshot}. | ||
*/ | ||
public static FailureHandlingResultSnapshot create( | ||
Optional<ExecutionVertexID> failingExecutionVertexId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passing an Optional
here causes an unnecessary wrapping in StateWithExecutionGraph:325 just to have it unwrapped in the method. Instead, we could do a failureHandlingResult.getExecutionVertexIdOfFailedTask().orElse(null)
in the factory method above and make this parameter @Nullable
.
/** | ||
* Archive the details of an execution failure for future retrieval and inspection. | ||
* | ||
* @param failureHandlingResultSnapshot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @param failureHandlingResultSnapshot | |
* @param failureHandlingResultSnapshot The {@link FailureHandlingResultSnapshot} holding the failure information that needs to be archived. |
nit: just to please the IDE and remove a warning.
|
||
assertThat(scheduler.howToHandleFailure(new Exception("test")).canRestart(), is(false)); | ||
assertThat( | ||
scheduler.howToHandleFailure(null, new Exception("test")).canRestart(), is(false)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing this. Could you move this case into its own test method?
Matchers.is(expectedException)); | ||
assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start)); | ||
assertThat(failure.getTimestamp(), lessThanOrEqualTo(end)); | ||
assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue())); | |
assertThat(failure.getTaskManagerLocation(), is(nullValue())); |
nit: This code imports already org.hamcrest.core.Is.is;
statically. The Matchers.
is not necessary here.
import static org.hamcrest.Matchers.nullValue; | ||
import static org.hamcrest.core.Is.is; | ||
import static org.junit.Assert.assertFalse; | ||
import static org.junit.Assert.assertThat; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I know it's not caused by you, but could you replace the org.junit.Assert.assertThat
import by org.hamcrest.MatcherAssert.assertThat
in a hotfix commit just to have the deprecation warning removed?
assertThat(failure.getTaskManagerLocation(), Matchers.is(nullValue())); | ||
assertThat(failure.getFailingTaskName(), Matchers.is(nullValue())); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add the concurrent failure test here as well? This should fail right now since we're not covering the failure archiving in the restart state.
final FailureResult failureResult = | ||
context.howToHandleFailure(failingExecutionVertexId, cause); | ||
|
||
archiveExecutionFailure(failingExecutionVertexId, cause); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about it once more: I guess, it's not the right location to archive the failure considering that we also want to identify concurrent failures. We haven't addressed that in this PR, yet.
To achieve that, we have to collect the failure snapshot here and pass it over to the next state (failure or restart). Any failure that pops up in these subsequent states has to be collected as well. The archiving should happen when re-instantiating the ExecutionGraph
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also cover this in the corresponding StateWithExecutionGraphTest
test implementations.
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
Show resolved
Hide resolved
@bytesandwich just letting you know: The feature freeze for Flink 1.14 is beginning of August 2021. How realistic is it finish this PR till then? FYI: @zentol might take over as I'm out for the next weeks. |
@bytesandwich ping |
Hi @zentol I just got back from a vacation so I'm looking at this again. I'm not sure what we want specification wise with concurrent failure support. I can imagine all sorts of things failing concurrently. It seems like maybe concurrent failures would be best tested in a more elaborate integration test to have a very clear expectation of correct behavior? Perhaps that could be a follow up ticket to make this current minimal exception handling landable quickly after I address the current feedback? |
Regarding using FYI I uploaded the changes, I had made before vacation, that I think addressed the review. I'd like to see what the integration test runs do, since developing on windows makes it hard to run the build. |
Hi @bytesandwich , I'm back from vacation so I am able to answer your questions. Analogously, that has to be done for cases where the scheduler does not switch into |
The same way, how you get the I hope that helped. Let me know if you have further questions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @bytesandwich. I looked through your questions and changes and responded to all of them. Please see my comments above and below. Feel free to reach out for further questions.
.../java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
Outdated
Show resolved
Hide resolved
This adds a first failing test case for AdaptiveScheduler to return an ExceptionHistory.
Hi @XComp I just switched jobs so I was out for a bit. I see how you envision the test and I implemented it that way. PTAL! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @bytesandwich . The changes look good. We're going into the right direction with it. Great! 👍 I added some comments to your code changes. Please check them below.
I didn't go through the tests, yet. AzureCI seems to be failing quite a bit. Is this related to your changes?
* @param failingExecutionVertexId an {@link ExecutionVertexID} the failure originates from, or | ||
* {@code None}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @param failingExecutionVertexId an {@link ExecutionVertexID} the failure originates from, or | |
* {@code None}. | |
* @param failingExecutionVertexId the {@link ExecutionVertexID} referring to the {@link Execution} that failed. {@code null} should be used in case of a global failure. |
I adapted the text a bit since we're not relying on Optional
anymore, i.e. None
is not exactly correct in this context.
* @param concurrentVertexIds {@link ExecutionVertexID} Task vertices concurrently failing with | ||
* the {@code failingExecutionVertexID}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @param concurrentVertexIds {@link ExecutionVertexID} Task vertices concurrently failing with | |
* the {@code failingExecutionVertexID}. | |
* @param concurrentVertexIds {@link ExecutionVertexID} referring to {@link Execution Executions} that failed while the initial failure of {@code failingExecutionVertexID} was handled. |
public static FailureHandlingResultSnapshot create( | ||
@Nullable ExecutionVertexID failingExecutionVertexId, | ||
Throwable rootCause, | ||
Set<ExecutionVertexID> concurrentVertexIds, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set<ExecutionVertexID> concurrentVertexIds, | |
Set<ExecutionVertexID> concurrentlyFailingExecutionVertexIds, |
nit: a small suggestion to make the parameter more expressive.
} | ||
} | ||
|
||
void maybeArchiveExecutionFailure(TaskExecutionStateTransition taskExecutionStateTransition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
void maybeArchiveExecutionFailure(TaskExecutionStateTransition taskExecutionStateTransition) { | |
void archiveExecutionFailureIfFailed(TaskExecutionStateTransition taskExecutionStateTransition) { |
as an idea to make it more explicit what the method does
|
||
if (taskExecutionStateTransition.getExecutionState() != ExecutionState.FAILED) { | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (taskExecutionStateTransition.getExecutionState() != ExecutionState.FAILED) { | |
return; | |
} | |
if (taskExecutionStateTransition.getExecutionState() != ExecutionState.FAILED) { | |
return; | |
} | |
nit: To visually separate the failed state handling...
} | ||
|
||
void archiveExecutionFailure( | ||
@Nullable ExecutionVertexID failingExecutionVertexId, Throwable cause) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add some JavaDoc here as well to describe the @Nullable
contract?
|
||
private final Logger logger; | ||
|
||
protected final ClassLoader userCodeClassLoader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected final ClassLoader userCodeClassLoader; | |
private final ClassLoader userCodeClassLoader; |
Sharing members with subclasses might not be the best. What about making userCodeClassLoader
private and providing a getError(TaskExecutionStateTransition stateTransition)
in StateWithExecutionGraph
? All subclasses could use that method as well
} | ||
|
||
@Override | ||
boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionStateTransition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not enough to collect only task failures. We also want to collect global failures. Hence, you have to cover the archiving in Restarting.handleGlobalFailure
.
|
||
@Override | ||
boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionStateTransition) { | ||
maybeArchiveExecutionFailure(taskExecutionStateTransition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also want to handle global failures, i.e. we want to extend Failing.handleGlobalFailure
OperatorCoordinatorHandler operatorCoordinatorHandler, | ||
Logger logger) { | ||
super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger); | ||
Logger logger, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for the record: I verified that we do not collect failures while being already in cancelling stage for the DefaultScheduler
(see source). Hence, not implementing the archiving in this class is correct. 👍
@bytesandwich Any update on your side? If not, I might pick it up to finalize it for Flink 1.15 after the upcoming release of 1.14. |
Hi @XComp I think that would be a good idea, unless you feel that the changes you asked for are the final changes to land the diff? I am not working with Flink at my day job at the moment. |
This PR is being marked as stale since it has not had any activity in the last 180 days. If you are having difficulty finding a reviewer, please reach out to the [community](https://flink.apache.org/what-is-flink/community/). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 90 days, it will be automatically closed. |
This PR was superseded by #18689. Closing the PR. |
What is the purpose of the change
This pull request will address saving exception histories in
AdaptiveScheduler
.Brief change log
aptiveSchedulerTest
for 3 conditions: knowing the specific failing task, for not knowing, and for concurrent failures.AdaptiveScheduler
along withrequestJob
archiveExecutionFailure
toStateWithExecutionGraph
to populate the queue inAdaptiveScheduler
along with uses of this method inarchiveAnyFailure
inExecuting
andStopWithSavepoint
.failingExecutionVertexId
toExecuting.FailureResult
.userCodeClassLoader
toStateWithExecutionGraph
Failing
andRestarting
FailureHandlingResultSnapshot
factory method that accepts what we have available in AdaptiveScheduler when we have failures. I think maybe it's a broader type of FailureSnapshot now.Verifying this change
This change adds exception state accumulation to the
AdaptiveScheduler
and has associated tests inAdaptiveSchedulerTest
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation