Skip to content

Commit

Permalink
[hotfix][runtime] Refactors suspend and cancel logic
Browse files Browse the repository at this point in the history
suspend and cancel reset the ExecutionGraph in a similar way. I move the common logic into its own method to make this more prominent in the code.
  • Loading branch information
XComp committed Feb 27, 2024
1 parent 384f6b2 commit 320d612
Showing 1 changed file with 40 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.FutureUtils.ConjunctFuture;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;

import org.slf4j.Logger;
Expand All @@ -114,6 +113,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -953,36 +953,25 @@ public void cancel() {
|| current == JobStatus.CREATED
|| current == JobStatus.RESTARTING) {
if (transitionState(current, JobStatus.CANCELLING)) {

incrementRestarts();

final CompletableFuture<Void> ongoingSchedulingFuture = schedulingFuture;

// cancel ongoing scheduling action
if (ongoingSchedulingFuture != null) {
ongoingSchedulingFuture.cancel(false);
}

final ConjunctFuture<Void> allTerminal = cancelVerticesAsync();
allTerminal.whenComplete(
(Void value, Throwable throwable) -> {
if (throwable != null) {
transitionState(
JobStatus.CANCELLING,
JobStatus.FAILED,
new FlinkException(
"Could not cancel job "
+ getJobName()
+ " because not all execution job vertices could be cancelled.",
throwable));
} else {
// cancellations may currently be overridden by failures which
// trigger
// restarts, so we need to pass a proper restart global version
// here
allVerticesInTerminalState();
}
});
resetExecutionGraph(ExecutionJobVertex::cancelWithFuture)
.whenComplete(
(Void value, Throwable throwable) -> {
if (throwable != null) {
transitionState(
JobStatus.CANCELLING,
JobStatus.FAILED,
new FlinkException(
"Could not cancel job "
+ getJobName()
+ " because not all execution job vertices could be cancelled.",
throwable));
} else {
// cancellations may currently be overridden by failures
// which trigger restarts, so we need to pass a proper
// restart global version here
allVerticesInTerminalState();
}
});

return;
}
Expand All @@ -1000,18 +989,26 @@ else if (current == JobStatus.FAILING) {
}
}

@VisibleForTesting
protected ConjunctFuture<Void> cancelVerticesAsync() {
final ArrayList<CompletableFuture<?>> futures =
new ArrayList<>(verticesInCreationOrder.size());
private CompletableFuture<Void> resetExecutionGraph(
Function<ExecutionJobVertex, CompletableFuture<Void>> perVertexOperationAsync) {
assertRunningInJobMasterMainThread();

// cancel all tasks (that still need cancelling)
for (ExecutionJobVertex ejv : verticesInCreationOrder) {
futures.add(ejv.cancelWithFuture());
incrementRestarts();

// cancel ongoing scheduling action
if (schedulingFuture != null) {
schedulingFuture.cancel(false);
}

// we build a future that is complete once all vertices have reached a terminal state
return FutureUtils.waitForAll(futures);
return applyToVertexAsync(perVertexOperationAsync);
}

private CompletableFuture<Void> applyToVertexAsync(
Function<ExecutionJobVertex, CompletableFuture<Void>> perVertexOperationAsync) {
return FutureUtils.waitForAll(
verticesInCreationOrder.stream()
.map(perVertexOperationAsync)
.collect(Collectors.toList()));
}

@Override
Expand All @@ -1025,21 +1022,8 @@ public void suspend(Throwable suspensionCause) {
} else if (transitionState(state, JobStatus.SUSPENDED, suspensionCause)) {
initFailureCause(suspensionCause, System.currentTimeMillis());

incrementRestarts();

// cancel ongoing scheduling action
if (schedulingFuture != null) {
schedulingFuture.cancel(false);
}
final ArrayList<CompletableFuture<Void>> executionJobVertexTerminationFutures =
new ArrayList<>(verticesInCreationOrder.size());

for (ExecutionJobVertex ejv : verticesInCreationOrder) {
executionJobVertexTerminationFutures.add(ejv.suspend());
}

final ConjunctFuture<Void> jobVerticesTerminationFuture =
FutureUtils.waitForAll(executionJobVertexTerminationFutures);
final CompletableFuture<Void> jobVerticesTerminationFuture =
resetExecutionGraph(ExecutionJobVertex::suspend);

checkState(jobVerticesTerminationFuture.isDone(), "Suspend needs to happen atomically");

Expand Down Expand Up @@ -1313,7 +1297,7 @@ public void failJob(Throwable cause, long timestamp) {
initFailureCause(cause, timestamp);

FutureUtils.assertNoException(
cancelVerticesAsync()
applyToVertexAsync(ExecutionJobVertex::cancelWithFuture)
.whenComplete(
(aVoid, throwable) -> {
if (transitionState(
Expand Down

0 comments on commit 320d612

Please sign in to comment.