From e2de9721e68990531efb113ca4275b4edd623d72 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 23 Feb 2015 22:54:58 -0800 Subject: [PATCH 1/2] Clear stages which have no corresponding active jobs. --- .../scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 0b6fe70bd2062..f595565b7341b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -130,6 +130,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) stageIdToInfo.remove(s.stageId) + stageIdToActiveJobIds.get(s.stageId).map(_.size) match { + case Some(0) => stageIdToActiveJobIds.remove(s.stageId) + case _ => + } } stages.trimStart(toRemove) } From 653b5bb14439b9b93b9f123f000c26c7933b0e33 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 23 Feb 2015 23:29:51 -0800 Subject: [PATCH 2/2] Fixed the fix and added test --- .../spark/ui/jobs/JobProgressListener.scala | 7 +++--- .../ui/jobs/JobProgressListenerSuite.scala | 22 +++++++++++++++++++ 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index f595565b7341b..937d95a934b59 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -130,10 +130,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) stageIdToInfo.remove(s.stageId) - stageIdToActiveJobIds.get(s.stageId).map(_.size) match { - case Some(0) => stageIdToActiveJobIds.remove(s.stageId) - case _ => - } } stages.trimStart(toRemove) } @@ -207,6 +203,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { for (stageId <- jobData.stageIds) { stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage => jobsUsingStage.remove(jobEnd.jobId) + if (jobsUsingStage.isEmpty) { + stageIdToActiveJobIds.remove(stageId) + } stageIdToInfo.get(stageId).foreach { stageInfo => if (stageInfo.submissionTime.isEmpty) { // if this stage is pending, it won't complete, so mark it as "skipped": diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 6019282d2fb70..730a4b54f5aa1 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -88,6 +88,28 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46)) } + test("test clearing of stageIdToActiveJobs") { + val conf = new SparkConf() + conf.set("spark.ui.retainedStages", 5.toString) + val listener = new JobProgressListener(conf) + val jobId = 0 + val stageIds = 1 to 50 + // Start a job with 50 stages + listener.onJobStart(createJobStartEvent(jobId, stageIds)) + for (stageId <- stageIds) { + listener.onStageSubmitted(createStageStartEvent(stageId)) + } + listener.stageIdToActiveJobIds.size should be > 0 + + // Complete the stages and job + for (stageId <- stageIds) { + listener.onStageCompleted(createStageEndEvent(stageId, failed = false)) + } + listener.onJobEnd(createJobEndEvent(jobId, false)) + assertActiveJobsStateIsEmpty(listener) + listener.stageIdToActiveJobIds.size should be (0) + } + test("test LRU eviction of jobs") { val conf = new SparkConf() conf.set("spark.ui.retainedStages", 5.toString)