Skip to content

Commit

Permalink
[SPARK-12149][WEB UI] Executor UI improvement suggestions - Color UI
Browse files Browse the repository at this point in the history
Added color coding to the Executors page for Active Tasks, Failed Tasks, Completed Tasks and Task Time.

Active Tasks is shaded blue with it's range based on percentage of total cores used.
Failed Tasks is shaded red ranging over the first 10% of total tasks failed
Completed Tasks is shaded green ranging over 10% of total tasks including failed and active tasks, but only when there are active or failed tasks on that executor.
Task Time is shaded red when GC Time goes over 10% of total time with it's range directly corresponding to the percent of total time.

Author: Alex Bozarth <ajbozart@us.ibm.com>

Closes #10154 from ajbozarth/spark12149.
  • Loading branch information
ajbozarth authored and Tom Graves committed Jan 25, 2016
1 parent ef8fb36 commit c037d25
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 20 deletions.
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ class ExecutorSummary private[spark](
val rddBlocks: Int,
val memoryUsed: Long,
val diskUsed: Long,
val maxTasks: Int,
val activeTasks: Int,
val failedTasks: Int,
val completedTasks: Int,
val totalTasks: Int,
val totalDuration: Long,
val totalGCTime: Long,
val totalInputBytes: Long,
val totalShuffleRead: Long,
val totalShuffleWrite: Long,
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ private[spark] object SparkUI {

val environmentListener = new EnvironmentListener
val storageStatusListener = new StorageStatusListener
val executorsListener = new ExecutorsListener(storageStatusListener)
val executorsListener = new ExecutorsListener(storageStatusListener, conf)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)

Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/ToolTips.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,7 @@ private[spark] object ToolTips {
multiple operations (e.g. two map() functions) if they can be pipelined. Some operations
also create multiple RDDs internally. Cached RDDs are shown in green.
"""

val TASK_TIME =
"Shaded red when garbage collection (GC) time is over 10% of task time"
}
98 changes: 81 additions & 17 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ private[ui] class ExecutorsPage(
threadDumpEnabled: Boolean)
extends WebUIPage("") {
private val listener = parent.listener
// When GCTimePercent is edited change ToolTips.TASK_TIME to match
private val GCTimePercent = 0.1

def render(request: HttpServletRequest): Seq[Node] = {
val (storageStatusList, execInfo) = listener.synchronized {
Expand Down Expand Up @@ -77,7 +79,7 @@ private[ui] class ExecutorsPage(
<th>Failed Tasks</th>
<th>Complete Tasks</th>
<th>Total Tasks</th>
<th>Task Time</th>
<th data-toggle="tooltip" title={ToolTips.TASK_TIME}>Task Time (GC Time)</th>
<th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
<th>
Expand Down Expand Up @@ -129,13 +131,8 @@ private[ui] class ExecutorsPage(
<td sorttable_customkey={diskUsed.toString}>
{Utils.bytesToString(diskUsed)}
</td>
<td>{info.activeTasks}</td>
<td>{info.failedTasks}</td>
<td>{info.completedTasks}</td>
<td>{info.totalTasks}</td>
<td sorttable_customkey={info.totalDuration.toString}>
{Utils.msDurationToString(info.totalDuration)}
</td>
{taskData(info.maxTasks, info.activeTasks, info.failedTasks, info.completedTasks,
info.totalTasks, info.totalDuration, info.totalGCTime)}
<td sorttable_customkey={info.totalInputBytes.toString}>
{Utils.bytesToString(info.totalInputBytes)}
</td>
Expand Down Expand Up @@ -177,7 +174,6 @@ private[ui] class ExecutorsPage(
val maximumMemory = execInfo.map(_.maxMemory).sum
val memoryUsed = execInfo.map(_.memoryUsed).sum
val diskUsed = execInfo.map(_.diskUsed).sum
val totalDuration = execInfo.map(_.totalDuration).sum
val totalInputBytes = execInfo.map(_.totalInputBytes).sum
val totalShuffleRead = execInfo.map(_.totalShuffleRead).sum
val totalShuffleWrite = execInfo.map(_.totalShuffleWrite).sum
Expand All @@ -192,13 +188,13 @@ private[ui] class ExecutorsPage(
<td sorttable_customkey={diskUsed.toString}>
{Utils.bytesToString(diskUsed)}
</td>
<td>{execInfo.map(_.activeTasks).sum}</td>
<td>{execInfo.map(_.failedTasks).sum}</td>
<td>{execInfo.map(_.completedTasks).sum}</td>
<td>{execInfo.map(_.totalTasks).sum}</td>
<td sorttable_customkey={totalDuration.toString}>
{Utils.msDurationToString(totalDuration)}
</td>
{taskData(execInfo.map(_.maxTasks).sum,
execInfo.map(_.activeTasks).sum,
execInfo.map(_.failedTasks).sum,
execInfo.map(_.completedTasks).sum,
execInfo.map(_.totalTasks).sum,
execInfo.map(_.totalDuration).sum,
execInfo.map(_.totalGCTime).sum)}
<td sorttable_customkey={totalInputBytes.toString}>
{Utils.bytesToString(totalInputBytes)}
</td>
Expand All @@ -219,7 +215,7 @@ private[ui] class ExecutorsPage(
<th>Failed Tasks</th>
<th>Complete Tasks</th>
<th>Total Tasks</th>
<th>Task Time</th>
<th data-toggle="tooltip" title={ToolTips.TASK_TIME}>Task Time (GC Time)</th>
<th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
<th>
Expand All @@ -233,6 +229,70 @@ private[ui] class ExecutorsPage(
</tbody>
</table>
}

private def taskData(
maxTasks: Int,
activeTasks: Int,
failedTasks: Int,
completedTasks: Int,
totalTasks: Int,
totalDuration: Long,
totalGCTime: Long):
Seq[Node] = {
// Determine Color Opacity from 0.5-1
// activeTasks range from 0 to maxTasks
val activeTasksAlpha =
if (maxTasks > 0) {
(activeTasks.toDouble / maxTasks) * 0.5 + 0.5
} else {
1
}
// failedTasks range max at 10% failure, alpha max = 1
val failedTasksAlpha =
if (totalTasks > 0) {
math.min(10 * failedTasks.toDouble / totalTasks, 1) * 0.5 + 0.5
} else {
1
}
// totalDuration range from 0 to 50% GC time, alpha max = 1
val totalDurationAlpha =
if (totalDuration > 0) {
math.min(totalGCTime.toDouble / totalDuration + 0.5, 1)
} else {
1
}

val tableData =
<td style={
if (activeTasks > 0) {
"background:hsla(240, 100%, 50%, " + activeTasksAlpha + ");color:white"
} else {
""
}
}>{activeTasks}</td>
<td style={
if (failedTasks > 0) {
"background:hsla(0, 100%, 50%, " + failedTasksAlpha + ");color:white"
} else {
""
}
}>{failedTasks}</td>
<td>{completedTasks}</td>
<td>{totalTasks}</td>
<td sorttable_customkey={totalDuration.toString} style={
// Red if GC time over GCTimePercent of total time
if (totalGCTime > GCTimePercent * totalDuration) {
"background:hsla(0, 100%, 50%, " + totalDurationAlpha + ");color:white"
} else {
""
}
}>
{Utils.msDurationToString(totalDuration)}
({Utils.msDurationToString(totalGCTime)})
</td>;

tableData
}
}

private[spark] object ExecutorsPage {
Expand All @@ -245,11 +305,13 @@ private[spark] object ExecutorsPage {
val memUsed = status.memUsed
val maxMem = status.maxMem
val diskUsed = status.diskUsed
val maxTasks = listener.executorToTasksMax.getOrElse(execId, 0)
val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0)
val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
val totalDuration = listener.executorToDuration.getOrElse(execId, 0L)
val totalGCTime = listener.executorToJvmGCTime.getOrElse(execId, 0L)
val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
Expand All @@ -261,11 +323,13 @@ private[spark] object ExecutorsPage {
rddBlocks,
memUsed,
diskUsed,
maxTasks,
activeTasks,
failedTasks,
completedTasks,
totalTasks,
totalDuration,
totalGCTime,
totalInputBytes,
totalShuffleRead,
totalShuffleWrite,
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.ui.exec

import scala.collection.mutable.HashMap

import org.apache.spark.{ExceptionFailure, Resubmitted, SparkContext}
import org.apache.spark.{ExceptionFailure, Resubmitted, SparkConf, SparkContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.{StorageStatus, StorageStatusListener}
Expand All @@ -43,11 +43,14 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec
* A SparkListener that prepares information to be displayed on the ExecutorsTab
*/
@DeveloperApi
class ExecutorsListener(storageStatusListener: StorageStatusListener) extends SparkListener {
class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: SparkConf)
extends SparkListener {
val executorToTasksMax = HashMap[String, Int]()
val executorToTasksActive = HashMap[String, Int]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToDuration = HashMap[String, Long]()
val executorToJvmGCTime = HashMap[String, Long]()
val executorToInputBytes = HashMap[String, Long]()
val executorToInputRecords = HashMap[String, Long]()
val executorToOutputBytes = HashMap[String, Long]()
Expand All @@ -62,6 +65,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
val eid = executorAdded.executorId
executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
executorToTasksMax(eid) =
executorAdded.executorInfo.totalCores / conf.getInt("spark.task.cpus", 1)
executorIdToData(eid) = ExecutorUIData(executorAdded.time)
}

Expand Down Expand Up @@ -131,6 +136,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
executorToShuffleWrite(eid) =
executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.bytesWritten
}
executorToJvmGCTime(eid) = executorToJvmGCTime.getOrElse(eid, 0L) + metrics.jvmGCTime
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
"rddBlocks" : 8,
"memoryUsed" : 28000128,
"diskUsed" : 0,
"maxTasks" : 0,
"activeTasks" : 0,
"failedTasks" : 1,
"completedTasks" : 31,
"totalTasks" : 32,
"totalDuration" : 8820,
"totalGCTime" : 352,
"totalInputBytes" : 28000288,
"totalShuffleRead" : 0,
"totalShuffleWrite" : 13180,
Expand Down
6 changes: 6 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.ExternalBlockStore$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.ExternalBlockManager"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.ExternalBlockStore")
) ++ Seq(
// SPARK-12149 Added new fields to ExecutorSummary
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this")
) ++
// SPARK-12665 Remove deprecated and unused classes
Seq(
Expand Down Expand Up @@ -301,6 +304,9 @@ object MimaExcludes {
// SPARK-3580 Add getNumPartitions method to JavaRDD
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaRDDLike.getNumPartitions")
) ++ Seq(
// SPARK-12149 Added new fields to ExecutorSummary
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this")
) ++
// SPARK-11314: YARN backend moved to yarn sub-module and MiMA complains even though it's a
// private class.
Expand Down

0 comments on commit c037d25

Please sign in to comment.