Skip to content

Commit

Permalink
[FLINK-2040] [runtime] Tolerate out-of-order CANCELING and CANCELED m…
Browse files Browse the repository at this point in the history
…essages.
  • Loading branch information
StephanEwen committed May 19, 2015
1 parent ea23f28 commit 8aad7af
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 17 deletions.
Expand Up @@ -745,9 +745,7 @@ private void notifyFatalError(String message, Throwable cause) {
*/ */
public void cancelExecution() { public void cancelExecution() {
LOG.info("Attempting to cancel task " + taskNameWithSubtask); LOG.info("Attempting to cancel task " + taskNameWithSubtask);
if (cancelOrFailAndCancelInvokable(ExecutionState.CANCELING)) { cancelOrFailAndCancelInvokable(ExecutionState.CANCELING, null);
notifyObservers(ExecutionState.CANCELING, null);
}
} }


/** /**
Expand All @@ -761,34 +759,36 @@ public void cancelExecution() {
*/ */
public void failExternally(Throwable cause) { public void failExternally(Throwable cause) {
LOG.info("Attempting to fail task externally " + taskNameWithSubtask); LOG.info("Attempting to fail task externally " + taskNameWithSubtask);
if (cancelOrFailAndCancelInvokable(ExecutionState.FAILED)) { cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause);
failureCause = cause;
notifyObservers(ExecutionState.FAILED, cause);
}
} }


private boolean cancelOrFailAndCancelInvokable(ExecutionState targetState) { private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwable cause) {
while (true) { while (true) {
ExecutionState current = this.executionState; ExecutionState current = this.executionState;


// if the task is already canceled (or canceling) or finished or failed, // if the task is already canceled (or canceling) or finished or failed,
// then we need not do anything // then we need not do anything
if (current.isTerminal() || current == ExecutionState.CANCELING) { if (current.isTerminal() || current == ExecutionState.CANCELING) {
return false; LOG.info("Task " + taskNameWithSubtask + " is already in state " + current);
return;
} }


if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) { if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) {
if (STATE_UPDATER.compareAndSet(this, current, targetState)) { if (STATE_UPDATER.compareAndSet(this, current, targetState)) {
// if we manage this state transition, then the invokable gets never called // if we manage this state transition, then the invokable gets never called
// we need not call cancel on it // we need not call cancel on it
return true; this.failureCause = cause;
notifyObservers(targetState, cause);
return;
} }
} }
else if (current == ExecutionState.RUNNING) { else if (current == ExecutionState.RUNNING) {
if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, targetState)) { if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, targetState)) {
// we are canceling / failing out of the running state // we are canceling / failing out of the running state
// we need to cancel the invokable // we need to cancel the invokable
if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) { if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
this.failureCause = cause;
notifyObservers(targetState, cause);
LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId); LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId);


// because the canceling may block on user code, we cancel from a separate thread // because the canceling may block on user code, we cancel from a separate thread
Expand All @@ -799,7 +799,7 @@ else if (current == ExecutionState.RUNNING) {
"Canceler for " + taskNameWithSubtask); "Canceler for " + taskNameWithSubtask);
cancelThread.start(); cancelThread.start();
} }
return true; return;
} }
} }
else { else {
Expand Down
Expand Up @@ -365,8 +365,7 @@ public void testCancelDuringRegisterInputOutput() {
assertNull(task.getFailureCause()); assertNull(task.getFailureCause());


validateUnregisterTask(task.getExecutionId()); validateUnregisterTask(task.getExecutionId());
validateListenerMessage(ExecutionState.CANCELING, task, false); validateCancelingAndCanceledListenerMessage(task);
validateListenerMessage(ExecutionState.CANCELED, task, false);
} }
catch (Exception e) { catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
Expand Down Expand Up @@ -431,8 +430,7 @@ public void testCancelDuringInvoke() {
validateUnregisterTask(task.getExecutionId()); validateUnregisterTask(task.getExecutionId());


validateListenerMessage(ExecutionState.RUNNING, task, false); validateListenerMessage(ExecutionState.RUNNING, task, false);
validateListenerMessage(ExecutionState.CANCELING, task, false); validateCancelingAndCanceledListenerMessage(task);
validateListenerMessage(ExecutionState.CANCELED, task, false);
} }
catch (Exception e) { catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
Expand Down Expand Up @@ -553,8 +551,7 @@ public void testExecutionFailesAfterCanceling() {
validateUnregisterTask(task.getExecutionId()); validateUnregisterTask(task.getExecutionId());


validateListenerMessage(ExecutionState.RUNNING, task, false); validateListenerMessage(ExecutionState.RUNNING, task, false);
validateListenerMessage(ExecutionState.CANCELING, task, false); validateCancelingAndCanceledListenerMessage(task);
validateListenerMessage(ExecutionState.CANCELED, task, false);
} }
catch (Exception e) { catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
Expand Down Expand Up @@ -721,6 +718,46 @@ private void validateListenerMessage(ExecutionState state, Task task, boolean ha
fail("interrupted"); fail("interrupted");
} }
} }

private void validateCancelingAndCanceledListenerMessage(Task task) {
try {
// we may have to wait for a bit to give the actors time to receive the message
// and put it into the queue
TaskMessages.UpdateTaskExecutionState message1 =
(TaskMessages.UpdateTaskExecutionState) listenerMessages.poll(10, TimeUnit.SECONDS);
TaskMessages.UpdateTaskExecutionState message2 =
(TaskMessages.UpdateTaskExecutionState) listenerMessages.poll(10, TimeUnit.SECONDS);


assertNotNull("There is no additional listener message", message1);
assertNotNull("There is no additional listener message", message2);

TaskExecutionState taskState1 = message1.taskExecutionState();
TaskExecutionState taskState2 = message2.taskExecutionState();

assertEquals(task.getJobID(), taskState1.getJobID());
assertEquals(task.getJobID(), taskState2.getJobID());
assertEquals(task.getExecutionId(), taskState1.getID());
assertEquals(task.getExecutionId(), taskState2.getID());

ExecutionState state1 = taskState1.getExecutionState();
ExecutionState state2 = taskState2.getExecutionState();

// it may be (very rarely) that the following race happens:
// - OUTSIDE THREAD: call to cancel()
// - OUTSIDE THREAD: atomic state change from running to canceling
// - TASK THREAD: finishes, atomic change from canceling to canceled
// - TASK THREAD: send notification that state is canceled
// - OUTSIDE THREAD: send notification that state is canceling

// for that reason, we allow the notification messages in any order.
assertTrue( (state1 == ExecutionState.CANCELING && state2 == ExecutionState.CANCELED) ||
(state2 == ExecutionState.CANCELING && state1 == ExecutionState.CANCELED));
}
catch (InterruptedException e) {
fail("interrupted");
}
}


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
// Mock invokable code // Mock invokable code
Expand Down

0 comments on commit 8aad7af

Please sign in to comment.