From 72e9021eaf26f31a82120505f8b764b18fbe8d48 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 17 Jul 2014 18:58:48 -0700 Subject: [PATCH] [SPARK-2299] Consolidate various stageIdTo* hash maps in JobProgressListener This should reduce memory usage for the web ui as well as slightly increase its speed in draining the UI event queue. @andrewor14 Author: Reynold Xin Closes #1262 from rxin/ui-consolidate-hashtables and squashes the following commits: 1ac3f97 [Reynold Xin] Oops. Properly handle description. f5736ad [Reynold Xin] Code review comments. b8828dc [Reynold Xin] Merge branch 'master' into ui-consolidate-hashtables 7a7b6c4 [Reynold Xin] Revert css change. f959bb8 [Reynold Xin] [SPARK-2299] Consolidate various stageIdTo* hash maps in JobProgressListener to speed it up. 63256f5 [Reynold Xin] [SPARK-2320] Reduce
 block font size.
---
 .../spark/ui/jobs/ExecutorSummary.scala       |  36 ----
 .../apache/spark/ui/jobs/ExecutorTable.scala  |  29 ++--
 .../spark/ui/jobs/JobProgressListener.scala   | 156 +++++++-----------
 .../org/apache/spark/ui/jobs/StagePage.scala  |  37 ++---
 .../org/apache/spark/ui/jobs/StageTable.scala |  73 ++++----
 .../org/apache/spark/ui/jobs/UIData.scala     |  62 +++++++
 .../ui/jobs/JobProgressListenerSuite.scala    |  36 ++--
 7 files changed, 205 insertions(+), 224 deletions(-)
 delete mode 100644 core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
 create mode 100644 core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala

diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
deleted file mode 100644
index c4a8996c0b9a9..0000000000000
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.jobs
-
-import org.apache.spark.annotation.DeveloperApi
-
-/**
- * :: DeveloperApi ::
- * Class for reporting aggregated metrics for each executor in stage UI.
- */
-@DeveloperApi
-class ExecutorSummary {
-  var taskTime : Long = 0
-  var failedTasks : Int = 0
-  var succeededTasks : Int = 0
-  var inputBytes: Long = 0
-  var shuffleRead : Long = 0
-  var shuffleWrite : Long = 0
-  var memoryBytesSpilled : Long = 0
-  var diskBytesSpilled : Long = 0
-}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 52020954ea57c..0cc51c873727d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable
 import scala.xml.Node
 
 import org.apache.spark.ui.{ToolTips, UIUtils}
+import org.apache.spark.ui.jobs.UIData.StageUIData
 import org.apache.spark.util.Utils
 
 /** Page showing executor summary */
@@ -64,11 +65,9 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
       executorIdToAddress.put(executorId, address)
     }
 
-    val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId)
-    executorIdToSummary match {
-      case Some(x) =>
-        x.toSeq.sortBy(_._1).map { case (k, v) => {
-          // scalastyle:off
+    listener.stageIdToData.get(stageId) match {
+      case Some(stageData: StageUIData) =>
+        stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) =>
           
             {k}
             {executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}
@@ -76,16 +75,20 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
             {v.failedTasks + v.succeededTasks}
             {v.failedTasks}
             {v.succeededTasks}
-            {Utils.bytesToString(v.inputBytes)}
-            {Utils.bytesToString(v.shuffleRead)}
-            {Utils.bytesToString(v.shuffleWrite)}
-            {Utils.bytesToString(v.memoryBytesSpilled)}
-            {Utils.bytesToString(v.diskBytesSpilled)}
+            
+              {Utils.bytesToString(v.inputBytes)}
+            
+              {Utils.bytesToString(v.shuffleRead)}
+            
+              {Utils.bytesToString(v.shuffleWrite)}
+            
+              {Utils.bytesToString(v.memoryBytesSpilled)}
+            
+              {Utils.bytesToString(v.diskBytesSpilled)}
           
-          // scalastyle:on
         }
-      }
-      case _ => Seq[Node]()
+      case None =>
+        Seq.empty[Node]
     }
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 2286a7f952f28..efb527b4f03e6 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -25,6 +25,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.ui.jobs.UIData._
 
 /**
  * :: DeveloperApi ::
@@ -35,7 +36,7 @@ import org.apache.spark.storage.BlockManagerId
  * updating the internal data structures concurrently.
  */
 @DeveloperApi
-class JobProgressListener(conf: SparkConf) extends SparkListener {
+class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
 
   import JobProgressListener._
 
@@ -46,20 +47,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
   val completedStages = ListBuffer[StageInfo]()
   val failedStages = ListBuffer[StageInfo]()
 
-  // TODO: Should probably consolidate all following into a single hash map.
-  val stageIdToTime = HashMap[Int, Long]()
-  val stageIdToInputBytes = HashMap[Int, Long]()
-  val stageIdToShuffleRead = HashMap[Int, Long]()
-  val stageIdToShuffleWrite = HashMap[Int, Long]()
-  val stageIdToMemoryBytesSpilled = HashMap[Int, Long]()
-  val stageIdToDiskBytesSpilled = HashMap[Int, Long]()
-  val stageIdToTasksActive = HashMap[Int, HashMap[Long, TaskInfo]]()
-  val stageIdToTasksComplete = HashMap[Int, Int]()
-  val stageIdToTasksFailed = HashMap[Int, Int]()
-  val stageIdToTaskData = HashMap[Int, HashMap[Long, TaskUIData]]()
-  val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]()
-  val stageIdToPool = HashMap[Int, String]()
-  val stageIdToDescription = HashMap[Int, String]()
+  val stageIdToData = new HashMap[Int, StageUIData]
+
   val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
 
   val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()
@@ -71,8 +60,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
   override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
     val stage = stageCompleted.stageInfo
     val stageId = stage.stageId
-    // Remove by stageId, rather than by StageInfo, in case the StageInfo is from storage
-    poolToActiveStages(stageIdToPool(stageId)).remove(stageId)
+    val stageData = stageIdToData.getOrElseUpdate(stageId, {
+      logWarning("Stage completed for unknown stage " + stageId)
+      new StageUIData
+    })
+
+    poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId))
     activeStages.remove(stageId)
     if (stage.failureReason.isEmpty) {
       completedStages += stage
@@ -87,21 +80,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
   private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
     if (stages.size > retainedStages) {
       val toRemove = math.max(retainedStages / 10, 1)
-      stages.take(toRemove).foreach { s =>
-        stageIdToTime.remove(s.stageId)
-        stageIdToInputBytes.remove(s.stageId)
-        stageIdToShuffleRead.remove(s.stageId)
-        stageIdToShuffleWrite.remove(s.stageId)
-        stageIdToMemoryBytesSpilled.remove(s.stageId)
-        stageIdToDiskBytesSpilled.remove(s.stageId)
-        stageIdToTasksActive.remove(s.stageId)
-        stageIdToTasksComplete.remove(s.stageId)
-        stageIdToTasksFailed.remove(s.stageId)
-        stageIdToTaskData.remove(s.stageId)
-        stageIdToExecutorSummaries.remove(s.stageId)
-        stageIdToPool.remove(s.stageId)
-        stageIdToDescription.remove(s.stageId)
-      }
+      stages.take(toRemove).foreach { s => stageIdToData.remove(s.stageId) }
       stages.trimStart(toRemove)
     }
   }
@@ -114,26 +93,27 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
     val poolName = Option(stageSubmitted.properties).map {
       p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
     }.getOrElse(DEFAULT_POOL_NAME)
-    stageIdToPool(stage.stageId) = poolName
 
-    val description = Option(stageSubmitted.properties).flatMap {
+    val stageData = stageIdToData.getOrElseUpdate(stage.stageId, new StageUIData)
+    stageData.schedulingPool = poolName
+
+    stageData.description = Option(stageSubmitted.properties).flatMap {
       p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
     }
-    description.map(d => stageIdToDescription(stage.stageId) = d)
 
     val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]())
     stages(stage.stageId) = stage
   }
 
   override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
-    val sid = taskStart.stageId
     val taskInfo = taskStart.taskInfo
     if (taskInfo != null) {
-      val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]())
-      tasksActive(taskInfo.taskId) = taskInfo
-      val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]())
-      taskMap(taskInfo.taskId) = new TaskUIData(taskInfo)
-      stageIdToTaskData(sid) = taskMap
+      val stageData = stageIdToData.getOrElseUpdate(taskStart.stageId, {
+        logWarning("Task start for unknown stage " + taskStart.stageId)
+        new StageUIData
+      })
+      stageData.numActiveTasks += 1
+      stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo))
     }
   }
 
@@ -143,88 +123,76 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
   }
 
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
-    val sid = taskEnd.stageId
     val info = taskEnd.taskInfo
-
     if (info != null) {
+      val stageData = stageIdToData.getOrElseUpdate(taskEnd.stageId, {
+        logWarning("Task end for unknown stage " + taskEnd.stageId)
+        new StageUIData
+      })
+
       // create executor summary map if necessary
-      val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
-        op = new HashMap[String, ExecutorSummary]())
+      val executorSummaryMap = stageData.executorSummary
       executorSummaryMap.getOrElseUpdate(key = info.executorId, op = new ExecutorSummary)
 
-      val executorSummary = executorSummaryMap.get(info.executorId)
-      executorSummary match {
-        case Some(y) => {
-          // first update failed-task, succeed-task
-          taskEnd.reason match {
-            case Success =>
-              y.succeededTasks += 1
-            case _ =>
-              y.failedTasks += 1
-          }
-
-          // update duration
-          y.taskTime += info.duration
-
-          val metrics = taskEnd.taskMetrics
-          if (metrics != null) {
-            metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
-            metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
-            metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
-            y.memoryBytesSpilled += metrics.memoryBytesSpilled
-            y.diskBytesSpilled += metrics.diskBytesSpilled
-          }
+      executorSummaryMap.get(info.executorId).foreach { y =>
+        // first update failed-task, succeed-task
+        taskEnd.reason match {
+          case Success =>
+            y.succeededTasks += 1
+          case _ =>
+            y.failedTasks += 1
+        }
+
+        // update duration
+        y.taskTime += info.duration
+
+        val metrics = taskEnd.taskMetrics
+        if (metrics != null) {
+          metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
+          metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
+          metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
+          y.memoryBytesSpilled += metrics.memoryBytesSpilled
+          y.diskBytesSpilled += metrics.diskBytesSpilled
         }
-        case _ => {}
       }
 
-      val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]())
-      // Remove by taskId, rather than by TaskInfo, in case the TaskInfo is from storage
-      tasksActive.remove(info.taskId)
+      stageData.numActiveTasks -= 1
 
       val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) =
         taskEnd.reason match {
           case org.apache.spark.Success =>
-            stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1
+            stageData.numCompleteTasks += 1
             (None, Option(taskEnd.taskMetrics))
           case e: ExceptionFailure =>  // Handle ExceptionFailure because we might have metrics
-            stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
+            stageData.numFailedTasks += 1
             (Some(e.toErrorString), e.metrics)
           case e: TaskFailedReason =>  // All other failure cases
-            stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
+            stageData.numFailedTasks += 1
             (Some(e.toErrorString), None)
         }
 
-      stageIdToTime.getOrElseUpdate(sid, 0L)
-      val time = metrics.map(_.executorRunTime).getOrElse(0L)
-      stageIdToTime(sid) += time
 
-      stageIdToInputBytes.getOrElseUpdate(sid, 0L)
+      val taskRunTime = metrics.map(_.executorRunTime).getOrElse(0L)
+      stageData.executorRunTime += taskRunTime
       val inputBytes = metrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L)
-      stageIdToInputBytes(sid) += inputBytes
+      stageData.inputBytes += inputBytes
 
-      stageIdToShuffleRead.getOrElseUpdate(sid, 0L)
       val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L)
-      stageIdToShuffleRead(sid) += shuffleRead
+      stageData.shuffleReadBytes += shuffleRead
 
-      stageIdToShuffleWrite.getOrElseUpdate(sid, 0L)
       val shuffleWrite =
         metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L)
-      stageIdToShuffleWrite(sid) += shuffleWrite
+      stageData.shuffleWriteBytes += shuffleWrite
 
-      stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L)
       val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L)
-      stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled
+      stageData.memoryBytesSpilled += memoryBytesSpilled
 
-      stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L)
       val diskBytesSpilled = metrics.map(_.diskBytesSpilled).getOrElse(0L)
-      stageIdToDiskBytesSpilled(sid) += diskBytesSpilled
+      stageData.diskBytesSpilled += diskBytesSpilled
 
-      val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]())
-      taskMap(info.taskId) = new TaskUIData(info, metrics, errorMessage)
-      stageIdToTaskData(sid) = taskMap
+      stageData.taskData(info.taskId) = new TaskUIData(info, metrics, errorMessage)
     }
-  }
+  }  // end of onTaskEnd
 
   override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
     synchronized {
@@ -252,12 +220,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
 
 }
 
-@DeveloperApi
-case class TaskUIData(
-    taskInfo: TaskInfo,
-    taskMetrics: Option[TaskMetrics] = None,
-    errorMessage: Option[String] = None)
-
 private object JobProgressListener {
   val DEFAULT_POOL_NAME = "default"
   val DEFAULT_RETAINED_STAGES = 1000
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8c3821bd7c3eb..cab26b9e2f7d3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletRequest
 import scala.xml.Node
 
 import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
+import org.apache.spark.ui.jobs.UIData._
 import org.apache.spark.util.{Utils, Distribution}
 
 /** Page showing statistics and task list for a given stage */
@@ -34,8 +35,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
   def render(request: HttpServletRequest): Seq[Node] = {
     listener.synchronized {
       val stageId = request.getParameter("id").toInt
+      val stageDataOption = listener.stageIdToData.get(stageId)
 
-      if (!listener.stageIdToTaskData.contains(stageId)) {
+      if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) {
         val content =
           

Summary Metrics

No tasks have started yet @@ -45,23 +47,14 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { "Details for Stage %s".format(stageId), parent.headerTabs, parent) } - val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime) + val stageData = stageDataOption.get + val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime) val numCompleted = tasks.count(_.taskInfo.finished) - val inputBytes = listener.stageIdToInputBytes.getOrElse(stageId, 0L) - val hasInput = inputBytes > 0 - val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L) - val hasShuffleRead = shuffleReadBytes > 0 - val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L) - val hasShuffleWrite = shuffleWriteBytes > 0 - val memoryBytesSpilled = listener.stageIdToMemoryBytesSpilled.getOrElse(stageId, 0L) - val diskBytesSpilled = listener.stageIdToDiskBytesSpilled.getOrElse(stageId, 0L) - val hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0 - - var activeTime = 0L - val now = System.currentTimeMillis - val tasksActive = listener.stageIdToTasksActive(stageId).values - tasksActive.foreach(activeTime += _.timeRunning(now)) + val hasInput = stageData.inputBytes > 0 + val hasShuffleRead = stageData.shuffleReadBytes > 0 + val hasShuffleWrite = stageData.shuffleWriteBytes > 0 + val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0 // scalastyle:off val summary = @@ -69,34 +62,34 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
  • Total task time across all tasks: - {UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)} + {UIUtils.formatDuration(stageData.executorRunTime)}
  • {if (hasInput)
  • Input: - {Utils.bytesToString(inputBytes)} + {Utils.bytesToString(stageData.inputBytes)}
  • } {if (hasShuffleRead)
  • Shuffle read: - {Utils.bytesToString(shuffleReadBytes)} + {Utils.bytesToString(stageData.shuffleReadBytes)}
  • } {if (hasShuffleWrite)
  • Shuffle write: - {Utils.bytesToString(shuffleWriteBytes)} + {Utils.bytesToString(stageData.shuffleWriteBytes)}
  • } {if (hasBytesSpilled)
  • Shuffle spill (memory): - {Utils.bytesToString(memoryBytesSpilled)} + {Utils.bytesToString(stageData.memoryBytesSpilled)}
  • Shuffle spill (disk): - {Utils.bytesToString(diskBytesSpilled)} + {Utils.bytesToString(stageData.diskBytesSpilled)}
  • }
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index fd8d0b5cdde00..5f45c0ced5ec5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -17,12 +17,11 @@ package org.apache.spark.ui.jobs -import java.util.Date - -import scala.collection.mutable.HashMap import scala.xml.Node -import org.apache.spark.scheduler.{StageInfo, TaskInfo} +import java.util.Date + +import org.apache.spark.scheduler.StageInfo import org.apache.spark.ui.{ToolTips, UIUtils} import org.apache.spark.util.Utils @@ -71,14 +70,14 @@ private[ui] class StageTableBase( } - private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = + private def makeProgressBar(started: Int, completed: Int, failed: Int, total: Int): Seq[Node] = { val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) val startWidth = "width: %s%%".format((started.toDouble/total)*100)
- {completed}/{total} {failed} + {completed}/{total} { if (failed > 0) s"($failed failed)" else "" }
@@ -108,13 +107,23 @@ private[ui] class StageTableBase( } - listener.stageIdToDescription.get(s.stageId) - .map(d =>
{d}
{nameLink} {killLink}
) - .getOrElse(
{nameLink} {killLink} {details}
) + val stageDataOption = listener.stageIdToData.get(s.stageId) + // Too many nested map/flatMaps with options are just annoying to read. Do this imperatively. + if (stageDataOption.isDefined && stageDataOption.get.description.isDefined) { + val desc = stageDataOption.get.description +
{desc}
{nameLink} {killLink}
+ } else { +
{killLink} {nameLink} {details}
+ } } protected def stageRow(s: StageInfo): Seq[Node] = { - val poolName = listener.stageIdToPool.get(s.stageId) + val stageDataOption = listener.stageIdToData.get(s.stageId) + if (stageDataOption.isEmpty) { + return {s.stageId}No data available for this stage + } + + val stageData = stageDataOption.get val submissionTime = s.submissionTime match { case Some(t) => UIUtils.formatDate(new Date(t)) case None => "Unknown" @@ -124,35 +133,20 @@ private[ui] class StageTableBase( if (finishTime > t) finishTime - t else System.currentTimeMillis - t } val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") - val startedTasks = - listener.stageIdToTasksActive.getOrElse(s.stageId, HashMap[Long, TaskInfo]()).size - val completedTasks = listener.stageIdToTasksComplete.getOrElse(s.stageId, 0) - val failedTasks = listener.stageIdToTasksFailed.getOrElse(s.stageId, 0) match { - case f if f > 0 => "(%s failed)".format(f) - case _ => "" - } - val totalTasks = s.numTasks - val inputSortable = listener.stageIdToInputBytes.getOrElse(s.stageId, 0L) - val inputRead = inputSortable match { - case 0 => "" - case b => Utils.bytesToString(b) - } - val shuffleReadSortable = listener.stageIdToShuffleRead.getOrElse(s.stageId, 0L) - val shuffleRead = shuffleReadSortable match { - case 0 => "" - case b => Utils.bytesToString(b) - } - val shuffleWriteSortable = listener.stageIdToShuffleWrite.getOrElse(s.stageId, 0L) - val shuffleWrite = shuffleWriteSortable match { - case 0 => "" - case b => Utils.bytesToString(b) - } + + val inputRead = stageData.inputBytes + val inputReadWithUnit = if (inputRead > 0) Utils.bytesToString(inputRead) else "" + val shuffleRead = stageData.shuffleReadBytes + val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else "" + val shuffleWrite = stageData.shuffleWriteBytes + val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else "" + {s.stageId} ++ {if (isFairScheduler) { - {poolName.get} + .format(UIUtils.prependBaseUri(basePath), stageData.schedulingPool)}> + {stageData.schedulingPool} } else { @@ -162,11 +156,12 @@ private[ui] class StageTableBase( {submissionTime} {formattedDuration} - {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)} + {makeProgressBar(stageData.numActiveTasks, stageData.numCompleteTasks, + stageData.numFailedTasks, s.numTasks)} - {inputRead} - {shuffleRead} - {shuffleWrite} + {inputReadWithUnit} + {shuffleReadWithUnit} + {shuffleWriteWithUnit} } /** Render an HTML row that represents a stage */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala new file mode 100644 index 0000000000000..be11a11695b01 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler.TaskInfo + +import scala.collection.mutable.HashMap + +private[jobs] object UIData { + + class ExecutorSummary { + var taskTime : Long = 0 + var failedTasks : Int = 0 + var succeededTasks : Int = 0 + var inputBytes : Long = 0 + var shuffleRead : Long = 0 + var shuffleWrite : Long = 0 + var memoryBytesSpilled : Long = 0 + var diskBytesSpilled : Long = 0 + } + + class StageUIData { + var numActiveTasks: Int = _ + var numCompleteTasks: Int = _ + var numFailedTasks: Int = _ + + var executorRunTime: Long = _ + + var inputBytes: Long = _ + var shuffleReadBytes: Long = _ + var shuffleWriteBytes: Long = _ + var memoryBytesSpilled: Long = _ + var diskBytesSpilled: Long = _ + + var schedulingPool: String = "" + var description: Option[String] = None + + var taskData = new HashMap[Long, TaskUIData] + var executorSummary = new HashMap[String, ExecutorSummary] + } + + case class TaskUIData( + taskInfo: TaskInfo, + taskMetrics: Option[TaskMetrics] = None, + errorMessage: Option[String] = None) +} diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index fa43b66c6cb5a..a8556624804bb 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -47,11 +47,11 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc } listener.completedStages.size should be (5) - listener.completedStages.filter(_.stageId == 50).size should be (1) - listener.completedStages.filter(_.stageId == 49).size should be (1) - listener.completedStages.filter(_.stageId == 48).size should be (1) - listener.completedStages.filter(_.stageId == 47).size should be (1) - listener.completedStages.filter(_.stageId == 46).size should be (1) + listener.completedStages.count(_.stageId == 50) should be (1) + listener.completedStages.count(_.stageId == 49) should be (1) + listener.completedStages.count(_.stageId == 48) should be (1) + listener.completedStages.count(_.stageId == 47) should be (1) + listener.completedStages.count(_.stageId == 46) should be (1) } test("test executor id to summary") { @@ -59,9 +59,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc val listener = new JobProgressListener(conf) val taskMetrics = new TaskMetrics() val shuffleReadMetrics = new ShuffleReadMetrics() - - // nothing in it - assert(listener.stageIdToExecutorSummaries.size == 0) + assert(listener.stageIdToData.size === 0) // finish this task, should get updated shuffleRead shuffleReadMetrics.remoteBytesRead = 1000 @@ -71,8 +69,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc var task = new ShuffleMapTask(0, null, null, 0, null) val taskType = Utils.getFormattedClassName(task) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail()) - .shuffleRead == 1000) + assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail()) + .shuffleRead === 1000) // finish a task with unknown executor-id, nothing should happen taskInfo = @@ -80,7 +78,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToExecutorSummaries.size == 1) + assert(listener.stageIdToData.size === 1) // finish this task, should get updated duration shuffleReadMetrics.remoteBytesRead = 1000 @@ -89,8 +87,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail()) - .shuffleRead == 2000) + assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail()) + .shuffleRead === 2000) // finish this task, should get updated duration shuffleReadMetrics.remoteBytesRead = 1000 @@ -99,8 +97,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail()) - .shuffleRead == 1000) + assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-2", fail()) + .shuffleRead === 1000) } test("test task success vs failure counting for different task end reasons") { @@ -121,13 +119,17 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc TaskKilled, ExecutorLostFailure, UnknownReason) + var failCount = 0 for (reason <- taskFailedReasons) { listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics)) - assert(listener.stageIdToTasksComplete.get(task.stageId) === None) + failCount += 1 + assert(listener.stageIdToData(task.stageId).numCompleteTasks === 0) + assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount) } // Make sure we count success as success. listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics)) - assert(listener.stageIdToTasksComplete.get(task.stageId) === Some(1)) + assert(listener.stageIdToData(task.stageId).numCompleteTasks === 1) + assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount) } }