From c19d82aa4c40889985cb913cf0aa38208facaa42 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 14 Jan 2015 16:48:31 +0530 Subject: [PATCH] SPARK-5217 Spark UI should report pending stages during job execution on AllStagesPage. --- .../org/apache/spark/ui/jobs/AllStagesPage.scala | 12 ++++++++++++ .../apache/spark/ui/jobs/JobProgressListener.scala | 5 ++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index b0f8ca2ab0d3f..63a4181c4d613 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -37,12 +37,18 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { val numCompletedStages = listener.numCompletedStages val failedStages = listener.failedStages.reverse.toSeq val numFailedStages = listener.numFailedStages + val pendingStages = listener.pendingStages.values.toSeq + val numWaitingStages = pendingStages.size val now = System.currentTimeMillis val activeStagesTable = new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = parent.killEnabled) + val pendingStagesTable = + new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse, + parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, + killEnabled = false) val completedStagesTable = new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false) @@ -68,6 +74,10 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { Scheduling Mode: {listener.schedulingMode.map(_.toString).getOrElse("Unknown")} +
  • + Pending Stages: + {pendingStages.size} +
  • Active Stages: {activeStages.size} @@ -89,6 +99,8 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { } else { Seq[Node]() }} ++ +

    Pending Stages ({pendingStages.size})

    ++ + pendingStagesTable.toNodeSeq ++

    Active Stages ({activeStages.size})

    ++ activeStagesTable.toNodeSeq ++

    Completed Stages ({numCompletedStages})

    ++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index b0d3bed1300b3..4d200eeda86b9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -56,6 +56,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val jobIdToData = new HashMap[JobId, JobUIData] // Stages: + val pendingStages = new HashMap[StageId, StageInfo] val activeStages = new HashMap[StageId, StageInfo] val completedStages = ListBuffer[StageInfo]() val skippedStages = ListBuffer[StageInfo]() @@ -157,6 +158,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageIds = jobStart.stageIds, jobGroup = jobGroup, status = JobExecutionStatus.RUNNING) + jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x) // Compute (a potential underestimate of) the number of tasks that will be run by this job. // This may be an underestimate because the job start event references all of the result // stages' transitive stage dependencies, but some of these stages might be skipped if their @@ -187,6 +189,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { } jobData.completionTime = Option(jobEnd.time).filter(_ >= 0) + jobData.stageIds.foreach(pendingStages.remove) jobEnd.jobResult match { case JobSucceeded => completedJobs += jobData @@ -257,7 +260,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized { val stage = stageSubmitted.stageInfo activeStages(stage.stageId) = stage - + pendingStages.remove(stage.stageId) val poolName = Option(stageSubmitted.properties).map { p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME)