diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index dbce6626fd3ee..f9e091d948751 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -89,7 +89,6 @@ import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TernaryBoolean; import org.apache.flink.util.concurrent.FutureUtils; -import org.apache.flink.util.concurrent.FutureUtils.ConjunctFuture; import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; import org.slf4j.Logger; @@ -114,6 +113,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -953,36 +953,25 @@ public void cancel() { || current == JobStatus.CREATED || current == JobStatus.RESTARTING) { if (transitionState(current, JobStatus.CANCELLING)) { - - incrementRestarts(); - - final CompletableFuture ongoingSchedulingFuture = schedulingFuture; - - // cancel ongoing scheduling action - if (ongoingSchedulingFuture != null) { - ongoingSchedulingFuture.cancel(false); - } - - final ConjunctFuture allTerminal = cancelVerticesAsync(); - allTerminal.whenComplete( - (Void value, Throwable throwable) -> { - if (throwable != null) { - transitionState( - JobStatus.CANCELLING, - JobStatus.FAILED, - new FlinkException( - "Could not cancel job " - + getJobName() - + " because not all execution job vertices could be cancelled.", - throwable)); - } else { - // cancellations may currently be overridden by failures which - // trigger - // restarts, so we need to pass a proper restart global version - // here - allVerticesInTerminalState(); - } - }); + resetExecutionGraph(ExecutionJobVertex::cancelWithFuture) + .whenComplete( + (Void value, Throwable throwable) -> { + if (throwable != null) { + transitionState( + JobStatus.CANCELLING, + JobStatus.FAILED, + new FlinkException( + "Could not cancel job " + + getJobName() + + " because not all execution job vertices could be cancelled.", + throwable)); + } else { + // cancellations may currently be overridden by failures + // which trigger restarts, so we need to pass a proper + // restart global version here + allVerticesInTerminalState(); + } + }); return; } @@ -1000,18 +989,26 @@ else if (current == JobStatus.FAILING) { } } - @VisibleForTesting - protected ConjunctFuture cancelVerticesAsync() { - final ArrayList> futures = - new ArrayList<>(verticesInCreationOrder.size()); + private CompletableFuture resetExecutionGraph( + Function> perVertexOperationAsync) { + assertRunningInJobMasterMainThread(); - // cancel all tasks (that still need cancelling) - for (ExecutionJobVertex ejv : verticesInCreationOrder) { - futures.add(ejv.cancelWithFuture()); + incrementRestarts(); + + // cancel ongoing scheduling action + if (schedulingFuture != null) { + schedulingFuture.cancel(false); } - // we build a future that is complete once all vertices have reached a terminal state - return FutureUtils.waitForAll(futures); + return applyToVertexAsync(perVertexOperationAsync); + } + + private CompletableFuture applyToVertexAsync( + Function> perVertexOperationAsync) { + return FutureUtils.waitForAll( + verticesInCreationOrder.stream() + .map(perVertexOperationAsync) + .collect(Collectors.toList())); } @Override @@ -1025,21 +1022,8 @@ public void suspend(Throwable suspensionCause) { } else if (transitionState(state, JobStatus.SUSPENDED, suspensionCause)) { initFailureCause(suspensionCause, System.currentTimeMillis()); - incrementRestarts(); - - // cancel ongoing scheduling action - if (schedulingFuture != null) { - schedulingFuture.cancel(false); - } - final ArrayList> executionJobVertexTerminationFutures = - new ArrayList<>(verticesInCreationOrder.size()); - - for (ExecutionJobVertex ejv : verticesInCreationOrder) { - executionJobVertexTerminationFutures.add(ejv.suspend()); - } - - final ConjunctFuture jobVerticesTerminationFuture = - FutureUtils.waitForAll(executionJobVertexTerminationFutures); + final CompletableFuture jobVerticesTerminationFuture = + resetExecutionGraph(ExecutionJobVertex::suspend); checkState(jobVerticesTerminationFuture.isDone(), "Suspend needs to happen atomically"); @@ -1313,7 +1297,7 @@ public void failJob(Throwable cause, long timestamp) { initFailureCause(cause, timestamp); FutureUtils.assertNoException( - cancelVerticesAsync() + applyToVertexAsync(ExecutionJobVertex::cancelWithFuture) .whenComplete( (aVoid, throwable) -> { if (transitionState(