Skip to content

Commit

Permalink
[SPARK-2299] Consolidate various stageIdTo* hash maps in JobProgressL…
Browse files Browse the repository at this point in the history
…istener to speed it up.
  • Loading branch information
rxin committed Jun 29, 2014
1 parent 63256f5 commit f959bb8
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 176 deletions.
27 changes: 15 additions & 12 deletions core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.ui.jobs

import org.apache.spark.ui.jobs.UIData.StageUIData

import scala.collection.mutable
import scala.xml.Node

Expand Down Expand Up @@ -63,27 +65,28 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
executorIdToAddress.put(executorId, address)
}

val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId)
executorIdToSummary match {
case Some(x) =>
x.toSeq.sortBy(_._1).map { case (k, v) => {
// scalastyle:off
listener.stageUIData.get(stageId) match {
case Some(stageData: StageUIData) =>
stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) =>
<tr>
<td>{k}</td>
<td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
<td sorttable_customekey={v.taskTime.toString}>{UIUtils.formatDuration(v.taskTime)}</td>
<td>{v.failedTasks + v.succeededTasks}</td>
<td>{v.failedTasks}</td>
<td>{v.succeededTasks}</td>
<td sorttable_customekey={v.shuffleRead.toString}>{Utils.bytesToString(v.shuffleRead)}</td>
<td sorttable_customekey={v.shuffleWrite.toString}>{Utils.bytesToString(v.shuffleWrite)}</td>
<td sorttable_customekey={v.memoryBytesSpilled.toString} >{Utils.bytesToString(v.memoryBytesSpilled)}</td>
<td sorttable_customekey={v.diskBytesSpilled.toString} >{Utils.bytesToString(v.diskBytesSpilled)}</td>
<td sorttable_customekey={v.shuffleRead.toString}>
{Utils.bytesToString(v.shuffleRead)}</td>
<td sorttable_customekey={v.shuffleWrite.toString}>
{Utils.bytesToString(v.shuffleWrite)}</td>
<td sorttable_customekey={v.memoryBytesSpilled.toString}>
{Utils.bytesToString(v.memoryBytesSpilled)}</td>
<td sorttable_customekey={v.diskBytesSpilled.toString}>
{Utils.bytesToString(v.diskBytesSpilled)}</td>
</tr>
// scalastyle:on
}
}
case _ => Seq[Node]()
case None =>
Seq.empty[Node]
}
}
}
150 changes: 58 additions & 92 deletions core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.ui.jobs.UIData._

/**
* :: DeveloperApi ::
Expand All @@ -35,7 +36,7 @@ import org.apache.spark.storage.BlockManagerId
* updating the internal data structures concurrently.
*/
@DeveloperApi
class JobProgressListener(conf: SparkConf) extends SparkListener {
class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {

import JobProgressListener._

Expand All @@ -51,19 +52,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
var totalShuffleRead = 0L
var totalShuffleWrite = 0L

// TODO: Should probably consolidate all following into a single hash map.
val stageIdToTime = HashMap[Int, Long]()
val stageIdToShuffleRead = HashMap[Int, Long]()
val stageIdToShuffleWrite = HashMap[Int, Long]()
val stageIdToMemoryBytesSpilled = HashMap[Int, Long]()
val stageIdToDiskBytesSpilled = HashMap[Int, Long]()
val stageIdToTasksActive = HashMap[Int, HashMap[Long, TaskInfo]]()
val stageIdToTasksComplete = HashMap[Int, Int]()
val stageIdToTasksFailed = HashMap[Int, Int]()
val stageIdToTaskData = HashMap[Int, HashMap[Long, TaskUIData]]()
val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]()
val stageIdToPool = HashMap[Int, String]()
val stageIdToDescription = HashMap[Int, String]()
val stageUIData = new HashMap[Int, StageUIData]

val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()

val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()
Expand All @@ -75,8 +65,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
val stage = stageCompleted.stageInfo
val stageId = stage.stageId
val stageData = stageUIData.getOrElseUpdate(stage.stageId, {
logWarning("Stage completed for unknown stage " + stage.stageId)
new StageUIData
})

// Remove by stageId, rather than by StageInfo, in case the StageInfo is from storage
poolToActiveStages(stageIdToPool(stageId)).remove(stageId)
poolToActiveStages(stageData.schedulingPool).remove(stageId)
activeStages.remove(stageId)
if (stage.failureReason.isEmpty) {
completedStages += stage
Expand All @@ -91,20 +86,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stages.take(toRemove).foreach { s =>
stageIdToTime.remove(s.stageId)
stageIdToShuffleRead.remove(s.stageId)
stageIdToShuffleWrite.remove(s.stageId)
stageIdToMemoryBytesSpilled.remove(s.stageId)
stageIdToDiskBytesSpilled.remove(s.stageId)
stageIdToTasksActive.remove(s.stageId)
stageIdToTasksComplete.remove(s.stageId)
stageIdToTasksFailed.remove(s.stageId)
stageIdToTaskData.remove(s.stageId)
stageIdToExecutorSummaries.remove(s.stageId)
stageIdToPool.remove(s.stageId)
stageIdToDescription.remove(s.stageId)
}
stages.take(toRemove).foreach { s => stageUIData.remove(s.stageId) }
stages.trimStart(toRemove)
}
}
Expand All @@ -117,26 +99,27 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val poolName = Option(stageSubmitted.properties).map {
p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
}.getOrElse(DEFAULT_POOL_NAME)
stageIdToPool(stage.stageId) = poolName

val description = Option(stageSubmitted.properties).flatMap {
val stageData = stageUIData.getOrElseUpdate(stage.stageId, new StageUIData)
stageData.schedulingPool = poolName

stageData.description = Option(stageSubmitted.properties).flatMap {
p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
}
description.map(d => stageIdToDescription(stage.stageId) = d)

val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]())
stages(stage.stageId) = stage
}

override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
val sid = taskStart.stageId
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]())
tasksActive(taskInfo.taskId) = taskInfo
val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]())
taskMap(taskInfo.taskId) = new TaskUIData(taskInfo)
stageIdToTaskData(sid) = taskMap
val stageData = stageUIData.getOrElseUpdate(taskStart.stageId, {
logWarning("Task start for unknown stage " + taskStart.stageId)
new StageUIData
})
stageData.numActiveTasks += 1
stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo))
}
}

Expand All @@ -146,86 +129,75 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val sid = taskEnd.stageId
val info = taskEnd.taskInfo

if (info != null) {
val stageData = stageUIData.getOrElseUpdate(taskEnd.stageId, {
logWarning("Task end for unknown stage " + taskEnd.stageId)
new StageUIData
})

// create executor summary map if necessary
val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
op = new HashMap[String, ExecutorSummary]())
val executorSummaryMap = stageData.executorSummary
executorSummaryMap.getOrElseUpdate(key = info.executorId, op = new ExecutorSummary)

val executorSummary = executorSummaryMap.get(info.executorId)
executorSummary match {
case Some(y) => {
// first update failed-task, succeed-task
taskEnd.reason match {
case Success =>
y.succeededTasks += 1
case _ =>
y.failedTasks += 1
}

// update duration
y.taskTime += info.duration

val metrics = taskEnd.taskMetrics
if (metrics != null) {
metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
y.memoryBytesSpilled += metrics.memoryBytesSpilled
y.diskBytesSpilled += metrics.diskBytesSpilled
}
executorSummaryMap.get(info.executorId).foreach { y =>
// first update failed-task, succeed-task
taskEnd.reason match {
case Success =>
y.succeededTasks += 1
case _ =>
y.failedTasks += 1
}

// update duration
y.taskTime += info.duration

val metrics = taskEnd.taskMetrics
if (metrics != null) {
metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
y.memoryBytesSpilled += metrics.memoryBytesSpilled
y.diskBytesSpilled += metrics.diskBytesSpilled
}
case _ => {}
}

val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]())
// Remove by taskId, rather than by TaskInfo, in case the TaskInfo is from storage
tasksActive.remove(info.taskId)
stageData.numActiveTasks -= 1

val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) =
taskEnd.reason match {
case org.apache.spark.Success =>
stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1
stageData.numCompleteTasks += 1
(None, Option(taskEnd.taskMetrics))
case e: ExceptionFailure => // Handle ExceptionFailure because we might have metrics
stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
stageData.numFailedTasks += 1
(Some(e.toErrorString), e.metrics)
case e: TaskFailedReason => // All other failure cases
stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
stageData.numFailedTasks += 1
(Some(e.toErrorString), None)
}

stageIdToTime.getOrElseUpdate(sid, 0L)
val time = metrics.map(_.executorRunTime).getOrElse(0L)
stageIdToTime(sid) += time
totalTime += time
val taskRunTime = metrics.map(_.executorRunTime).getOrElse(0L)
stageData.executorRunTime += taskRunTime
totalTime += taskRunTime

stageIdToShuffleRead.getOrElseUpdate(sid, 0L)
val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L)
stageIdToShuffleRead(sid) += shuffleRead
stageData.shuffleReadBytes += shuffleRead
totalShuffleRead += shuffleRead

stageIdToShuffleWrite.getOrElseUpdate(sid, 0L)
val shuffleWrite =
metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L)
stageIdToShuffleWrite(sid) += shuffleWrite
stageData.shuffleWriteBytes += shuffleWrite
totalShuffleWrite += shuffleWrite

stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L)
val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L)
stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled
stageData.memoryBytesSpilled += memoryBytesSpilled

stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L)
val diskBytesSpilled = metrics.map(_.diskBytesSpilled).getOrElse(0L)
stageIdToDiskBytesSpilled(sid) += diskBytesSpilled
stageData.diskBytesSpilled += diskBytesSpilled

val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]())
taskMap(info.taskId) = new TaskUIData(info, metrics, errorMessage)
stageIdToTaskData(sid) = taskMap
stageData.taskData(info.taskId) = new TaskUIData(info, metrics, errorMessage)
}
}
} // end of onTaskEnd

override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
synchronized {
Expand Down Expand Up @@ -253,12 +225,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {

}

@DeveloperApi
case class TaskUIData(
taskInfo: TaskInfo,
taskMetrics: Option[TaskMetrics] = None,
errorMessage: Option[String] = None)

private object JobProgressListener {
val DEFAULT_POOL_NAME = "default"
val DEFAULT_RETAINED_STAGES = 1000
Expand Down
30 changes: 12 additions & 18 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node

import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.ui.jobs.UIData._
import org.apache.spark.util.{Utils, Distribution}

/** Page showing statistics and task list for a given stage */
Expand All @@ -34,8 +35,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val stageId = request.getParameter("id").toInt
val stageData = listener.stageUIData(stageId)

if (!listener.stageIdToTaskData.contains(stageId)) {
if (stageData.taskData.isEmpty) {
val content =
<div>
<h4>Summary Metrics</h4> No tasks have started yet
Expand All @@ -45,50 +47,42 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
"Details for Stage %s".format(stageId), parent.headerTabs, parent)
}

val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime)
val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)

val numCompleted = tasks.count(_.taskInfo.finished)
val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L)
val hasShuffleRead = shuffleReadBytes > 0
val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
val hasShuffleWrite = shuffleWriteBytes > 0
val memoryBytesSpilled = listener.stageIdToMemoryBytesSpilled.getOrElse(stageId, 0L)
val diskBytesSpilled = listener.stageIdToDiskBytesSpilled.getOrElse(stageId, 0L)
val hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0

var activeTime = 0L
val now = System.currentTimeMillis
val tasksActive = listener.stageIdToTasksActive(stageId).values
tasksActive.foreach(activeTime += _.timeRunning(now))
val hasShuffleRead = stageData.shuffleReadBytes > 0
val hasShuffleWrite = stageData.shuffleWriteBytes > 0
val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0

// scalastyle:off
val summary =
<div>
<ul class="unstyled">
<li>
<strong>Total task time across all tasks: </strong>
{UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
{UIUtils.formatDuration(stageData.executorRunTime)}
</li>
{if (hasShuffleRead)
<li>
<strong>Shuffle read: </strong>
{Utils.bytesToString(shuffleReadBytes)}
{Utils.bytesToString(stageData.shuffleReadBytes)}
</li>
}
{if (hasShuffleWrite)
<li>
<strong>Shuffle write: </strong>
{Utils.bytesToString(shuffleWriteBytes)}
{Utils.bytesToString(stageData.shuffleWriteBytes)}
</li>
}
{if (hasBytesSpilled)
<li>
<strong>Shuffle spill (memory): </strong>
{Utils.bytesToString(memoryBytesSpilled)}
{Utils.bytesToString(stageData.memoryBytesSpilled)}
</li>
<li>
<strong>Shuffle spill (disk): </strong>
{Utils.bytesToString(diskBytesSpilled)}
{Utils.bytesToString(stageData.diskBytesSpilled)}
</li>
}
</ul>
Expand Down
Loading

0 comments on commit f959bb8

Please sign in to comment.