From 47525c6138597a01a6cd2408b95b0fdd4387e0c5 Mon Sep 17 00:00:00 2001 From: Xu Tingjun Date: Fri, 17 Apr 2015 14:29:41 +0800 Subject: [PATCH 1/6] modify total stages/tasks on the allJobsPage --- .../src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index bd923d78a86ce..ed96d000a0702 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -74,14 +74,14 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { {formattedDuration} - {job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages} + {job.completedStageIndices.size}/{job.stageIds.size} {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"} {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"} {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks, failed = job.numFailedTasks, skipped = job.numSkippedTasks, - total = job.numTasks - job.numSkippedTasks)} + total = job.numTasks)} } From b987ea71c5ada7877a4489ec5236d6f95752d738 Mon Sep 17 00:00:00 2001 From: Xutingjun Date: Wed, 22 Apr 2015 10:36:04 +0800 Subject: [PATCH 2/6] delete skkiped stages from completed set --- .../org/apache/spark/ui/jobs/AllJobsPage.scala | 7 ++++--- .../apache/spark/ui/jobs/JobProgressListener.scala | 2 +- .../scala/org/apache/spark/ui/jobs/UIData.scala | 2 +- .../apache/spark/util/collection/OpenHashSet.scala | 13 +++++++++++++ .../spark/util/collection/OpenHashSetSuite.scala | 14 ++++++++++++++ 5 files changed, 33 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index ed96d000a0702..08ee29a4598ac 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -74,14 +74,15 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { {formattedDuration} - {job.completedStageIndices.size}/{job.stageIds.size} + {job.completedStageIndices.diff(job.skippedStageIndices).size}/ + {job.stageIds.size - job.skippedStageIndices.size} {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"} - {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"} + {if (job.skippedStageIndices.size > 0) s"(${job.skippedStageIndices.size} skipped)"} {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks, failed = job.numFailedTasks, skipped = job.numSkippedTasks, - total = job.numTasks)} + total = job.numTasks - job.numSkippedTasks)} } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 625596885faa1..4563c55863bfb 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -230,7 +230,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // if this stage is pending, it won't complete, so mark it as "skipped": skippedStages += stageInfo trimStagesIfNecessary(skippedStages) - jobData.numSkippedStages += 1 + jobData.skippedStageIndices.add(stageId) jobData.numSkippedTasks += stageInfo.numTasks } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 711a3697bda15..e6d178548a73c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -64,7 +64,7 @@ private[jobs] object UIData { var numActiveStages: Int = 0, // This needs to be a set instead of a simple count to prevent double-counting of rerun stages: var completedStageIndices: OpenHashSet[Int] = new OpenHashSet[Int](), - var numSkippedStages: Int = 0, + var skippedStageIndices: OpenHashSet[Int] = new OpenHashSet[Int](), var numFailedStages: Int = 0 ) diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index 1501111a06655..6e23461dcdd7f 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -101,6 +101,19 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( /** Return true if this set contains the specified element. */ def contains(k: T): Boolean = getPos(k) != INVALID_POS + /** Return the difference of this set and another set. */ + def diff(that: OpenHashSet[T]): OpenHashSet[T] = { + val result = new OpenHashSet[T]() + val iter = this.iterator + while(iter.hasNext) { + val value = iter.next() + if(!that.contains(value)) { + result.add(value) + } + } + result + } + /** * Add an element to the set. If the set is over capacity after the insertion, grow the set * and rehash all elements. diff --git a/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala index 68a03e3a0970f..5440a0315dbfb 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala @@ -176,4 +176,18 @@ class OpenHashSetSuite extends FunSuite with Matchers { assert(set.size === 1000) assert(set.capacity > 1000) } + + test("diff") { + val set1 = new OpenHashSet[Int]() + val set2 = new OpenHashSet[Int]() + set1.add(1) + set1.add(2) + set1.add(3) + set2.add(2) + val set3 = set1.diff(set2) + assert(set3.size == 2) + assert(set3.contains(1)) + assert(set3.contains(3)) + assert(!set3.contains(2)) + } } From 9e23c7183e822a443bd14000619bc39c6aa8163d Mon Sep 17 00:00:00 2001 From: Xu Tingjun Date: Wed, 22 Apr 2015 10:45:16 +0800 Subject: [PATCH 3/6] recover numSkippedStages --- core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala | 2 +- .../scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 1 + core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index 08ee29a4598ac..ddb2c9c504f40 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -77,7 +77,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { {job.completedStageIndices.diff(job.skippedStageIndices).size}/ {job.stageIds.size - job.skippedStageIndices.size} {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"} - {if (job.skippedStageIndices.size > 0) s"(${job.skippedStageIndices.size} skipped)"} + {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"} {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks, diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 4563c55863bfb..4b40fedf72f91 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -231,6 +231,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { skippedStages += stageInfo trimStagesIfNecessary(skippedStages) jobData.skippedStageIndices.add(stageId) + jobData.numSkippedStages += 1 jobData.numSkippedTasks += stageInfo.numTasks } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index e6d178548a73c..2da120442403f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -65,6 +65,7 @@ private[jobs] object UIData { // This needs to be a set instead of a simple count to prevent double-counting of rerun stages: var completedStageIndices: OpenHashSet[Int] = new OpenHashSet[Int](), var skippedStageIndices: OpenHashSet[Int] = new OpenHashSet[Int](), + var numSkippedStages: Int = 0, var numFailedStages: Int = 0 ) From 6459238d57e4a0f5f2527366d3b16b539b57ea00 Mon Sep 17 00:00:00 2001 From: meiyoula <1039320815@qq.com> Date: Wed, 22 Apr 2015 15:24:03 +0800 Subject: [PATCH 4/6] delete space --- .../src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index ddb2c9c504f40..2365112d94367 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -74,8 +74,8 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { {formattedDuration} - {job.completedStageIndices.diff(job.skippedStageIndices).size}/ - {job.stageIds.size - job.skippedStageIndices.size} + {job.completedStageIndices.diff(job.skippedStageIndices).size}/{ + job.stageIds.size - job.skippedStageIndices.size} {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"} {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"} From 40ce94b400e5bb61107f1d1530c0a0eef14379fa Mon Sep 17 00:00:00 2001 From: Xutingjun Date: Thu, 23 Apr 2015 15:15:23 +0800 Subject: [PATCH 5/6] remove stage id from completed set if it retries again --- .../org/apache/spark/ui/jobs/AllJobsPage.scala | 3 +-- .../spark/ui/jobs/JobProgressListener.scala | 17 +++++++++++++++-- .../scala/org/apache/spark/ui/jobs/UIData.scala | 4 ++-- .../spark/util/collection/OpenHashSet.scala | 13 ------------- .../util/collection/OpenHashSetSuite.scala | 14 -------------- 5 files changed, 18 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index 2365112d94367..bd923d78a86ce 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -74,8 +74,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { {formattedDuration} - {job.completedStageIndices.diff(job.skippedStageIndices).size}/{ - job.stageIds.size - job.skippedStageIndices.size} + {job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages} {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"} {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 4b40fedf72f91..94d5dcca95c35 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -230,7 +230,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // if this stage is pending, it won't complete, so mark it as "skipped": skippedStages += stageInfo trimStagesIfNecessary(skippedStages) - jobData.skippedStageIndices.add(stageId) jobData.numSkippedStages += 1 jobData.numSkippedTasks += stageInfo.numTasks } @@ -272,7 +271,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { ) { jobData.numActiveStages -= 1 if (stage.failureReason.isEmpty) { - jobData.completedStageIndices.add(stage.stageId) + if (!stage.submissionTime.isEmpty) { + jobData.completedStageIndices.add(stage.stageId) + } } else { jobData.numFailedStages += 1 } @@ -284,6 +285,18 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val stage = stageSubmitted.stageInfo activeStages(stage.stageId) = stage pendingStages.remove(stage.stageId) + + // If a stage retries again, it should be removed from completedStageIndices set + for ( + activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId); + jobId <- activeJobsDependentOnStage; + jobData <- jobIdToData.get(jobId) + ) { + if (jobData.completedStageIndices.contains(stage.stageId)) { + jobData.completedStageIndices.remove(stage.stageId) + } + } + val poolName = Option(stageSubmitted.properties).map { p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 2da120442403f..a9bd39db7170e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -22,6 +22,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} import org.apache.spark.util.collection.OpenHashSet +import scala.collection.mutable import scala.collection.mutable.HashMap private[jobs] object UIData { @@ -63,8 +64,7 @@ private[jobs] object UIData { /* Stages */ var numActiveStages: Int = 0, // This needs to be a set instead of a simple count to prevent double-counting of rerun stages: - var completedStageIndices: OpenHashSet[Int] = new OpenHashSet[Int](), - var skippedStageIndices: OpenHashSet[Int] = new OpenHashSet[Int](), + var completedStageIndices: mutable.HashSet[Int] = new mutable.HashSet[Int](), var numSkippedStages: Int = 0, var numFailedStages: Int = 0 ) diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index 6e23461dcdd7f..1501111a06655 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -101,19 +101,6 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( /** Return true if this set contains the specified element. */ def contains(k: T): Boolean = getPos(k) != INVALID_POS - /** Return the difference of this set and another set. */ - def diff(that: OpenHashSet[T]): OpenHashSet[T] = { - val result = new OpenHashSet[T]() - val iter = this.iterator - while(iter.hasNext) { - val value = iter.next() - if(!that.contains(value)) { - result.add(value) - } - } - result - } - /** * Add an element to the set. If the set is over capacity after the insertion, grow the set * and rehash all elements. diff --git a/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala index 5440a0315dbfb..68a03e3a0970f 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala @@ -176,18 +176,4 @@ class OpenHashSetSuite extends FunSuite with Matchers { assert(set.size === 1000) assert(set.capacity > 1000) } - - test("diff") { - val set1 = new OpenHashSet[Int]() - val set2 = new OpenHashSet[Int]() - set1.add(1) - set1.add(2) - set1.add(3) - set2.add(2) - val set3 = set1.diff(set2) - assert(set3.size == 2) - assert(set3.contains(1)) - assert(set3.contains(3)) - assert(!set3.contains(2)) - } } From a74254132c1a3d5965ac2f4877ab98773850ae0a Mon Sep 17 00:00:00 2001 From: Xu Tingjun Date: Thu, 23 Apr 2015 19:12:49 +0800 Subject: [PATCH 6/6] delete the loop --- .../spark/ui/jobs/JobProgressListener.scala | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 94d5dcca95c35..5743648f48ef6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -285,18 +285,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val stage = stageSubmitted.stageInfo activeStages(stage.stageId) = stage pendingStages.remove(stage.stageId) - - // If a stage retries again, it should be removed from completedStageIndices set - for ( - activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId); - jobId <- activeJobsDependentOnStage; - jobData <- jobIdToData.get(jobId) - ) { - if (jobData.completedStageIndices.contains(stage.stageId)) { - jobData.completedStageIndices.remove(stage.stageId) - } - } - val poolName = Option(stageSubmitted.properties).map { p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME) @@ -318,6 +306,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { jobData <- jobIdToData.get(jobId) ) { jobData.numActiveStages += 1 + + // If a stage retries again, it should be removed from completedStageIndices set + jobData.completedStageIndices.remove(stage.stageId) } }