-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler #8922
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
| final List<ExecutionVertex> executionVertices = StreamSupport | ||
| .stream(getAllExecutionVertices().spliterator(), false) | ||
| .collect(Collectors.toList()); | ||
| return SchedulingUtils.scheduleLazy(executionVertices, this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SchedulingUtils could accept Iterable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| return new RestartAllStrategy.Factory(); | ||
|
|
||
| case PIPELINED_REGION_RESTART_STRATEGY_NAME: | ||
| return new AdaptedRestartPipelinedRegionStrategyNG.Factory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| } | ||
|
|
||
| private static void assertNoPendingCheckpoints(final CheckpointCoordinator checkpointCoordinator) { | ||
| assertThat(checkpointCoordinator.getPendingCheckpoints().keySet(), hasSize(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe also check that CheckpointCoordinator#getSuccessfulCheckpoints() is empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's not needed since we are not acknowledging any checkpoints from the executions.
| assertThat(failoverStrategy.getLastTasksToCancel(), containsInAnyOrder( | ||
| new ExecutionVertexID(ev11.getJobvertexId(), 0), | ||
| new ExecutionVertexID(ev21.getJobvertexId(), 0), | ||
| new ExecutionVertexID(ev21.getJobvertexId(), 1))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be easier to understand if this were ev22.getJobvertexId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true, I am waiting for the PR that enables me to write ev22.getID()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Commit cherry picked from #8804.
| ev12.getCurrentExecutionAttempt().markFinished(); | ||
|
|
||
| // force FINISHED ev11 to fail to reset its partition | ||
| strategy.onTaskFailure(ev11.getCurrentExecutionAttempt(), new FlinkException("Fail for testing")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe this scenario to be possible in production. FAILED to FINISHED is not a valid state transition. Could we not fail one of the consumers instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's delete the test. The adapter is not calling IntermediateResultPartition#resetForNewExecution()
| final List<ExecutionVertex> executionVertices = StreamSupport | ||
| .stream(getAllExecutionVertices().spliterator(), false) | ||
| .collect(Collectors.toList()); | ||
| return SchedulingUtils.scheduleLazy(executionVertices, this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed
|
|
||
| private final SlotProvider delegate; | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicated empty line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, fixed
|
|
||
| final FailoverStrategy failoverStrategy = executionGraph.getFailoverStrategy(); | ||
| failoverStrategy.onTaskFailure(firstVertex.getCurrentExecutionAttempt(), new Exception("Test Exception")); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
manualMainThreadExecutor.triggerAll() is needed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't hurt but not needed since we are only interested in the state transition to ExecutionState.CANCELED.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The vertex directly transitions to CANCELED because it was no scheduled yet and the graph gets cancelled. It does not prove whether the local failover is skipped. (Even though a local failover is not skipped, the recovery action will be queued in main thread executor and not able to change the vertex state out from CANCELED).
If we want to verify "skip local failover if the job status is not Running", how about trigger task failure directly without graph.cancel()? In this case, the local failover will be skipped since the graph is in CREATED state, and the vertex will remain in CREATED state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add triggerAll() since it is a more realistic scenario, and it is less invasive (createExecutionGraph() already schedules the ExecutionGraph).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
|
||
| // trigger downstream regions to schedule | ||
| testingMainThreadExecutor.execute(() -> { | ||
| componentMainThreadExecutor.execute(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can invoke markFinished directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed it.
|
rebased to master |
| /** Config name for the {@link AdaptedRestartPipelinedRegionStrategyNG} */ | ||
| public static final String PIPELINED_REGION_RESTART_STRATEGY_NAME = "region"; | ||
|
|
||
| /** Config name for the {@link RestartPipelinedRegionStrategy} */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A "." is needed at the end of the statement.
Also for the other 3 config name comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SchedulingUtils.java
Show resolved
Hide resolved
| } | ||
|
|
||
| /** | ||
| * Schedule vertices lazy. That means all vertices will be scheduled at once. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eagerly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SchedulingUtils.java
Outdated
Show resolved
Hide resolved
...rg/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
Show resolved
Hide resolved
...rg/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
Outdated
Show resolved
Hide resolved
...rg/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /** | ||
| * Adapts {@link ScheduledExecutor} to {@link ComponentMainThreadExecutor}. | ||
| */ | ||
| public class ScheduledExecutorToComponentMainThreadExecutorAdapter implements ComponentMainThreadExecutor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this class duplicates ComponentMainThreadExecutorServiceAdapter. A related class is TestingComponentMainThreadExecutorServiceAdapter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so.
ComponentMainThreadExecutorServiceAdapter adapts ScheduledExecutorService. ScheduledExecutor is not a ScheduledExecutorService. There is no adapter from ScheduledExecutor to ScheduledExecutorService.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But there should be ScheduledExecutorServiceAdapter for ScheduledExecutorService -> ScheduledExecutor, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made ComponentMainThreadExecutorServiceAdapter accept ScheduledExecutor and deleted my adapter. However, there are more pending clean ups that require a separate PR – for example, it does not make sense that TestingComponentMainThreadExecutorServiceAdapter extends ComponentMainThreadExecutorServiceAdapter.
|
rebased to master |
… from ManuallyTriggeredScheduledExecutor
|
Rebased to master and squashed commits. Merging as soon as build is green. |
…pt ScheduledExecutor
…gacy scheduling Implement adapter (AdaptedRestartPipelinedRegionStrategyNG) that adapts org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy to the legacy failover strategy interface (org.apache.flink.runtime.executiongraph.failover.FailoverStrategy). The new AdaptedRestartPipelinedRegionStrategyNG is chosen if config option jobmanager.execution.failover-strategy is set to "region". The legacy behavior can be enabled by setting the config option to "region-legacy".
What is the purpose of the change
Based on #8851
Brief change log
Verifying this change
This change is already covered by existing tests, such as RegionFailoverITCase.
This change added tests and can be verified as follows:
AdaptedRestartPipelinedRegionStrategyNGDoes this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation