Skip to content

Commit

Permalink
Code review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Jul 17, 2014
1 parent b8828dc commit f5736ad
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

package org.apache.spark.ui.jobs

import org.apache.spark.ui.jobs.UIData.StageUIData

import scala.collection.mutable
import scala.xml.Node

import org.apache.spark.ui.{ToolTips, UIUtils}
import org.apache.spark.ui.jobs.UIData.StageUIData
import org.apache.spark.util.Utils

/** Page showing executor summary */
Expand Down Expand Up @@ -66,7 +65,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
executorIdToAddress.put(executorId, address)
}

listener.stageUIData.get(stageId) match {
listener.stageIdToData.get(stageId) match {
case Some(stageData: StageUIData) =>
stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) =>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val completedStages = ListBuffer[StageInfo]()
val failedStages = ListBuffer[StageInfo]()

val stageUIData = new HashMap[Int, StageUIData]
val stageIdToData = new HashMap[Int, StageUIData]

val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()

Expand All @@ -60,13 +60,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
val stage = stageCompleted.stageInfo
val stageId = stage.stageId
val stageData = stageUIData.getOrElseUpdate(stage.stageId, {
logWarning("Stage completed for unknown stage " + stage.stageId)
val stageData = stageIdToData.getOrElseUpdate(stageId, {
logWarning("Stage completed for unknown stage " + stageId)
new StageUIData
})

// Remove by stageId, rather than by StageInfo, in case the StageInfo is from storage
poolToActiveStages(stageData.schedulingPool).remove(stageId)
poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId))
activeStages.remove(stageId)
if (stage.failureReason.isEmpty) {
completedStages += stage
Expand All @@ -81,7 +80,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stages.take(toRemove).foreach { s => stageUIData.remove(s.stageId) }
stages.take(toRemove).foreach { s => stageIdToData.remove(s.stageId) }
stages.trimStart(toRemove)
}
}
Expand All @@ -95,7 +94,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
}.getOrElse(DEFAULT_POOL_NAME)

val stageData = stageUIData.getOrElseUpdate(stage.stageId, new StageUIData)
val stageData = stageIdToData.getOrElseUpdate(stage.stageId, new StageUIData)
stageData.schedulingPool = poolName

stageData.description = Option(stageSubmitted.properties).flatMap {
Expand All @@ -109,7 +108,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
val stageData = stageUIData.getOrElseUpdate(taskStart.stageId, {
val stageData = stageIdToData.getOrElseUpdate(taskStart.stageId, {
logWarning("Task start for unknown stage " + taskStart.stageId)
new StageUIData
})
Expand All @@ -126,7 +125,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val info = taskEnd.taskInfo
if (info != null) {
val stageData = stageUIData.getOrElseUpdate(taskEnd.stageId, {
val stageData = stageIdToData.getOrElseUpdate(taskEnd.stageId, {
logWarning("Task end for unknown stage " + taskEnd.stageId)
new StageUIData
})
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val stageId = request.getParameter("id").toInt
val stageData = listener.stageUIData(stageId)
val stageDataOption = listener.stageIdToData.get(stageId)

if (stageData.taskData.isEmpty) {
if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) {
val content =
<div>
<h4>Summary Metrics</h4> No tasks have started yet
Expand All @@ -47,6 +47,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
"Details for Stage %s".format(stageId), parent.headerTabs, parent)
}

val stageData = stageDataOption.get
val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)

val numCompleted = tasks.count(_.taskInfo.finished)
Expand Down
24 changes: 16 additions & 8 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

package org.apache.spark.ui.jobs

import java.util.Date

import scala.collection.mutable.HashMap
import scala.xml.Node

import org.apache.spark.scheduler.{StageInfo, TaskInfo}
import java.util.Date

import org.apache.spark.scheduler.StageInfo
import org.apache.spark.ui.{ToolTips, UIUtils}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -108,14 +107,23 @@ private[ui] class StageTableBase(
<pre class="stage-details collapsed">{s.details}</pre>
}

listener.stageUIData(s.stageId).description match {
case Some(desc) => <div><em>{desc}</em></div><div>{nameLink} {killLink}</div>
case None => <div>{killLink} {nameLink} {details}</div>
val stageDataOption = listener.stageIdToData.get(s.stageId)
// Too many nested map/flatMaps with options are just annoying to read. Do this imperatively.
if (stageDataOption.isDefined && stageDataOption.get.description.isDefined) {
val desc = stageDataOption.get.description.isDefined
<div><em>{desc}</em></div><div>{nameLink} {killLink}</div>
} else {
<div>{killLink} {nameLink} {details}</div>
}
}

protected def stageRow(s: StageInfo): Seq[Node] = {
val stageData = listener.stageUIData(s.stageId)
val stageDataOption = listener.stageIdToData.get(s.stageId)
if (stageDataOption.isEmpty) {
return <td>{s.stageId}</td><td>No data available for this stage</td>
}

val stageData = stageDataOption.get
val submissionTime = s.submissionTime match {
case Some(t) => UIUtils.formatDate(new Date(t))
case None => "Unknown"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val listener = new JobProgressListener(conf)
val taskMetrics = new TaskMetrics()
val shuffleReadMetrics = new ShuffleReadMetrics()
assert(listener.stageUIData.size === 0)
assert(listener.stageIdToData.size === 0)

// finish this task, should get updated shuffleRead
shuffleReadMetrics.remoteBytesRead = 1000
Expand All @@ -69,7 +69,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
var task = new ShuffleMapTask(0, null, null, 0, null)
val taskType = Utils.getFormattedClassName(task)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
assert(listener.stageUIData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
.shuffleRead === 1000)

// finish a task with unknown executor-id, nothing should happen
Expand All @@ -78,7 +78,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
assert(listener.stageUIData.size === 1)
assert(listener.stageIdToData.size === 1)

// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
Expand All @@ -87,7 +87,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
assert(listener.stageUIData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
.shuffleRead === 2000)

// finish this task, should get updated duration
Expand All @@ -97,7 +97,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
assert(listener.stageUIData.getOrElse(0, fail()).executorSummary.getOrElse("exe-2", fail())
assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-2", fail())
.shuffleRead === 1000)
}

Expand All @@ -123,13 +123,13 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
for (reason <- taskFailedReasons) {
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics))
failCount += 1
assert(listener.stageUIData(task.stageId).numCompleteTasks === 0)
assert(listener.stageUIData(task.stageId).numFailedTasks === failCount)
assert(listener.stageIdToData(task.stageId).numCompleteTasks === 0)
assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
}

// Make sure we count success as success.
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics))
assert(listener.stageUIData(task.stageId).numCompleteTasks === 1)
assert(listener.stageUIData(task.stageId).numFailedTasks === failCount)
assert(listener.stageIdToData(task.stageId).numCompleteTasks === 1)
assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
}
}

0 comments on commit f5736ad

Please sign in to comment.