From c19d82aa4c40889985cb913cf0aa38208facaa42 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 14 Jan 2015 16:48:31 +0530 Subject: [PATCH 1/3] SPARK-5217 Spark UI should report pending stages during job execution on AllStagesPage. --- .../org/apache/spark/ui/jobs/AllStagesPage.scala | 12 ++++++++++++ .../apache/spark/ui/jobs/JobProgressListener.scala | 5 ++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index b0f8ca2ab0d3f..63a4181c4d613 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -37,12 +37,18 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { val numCompletedStages = listener.numCompletedStages val failedStages = listener.failedStages.reverse.toSeq val numFailedStages = listener.numFailedStages + val pendingStages = listener.pendingStages.values.toSeq + val numWaitingStages = pendingStages.size val now = System.currentTimeMillis val activeStagesTable = new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = parent.killEnabled) + val pendingStagesTable = + new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse, + parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, + killEnabled = false) val completedStagesTable = new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false) @@ -68,6 +74,10 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { Scheduling Mode: {listener.schedulingMode.map(_.toString).getOrElse("Unknown")} +
  • + Pending Stages: + {pendingStages.size} +
  • Active Stages: {activeStages.size} @@ -89,6 +99,8 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { } else { Seq[Node]() }} ++ +

    Pending Stages ({pendingStages.size})

    ++ + pendingStagesTable.toNodeSeq ++

    Active Stages ({activeStages.size})

    ++ activeStagesTable.toNodeSeq ++

    Completed Stages ({numCompletedStages})

    ++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index b0d3bed1300b3..4d200eeda86b9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -56,6 +56,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val jobIdToData = new HashMap[JobId, JobUIData] // Stages: + val pendingStages = new HashMap[StageId, StageInfo] val activeStages = new HashMap[StageId, StageInfo] val completedStages = ListBuffer[StageInfo]() val skippedStages = ListBuffer[StageInfo]() @@ -157,6 +158,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageIds = jobStart.stageIds, jobGroup = jobGroup, status = JobExecutionStatus.RUNNING) + jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x) // Compute (a potential underestimate of) the number of tasks that will be run by this job. // This may be an underestimate because the job start event references all of the result // stages' transitive stage dependencies, but some of these stages might be skipped if their @@ -187,6 +189,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { } jobData.completionTime = Option(jobEnd.time).filter(_ >= 0) + jobData.stageIds.foreach(pendingStages.remove) jobEnd.jobResult match { case JobSucceeded => completedJobs += jobData @@ -257,7 +260,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized { val stage = stageSubmitted.stageInfo activeStages(stage.stageId) = stage - + pendingStages.remove(stage.stageId) val poolName = Option(stageSubmitted.properties).map { p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME) From 0992842e84cfe1eeaeea5b064cd55b522d0547b7 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 16 Jan 2015 13:31:17 +0530 Subject: [PATCH 2/3] Switched to Linked hashmap, changed the order to active->pending->completed->failed. And changed pending stages to not reverse sort. --- .../org/apache/spark/ui/jobs/AllStagesPage.scala | 14 +++++++------- .../apache/spark/ui/jobs/JobProgressListener.scala | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index 63a4181c4d613..c9ab8562d649d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -46,7 +46,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = parent.killEnabled) val pendingStagesTable = - new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse, + new StageTableBase(pendingStages.sortBy(_.submissionTime), parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false) val completedStagesTable = @@ -74,14 +74,14 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { Scheduling Mode: {listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
  • -
  • - Pending Stages: - {pendingStages.size} -
  • Active Stages: {activeStages.size}
  • +
  • + Pending Stages: + {pendingStages.size} +
  • Completed Stages: {numCompletedStages} @@ -99,10 +99,10 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { } else { Seq[Node]() }} ++ -

    Pending Stages ({pendingStages.size})

    ++ - pendingStagesTable.toNodeSeq ++

    Active Stages ({activeStages.size})

    ++ activeStagesTable.toNodeSeq ++ +

    Pending Stages ({pendingStages.size})

    ++ + pendingStagesTable.toNodeSeq ++

    Completed Stages ({numCompletedStages})

    ++ completedStagesTable.toNodeSeq ++

    Failed Stages ({numFailedStages})

    ++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 4d200eeda86b9..b5fee3e8afd36 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui.jobs -import scala.collection.mutable.{HashMap, HashSet, ListBuffer} +import scala.collection.mutable.{HashMap, HashSet, ListBuffer, LinkedHashMap} import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi @@ -56,7 +56,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val jobIdToData = new HashMap[JobId, JobUIData] // Stages: - val pendingStages = new HashMap[StageId, StageInfo] + val pendingStages = new LinkedHashMap[StageId, StageInfo] val activeStages = new HashMap[StageId, StageInfo] val completedStages = ListBuffer[StageInfo]() val skippedStages = ListBuffer[StageInfo]() From 3b11803ae9b64acba2d64ad02d1e31d756783eaf Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 16 Jan 2015 16:50:18 +0530 Subject: [PATCH 3/3] Review feedback. --- .../main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala | 5 ++--- .../scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index c9ab8562d649d..1da7a988203db 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -33,12 +33,11 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val activeStages = listener.activeStages.values.toSeq + val pendingStages = listener.pendingStages.values.toSeq val completedStages = listener.completedStages.reverse.toSeq val numCompletedStages = listener.numCompletedStages val failedStages = listener.failedStages.reverse.toSeq val numFailedStages = listener.numFailedStages - val pendingStages = listener.pendingStages.values.toSeq - val numWaitingStages = pendingStages.size val now = System.currentTimeMillis val activeStagesTable = @@ -46,7 +45,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = parent.killEnabled) val pendingStagesTable = - new StageTableBase(pendingStages.sortBy(_.submissionTime), + new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse, parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false) val completedStagesTable = diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index b5fee3e8afd36..4d200eeda86b9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui.jobs -import scala.collection.mutable.{HashMap, HashSet, ListBuffer, LinkedHashMap} +import scala.collection.mutable.{HashMap, HashSet, ListBuffer} import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi @@ -56,7 +56,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val jobIdToData = new HashMap[JobId, JobUIData] // Stages: - val pendingStages = new LinkedHashMap[StageId, StageInfo] + val pendingStages = new HashMap[StageId, StageInfo] val activeStages = new HashMap[StageId, StageInfo] val completedStages = ListBuffer[StageInfo]() val skippedStages = ListBuffer[StageInfo]()