Skip to content

Commit

Permalink
[FLINK-8732] [flip6] Cancel ongoing scheduling operation
Browse files Browse the repository at this point in the history
Keeps track of ongoing scheduling operations in the ExecutionGraph and cancels
them in case of a concurrent cancel, suspend or fail call. This makes sure that
the original cause for termination is maintained.

This closes #5548.
  • Loading branch information
tillrohrmann committed Feb 22, 2018
1 parent 1e315f0 commit 519639c
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 43 deletions.
Expand Up @@ -358,7 +358,7 @@ public CompletableFuture<?> getReleaseFuture() {
// Actions
// --------------------------------------------------------------------------------------------

public boolean scheduleForExecution() {
public CompletableFuture<Void> scheduleForExecution() {
final ExecutionGraph executionGraph = getVertex().getExecutionGraph();
final SlotProvider resourceProvider = executionGraph.getSlotProvider();
final boolean allowQueued = executionGraph.isQueuedSchedulingAllowed();
Expand All @@ -377,14 +377,14 @@ public boolean scheduleForExecution() {
* @param queued Flag to indicate whether the scheduler may queue this task if it cannot
* immediately deploy it.
* @param locationPreferenceConstraint constraint for the location preferences
* @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
* @return Future which is completed once the Execution has been deployed
*/
public boolean scheduleForExecution(
public CompletableFuture<Void> scheduleForExecution(
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint locationPreferenceConstraint) {
final Time allocationTimeout = vertex.getExecutionGraph().getAllocationTimeout();
try {
final Time allocationTimeout = vertex.getExecutionGraph().getAllocationTimeout();
final CompletableFuture<Execution> allocationFuture = allocateAndAssignSlotForExecution(
slotProvider,
queued,
Expand All @@ -395,11 +395,10 @@ public boolean scheduleForExecution(
// that we directly deploy the tasks if the slot allocation future is completed. This is
// necessary for immediate deployment.
final CompletableFuture<Void> deploymentFuture = allocationFuture.handle(
(Execution ignored, Throwable throwable) -> {
(Execution ignored, Throwable throwable) -> {
if (throwable != null) {
markFailed(ExceptionUtils.stripCompletionException(throwable));
}
else {
} else {
try {
deploy();
} catch (Throwable t) {
Expand All @@ -415,10 +414,9 @@ public boolean scheduleForExecution(
allocationFuture.completeExceptionally(new IllegalArgumentException("The slot allocation future has not been completed yet."));
}

return true;
}
catch (IllegalExecutionStateException e) {
return false;
return deploymentFuture;
} catch (IllegalExecutionStateException e) {
return FutureUtils.completedExceptionally(e);
}
}

Expand Down
Expand Up @@ -77,6 +77,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
Expand All @@ -90,6 +92,7 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -270,6 +273,12 @@ public class ExecutionGraph implements AccessExecutionGraph {
* strong reference to any user-defined classes.*/
private volatile ErrorInfo failureInfo;

/**
* Future for an ongoing or completed scheduling action.
*/
@Nullable
private volatile CompletableFuture<Void> schedulingFuture;

// ------ Fields that are relevant to the execution and need to be cleared before archiving -------

/** The coordinator for checkpoints, if snapshot checkpoints are enabled */
Expand Down Expand Up @@ -409,6 +418,8 @@ public ExecutionGraph(
// the failover strategy must be instantiated last, so that the execution graph
// is ready by the time the failover strategy sees it
this.failoverStrategy = checkNotNull(failoverStrategyFactory.create(this), "null failover strategy");

this.schedulingFuture = null;
LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
}

Expand Down Expand Up @@ -857,37 +868,60 @@ public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobExcepti

public void scheduleForExecution() throws JobException {

final long currentGlobalModVersion = globalModVersion;

if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {

final CompletableFuture<Void> newSchedulingFuture;

switch (scheduleMode) {

case LAZY_FROM_SOURCES:
scheduleLazy(slotProvider);
newSchedulingFuture = scheduleLazy(slotProvider);
break;

case EAGER:
scheduleEager(slotProvider, allocationTimeout);
newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
break;

default:
throw new JobException("Schedule mode is invalid.");
}

if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
schedulingFuture = newSchedulingFuture.whenCompleteAsync(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
failGlobal(ExceptionUtils.stripCompletionException(throwable));
}
},
futureExecutor);
} else {
newSchedulingFuture.cancel(false);
}
}
else {
throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
}
}

private void scheduleLazy(SlotProvider slotProvider) {
private CompletableFuture<Void> scheduleLazy(SlotProvider slotProvider) {

final ArrayList<CompletableFuture<Void>> schedulingFutures = new ArrayList<>(numVerticesTotal);

// simply take the vertices without inputs.
for (ExecutionJobVertex ejv : verticesInCreationOrder) {
if (ejv.getJobVertex().isInputVertex()) {
ejv.scheduleAll(
final CompletableFuture<Void> schedulingJobVertexFuture = ejv.scheduleAll(
slotProvider,
allowQueuedScheduling,
LocationPreferenceConstraint.ALL); // since it is an input vertex, the input based location preferences should be empty

schedulingFutures.add(schedulingJobVertexFuture);
}
}

return FutureUtils.waitForAll(schedulingFutures);
}

/**
Expand All @@ -896,8 +930,10 @@ private void scheduleLazy(SlotProvider slotProvider) {
* @param slotProvider The resource provider from which the slots are allocated
* @param timeout The maximum time that the deployment may take, before a
* TimeoutException is thrown.
* @returns Future which is completed once the {@link ExecutionGraph} has been scheduled.
* The future can also be completed exceptionally if an error happened.
*/
private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time timeout) {
checkState(state == JobStatus.RUNNING, "job is not running currently");

// Important: reserve all the space we need up front.
Expand Down Expand Up @@ -925,9 +961,23 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
// the future fails once one slot future fails.
final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);

allAllocationsFuture.whenCompleteAsync(
(Collection<Execution> allAllocations, Throwable throwable) -> {
if (throwable != null) {
return allAllocationsFuture
.thenAccept(
(Collection<Execution> executionsToDeploy) -> {
for (Execution execution : executionsToDeploy) {
try {
execution.deploy();
} catch (Throwable t) {
throw new CompletionException(
new FlinkException(
String.format("Could not deploy execution %s.", execution),
t));
}
}
})
// Generate a more specific failure message for the eager scheduling
.exceptionally(
(Throwable throwable) -> {
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
final Throwable resultThrowable;

Expand All @@ -942,18 +992,8 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
resultThrowable = strippedThrowable;
}

failGlobal(resultThrowable);
} else {
try {
// successfully obtained all slots, now deploy
for (Execution execution : allAllocations) {
execution.deploy();
}
} catch (Throwable t) {
failGlobal(new FlinkException("Could not deploy executions.", t));
}
}
}, futureExecutor);
throw new CompletionException(resultThrowable);
});
}

public void cancel() {
Expand All @@ -966,6 +1006,13 @@ public void cancel() {
// make sure no concurrent local actions interfere with the cancellation
final long globalVersionForRestart = incrementGlobalModVersion();

final CompletableFuture<Void> ongoingSchedulingFuture = schedulingFuture;

// cancel ongoing scheduling action
if (ongoingSchedulingFuture != null) {
ongoingSchedulingFuture.cancel(false);
}

final ArrayList<CompletableFuture<?>> futures = new ArrayList<>(verticesInCreationOrder.size());

// cancel all tasks (that still need cancelling)
Expand Down Expand Up @@ -1057,6 +1104,13 @@ public void suspend(Throwable suspensionCause) {
// make sure no concurrent local actions interfere with the cancellation
incrementGlobalModVersion();

final CompletableFuture<Void> ongoingSchedulingFuture = schedulingFuture;

// cancel ongoing scheduling action
if (ongoingSchedulingFuture != null) {
ongoingSchedulingFuture.cancel(false);
}

for (ExecutionJobVertex ejv: verticesInCreationOrder) {
ejv.cancel();
}
Expand Down Expand Up @@ -1108,6 +1162,13 @@ else if (transitionState(current, JobStatus.FAILING, t)) {
// make sure no concurrent local or global actions interfere with the failover
final long globalVersionForRestart = incrementGlobalModVersion();

final CompletableFuture<Void> ongoingSchedulingFuture = schedulingFuture;

// cancel ongoing scheduling action
if (ongoingSchedulingFuture != null) {
ongoingSchedulingFuture.cancel(false);
}

// we build a future that is complete once all vertices have reached a terminal state
final ArrayList<CompletableFuture<?>> futures = new ArrayList<>(verticesInCreationOrder.size());

Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
Expand Down Expand Up @@ -472,18 +473,23 @@ public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>
* @param slotProvider to allocate the slots from
* @param queued if the allocations can be queued
* @param locationPreferenceConstraint constraint for the location preferences
* @return Future which is completed once all {@link Execution} could be deployed
*/
public void scheduleAll(
public CompletableFuture<Void> scheduleAll(
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint locationPreferenceConstraint) {

final ExecutionVertex[] vertices = this.taskVertices;

final ArrayList<CompletableFuture<Void>> scheduleFutures = new ArrayList<>(vertices.length);

// kick off the tasks
for (ExecutionVertex ev : vertices) {
ev.scheduleForExecution(slotProvider, queued, locationPreferenceConstraint);
scheduleFutures.add(ev.scheduleForExecution(slotProvider, queued, locationPreferenceConstraint));
}

return FutureUtils.waitForAll(scheduleFutures);
}

/**
Expand Down
Expand Up @@ -607,9 +607,10 @@ public Execution resetForNewExecution(final long timestamp, final long originati
* @param slotProvider to allocate the slots from
* @param queued if the allocation can be queued
* @param locationPreferenceConstraint constraint for the location preferences
* @return
* @return Future which is completed once the execution is deployed. The future
* can also completed exceptionally.
*/
public boolean scheduleForExecution(
public CompletableFuture<Void> scheduleForExecution(
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint locationPreferenceConstraint) {
Expand Down
Expand Up @@ -28,8 +28,10 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SimpleSlotContext;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand All @@ -40,8 +42,11 @@
import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
Expand Down Expand Up @@ -412,6 +417,54 @@ public void testEagerSchedulingWithSlotTimeout() throws Exception {
verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
}

/**
* Tests that an ongoing scheduling operation does not fail the {@link ExecutionGraph}
* if it gets concurrently cancelled
*/
@Test
public void testSchedulingOperationCancellationWhenCancel() throws Exception {
final JobVertex jobVertex = new JobVertex("NoOp JobVertex");
jobVertex.setInvokableClass(NoOpInvokable.class);
jobVertex.setParallelism(2);
final JobGraph jobGraph = new JobGraph(jobVertex);
jobGraph.setScheduleMode(ScheduleMode.EAGER);
jobGraph.setAllowQueuedScheduling(true);

final CompletableFuture<LogicalSlot> slotFuture1 = new CompletableFuture<>();
final CompletableFuture<LogicalSlot> slotFuture2 = new CompletableFuture<>();
final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(2);
slotProvider.addSlots(jobVertex.getID(), new CompletableFuture[]{slotFuture1, slotFuture2});
final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider);

executionGraph.scheduleForExecution();

final CompletableFuture<?> releaseFuture = new CompletableFuture<>();

final TestingLogicalSlot slot = new TestingLogicalSlot(
new LocalTaskManagerLocation(),
new SimpleAckingTaskManagerGateway(),
0,
new AllocationID(),
new SlotRequestId(),
new SlotSharingGroupId(),
releaseFuture);
slotFuture1.complete(slot);

// cancel should change the state of all executions to CANCELLED
executionGraph.cancel();

// complete the now CANCELLED execution --> this should cause a failure
slotFuture2.complete(new TestingLogicalSlot());

Thread.sleep(1L);
// release the first slot to finish the cancellation
releaseFuture.complete(null);

// NOTE: This test will only occasionally fail without the fix since there is
// a race between the releaseFuture and the slotFuture2
assertThat(executionGraph.getTerminationFuture().get(), is(JobStatus.CANCELED));
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
Expand Down
Expand Up @@ -304,7 +304,7 @@ public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception

ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];

assertTrue(executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY));
executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY).get();

Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();

Expand Down

0 comments on commit 519639c

Please sign in to comment.