From 2d8358a0331e81c0aa3c5113877cbf4db94711e9 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 7 Dec 2016 16:22:23 +0100 Subject: [PATCH 1/2] [FLINK-5278] Improve task and checkpoint related logging --- .../checkpoint/CheckpointCoordinator.java | 29 +-- .../checkpoint/CompletedCheckpoint.java | 3 +- .../ZooKeeperCompletedCheckpointStore.java | 75 +++++++- .../runtime/executiongraph/Execution.java | 9 +- .../flink/runtime/taskmanager/Task.java | 165 ++++++++++++------ ...ZooKeeperCompletedCheckpointStoreTest.java | 36 ++++ .../streaming/runtime/tasks/StreamTask.java | 25 ++- 7 files changed, 257 insertions(+), 85 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 8ca4b2e8f9a37..da893eb235950 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -613,6 +613,7 @@ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E if (shutdown || message == null) { return false; } + if (!job.equals(message.getJob())) { LOG.error("Received AcknowledgeCheckpoint message for wrong job: {}", message); return false; @@ -651,8 +652,8 @@ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E completed = checkpoint.finalizeCheckpoint(); completedCheckpointStore.addCheckpoint(completed); - LOG.info("Completed checkpoint " + checkpointId + " (in " + - completed.getDuration() + " ms)"); + LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId, + completed.getStateSize(), completed.getDuration()); if (LOG.isDebugEnabled()) { StringBuilder builder = new StringBuilder(); @@ -685,7 +686,7 @@ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E "the state handle to avoid lingering state.", message.getCheckpointId(), message.getTaskExecutionId(), message.getJob()); - discardState(message.getSubtaskState()); + discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState()); break; case DISCARDED: @@ -694,7 +695,7 @@ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E "state handle tp avoid lingering state.", message.getCheckpointId(), message.getTaskExecutionId(), message.getJob()); - discardState(message.getSubtaskState()); + discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState()); } } else if (checkpoint != null) { @@ -706,15 +707,17 @@ else if (checkpoint != null) { // message is for an unknown checkpoint, or comes too late (checkpoint disposed) if (recentPendingCheckpoints.contains(checkpointId)) { isPendingCheckpoint = true; - LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId); + LOG.warn("Received late message for now expired checkpoint attempt {} from " + + "{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob()); } else { - LOG.debug("Received message for an unknown checkpoint {}.", checkpointId); + LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.", + checkpointId, message.getTaskExecutionId(), message.getJob()); isPendingCheckpoint = false; } // try to discard the state so that we don't have lingering state lying around - discardState(message.getSubtaskState()); + discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState()); } } @@ -947,19 +950,25 @@ public void run() { triggerCheckpoint(System.currentTimeMillis(), true); } catch (Exception e) { - LOG.error("Exception while triggering checkpoint", e); + LOG.error("Exception while triggering checkpoint.", e); } } } - private void discardState(final StateObject stateObject) { + private void discardState( + final JobID jobId, + final ExecutionAttemptID executionAttemptID, + final long checkpointId, + final StateObject stateObject) { executor.execute(new Runnable() { @Override public void run() { try { stateObject.discardState(); } catch (Exception e) { - LOG.warn("Could not properly discard state object.", e); + LOG.warn("Could not properly discard state object of checkpoint {} " + + "belonging to task {} of job {}.", checkpointId, executionAttemptID, jobId, + e); } } }); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index 3c33ce3a4f045..ed650112f8f0e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -26,7 +26,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.io.Serializable; import java.util.Map; import java.util.Objects; @@ -164,7 +163,7 @@ private void discard() throws Exception { } } - public long getStateSize() throws IOException { + public long getStateSize() { long result = 0L; for (TaskState taskState : taskStates.values()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 4add504f7e343..fdd0d409643f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -163,7 +163,17 @@ public void recover() throws Exception { Tuple2, String> latest = initialCheckpoints .get(numberOfInitialCheckpoints - 1); - CompletedCheckpoint latestCheckpoint = latest.f0.retrieveState(); + CompletedCheckpoint latestCheckpoint; + long checkpointId = pathToCheckpointId(latest.f1); + + LOG.info("Trying to retrieve checkpoint {}.", checkpointId); + + try { + latestCheckpoint = latest.f0.retrieveState(); + } catch (Exception e) { + throw new Exception("Could not retrieve the completed checkpoint " + checkpointId + + " from the state storage.", e); + } checkpointStateHandles.add(latest); @@ -190,7 +200,7 @@ public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { checkNotNull(checkpoint, "Checkpoint"); // First add the new one. If it fails, we don't want to loose existing data. - String path = String.format("/%s", checkpoint.getCheckpointID()); + String path = checkpointIdToPath(checkpoint.getCheckpointID()); final RetrievableStateHandle stateHandle = checkpointsInZooKeeper.add(path, checkpoint); @@ -298,14 +308,36 @@ private void remove( BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + final long checkpointId = pathToCheckpointId(stateHandleAndPath.f1); + try { if (event.getType() == CuratorEventType.DELETE) { if (event.getResultCode() == 0) { + Exception exception = null; + try { action.call(); - } finally { + } catch (Exception e) { + exception = new Exception("Could not execute callable action " + + "for checkpoint " + checkpointId + '.', e); + } + + try { // Discard the state handle stateHandleAndPath.f0.discardState(); + } catch (Exception e) { + Exception newException = new Exception("Could not discard meta " + + "data for completed checkpoint " + checkpointId + '.', e); + + if (exception == null) { + exception = newException; + } else { + exception.addSuppressed(newException); + } + } + + if (exception != null) { + throw exception; } } else { throw new IllegalStateException("Unexpected result code " + @@ -316,7 +348,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex event.getType() + " in '" + event + "' callback."); } } catch (Exception e) { - LOG.error("Failed to discard checkpoint.", e); + LOG.warn("Failed to discard checkpoint {}.", checkpointId, e); } } }; @@ -326,4 +358,39 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex // inconsistent state. checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback); } + + /** + * Convert a checkpoint id into a ZooKeeper path. + * + * @param checkpointId to convert to the path + * @return Path created from the given checkpoint id + */ + protected static String checkpointIdToPath(long checkpointId) { + return String.format("/%s", checkpointId); + } + + /** + * Converts a path to the checkpoint id. + * + * @param path in ZooKeeper + * @return Checkpoint id parsed from the path + */ + protected static long pathToCheckpointId(String path) { + try { + String numberString; + + // check if we have a leading slash + if ('/' == path.charAt(0) ) { + numberString = path.substring(1); + } else { + numberString = path; + } + return Long.parseLong(numberString); + } catch (NumberFormatException e) { + LOG.warn("Could not parse checkpoint id from {}. This indicates that the " + + "checkpoint id to path conversion has changed.", path); + + return -1L; + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 219d71ddbb1e8..16aebced5fde9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -1015,14 +1015,17 @@ private boolean transitionState(ExecutionState currentState, ExecutionState targ private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) { // sanity check if (currentState.isTerminal()) { - throw new IllegalStateException("Cannot leave terminal state " + currentState + " to transition to " + targetState + "."); + throw new IllegalStateException("Cannot leave terminal state " + currentState + " to transition to " + targetState + '.'); } if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) { markTimestamp(targetState); - LOG.info(getVertex().getTaskNameWithSubtaskIndex() + " (" + getAttemptId() + ") switched from " - + currentState + " to " + targetState); + if (error == null) { + LOG.info("{} ({}) switched from {} to {}.", getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, targetState); + } else { + LOG.info("{} ({}) switched from {} to {}.", getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, targetState, error); + } // make sure that the state transition completes normally. // potential errors (in listeners may not affect the main logic) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 14ef1bff8f153..184c3b1dae9d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -504,7 +504,7 @@ public void run() { while (true) { ExecutionState current = this.executionState; if (current == ExecutionState.CREATED) { - if (STATE_UPDATER.compareAndSet(this, ExecutionState.CREATED, ExecutionState.DEPLOYING)) { + if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) { // success, we can start our work break; } @@ -515,14 +515,14 @@ else if (current == ExecutionState.FAILED) { return; } else if (current == ExecutionState.CANCELING) { - if (STATE_UPDATER.compareAndSet(this, ExecutionState.CANCELING, ExecutionState.CANCELED)) { + if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) { // we were immediately canceled. tell the TaskManager that we reached our final state notifyFinalState(); return; } } else { - throw new IllegalStateException("Invalid state for beginning of task operation"); + throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.'); } } @@ -543,7 +543,7 @@ else if (current == ExecutionState.CANCELING) { // first of all, get a user-code classloader // this may involve downloading the job's JAR files and/or classes - LOG.info("Loading JAR files for task " + taskNameWithSubtask); + LOG.info("Loading JAR files for task {}.", this); userCodeClassLoader = createUserCodeClassloader(libraryCache); final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader); @@ -572,7 +572,7 @@ else if (current == ExecutionState.CANCELING) { // the registration must also strictly be undone // ---------------------------------------------------------------- - LOG.info("Registering task at network: " + this); + LOG.info("Registering task at network: {}.", this); network.registerTask(this); @@ -581,13 +581,15 @@ else if (current == ExecutionState.CANCELING) { for (Map.Entry entry : DistributedCache.readFileInfoFromConfig(jobConfiguration)) { - LOG.info("Obtaining local cache file for '" + entry.getKey() + '\''); + LOG.info("Obtaining local cache file for '{}'.", entry.getKey()); Future cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId); distributedCacheEntries.put(entry.getKey(), cp); } } catch (Exception e) { - throw new Exception("Exception while adding files to distributed cache.", e); + throw new Exception( + String.format("Exception while adding files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId), + e); } if (isCanceledOrFailed()) { @@ -638,7 +640,7 @@ else if (current == ExecutionState.CANCELING) { this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime - if (!STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { + if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } @@ -671,7 +673,7 @@ else if (current == ExecutionState.CANCELING) { // try to mark the task as finished // if that fails, the task was canceled/failed in the meantime - if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) { + if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { notifyObservers(ExecutionState.FINISHED, null); } else { @@ -694,7 +696,7 @@ else if (current == ExecutionState.CANCELING) { if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) { if (t instanceof CancelTaskException) { - if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) { + if (transitionState(current, ExecutionState.CANCELED)) { cancelInvokable(); notifyObservers(ExecutionState.CANCELED, null); @@ -702,19 +704,19 @@ else if (current == ExecutionState.CANCELING) { } } else { - if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { + if (transitionState(current, ExecutionState.FAILED, t)) { // proper failure of the task. record the exception as the root cause - LOG.error("Task execution failed. ", t); + String errorMessage = String.format("Execution of {} ({}) failed.", taskNameWithSubtask, executionId); failureCause = t; cancelInvokable(); - notifyObservers(ExecutionState.FAILED, t); + notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t)); break; } } } else if (current == ExecutionState.CANCELING) { - if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) { + if (transitionState(current, ExecutionState.CANCELED)) { notifyObservers(ExecutionState.CANCELED, null); break; } @@ -724,22 +726,22 @@ else if (current == ExecutionState.FAILED) { break; } // unexpected state, go to failed - else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { - LOG.error("Unexpected state in Task during an exception: " + current); + else if (transitionState(current, ExecutionState.FAILED, t)) { + LOG.error("Unexpected state in task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, current); break; } // else fall through the loop and } } catch (Throwable tt) { - String message = "FATAL - exception in task exception handler"; + String message = String.format("FATAL - exception in exception handler of task %s (%s).", taskNameWithSubtask, executionId); LOG.error(message, tt); notifyFatalError(message, tt); } } finally { try { - LOG.info("Freeing task resources for " + taskNameWithSubtask); + LOG.info("Freeing task resources for {} ({}).", taskNameWithSubtask, executionId); // stop the async dispatcher. // copy dispatcher reference to stack, against concurrent release @@ -767,7 +769,7 @@ else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { } catch (Throwable t) { // an error in the resource cleanup is fatal - String message = "FATAL - exception in task resource cleanup"; + String message = String.format("FATAL - exception in resource cleanup of task %s (%s).", taskNameWithSubtask, executionId); LOG.error(message, t); notifyFatalError(message, t); } @@ -779,7 +781,7 @@ else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { metrics.close(); } catch (Throwable t) { - LOG.error("Error during metrics de-registration", t); + LOG.error("Error during metrics de-registration of task {} ({}).", taskNameWithSubtask, executionId, t); } } } @@ -845,6 +847,39 @@ private void notifyFatalError(String message, Throwable cause) { taskManagerConnection.notifyFatalError(message, cause); } + /** + * Try to transition the execution state from the current state to the new state. + * + * @param currentState of the execution + * @param newState of the execution + * @return true if the transition was successful, otherwise false + */ + private boolean transitionState(ExecutionState currentState, ExecutionState newState) { + return transitionState(currentState, newState, null); + } + + /** + * Try to transition the execution state from the current state to the new state. + * + * @param currentState of the execution + * @param newState of the execution + * @param cause of the transition change or null + * @return true if the transition was successful, otherwise false + */ + private boolean transitionState(ExecutionState currentState, ExecutionState newState, Throwable cause) { + if (STATE_UPDATER.compareAndSet(this, currentState, newState)) { + if (cause == null) { + LOG.info("{} ({}) switched from {} to {}.", taskNameWithSubtask, executionId, currentState, newState); + } else { + LOG.info("{} ({}) switched from {} to {}.", taskNameWithSubtask, executionId, currentState, newState, cause); + } + + return true; + } else { + return false; + } + } + // ---------------------------------------------------------------------------------------------------------------- // Stopping / Canceling / Failing the task from the outside // ---------------------------------------------------------------------------------------------------------------- @@ -859,22 +894,22 @@ private void notifyFatalError(String message, Throwable cause) { * if the {@link AbstractInvokable} does not implement {@link StoppableTask} */ public void stopExecution() throws UnsupportedOperationException { - LOG.info("Attempting to stop task " + taskNameWithSubtask); - if(this.invokable instanceof StoppableTask) { + LOG.info("Attempting to stop task {} ({}).", taskNameWithSubtask, executionId); + if (invokable instanceof StoppableTask) { Runnable runnable = new Runnable() { @Override public void run() { try { - ((StoppableTask)Task.this.invokable).stop(); + ((StoppableTask)invokable).stop(); } catch(RuntimeException e) { - LOG.error("Stopping task " + taskNameWithSubtask + " failed.", e); + LOG.error("Stopping task {} ({}) failed.", taskNameWithSubtask, executionId, e); taskManagerConnection.failTask(executionId, e); } } }; - executeAsyncCallRunnable(runnable, "Stopping source task " + this.taskNameWithSubtask); + executeAsyncCallRunnable(runnable, String.format("Stopping source task %s (%s).", taskNameWithSubtask, executionId)); } else { - throw new UnsupportedOperationException("Stopping not supported by this task."); + throw new UnsupportedOperationException(String.format("Stopping not supported by task %s (%s).", taskNameWithSubtask, executionId)); } } @@ -887,7 +922,7 @@ public void run() { *

This method never blocks.

*/ public void cancelExecution() { - LOG.info("Attempting to cancel task " + taskNameWithSubtask); + LOG.info("Attempting to cancel task {} ({}).", taskNameWithSubtask, executionId); cancelOrFailAndCancelInvokable(ExecutionState.CANCELING, null); } @@ -902,37 +937,52 @@ public void cancelExecution() { */ @Override public void failExternally(Throwable cause) { - LOG.info("Attempting to fail task externally " + taskNameWithSubtask); + LOG.info("Attempting to fail task externally {} ({}).", taskNameWithSubtask, executionId); cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause); } private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwable cause) { while (true) { - ExecutionState current = this.executionState; + ExecutionState current = executionState; // if the task is already canceled (or canceling) or finished or failed, // then we need not do anything if (current.isTerminal() || current == ExecutionState.CANCELING) { - LOG.info("Task " + taskNameWithSubtask + " is already in state " + current); + LOG.info("Task {} is already in state {}", taskNameWithSubtask, current); return; } if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) { - if (STATE_UPDATER.compareAndSet(this, current, targetState)) { + if (transitionState(current, targetState, cause)) { // if we manage this state transition, then the invokable gets never called // we need not call cancel on it this.failureCause = cause; - notifyObservers(targetState, cause); + notifyObservers( + targetState, + new Exception( + String.format( + "Cancel or fail execution of %s (%s).", + taskNameWithSubtask, + executionId), + cause)); return; } } else if (current == ExecutionState.RUNNING) { - if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, targetState)) { + if (transitionState(ExecutionState.RUNNING, targetState, cause)) { // we are canceling / failing out of the running state // we need to cancel the invokable if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) { this.failureCause = cause; - notifyObservers(targetState, cause); + notifyObservers( + targetState, + new Exception( + String.format( + "Cancel or fail execution of %s (%s).", + taskNameWithSubtask, + executionId), + cause)); + LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId); // because the canceling may block on user code, we cancel from a separate thread @@ -951,7 +1001,7 @@ else if (current == ExecutionState.RUNNING) { producedPartitions, inputGates); Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler, - "Canceler for " + taskNameWithSubtask); + String.format("Canceler for %s (%s).", taskNameWithSubtask, executionId)); cancelThread.setDaemon(true); cancelThread.start(); } @@ -959,7 +1009,8 @@ else if (current == ExecutionState.RUNNING) { } } else { - throw new IllegalStateException("Unexpected task state: " + current); + throw new IllegalStateException(String.format("Unexpected state: %s of task %s (%s).", + current, taskNameWithSubtask, executionId)); } } } @@ -973,13 +1024,6 @@ public void registerExecutionListener(TaskExecutionStateListener listener) { } private void notifyObservers(ExecutionState newState, Throwable error) { - if (error == null) { - LOG.info(taskNameWithSubtask + " switched to " + newState); - } - else { - LOG.info(taskNameWithSubtask + " switched to " + newState + " with exception.", error); - } - TaskExecutionState stateUpdate = new TaskExecutionState(jobId, executionId, newState, error); for (TaskExecutionStateListener listener : taskExecutionStateListeners) { @@ -1066,24 +1110,29 @@ public void run() { catch (Throwable t) { if (getExecutionState() == ExecutionState.RUNNING) { failExternally(new Exception( - "Error while triggering checkpoint for " + taskName, - t)); + "Error while triggering checkpoint " + checkpointID + " for " + + taskNameWithSubtask, t)); + } else { + LOG.debug("Encountered error while triggering checkpoint {} for " + + "{} ({}) while being not in state running.", checkpointID, + taskNameWithSubtask, executionId, t); } } } }; - executeAsyncCallRunnable(runnable, "Checkpoint Trigger for " + taskName); + executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId)); } else { checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID, new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask)); + + LOG.error("Task received a checkpoint request, but is not a checkpointing task - {} ({}).", + taskNameWithSubtask, executionId); - LOG.error("Task received a checkpoint request, but is not a checkpointing task - " - + taskNameWithSubtask); } } else { - LOG.debug("Declining checkpoint request for non-running task"); + LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId); // send back a message that we did not do the checkpoint checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID, @@ -1120,12 +1169,12 @@ public void run() { executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " + taskName); } else { - LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - " - + taskNameWithSubtask); + LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - {}.", + taskNameWithSubtask); } } else { - LOG.debug("Ignoring checkpoint commit notification for non-running task."); + LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask); } } @@ -1228,14 +1277,14 @@ private void cancelInvokable() { invokable.cancel(); } catch (Throwable t) { - LOG.error("Error while canceling task " + taskNameWithSubtask, t); + LOG.error("Error while canceling task {}.", taskNameWithSubtask, t); } } } @Override public String toString() { - return taskNameWithSubtask + " [" + executionState + ']'; + return String.format("%s (%s) [%s]", taskNameWithSubtask, executionId, executionState); } /** @@ -1312,7 +1361,7 @@ public void run() { try { invokable.cancel(); } catch (Throwable t) { - logger.error("Error while canceling the task", t); + logger.error("Error while canceling the task {}.", taskName, t); } // Early release of input and output buffer pools. We do this @@ -1326,7 +1375,7 @@ public void run() { try { partition.destroyBufferPool(); } catch (Throwable t) { - LOG.error("Failed to release result partition buffer pool.", t); + LOG.error("Failed to release result partition buffer pool for task {}.", taskName, t); } } @@ -1334,7 +1383,7 @@ public void run() { try { inputGate.releaseAllResources(); } catch (Throwable t) { - LOG.error("Failed to release input gate.", t); + LOG.error("Failed to release input gate for task {}.", taskName, t); } } @@ -1352,7 +1401,7 @@ public void run() { watchDogThread.join(); } } catch (Throwable t) { - logger.error("Error in the task canceler", t); + logger.error("Error in the task canceler for task {}.", taskName, t); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java new file mode 100644 index 0000000000000..6ee014128a53b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { + + @Test + public void testPathConversion() { + final long checkpointId = 42L; + + final String path = ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpointId); + + assertEquals(checkpointId, ZooKeeperCompletedCheckpointStore.pathToCheckpointId(path)); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 54f6c104c8a5b..88a29ab90ecf2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -211,7 +211,7 @@ public final void invoke() throws Exception { boolean disposed = false; try { // -------- Initialize --------- - LOG.debug("Initializing {}", getName()); + LOG.debug("Initializing {}.", getName()); asyncOperationsThreadPool = Executors.newCachedThreadPool(); @@ -528,8 +528,11 @@ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws E catch (Exception e) { // propagate exceptions only if the task is still in "running" state if (isRunning) { - throw e; + throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + + "for operator " + getName() + '.', e); } else { + LOG.debug("Could not perform checkpoint {} for operator {} while the " + + "invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e); return false; } } @@ -541,10 +544,12 @@ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) th performCheckpoint(checkpointMetaData); } catch (CancelTaskException e) { - throw e; + throw new Exception("Operator " + getName() + " was cancelled while performing checkpoint " + + checkpointMetaData.getCheckpointId() + '.'); } catch (Exception e) { - throw new Exception("Error while performing a checkpoint", e); + throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + + getName() + '.', e); } } @@ -678,7 +683,7 @@ private AbstractStateBackend createStateBackend() throws Exception { if (stateBackend != null) { // backend has been configured on the environment - LOG.info("Using user-defined state backend: " + stateBackend); + LOG.info("Using user-defined state backend: {}.", stateBackend); } else { // see if we have a backend specified in the configuration Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration(); @@ -697,8 +702,8 @@ private AbstractStateBackend createStateBackend() throws Exception { case "filesystem": FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig); - LOG.info("State backend is set to heap memory (checkpoints to filesystem \"" - + backend.getBasePath() + "\")"); + LOG.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")", + backend.getBasePath()); stateBackend = backend; break; @@ -933,7 +938,11 @@ public void run() { } } catch (Exception e) { // registers the exception and tries to fail the whole task - AsynchronousException asyncException = new AsynchronousException(e); + AsynchronousException asyncException = new AsynchronousException( + new Exception( + "Could not materialize checkpoint " + checkpointMetaData.getCheckpointId() + + " for operator " + owner.getName() + '.', + e)); owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException); } finally { owner.cancelables.unregisterClosable(this); From 04a7f69ea5c98b86b7e0440c1ddecc23da848ec0 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 9 Dec 2016 14:17:54 +0100 Subject: [PATCH 2/2] Add more logging --- .../apache/flink/runtime/checkpoint/CheckpointCoordinator.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index da893eb235950..5f0fd74c43174 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -642,6 +642,9 @@ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) { case SUCCESS: + LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.", + checkpointId, message.getTaskExecutionId(), message.getJob()); + if (checkpoint.isFullyAcknowledged()) { // record the time when this was completed, to calculate