From f5736adb1d5bb4a947471eca6a9e004104ba3602 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 16 Jul 2014 23:37:34 -0700 Subject: [PATCH] Code review comments. --- .../apache/spark/ui/jobs/ExecutorTable.scala | 5 ++-- .../spark/ui/jobs/JobProgressListener.scala | 17 +++++++------ .../org/apache/spark/ui/jobs/StagePage.scala | 5 ++-- .../org/apache/spark/ui/jobs/StageTable.scala | 24 ++++++++++++------- .../ui/jobs/JobProgressListenerSuite.scala | 18 +++++++------- 5 files changed, 38 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index fc848552b63bc..0cc51c873727d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -17,12 +17,11 @@ package org.apache.spark.ui.jobs -import org.apache.spark.ui.jobs.UIData.StageUIData - import scala.collection.mutable import scala.xml.Node import org.apache.spark.ui.{ToolTips, UIUtils} +import org.apache.spark.ui.jobs.UIData.StageUIData import org.apache.spark.util.Utils /** Page showing executor summary */ @@ -66,7 +65,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { executorIdToAddress.put(executorId, address) } - listener.stageUIData.get(stageId) match { + listener.stageIdToData.get(stageId) match { case Some(stageData: StageUIData) => stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) => diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index fa4b441068d07..efb527b4f03e6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -47,7 +47,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val completedStages = ListBuffer[StageInfo]() val failedStages = ListBuffer[StageInfo]() - val stageUIData = new HashMap[Int, StageUIData] + val stageIdToData = new HashMap[Int, StageUIData] val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]() @@ -60,13 +60,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized { val stage = stageCompleted.stageInfo val stageId = stage.stageId - val stageData = stageUIData.getOrElseUpdate(stage.stageId, { - logWarning("Stage completed for unknown stage " + stage.stageId) + val stageData = stageIdToData.getOrElseUpdate(stageId, { + logWarning("Stage completed for unknown stage " + stageId) new StageUIData }) - // Remove by stageId, rather than by StageInfo, in case the StageInfo is from storage - poolToActiveStages(stageData.schedulingPool).remove(stageId) + poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId)) activeStages.remove(stageId) if (stage.failureReason.isEmpty) { completedStages += stage @@ -81,7 +80,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > retainedStages) { val toRemove = math.max(retainedStages / 10, 1) - stages.take(toRemove).foreach { s => stageUIData.remove(s.stageId) } + stages.take(toRemove).foreach { s => stageIdToData.remove(s.stageId) } stages.trimStart(toRemove) } } @@ -95,7 +94,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME) - val stageData = stageUIData.getOrElseUpdate(stage.stageId, new StageUIData) + val stageData = stageIdToData.getOrElseUpdate(stage.stageId, new StageUIData) stageData.schedulingPool = poolName stageData.description = Option(stageSubmitted.properties).flatMap { @@ -109,7 +108,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { val taskInfo = taskStart.taskInfo if (taskInfo != null) { - val stageData = stageUIData.getOrElseUpdate(taskStart.stageId, { + val stageData = stageIdToData.getOrElseUpdate(taskStart.stageId, { logWarning("Task start for unknown stage " + taskStart.stageId) new StageUIData }) @@ -126,7 +125,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val info = taskEnd.taskInfo if (info != null) { - val stageData = stageUIData.getOrElseUpdate(taskEnd.stageId, { + val stageData = stageIdToData.getOrElseUpdate(taskEnd.stageId, { logWarning("Task end for unknown stage " + taskEnd.stageId) new StageUIData }) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 40c8d2b41a13d..cab26b9e2f7d3 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -35,9 +35,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val stageId = request.getParameter("id").toInt - val stageData = listener.stageUIData(stageId) + val stageDataOption = listener.stageIdToData.get(stageId) - if (stageData.taskData.isEmpty) { + if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) { val content =

Summary Metrics

No tasks have started yet @@ -47,6 +47,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { "Details for Stage %s".format(stageId), parent.headerTabs, parent) } + val stageData = stageDataOption.get val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime) val numCompleted = tasks.count(_.taskInfo.finished) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index ff528df6b7942..916a273d8e17b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -17,12 +17,11 @@ package org.apache.spark.ui.jobs -import java.util.Date - -import scala.collection.mutable.HashMap import scala.xml.Node -import org.apache.spark.scheduler.{StageInfo, TaskInfo} +import java.util.Date + +import org.apache.spark.scheduler.StageInfo import org.apache.spark.ui.{ToolTips, UIUtils} import org.apache.spark.util.Utils @@ -108,14 +107,23 @@ private[ui] class StageTableBase( } - listener.stageUIData(s.stageId).description match { - case Some(desc) =>
{desc}
{nameLink} {killLink}
- case None =>
{killLink} {nameLink} {details}
+ val stageDataOption = listener.stageIdToData.get(s.stageId) + // Too many nested map/flatMaps with options are just annoying to read. Do this imperatively. + if (stageDataOption.isDefined && stageDataOption.get.description.isDefined) { + val desc = stageDataOption.get.description.isDefined +
{desc}
{nameLink} {killLink}
+ } else { +
{killLink} {nameLink} {details}
} } protected def stageRow(s: StageInfo): Seq[Node] = { - val stageData = listener.stageUIData(s.stageId) + val stageDataOption = listener.stageIdToData.get(s.stageId) + if (stageDataOption.isEmpty) { + return {s.stageId}No data available for this stage + } + + val stageData = stageDataOption.get val submissionTime = s.submissionTime match { case Some(t) => UIUtils.formatDate(new Date(t)) case None => "Unknown" diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index b6b8293c37312..a8556624804bb 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -59,7 +59,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc val listener = new JobProgressListener(conf) val taskMetrics = new TaskMetrics() val shuffleReadMetrics = new ShuffleReadMetrics() - assert(listener.stageUIData.size === 0) + assert(listener.stageIdToData.size === 0) // finish this task, should get updated shuffleRead shuffleReadMetrics.remoteBytesRead = 1000 @@ -69,7 +69,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc var task = new ShuffleMapTask(0, null, null, 0, null) val taskType = Utils.getFormattedClassName(task) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageUIData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail()) + assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail()) .shuffleRead === 1000) // finish a task with unknown executor-id, nothing should happen @@ -78,7 +78,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageUIData.size === 1) + assert(listener.stageIdToData.size === 1) // finish this task, should get updated duration shuffleReadMetrics.remoteBytesRead = 1000 @@ -87,7 +87,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageUIData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail()) + assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail()) .shuffleRead === 2000) // finish this task, should get updated duration @@ -97,7 +97,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageUIData.getOrElse(0, fail()).executorSummary.getOrElse("exe-2", fail()) + assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-2", fail()) .shuffleRead === 1000) } @@ -123,13 +123,13 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc for (reason <- taskFailedReasons) { listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics)) failCount += 1 - assert(listener.stageUIData(task.stageId).numCompleteTasks === 0) - assert(listener.stageUIData(task.stageId).numFailedTasks === failCount) + assert(listener.stageIdToData(task.stageId).numCompleteTasks === 0) + assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount) } // Make sure we count success as success. listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics)) - assert(listener.stageUIData(task.stageId).numCompleteTasks === 1) - assert(listener.stageUIData(task.stageId).numFailedTasks === failCount) + assert(listener.stageIdToData(task.stageId).numCompleteTasks === 1) + assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount) } }