From 21a17aadbc0d3068e04a3b8dcf7fc7e84f35f9bd Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Wed, 20 Jan 2016 18:09:12 +0100 Subject: [PATCH] [hotfix] Fix interaction of Async calls/checkpointing/canceling Before, it could happen that a Task is canceled during snapshotting. Some State Backends would silently swallow exceptions resulting from this and the Task would get stuck until the cleanup logic gets to it. Now, we rethrow a CancelTaskException if isRunning is false in StreamTask after performing snapshots. This also moves the logic that swallows exceptions in case a task is not running anymore from StreamTask to the async caller in Task. --- .../flink/runtime/taskmanager/Task.java | 14 ++- .../streaming/runtime/tasks/StreamTask.java | 113 +++++++++--------- 2 files changed, 67 insertions(+), 60 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 9cc1be4f03eba..ed322eaaa64fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -882,7 +882,11 @@ public void run() { } } catch (Throwable t) { - failExternally(new RuntimeException("Error while triggering checkpoint for " + taskName, t)); + if (getExecutionState() == ExecutionState.RUNNING) { + failExternally(new RuntimeException( + "Error while triggering checkpoint for " + taskName, + t)); + } } } }; @@ -915,8 +919,12 @@ public void run() { statefulTask.notifyCheckpointComplete(checkpointID); } catch (Throwable t) { - // fail task if checkpoint confirmation failed. - failExternally(new RuntimeException("Error while confirming checkpoint", t)); + if (getExecutionState() == ExecutionState.RUNNING) { + // fail task if checkpoint confirmation failed. + failExternally(new RuntimeException( + "Error while confirming checkpoint", + t)); + } } } }; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 72f74ad85a260..65cfebb28ef03 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; @@ -457,74 +458,72 @@ public boolean triggerCheckpoint(final long checkpointId, final long timestamp) operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp); // now draw the state snapshot - try { - final StreamOperator[] allOperators = operatorChain.getAllOperators(); - final StreamTaskState[] states = new StreamTaskState[allOperators.length]; + final StreamOperator[] allOperators = operatorChain.getAllOperators(); + final StreamTaskState[] states = new StreamTaskState[allOperators.length]; - boolean hasAsyncStates = false; + boolean hasAsyncStates = false; - for (int i = 0; i < states.length; i++) { - StreamOperator operator = allOperators[i]; - if (operator != null) { - StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp); - if (state.getOperatorState() instanceof AsynchronousStateHandle) { - hasAsyncStates = true; - } - if (state.getFunctionState() instanceof AsynchronousStateHandle) { - hasAsyncStates = true; - } - states[i] = state.isEmpty() ? null : state; + for (int i = 0; i < states.length; i++) { + StreamOperator operator = allOperators[i]; + if (operator != null) { + StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp); + if (state.getOperatorState() instanceof AsynchronousStateHandle) { + hasAsyncStates = true; } + if (state.getFunctionState() instanceof AsynchronousStateHandle) { + hasAsyncStates = true; + } + states[i] = state.isEmpty() ? null : state; } + } + if (!isRunning) { + // Rethrow the cancel exception because some state backends could swallow + // exceptions and seem to exit cleanly. + throw new CancelTaskException(); + } - StreamTaskStateList allStates = new StreamTaskStateList(states); - - if (allStates.isEmpty()) { - getEnvironment().acknowledgeCheckpoint(checkpointId); - } else if (!hasAsyncStates) { - getEnvironment().acknowledgeCheckpoint(checkpointId, allStates); - } else { - // start a Thread that does the asynchronous materialization and - // then sends the checkpoint acknowledge - - Thread checkpointThread = new Thread() { - @Override - public void run() { - try { - for (StreamTaskState state : states) { - if (state != null) { - if (state.getFunctionState() instanceof AsynchronousStateHandle) { - AsynchronousStateHandle asyncState = (AsynchronousStateHandle) state.getFunctionState(); - state.setFunctionState((StateHandle) asyncState.materialize()); - } - if (state.getOperatorState() instanceof AsynchronousStateHandle) { - AsynchronousStateHandle asyncState = (AsynchronousStateHandle) state.getOperatorState(); - state.setOperatorState((StateHandle) asyncState.materialize()); - } + StreamTaskStateList allStates = new StreamTaskStateList(states); + + if (allStates.isEmpty()) { + getEnvironment().acknowledgeCheckpoint(checkpointId); + } else if (!hasAsyncStates) { + getEnvironment().acknowledgeCheckpoint(checkpointId, allStates); + } else { + // start a Thread that does the asynchronous materialization and + // then sends the checkpoint acknowledge + + Thread checkpointThread = new Thread() { + @Override + public void run() { + try { + for (StreamTaskState state : states) { + if (state != null) { + if (state.getFunctionState() instanceof AsynchronousStateHandle) { + AsynchronousStateHandle asyncState = (AsynchronousStateHandle) state.getFunctionState(); + state.setFunctionState((StateHandle) asyncState.materialize()); + } + if (state.getOperatorState() instanceof AsynchronousStateHandle) { + AsynchronousStateHandle asyncState = (AsynchronousStateHandle) state.getOperatorState(); + state.setOperatorState((StateHandle) asyncState.materialize()); } - } - StreamTaskStateList allStates = new StreamTaskStateList(states); - getEnvironment().acknowledgeCheckpoint(checkpointId, allStates); - } catch (Exception e) { - LOG.error("Caught exception while materializing asynchronous checkpoints.", e); - if (asyncException == null) { - asyncException = new AsynchronousException(e); } } - asyncCheckpointThreads.remove(this); - LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}", checkpointId, getName()); + StreamTaskStateList allStates = new StreamTaskStateList(states); + getEnvironment().acknowledgeCheckpoint(checkpointId, allStates); + } catch (Exception e) { + LOG.error("Caught exception while materializing asynchronous checkpoints.", e); + if (asyncException == null) { + asyncException = new AsynchronousException(e); + } } - }; + asyncCheckpointThreads.remove(this); + LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}", checkpointId, getName()); + } + }; - asyncCheckpointThreads.add(checkpointThread); - checkpointThread.start(); - } - } - catch (Exception e) { - if (isRunning) { - throw e; - } + asyncCheckpointThreads.add(checkpointThread); + checkpointThread.start(); } return true; } else {