From 95c70b8487f913d977ae70b308442127ddfa3f2a Mon Sep 17 00:00:00 2001 From: Tom Magrino Date: Fri, 10 Jun 2016 14:11:30 -0700 Subject: [PATCH 1/4] Add links to executor logs from stage details page in UI. --- .../apache/spark/ui/jobs/ExecutorTable.scala | 9 ++++ .../org/apache/spark/ui/jobs/StagePage.scala | 42 +++++++++++++++---- .../org/apache/spark/ui/jobs/StagesTab.scala | 1 + 3 files changed, 45 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 1268f44596f8a..5a40c52094724 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -84,6 +84,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage Shuffle Spill (Memory) Shuffle Spill (Disk) }} + Logs {createExecutorTable()} @@ -147,6 +148,14 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage {Utils.bytesToString(v.diskBytesSpilled)} }} + + {val logs = parent.executorsListener.executorToLogUrls(k) + if (logs.isEmpty) { + "No Logs Found" + } else logs.map { + case (logName, logUrl) =>
{logName}
+ }} + } case None => diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 5183c80ab4526..a31e0d7c453a3 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -30,6 +30,7 @@ import org.apache.spark.{InternalAccumulator, SparkConf} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo, TaskLocality} import org.apache.spark.ui._ +import org.apache.spark.ui.exec.ExecutorsListener import org.apache.spark.ui.jobs.UIData._ import org.apache.spark.util.{Utils, Distribution} @@ -39,6 +40,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { private val progressListener = parent.progressListener private val operationGraphListener = parent.operationGraphListener + private val executorsListener = parent.executorsListener private val TIMELINE_LEGEND = {
@@ -292,7 +294,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { currentTime, pageSize = taskPageSize, sortColumn = taskSortColumn, - desc = taskSortDesc + desc = taskSortDesc, + executorsListener = executorsListener ) (_taskTable, _taskTable.table(taskPage)) } catch { @@ -829,7 +832,8 @@ private[ui] class TaskTableRowData( val shuffleRead: Option[TaskTableRowShuffleReadData], val shuffleWrite: Option[TaskTableRowShuffleWriteData], val bytesSpilled: Option[TaskTableRowBytesSpilledData], - val error: String) + val error: String, + val logs: Map[String, String]) private[ui] class TaskDataSource( tasks: Seq[TaskUIData], @@ -842,7 +846,8 @@ private[ui] class TaskDataSource( currentTime: Long, pageSize: Int, sortColumn: String, - desc: Boolean) extends PagedDataSource[TaskTableRowData](pageSize) { + desc: Boolean, + executorsListener: ExecutorsListener) extends PagedDataSource[TaskTableRowData](pageSize) { import StagePage._ // Convert TaskUIData to TaskTableRowData which contains the final contents to show in the table @@ -985,6 +990,8 @@ private[ui] class TaskDataSource( None } + val logs = executorsListener.executorToLogUrls.getOrElse(info.executorId, Map.empty) + new TaskTableRowData( info.index, info.taskId, @@ -1008,7 +1015,8 @@ private[ui] class TaskDataSource( shuffleRead, shuffleWrite, bytesSpilled, - errorMessage.getOrElse("")) + errorMessage.getOrElse(""), + logs) } /** @@ -1186,6 +1194,16 @@ private[ui] class TaskDataSource( override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = Ordering.String.compare(x.error, y.error) } + case "Logs" => new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + if (x.logs.isEmpty == y.logs.isEmpty) { + return 0 + } else if (x.logs.isEmpty) { + return -1; + } else { + return 1; + } + } case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") } if (desc) { @@ -1210,7 +1228,8 @@ private[ui] class TaskPagedTable( currentTime: Long, pageSize: Int, sortColumn: String, - desc: Boolean) extends PagedTable[TaskTableRowData] { + desc: Boolean, + executorsListener: ExecutorsListener) extends PagedTable[TaskTableRowData] { // We only track peak memory used for unsafe operators private val displayPeakExecutionMemory = conf.getBoolean("spark.sql.unsafe.enabled", true) @@ -1230,7 +1249,8 @@ private[ui] class TaskPagedTable( currentTime, pageSize, sortColumn, - desc) + desc, + executorsListener) override def pageLink(page: Int): String = { val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") @@ -1291,7 +1311,8 @@ private[ui] class TaskPagedTable( } else { Nil }} ++ - Seq(("Errors", "")) + Seq(("Errors", "")) ++ + Seq(("Logs", "")) if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) { throw new IllegalArgumentException(s"Unknown column: $sortColumn") @@ -1379,6 +1400,13 @@ private[ui] class TaskPagedTable( {task.bytesSpilled.get.diskBytesSpilledReadable} }} {errorMessageCell(task.error)} + + {if (task.logs.isEmpty) { + "No Logs Found" + } else task.logs.map { + case (logName, logUrl) =>
{logName}
+ }} + } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index 5989f0035b270..7792c5a882eff 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -29,6 +29,7 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages" val killEnabled = parent.killEnabled val progressListener = parent.jobProgressListener val operationGraphListener = parent.operationGraphListener + val executorsListener = parent.executorsListener attachPage(new AllStagesPage(this)) attachPage(new StagePage(this)) From f8667c66ee142173d6d4873fd6264b4d3ce482b6 Mon Sep 17 00:00:00 2001 From: Tom Magrino Date: Fri, 10 Jun 2016 16:59:08 -0700 Subject: [PATCH 2/4] Small fixs for missing indices and testing. --- .../scala/org/apache/spark/ui/jobs/ExecutorTable.scala | 2 +- .../test/scala/org/apache/spark/ui/StagePageSuite.scala | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 5a40c52094724..e7baf5b1b3bf0 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -149,7 +149,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage }} - {val logs = parent.executorsListener.executorToLogUrls(k) + {val logs = parent.executorsListener.executorToLogUrls.getOrElse(k, Map.empty) if (logs.isEmpty) { "No Logs Found" } else logs.map { diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index 86699e7f56953..924836e0e3a78 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -20,12 +20,12 @@ package org.apache.spark.ui import javax.servlet.http.HttpServletRequest import scala.xml.Node - -import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} - +import org.mockito.Mockito.{RETURNS_SMART_NULLS, mock, when} import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ +import org.apache.spark.storage.StorageStatusListener +import org.apache.spark.ui.exec.ExecutorsListener import org.apache.spark.ui.jobs.{JobProgressListener, StagePage, StagesTab} import org.apache.spark.ui.scope.RDDOperationGraphListener @@ -62,11 +62,13 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { private def renderStagePage(conf: SparkConf): Seq[Node] = { val jobListener = new JobProgressListener(conf) val graphListener = new RDDOperationGraphListener(conf) + val executorsListener = new ExecutorsListener(new StorageStatusListener()) val tab = mock(classOf[StagesTab], RETURNS_SMART_NULLS) val request = mock(classOf[HttpServletRequest]) when(tab.conf).thenReturn(conf) when(tab.progressListener).thenReturn(jobListener) when(tab.operationGraphListener).thenReturn(graphListener) + when(tab.executorsListener).thenReturn(executorsListener) when(tab.appName).thenReturn("testing") when(tab.headerTabs).thenReturn(Seq.empty) when(request.getParameter("id")).thenReturn("0") From 38b2fb78d21e306d67971501aa459535b48e43a5 Mon Sep 17 00:00:00 2001 From: Tom Magrino Date: Mon, 13 Jun 2016 09:17:59 -0700 Subject: [PATCH 3/4] Code style fix. --- core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index 924836e0e3a78..f6233450956d5 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -20,7 +20,9 @@ package org.apache.spark.ui import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.mockito.Mockito.{RETURNS_SMART_NULLS, mock, when} + +import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} + import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ From d15000ac5c0aaf058413e5d7c64a327b0be80e20 Mon Sep 17 00:00:00 2001 From: Tom Magrino Date: Tue, 14 Jun 2016 12:23:24 -0700 Subject: [PATCH 4/4] Changes to address UI consistency comments by @ajbozarth. --- .../scala/org/apache/spark/ui/jobs/ExecutorTable.scala | 4 +--- .../main/scala/org/apache/spark/ui/jobs/StagePage.scala | 8 +++----- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index e7baf5b1b3bf0..1810f990ec566 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -150,9 +150,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage }} {val logs = parent.executorsListener.executorToLogUrls.getOrElse(k, Map.empty) - if (logs.isEmpty) { - "No Logs Found" - } else logs.map { + logs.map { case (logName, logUrl) => }} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index a31e0d7c453a3..23520f67a3808 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -1194,7 +1194,7 @@ private[ui] class TaskDataSource( override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = Ordering.String.compare(x.error, y.error) } - case "Logs" => new Ordering[TaskTableRowData] { + case "Executor Logs" => new Ordering[TaskTableRowData] { override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = if (x.logs.isEmpty == y.logs.isEmpty) { return 0 @@ -1312,7 +1312,7 @@ private[ui] class TaskPagedTable( Nil }} ++ Seq(("Errors", "")) ++ - Seq(("Logs", "")) + Seq(("Executor Logs", "")) if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) { throw new IllegalArgumentException(s"Unknown column: $sortColumn") @@ -1401,9 +1401,7 @@ private[ui] class TaskPagedTable( }} {errorMessageCell(task.error)} - {if (task.logs.isEmpty) { - "No Logs Found" - } else task.logs.map { + {task.logs.map { case (logName, logUrl) => }}