From 4a2f948224fe628c721adc4fae24199b0296c80f Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 29 Nov 2016 16:02:29 +0100 Subject: [PATCH] [FLINK-5197] [jm] Ignore outdated JobStatusChanged messages Outdated JobStatusChanged messages no longer trigger a RemoveJob message but are logged and ignored. This has the advantage, that an outdated JobStatusChanged message cannot interfere with a recovered job which can have the same job id. --- .../apache/flink/runtime/jobmanager/JobManager.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 4a4968f237564..cf60d4ecdf39b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -746,7 +746,7 @@ class JobManager( } }(context.dispatcher) - case JobStatusChanged(jobID, newJobStatus, timeStamp, error) => + case msg @ JobStatusChanged(jobID, newJobStatus, timeStamp, error) => currentJobs.get(jobID) match { case Some((executionGraph, jobInfo)) => executionGraph.getJobName @@ -818,8 +818,7 @@ class JobManager( } }(context.dispatcher) } - case None => - self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + case None => log.debug(s"Received $msg for nonexistent job $jobID.") } case ScheduleOrUpdateConsumers(jobId, partitionId) => @@ -956,7 +955,7 @@ class JobManager( futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete) case None => } - case None => + case None => log.debug(s"Tried to remove nonexistent job $jobID.") } case RemoveCachedJob(jobID) => @@ -1620,7 +1619,7 @@ class JobManager( // shutdown to release all resources. submittedJobGraphs.removeJobGraph(jobID) } catch { - case t: Throwable => log.error(s"Could not remove submitted job graph $jobID.", t) + case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t) } }(context.dispatcher)) @@ -1629,7 +1628,7 @@ class JobManager( archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg)) } catch { - case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " + + case t: Throwable => log.warn(s"Could not prepare the execution graph $eg for " + "archiving.", t) }