Skip to content
Permalink
Browse files
[FLINK-17295][runtime] Simplify ExecutionVertexID retrieval in scheduler
  • Loading branch information
zhuzhurk committed May 20, 2022
1 parent 13cb93b commit 6ce16d03e5159795f33a28bf35c6f3da262e4926
Showing 2 changed files with 5 additions and 28 deletions.
@@ -119,7 +119,6 @@
import java.util.stream.StreamSupport;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/** Base class which can be used to implement {@link SchedulerNG}. */
public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling {
@@ -539,25 +538,6 @@ protected final void transitionToRunning() {
executionGraph.transitionToRunning();
}

protected Optional<ExecutionVertexID> getExecutionVertexId(
final ExecutionAttemptID executionAttemptId) {
return Optional.ofNullable(executionGraph.getRegisteredExecutions().get(executionAttemptId))
.map(this::getExecutionVertexId);
}

protected ExecutionVertexID getExecutionVertexIdOrThrow(
final ExecutionAttemptID executionAttemptId) {
return getExecutionVertexId(executionAttemptId)
.orElseThrow(
() ->
new IllegalStateException(
"Cannot find execution " + executionAttemptId));
}

private ExecutionVertexID getExecutionVertexId(final Execution execution) {
return execution.getVertex().getID();
}

public ExecutionVertex getExecutionVertex(final ExecutionVertexID executionVertexId) {
return executionGraph
.getAllVertices()
@@ -726,16 +706,14 @@ protected final void archiveFromFailureHandlingResult(
@Override
public final boolean updateTaskExecutionState(
final TaskExecutionStateTransition taskExecutionState) {
final Optional<ExecutionVertexID> executionVertexId =
getExecutionVertexId(taskExecutionState.getID());
final ExecutionVertexID executionVertexId =
taskExecutionState.getID().getExecutionVertexId();

boolean updateSuccess = executionGraph.updateState(taskExecutionState);

if (updateSuccess) {
checkState(executionVertexId.isPresent());

if (isNotifiable(executionVertexId.get(), taskExecutionState)) {
updateTaskExecutionStateInternal(executionVertexId.get(), taskExecutionState);
if (isNotifiable(executionVertexId, taskExecutionState)) {
updateTaskExecutionStateInternal(executionVertexId, taskExecutionState);
}
return true;
} else {
@@ -991,8 +991,7 @@ public void cancelWhileRestartingShouldWaitForRunningTasks() {
vertexIterator.next().getCurrentExecutionAttempt().getAttemptId();
final ExecutionAttemptID attemptId2 =
vertexIterator.next().getCurrentExecutionAttempt().getAttemptId();
final ExecutionVertexID executionVertex2 =
scheduler.getExecutionVertexIdOrThrow(attemptId2);
final ExecutionVertexID executionVertex2 = attemptId2.getExecutionVertexId();

scheduler.updateTaskExecutionState(
new TaskExecutionState(

0 comments on commit 6ce16d0

Please sign in to comment.