From e872b5fbc454e379eab6788bae77c6bf3e2e98af Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Sat, 3 Dec 2016 20:15:35 +0100 Subject: [PATCH] [FLINK-5246] Don't discard checkpoint messages if they are unknown This is the case if the savepoint coordinator has triggered a checkpoint. The corresponding checkpoint messages are not known to the checkpoint coordinator and thus should not be discarded. Instead, the JobManager will now discard all messages which have not been accepted by neither the CheckpointCoordinator nor the SavepointCoordinator. --- .../checkpoint/CheckpointCoordinator.java | 6 +++--- .../flink/runtime/jobmanager/JobManager.scala | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 592bafce51d1a..a3e511f669721 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -735,15 +735,15 @@ else if (checkpoint != null) { if (recentPendingCheckpoints.contains(checkpointId)) { wasPendingCheckpoint = true; LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId); + + // try to discard the state so that we don't have lingering state lying around + discardState(message.getState()); } else { LOG.debug("Received message for an unknown checkpoint {}.", checkpointId); wasPendingCheckpoint = false; } - // try to discard the state so that we don't have lingering state lying around - discardState(message.getState()); - return wasPendingCheckpoint; } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 9f6e2db21c3ff..cbf7b5d094338 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1376,6 +1376,24 @@ class JobManager( // addressed to the periodic checkpoint coordinator. log.info("Received message for non-existing checkpoint " + ackMessage.getCheckpointId) + + val classLoader = Option(libraryCacheManager.getClassLoader(jid)) match { + case Some(userCodeClassLoader) => userCodeClassLoader + case None => getClass.getClassLoader + } + + future { + Option(ackMessage.getState()) match { + case Some(state) => + try { + state.deserializeValue(classLoader).discardState() + } catch { + case e: Exception => log.warn("Could not discard orphaned checkpoint " + + "state.", e) + } + case None => + } + }(ExecutionContext.fromExecutor(ioExecutor)) } } catch { case t: Throwable =>