diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
index b0f8ca2ab0d3f..63a4181c4d613 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
@@ -37,12 +37,18 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
val numCompletedStages = listener.numCompletedStages
val failedStages = listener.failedStages.reverse.toSeq
val numFailedStages = listener.numFailedStages
+ val pendingStages = listener.pendingStages.values.toSeq
+ val numWaitingStages = pendingStages.size
val now = System.currentTimeMillis
val activeStagesTable =
new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler,
killEnabled = parent.killEnabled)
+ val pendingStagesTable =
+ new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse,
+ parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler,
+ killEnabled = false)
val completedStagesTable =
new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath,
parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false)
@@ -68,6 +74,10 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
Scheduling Mode:
{listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
+
+ Pending Stages:
+ {pendingStages.size}
+
Active Stages:
{activeStages.size}
@@ -89,6 +99,8 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
} else {
Seq[Node]()
}} ++
+ Pending Stages ({pendingStages.size})
++
+ pendingStagesTable.toNodeSeq ++
Active Stages ({activeStages.size})
++
activeStagesTable.toNodeSeq ++
Completed Stages ({numCompletedStages})
++
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index b0d3bed1300b3..4d200eeda86b9 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -56,6 +56,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val jobIdToData = new HashMap[JobId, JobUIData]
// Stages:
+ val pendingStages = new HashMap[StageId, StageInfo]
val activeStages = new HashMap[StageId, StageInfo]
val completedStages = ListBuffer[StageInfo]()
val skippedStages = ListBuffer[StageInfo]()
@@ -157,6 +158,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageIds = jobStart.stageIds,
jobGroup = jobGroup,
status = JobExecutionStatus.RUNNING)
+ jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x)
// Compute (a potential underestimate of) the number of tasks that will be run by this job.
// This may be an underestimate because the job start event references all of the result
// stages' transitive stage dependencies, but some of these stages might be skipped if their
@@ -187,6 +189,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
}
jobData.completionTime = Option(jobEnd.time).filter(_ >= 0)
+ jobData.stageIds.foreach(pendingStages.remove)
jobEnd.jobResult match {
case JobSucceeded =>
completedJobs += jobData
@@ -257,7 +260,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized {
val stage = stageSubmitted.stageInfo
activeStages(stage.stageId) = stage
-
+ pendingStages.remove(stage.stageId)
val poolName = Option(stageSubmitted.properties).map {
p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
}.getOrElse(DEFAULT_POOL_NAME)