Skip to content

Commit

Permalink
[FLINK-2354] [runtime] Make revokeLeadership not remove the jobs from…
Browse files Browse the repository at this point in the history
… the state backend
  • Loading branch information
tillrohrmann authored and uce committed Oct 9, 2015
1 parent f8e3801 commit 72abfd6
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ class JobManager(
log.info(s"Stopping JobManager ${getAddress}.")

val newFuturesToComplete = cancelAndClearEverything(
new Exception("The JobManager is shutting down."))
new Exception("The JobManager is shutting down."),
true)

implicit val executionContext = context.dispatcher

Expand Down Expand Up @@ -247,7 +248,8 @@ class JobManager(
log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.")

val newFuturesToComplete = cancelAndClearEverything(
new Exception("JobManager is no longer the leader."))
new Exception("JobManager is no longer the leader."),
false)

futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete)

Expand Down Expand Up @@ -1178,15 +1180,15 @@ class JobManager(
* might block. Therefore be careful not to block the actor thread.
*
* @param jobID ID of the job to remove and archive
* @param jobProperlyFinished true if the job shall be archived and removed from the state
* @param removeJobFromStateBackend true if the job shall be archived and removed from the state
* backend
*/
private def removeJob(jobID: JobID, jobProperlyFinished: Boolean): Option[Future[Unit]] = {
private def removeJob(jobID: JobID, removeJobFromStateBackend: Boolean): Option[Future[Unit]] = {
// Don't remove the job yet...
val futureOption = currentJobs.get(jobID) match {
case Some((eg, _)) =>
val result = if (jobProperlyFinished) {
Some(future {
val result = if (removeJobFromStateBackend) {
val futureOption = Some(future {
try {
// ...otherwise, we can have lingering resources when there is a concurrent shutdown
// and the ZooKeeper client is closed. Not removing the job immediately allow the
Expand All @@ -1205,6 +1207,8 @@ class JobManager(
case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
"archiving.", t)
}

futureOption
} else {
None
}
Expand All @@ -1230,15 +1234,20 @@ class JobManager(
*
* @param cause Cause for the cancelling.
*/
private def cancelAndClearEverything(cause: Throwable): Seq[Future[Unit]] = {
private def cancelAndClearEverything(
cause: Throwable,
removeJobFromStateBackend: Boolean)
: Seq[Future[Unit]] = {
val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
future {
try {
submittedJobGraphs.removeJobGraph(jobID)
}
catch {
case t: Throwable => {
log.error("Error during submitted job graph clean up.", t)
if (removeJobFromStateBackend) {
try {
submittedJobGraphs.removeJobGraph(jobID)
}
catch {
case t: Throwable => {
log.error("Error during submitted job graph clean up.", t)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,9 @@ object JobManagerMessages {
/** Triggers the removal of the job with the given job ID
*
* @param jobID
* @param jobProperlyFinished true if the job has properly finished
* @param removeJobFromStateBackend true if the job has properly finished
*/
case class RemoveJob(jobID: JobID, jobProperlyFinished: Boolean = true)
case class RemoveJob(jobID: JobID, removeJobFromStateBackend: Boolean = true)
extends RequiresLeaderSessionID

/**
Expand Down

0 comments on commit 72abfd6

Please sign in to comment.