Skip to content

Commit

Permalink
[FLINK-15467][task] Wait in Task.cancelInvokable for completion
Browse files Browse the repository at this point in the history
Task.invokableCancelFuture is added instead of just joining the source thread in cancelTask.
This is because cancelTask can be called from TaskCanceller thread,
while resources are freed by Task.executingThread or upon it's death.
  • Loading branch information
rkhachatryan committed Jun 19, 2020
1 parent 83a9427 commit 35c875a
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 5 deletions.
Expand Up @@ -93,4 +93,9 @@ public static Deadline now() {
public static Deadline fromNow(Duration duration) {
return new Deadline(Math.addExact(System.nanoTime(), duration.toNanos()));
}

@Override
public String toString() {
return timeNanos + " ns";
}
}
Expand Up @@ -101,6 +101,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -256,7 +257,8 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
// proper happens-before semantics on parallel modification
// ------------------------------------------------------------------------

/** atomic flag that makes sure the invokable is canceled exactly once upon error. */
/** atomic flag that makes sure the invokable is canceled exactly once upon error.
* NOTE: whenever this variable is set {@link #invokableCancelFuture} must be completed. */
private final AtomicBoolean invokableHasBeenCanceled;

/** The invokable of this task, if initialized. All accesses must copy the reference and
Expand All @@ -279,6 +281,8 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
/** This class loader should be set as the context class loader for threads that may dynamically load user code. */
private ClassLoader userCodeClassLoader;

private final CompletableFuture<Void> invokableCancelFuture = new CompletableFuture<>();

/**
* <p><b>IMPORTANT:</b> This constructor may not start any work that would need to
* be undone in the case of a failing task deployment.</p>
Expand Down Expand Up @@ -819,6 +823,8 @@ else if (transitionState(current, ExecutionState.FAILED, t)) {
}
finally {
try {
waitInvokableCancelCompletion();

LOG.info("Freeing task resources for {} ({}).", taskNameWithSubtask, executionId);

// clear the reference to the invokable. this helps guard against holding references
Expand Down Expand Up @@ -861,6 +867,21 @@ else if (transitionState(current, ExecutionState.FAILED, t)) {
}
}

private void waitInvokableCancelCompletion() throws ExecutionException {
if (!invokableHasBeenCanceled.get()) {
return;
}
LOG.debug("Waiting for invokable cancellation");

while (!invokableCancelFuture.isDone()) {
try {
invokableCancelFuture.get();
} catch (InterruptedException e) {
// expect interrupts from TaskCanceler and TaskInterrupter
}
}
}

@VisibleForTesting
public static void setupPartitionsAndGates(
ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException {
Expand Down Expand Up @@ -1059,7 +1080,7 @@ else if (current == ExecutionState.RUNNING) {
// case the canceling could not continue

// The canceller calls cancel and interrupts the executing thread once
Runnable canceler = new TaskCanceler(LOG, this::closeNetworkResources, invokable, executingThread, taskNameWithSubtask);
Runnable canceler = new TaskCanceler(LOG, this::closeNetworkResources, invokable, executingThread, taskNameWithSubtask, invokableCancelFuture);

Thread cancelThread = new Thread(
executingThread.getThreadGroup(),
Expand Down Expand Up @@ -1285,7 +1306,7 @@ private void cancelInvokable(AbstractInvokable invokable) {
// in case of an exception during execution, we still call "cancel()" on the task
if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
try {
invokable.cancel();
FutureUtils.forward(invokable.cancel(), invokableCancelFuture);
}
catch (Throwable t) {
LOG.error("Error while canceling task {}.", taskNameWithSubtask, t);
Expand Down Expand Up @@ -1409,13 +1430,15 @@ private static class TaskCanceler implements Runnable {
private final AbstractInvokable invokable;
private final Thread executer;
private final String taskName;
private final CompletableFuture<Void> completionFuture;

TaskCanceler(Logger logger, Runnable networkResourcesCloser, AbstractInvokable invokable, Thread executer, String taskName) {
TaskCanceler(Logger logger, Runnable networkResourcesCloser, AbstractInvokable invokable, Thread executer, String taskName, CompletableFuture<Void> completionFuture) {
this.logger = logger;
this.networkResourcesCloser = networkResourcesCloser;
this.invokable = invokable;
this.executer = executer;
this.taskName = taskName;
this.completionFuture = completionFuture;
}

@Override
Expand All @@ -1424,7 +1447,7 @@ public void run() {
// the user-defined cancel method may throw errors.
// we need do continue despite that
try {
invokable.cancel();
FutureUtils.forward(invokable.cancel(), completionFuture);
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalError(t);
logger.error("Error while canceling the task {}.", taskName, t);
Expand Down
Expand Up @@ -92,6 +92,7 @@ public class TaskTest extends TestLogger {

private static OneShotLatch awaitLatch;
private static OneShotLatch triggerLatch;
private static CompletableFuture<Void> cancelCompletionFuture;

private ShuffleEnvironment<?, ?> shuffleEnvironment;

Expand All @@ -102,6 +103,7 @@ public class TaskTest extends TestLogger {
public void setup() {
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
cancelCompletionFuture = new CompletableFuture<>();

shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
}
Expand Down Expand Up @@ -140,6 +142,33 @@ public void testRegularExecution() throws Exception {
taskManagerActions.validateListenerMessage(ExecutionState.FINISHED, task, null);
}

@Test
public void testTaskTerminatesOnlyAfterInvokable() throws Exception {
Task task = createTaskBuilder()
.setInvokable(InvokableBlockingInCancelFuture.class)
.setTaskManagerActions(new ProhibitFatalErrorTaskManagerActions())
.build();

task.startTaskThread();

awaitLatch.await();
task.cancelExecution();

triggerLatch.trigger();
while (!task.getExecutionState().isTerminal() && task.getExecutingThread().isAlive()) {
task.getExecutingThread().join(10);
}
assertEquals(ExecutionState.CANCELED, task.getExecutionState());
assertFalse(task.getTerminationFuture().isDone());

task.getExecutingThread().join(500); // allow some arbitrary delay for potential termination and re-check
assertFalse(task.getTerminationFuture().isDone());

cancelCompletionFuture.complete(null);
task.getExecutingThread().join();
assertTrue(task.getTerminationFuture().isDone());
}

@Test
public void testCancelRightAway() throws Exception {
final Task task = createTaskBuilder().build();
Expand Down Expand Up @@ -1164,6 +1193,27 @@ public CompletableFuture<Void> cancel() {
}
}

/**
* {@link AbstractInvokable} which blocks in future returned from cancel.
*/
public static final class InvokableBlockingInCancelFuture extends AbstractInvokable {

public InvokableBlockingInCancelFuture(Environment environment) {
super(environment);
}

@Override
public void invoke() throws InterruptedException {
awaitLatch.trigger();
triggerLatch.await();
}

@Override
public CompletableFuture<Void> cancel() {
return cancelCompletionFuture;
}
}

/**
* {@link AbstractInvokable} which blocks in cancel and is not interruptible.
*/
Expand Down

0 comments on commit 35c875a

Please sign in to comment.