From f0a725d2bc3fd0d42af88bc1488241b41c552a6f Mon Sep 17 00:00:00 2001 From: Alex Bozarth Date: Fri, 8 Jan 2016 12:37:57 -0800 Subject: [PATCH 1/5] Added a TOTALS row to the executors UI --- .../apache/spark/ui/exec/ExecutorsPage.scala | 56 +++++++++++++++---- 1 file changed, 46 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 1a29b0f412603..c162598040b63 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -61,10 +61,28 @@ private[ui] class ExecutorsPage( val execInfoSorted = execInfo.sortBy(_.id) val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty + val sumInfo = new ExecutorSummary( + "", + "", + execInfo.map(_.rddBlocks).sum, + execInfo.map(_.memoryUsed).sum, + execInfo.map(_.diskUsed).sum, + execInfo.map(_.activeTasks).sum, + execInfo.map(_.failedTasks).sum, + execInfo.map(_.completedTasks).sum, + execInfo.map(_.totalTasks).sum, + execInfo.map(_.totalDuration).sum, + execInfo.map(_.totalInputBytes).sum, + execInfo.map(_.totalShuffleRead).sum, + execInfo.map(_.totalShuffleWrite).sum, + execInfo.map(_.maxMemory).sum, + Map.empty + ) + val execTable = - + @@ -88,6 +106,7 @@ private[ui] class ExecutorsPage( {if (threadDumpEnabled) else Seq.empty} + {execRow(sumInfo, logsExist)} {execInfoSorted.map(execRow(_, logsExist))}
Executor IDExecutor ID Address RDD Blocks Storage MemoryThread Dump
@@ -114,12 +133,23 @@ private[ui] class ExecutorsPage( /** Render an HTML row representing an executor */ private def execRow(info: ExecutorSummary, logsExist: Boolean): Seq[Node] = { + val totalRow = info.id.isEmpty && info.hostPort.isEmpty; val maximumMemory = info.maxMemory val memoryUsed = info.memoryUsed val diskUsed = info.diskUsed - {info.id} - {info.hostPort} + { + if (totalRow) + + else + {info.id} + } + { + if (totalRow) + "TOTALS" + else + {info.hostPort} + } {info.rddBlocks} {Utils.bytesToString(memoryUsed)} / @@ -148,12 +178,14 @@ private[ui] class ExecutorsPage( if (logsExist) { { - info.executorLogs.map { case (logName, logUrl) => -
- - {logName} - -
+ if (!totalRow) { + info.executorLogs.map { case (logName, logUrl) => +
+ + {logName} + +
+ } } } @@ -163,7 +195,11 @@ private[ui] class ExecutorsPage( if (threadDumpEnabled) { val encodedId = URLEncoder.encode(info.id, "UTF-8") - Thread Dump + { + if (!totalRow) { + Thread Dump + } + } } else { Seq.empty From bbd9c0d9066a68286310bccb9e1fbe36d3375371 Mon Sep 17 00:00:00 2001 From: Alex Bozarth Date: Fri, 8 Jan 2016 16:17:14 -0800 Subject: [PATCH 2/5] fixed style issue --- .../org/apache/spark/ui/exec/ExecutorsPage.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index c162598040b63..cc52d8078fc92 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -139,16 +139,18 @@ private[ui] class ExecutorsPage( val diskUsed = info.diskUsed { - if (totalRow) + if (totalRow) { - else + } else { {info.id} + } } { - if (totalRow) - "TOTALS" - else + if (totalRow) { + TOTALS + } else { {info.hostPort} + } } {info.rddBlocks} From 0b5ce0479403860943b6b4d939c2bb763525b1fe Mon Sep 17 00:00:00 2001 From: Alex Bozarth Date: Tue, 12 Jan 2016 09:57:37 -0800 Subject: [PATCH 3/5] Revert "Added a TOTALS row to the executors UI" This reverts commit f0a725d2bc3fd0d42af88bc1488241b41c552a6f. --- .../apache/spark/ui/exec/ExecutorsPage.scala | 58 ++++--------------- 1 file changed, 10 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index cc52d8078fc92..1a29b0f412603 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -61,28 +61,10 @@ private[ui] class ExecutorsPage( val execInfoSorted = execInfo.sortBy(_.id) val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty - val sumInfo = new ExecutorSummary( - "", - "", - execInfo.map(_.rddBlocks).sum, - execInfo.map(_.memoryUsed).sum, - execInfo.map(_.diskUsed).sum, - execInfo.map(_.activeTasks).sum, - execInfo.map(_.failedTasks).sum, - execInfo.map(_.completedTasks).sum, - execInfo.map(_.totalTasks).sum, - execInfo.map(_.totalDuration).sum, - execInfo.map(_.totalInputBytes).sum, - execInfo.map(_.totalShuffleRead).sum, - execInfo.map(_.totalShuffleWrite).sum, - execInfo.map(_.maxMemory).sum, - Map.empty - ) - val execTable = - + @@ -106,7 +88,6 @@ private[ui] class ExecutorsPage( {if (threadDumpEnabled) else Seq.empty} - {execRow(sumInfo, logsExist)} {execInfoSorted.map(execRow(_, logsExist))}
Executor IDExecutor ID Address RDD Blocks Storage MemoryThread Dump
@@ -133,25 +114,12 @@ private[ui] class ExecutorsPage( /** Render an HTML row representing an executor */ private def execRow(info: ExecutorSummary, logsExist: Boolean): Seq[Node] = { - val totalRow = info.id.isEmpty && info.hostPort.isEmpty; val maximumMemory = info.maxMemory val memoryUsed = info.memoryUsed val diskUsed = info.diskUsed - { - if (totalRow) { - - } else { - {info.id} - } - } - { - if (totalRow) { - TOTALS - } else { - {info.hostPort} - } - } + {info.id} + {info.hostPort} {info.rddBlocks} {Utils.bytesToString(memoryUsed)} / @@ -180,14 +148,12 @@ private[ui] class ExecutorsPage( if (logsExist) { { - if (!totalRow) { - info.executorLogs.map { case (logName, logUrl) => -
- - {logName} - -
- } + info.executorLogs.map { case (logName, logUrl) => +
+ + {logName} + +
} } @@ -197,11 +163,7 @@ private[ui] class ExecutorsPage( if (threadDumpEnabled) { val encodedId = URLEncoder.encode(info.id, "UTF-8") - { - if (!totalRow) { - Thread Dump - } - } + Thread Dump } else { Seq.empty From 8b13783ea8eafcc5d2db9fdd8258694fc9f2e9f4 Mon Sep 17 00:00:00 2001 From: Alex Bozarth Date: Mon, 11 Jan 2016 16:34:00 -0800 Subject: [PATCH 4/5] Alt version of SPARK-12716 --- .../apache/spark/ui/exec/ExecutorsPage.scala | 70 +++++++++++++++++-- 1 file changed, 63 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 1a29b0f412603..869cb9555b2f7 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -93,14 +93,10 @@ private[ui] class ExecutorsPage( val content = -
+
-
    -
  • Memory: - {Utils.bytesToString(memUsed)} Used - ({Utils.bytesToString(maxMem)} Total)
  • -
  • Disk: {Utils.bytesToString(diskUsed)} Used
  • -
+

Totals for {execInfo.size} Executors

+ {execSummary(execInfo)}
@@ -172,6 +168,66 @@ private[ui] class ExecutorsPage( } + private def execSummary(execInfo: Seq[ExecutorSummary]): Seq[Node] = { + val maximumMemory = execInfo.map(_.maxMemory).sum + val memoryUsed = execInfo.map(_.memoryUsed).sum + val diskUsed = execInfo.map(_.diskUsed).sum + val totalDuration = execInfo.map(_.totalDuration).sum + val totalInputBytes = execInfo.map(_.totalInputBytes).sum + val totalShuffleRead = execInfo.map(_.totalShuffleRead).sum + val totalShuffleWrite = execInfo.map(_.totalShuffleWrite).sum + + val sumContent = + + {execInfo.map(_.rddBlocks).sum} + + {Utils.bytesToString(memoryUsed)} / + {Utils.bytesToString(maximumMemory)} + + + {Utils.bytesToString(diskUsed)} + + {execInfo.map(_.activeTasks).sum} + {execInfo.map(_.failedTasks).sum} + {execInfo.map(_.completedTasks).sum} + {execInfo.map(_.totalTasks).sum} + + {Utils.msDurationToString(totalDuration)} + + + {Utils.bytesToString(totalInputBytes)} + + + {Utils.bytesToString(totalShuffleRead)} + + + {Utils.bytesToString(totalShuffleWrite)} + + ; + + + + + + + + + + + + + + + + + {sumContent} + +
RDD BlocksStorage MemoryDisk UsedActive TasksFailed TasksComplete TasksTotal TasksTask TimeInputShuffle Read + + Shuffle Write + +
+ } } private[spark] object ExecutorsPage { From 4b5f7c7153fbf72c72b35b3e305261137f1bdd1b Mon Sep 17 00:00:00 2001 From: Alex Bozarth Date: Thu, 14 Jan 2016 14:54:57 -0800 Subject: [PATCH 5/5] Removed unused vals and added table header --- .../main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 869cb9555b2f7..33973df364865 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -53,9 +53,6 @@ private[ui] class ExecutorsPage( def render(request: HttpServletRequest): Seq[Node] = { val storageStatusList = listener.storageStatusList - val maxMem = storageStatusList.map(_.maxMem).sum - val memUsed = storageStatusList.map(_.memUsed).sum - val diskUsed = storageStatusList.map(_.diskUsed).sum val execInfo = for (statusId <- 0 until storageStatusList.size) yield ExecutorsPage.getExecInfo(listener, statusId) val execInfoSorted = execInfo.sortBy(_.id) @@ -101,6 +98,7 @@ private[ui] class ExecutorsPage(
+

Active Executors

{execTable}
;