From 43b00592bc4d003295bde98a55fe291350033dd1 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Thu, 28 May 2015 19:40:08 +0200 Subject: [PATCH] [FLINK-2109] [runtime] Fix CancelTaskException handling --- .../flink/runtime/taskmanager/Task.java | 47 +++++++++------ .../flink/runtime/taskmanager/TaskTest.java | 57 ++++++++++++++++++- 2 files changed, 87 insertions(+), 17 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 40198dc94f4a8..625083786dd85 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -537,7 +537,7 @@ else if (current == ExecutionState.CANCELING) { // actual task core work // ---------------------------------------------------------------- - // we must make strictly sure that the invokable is accessible to teh cancel() call + // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; @@ -597,22 +597,25 @@ else if (current == ExecutionState.CANCELING) { // to failExternally() while (true) { ExecutionState current = this.executionState; + if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) { - if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { - // proper failure of the task. record the exception as the root cause - failureCause = t; - notifyObservers(ExecutionState.FAILED, t); - - // in case of an exception during execution, we still call "cancel()" on the task - if (invokable != null && this.invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) { - try { - invokable.cancel(); - } - catch (Throwable t2) { - LOG.error("Error while canceling task " + taskNameWithSubtask, t2); - } + if (t instanceof CancelTaskException) { + if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) { + cancelInvokable(); + + notifyObservers(ExecutionState.CANCELED, null); + break; + } + } + else { + if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { + // proper failure of the task. record the exception as the root cause + failureCause = t; + cancelInvokable(); + + notifyObservers(ExecutionState.FAILED, t); + break; } - break; } } else if (current == ExecutionState.CANCELING) { @@ -746,7 +749,7 @@ public void cancelExecution() { } /** - * Marks task execution failed for an external reason (a reason other than th task code itself + * Marks task execution failed for an external reason (a reason other than the task code itself * throwing an exception). If the task is already in a terminal state * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing. * Otherwise it sets the state to FAILED, and, if the invokable code is running, @@ -962,6 +965,18 @@ private void executeAsyncCallRunnable(Runnable runnable, String callName) { // Utilities // ------------------------------------------------------------------------ + private void cancelInvokable() { + // in case of an exception during execution, we still call "cancel()" on the task + if (invokable != null && this.invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) { + try { + invokable.cancel(); + } + catch (Throwable t) { + LOG.error("Error while canceling task " + taskNameWithSubtask, t); + } + } + } + @Override public String toString() { return getTaskNameWithSubtasks() + " [" + executionState + ']'; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index bcc7ffe917a1e..e9e761c44143b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -604,6 +605,41 @@ public void testExecutionFailsAfterTaskMarkedFailed() { } } + @Test + public void testCancelTaskException() throws Exception { + final Task task = createTask(InvokableWithCancelTaskExceptionInInvoke.class); + + // Cause CancelTaskException. + triggerLatch.trigger(); + + task.run(); + + assertEquals(ExecutionState.CANCELED, task.getExecutionState()); + } + + @Test + public void testCancelTaskExceptionAfterTaskMarkedFailed() throws Exception { + final Task task = createTask(InvokableWithCancelTaskExceptionInInvoke.class); + + task.startTaskThread(); + + // Wait till the task is in invoke. + awaitLatch.await(); + + task.failExternally(new Exception("external")); + assertEquals(ExecutionState.FAILED, task.getExecutionState()); + + // Either we cause the CancelTaskException or the TaskCanceler + // by interrupting the invokable. + triggerLatch.trigger(); + + task.getExecutingThread().join(); + + assertEquals(ExecutionState.FAILED, task.getExecutionState()); + assertTrue(task.isCanceledOrFailed()); + assertTrue(task.getFailureCause().getMessage().contains("external")); + } + @Test public void testOnPartitionStateUpdate() throws Exception { IntermediateDataSetID resultId = new IntermediateDataSetID(); @@ -900,7 +936,7 @@ public void invoke() { // fall through the loop } } - + throw new RuntimeException("test"); } } @@ -940,4 +976,23 @@ public void invoke() throws Exception { } } } + + public static final class InvokableWithCancelTaskExceptionInInvoke extends AbstractInvokable { + + @Override + public void registerInputOutput() { + } + + @Override + public void invoke() throws Exception { + awaitLatch.trigger(); + + try { + triggerLatch.await(); + } + finally { + throw new CancelTaskException(); + } + } + } }