Skip to content

Commit

Permalink
[SPARK-2299] Consolidate various stageIdTo* hash maps in JobProgressL…
Browse files Browse the repository at this point in the history
…istener

This should reduce memory usage for the web ui as well as slightly increase its speed in draining the UI event queue.

@andrewor14

Author: Reynold Xin <rxin@apache.org>

Closes #1262 from rxin/ui-consolidate-hashtables and squashes the following commits:

1ac3f97 [Reynold Xin] Oops. Properly handle description.
f5736ad [Reynold Xin] Code review comments.
b8828dc [Reynold Xin] Merge branch 'master' into ui-consolidate-hashtables
7a7b6c4 [Reynold Xin] Revert css change.
f959bb8 [Reynold Xin] [SPARK-2299] Consolidate various stageIdTo* hash maps in JobProgressListener to speed it up.
63256f5 [Reynold Xin] [SPARK-2320] Reduce <pre> block font size.
  • Loading branch information
rxin committed Jul 18, 2014
1 parent 935fe65 commit 72e9021
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 224 deletions.
36 changes: 0 additions & 36 deletions core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala

This file was deleted.

29 changes: 16 additions & 13 deletions core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable
import scala.xml.Node

import org.apache.spark.ui.{ToolTips, UIUtils}
import org.apache.spark.ui.jobs.UIData.StageUIData
import org.apache.spark.util.Utils

/** Page showing executor summary */
Expand Down Expand Up @@ -64,28 +65,30 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
executorIdToAddress.put(executorId, address)
}

val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId)
executorIdToSummary match {
case Some(x) =>
x.toSeq.sortBy(_._1).map { case (k, v) => {
// scalastyle:off
listener.stageIdToData.get(stageId) match {
case Some(stageData: StageUIData) =>
stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) =>
<tr>
<td>{k}</td>
<td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
<td sorttable_customekey={v.taskTime.toString}>{UIUtils.formatDuration(v.taskTime)}</td>
<td>{v.failedTasks + v.succeededTasks}</td>
<td>{v.failedTasks}</td>
<td>{v.succeededTasks}</td>
<td sorttable_customekey={v.inputBytes.toString}>{Utils.bytesToString(v.inputBytes)}</td>
<td sorttable_customekey={v.shuffleRead.toString}>{Utils.bytesToString(v.shuffleRead)}</td>
<td sorttable_customekey={v.shuffleWrite.toString}>{Utils.bytesToString(v.shuffleWrite)}</td>
<td sorttable_customekey={v.memoryBytesSpilled.toString} >{Utils.bytesToString(v.memoryBytesSpilled)}</td>
<td sorttable_customekey={v.diskBytesSpilled.toString} >{Utils.bytesToString(v.diskBytesSpilled)}</td>
<td sorttable_customekey={v.inputBytes.toString}>
{Utils.bytesToString(v.inputBytes)}</td>
<td sorttable_customekey={v.shuffleRead.toString}>
{Utils.bytesToString(v.shuffleRead)}</td>
<td sorttable_customekey={v.shuffleWrite.toString}>
{Utils.bytesToString(v.shuffleWrite)}</td>
<td sorttable_customekey={v.memoryBytesSpilled.toString}>
{Utils.bytesToString(v.memoryBytesSpilled)}</td>
<td sorttable_customekey={v.diskBytesSpilled.toString}>
{Utils.bytesToString(v.diskBytesSpilled)}</td>
</tr>
// scalastyle:on
}
}
case _ => Seq[Node]()
case None =>
Seq.empty[Node]
}
}
}
156 changes: 59 additions & 97 deletions core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.ui.jobs.UIData._

/**
* :: DeveloperApi ::
Expand All @@ -35,7 +36,7 @@ import org.apache.spark.storage.BlockManagerId
* updating the internal data structures concurrently.
*/
@DeveloperApi
class JobProgressListener(conf: SparkConf) extends SparkListener {
class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {

import JobProgressListener._

Expand All @@ -46,20 +47,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val completedStages = ListBuffer[StageInfo]()
val failedStages = ListBuffer[StageInfo]()

// TODO: Should probably consolidate all following into a single hash map.
val stageIdToTime = HashMap[Int, Long]()
val stageIdToInputBytes = HashMap[Int, Long]()
val stageIdToShuffleRead = HashMap[Int, Long]()
val stageIdToShuffleWrite = HashMap[Int, Long]()
val stageIdToMemoryBytesSpilled = HashMap[Int, Long]()
val stageIdToDiskBytesSpilled = HashMap[Int, Long]()
val stageIdToTasksActive = HashMap[Int, HashMap[Long, TaskInfo]]()
val stageIdToTasksComplete = HashMap[Int, Int]()
val stageIdToTasksFailed = HashMap[Int, Int]()
val stageIdToTaskData = HashMap[Int, HashMap[Long, TaskUIData]]()
val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]()
val stageIdToPool = HashMap[Int, String]()
val stageIdToDescription = HashMap[Int, String]()
val stageIdToData = new HashMap[Int, StageUIData]

val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()

val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()
Expand All @@ -71,8 +60,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
val stage = stageCompleted.stageInfo
val stageId = stage.stageId
// Remove by stageId, rather than by StageInfo, in case the StageInfo is from storage
poolToActiveStages(stageIdToPool(stageId)).remove(stageId)
val stageData = stageIdToData.getOrElseUpdate(stageId, {
logWarning("Stage completed for unknown stage " + stageId)
new StageUIData
})

poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId))
activeStages.remove(stageId)
if (stage.failureReason.isEmpty) {
completedStages += stage
Expand All @@ -87,21 +80,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stages.take(toRemove).foreach { s =>
stageIdToTime.remove(s.stageId)
stageIdToInputBytes.remove(s.stageId)
stageIdToShuffleRead.remove(s.stageId)
stageIdToShuffleWrite.remove(s.stageId)
stageIdToMemoryBytesSpilled.remove(s.stageId)
stageIdToDiskBytesSpilled.remove(s.stageId)
stageIdToTasksActive.remove(s.stageId)
stageIdToTasksComplete.remove(s.stageId)
stageIdToTasksFailed.remove(s.stageId)
stageIdToTaskData.remove(s.stageId)
stageIdToExecutorSummaries.remove(s.stageId)
stageIdToPool.remove(s.stageId)
stageIdToDescription.remove(s.stageId)
}
stages.take(toRemove).foreach { s => stageIdToData.remove(s.stageId) }
stages.trimStart(toRemove)
}
}
Expand All @@ -114,26 +93,27 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val poolName = Option(stageSubmitted.properties).map {
p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
}.getOrElse(DEFAULT_POOL_NAME)
stageIdToPool(stage.stageId) = poolName

val description = Option(stageSubmitted.properties).flatMap {
val stageData = stageIdToData.getOrElseUpdate(stage.stageId, new StageUIData)
stageData.schedulingPool = poolName

stageData.description = Option(stageSubmitted.properties).flatMap {
p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
}
description.map(d => stageIdToDescription(stage.stageId) = d)

val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]())
stages(stage.stageId) = stage
}

override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
val sid = taskStart.stageId
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]())
tasksActive(taskInfo.taskId) = taskInfo
val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]())
taskMap(taskInfo.taskId) = new TaskUIData(taskInfo)
stageIdToTaskData(sid) = taskMap
val stageData = stageIdToData.getOrElseUpdate(taskStart.stageId, {
logWarning("Task start for unknown stage " + taskStart.stageId)
new StageUIData
})
stageData.numActiveTasks += 1
stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo))
}
}

Expand All @@ -143,88 +123,76 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val sid = taskEnd.stageId
val info = taskEnd.taskInfo

if (info != null) {
val stageData = stageIdToData.getOrElseUpdate(taskEnd.stageId, {
logWarning("Task end for unknown stage " + taskEnd.stageId)
new StageUIData
})

// create executor summary map if necessary
val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
op = new HashMap[String, ExecutorSummary]())
val executorSummaryMap = stageData.executorSummary
executorSummaryMap.getOrElseUpdate(key = info.executorId, op = new ExecutorSummary)

val executorSummary = executorSummaryMap.get(info.executorId)
executorSummary match {
case Some(y) => {
// first update failed-task, succeed-task
taskEnd.reason match {
case Success =>
y.succeededTasks += 1
case _ =>
y.failedTasks += 1
}

// update duration
y.taskTime += info.duration

val metrics = taskEnd.taskMetrics
if (metrics != null) {
metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
y.memoryBytesSpilled += metrics.memoryBytesSpilled
y.diskBytesSpilled += metrics.diskBytesSpilled
}
executorSummaryMap.get(info.executorId).foreach { y =>
// first update failed-task, succeed-task
taskEnd.reason match {
case Success =>
y.succeededTasks += 1
case _ =>
y.failedTasks += 1
}

// update duration
y.taskTime += info.duration

val metrics = taskEnd.taskMetrics
if (metrics != null) {
metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
y.memoryBytesSpilled += metrics.memoryBytesSpilled
y.diskBytesSpilled += metrics.diskBytesSpilled
}
case _ => {}
}

val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]())
// Remove by taskId, rather than by TaskInfo, in case the TaskInfo is from storage
tasksActive.remove(info.taskId)
stageData.numActiveTasks -= 1

val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) =
taskEnd.reason match {
case org.apache.spark.Success =>
stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1
stageData.numCompleteTasks += 1
(None, Option(taskEnd.taskMetrics))
case e: ExceptionFailure => // Handle ExceptionFailure because we might have metrics
stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
stageData.numFailedTasks += 1
(Some(e.toErrorString), e.metrics)
case e: TaskFailedReason => // All other failure cases
stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
stageData.numFailedTasks += 1
(Some(e.toErrorString), None)
}

stageIdToTime.getOrElseUpdate(sid, 0L)
val time = metrics.map(_.executorRunTime).getOrElse(0L)
stageIdToTime(sid) += time

stageIdToInputBytes.getOrElseUpdate(sid, 0L)
val taskRunTime = metrics.map(_.executorRunTime).getOrElse(0L)
stageData.executorRunTime += taskRunTime
val inputBytes = metrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L)
stageIdToInputBytes(sid) += inputBytes
stageData.inputBytes += inputBytes

stageIdToShuffleRead.getOrElseUpdate(sid, 0L)
val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L)
stageIdToShuffleRead(sid) += shuffleRead
stageData.shuffleReadBytes += shuffleRead

stageIdToShuffleWrite.getOrElseUpdate(sid, 0L)
val shuffleWrite =
metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L)
stageIdToShuffleWrite(sid) += shuffleWrite
stageData.shuffleWriteBytes += shuffleWrite

stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L)
val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L)
stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled
stageData.memoryBytesSpilled += memoryBytesSpilled

stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L)
val diskBytesSpilled = metrics.map(_.diskBytesSpilled).getOrElse(0L)
stageIdToDiskBytesSpilled(sid) += diskBytesSpilled
stageData.diskBytesSpilled += diskBytesSpilled

val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]())
taskMap(info.taskId) = new TaskUIData(info, metrics, errorMessage)
stageIdToTaskData(sid) = taskMap
stageData.taskData(info.taskId) = new TaskUIData(info, metrics, errorMessage)
}
}
} // end of onTaskEnd

override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
synchronized {
Expand Down Expand Up @@ -252,12 +220,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {

}

@DeveloperApi
case class TaskUIData(
taskInfo: TaskInfo,
taskMetrics: Option[TaskMetrics] = None,
errorMessage: Option[String] = None)

private object JobProgressListener {
val DEFAULT_POOL_NAME = "default"
val DEFAULT_RETAINED_STAGES = 1000
Expand Down
Loading

0 comments on commit 72e9021

Please sign in to comment.