Skip to content

Commit

Permalink
[FLINK-5246] Don't discard checkpoint messages if they are unknown
Browse files Browse the repository at this point in the history
This is the case if the savepoint coordinator has triggered a checkpoint. The corresponding
checkpoint messages are not known to the checkpoint coordinator and thus should not be
discarded. Instead, the JobManager will now discard all messages which have not been accepted
by neither the CheckpointCoordinator nor the SavepointCoordinator.

This closes #2930.
  • Loading branch information
tillrohrmann committed Dec 4, 2016
1 parent 020da2c commit da09d41
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -735,15 +735,15 @@ else if (checkpoint != null) {
if (recentPendingCheckpoints.contains(checkpointId)) {
wasPendingCheckpoint = true;
LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId);

// try to discard the state so that we don't have lingering state lying around
discardState(message.getState());
}
else {
LOG.debug("Received message for an unknown checkpoint {}.", checkpointId);
wasPendingCheckpoint = false;
}

// try to discard the state so that we don't have lingering state lying around
discardState(message.getState());

return wasPendingCheckpoint;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,24 @@ class JobManager(
// addressed to the periodic checkpoint coordinator.
log.info("Received message for non-existing checkpoint " +
ackMessage.getCheckpointId)

val classLoader = Option(libraryCacheManager.getClassLoader(jid)) match {
case Some(userCodeClassLoader) => userCodeClassLoader
case None => getClass.getClassLoader
}

future {
Option(ackMessage.getState()) match {
case Some(state) =>
try {
state.deserializeValue(classLoader).discardState()
} catch {
case e: Exception => log.warn("Could not discard orphaned checkpoint " +
"state.", e)
}
case None =>
}
}(ExecutionContext.fromExecutor(ioExecutor))
}
} catch {
case t: Throwable =>
Expand Down

0 comments on commit da09d41

Please sign in to comment.