From c368f885aa539da622f95093c51205af11c9d7a1 Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Tue, 6 Sep 2016 13:25:53 +0800 Subject: [PATCH 01/11] limit timeline executor events --- .../apache/spark/ui/exec/ExecutorsTab.scala | 32 +++++++-- .../apache/spark/ui/jobs/AllJobsPage.scala | 66 +++++++++--------- .../org/apache/spark/ui/jobs/JobPage.scala | 67 +++++++++---------- .../org/apache/spark/ui/jobs/UIData.scala | 5 -- 4 files changed, 92 insertions(+), 78 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 676f4457510c2..5c35039d34072 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -17,6 +17,7 @@ package org.apache.spark.ui.exec +import scala.collection.mutable import scala.collection.mutable.HashMap import org.apache.spark.{ExceptionFailure, Resubmitted, SparkConf, SparkContext} @@ -24,7 +25,6 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageStatus, StorageStatusListener} import org.apache.spark.ui.{SparkUI, SparkUITab} -import org.apache.spark.ui.jobs.UIData.ExecutorUIData private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") { val listener = parent.executorsListener @@ -59,7 +59,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar val executorToShuffleRead = HashMap[String, Long]() val executorToShuffleWrite = HashMap[String, Long]() val executorToLogUrls = HashMap[String, Map[String, String]]() - val executorIdToData = HashMap[String, ExecutorUIData]() + var executorEvents = new mutable.ListBuffer[SparkListenerEvent]() + + val MAX_EXECUTOR_LIMIT = conf.getInt("spark.ui.timeline.executors.maximum", 1000) def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList @@ -70,15 +72,33 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap executorToTotalCores(eid) = executorAdded.executorInfo.totalCores executorToTasksMax(eid) = executorToTotalCores(eid) / conf.getInt("spark.task.cpus", 1) - executorIdToData(eid) = new ExecutorUIData(executorAdded.time) + executorEvents += executorAdded + if (executorEvents.size > MAX_EXECUTOR_LIMIT) { + executorEvents = executorEvents.drop(1) + } } override def onExecutorRemoved( executorRemoved: SparkListenerExecutorRemoved): Unit = synchronized { + executorEvents += executorRemoved + if (executorEvents.size > MAX_EXECUTOR_LIMIT) { + executorEvents = executorEvents.drop(1) + } val eid = executorRemoved.executorId - val uiData = executorIdToData(eid) - uiData.finishTime = Some(executorRemoved.time) - uiData.finishReason = Some(executorRemoved.reason) + executorToTotalCores.remove(eid) + executorToTasksMax.remove(eid) + executorToTasksActive.remove(eid) + executorToTasksComplete.remove(eid) + executorToTasksFailed.remove(eid) + executorToDuration.remove(eid) + executorToJvmGCTime.remove(eid) + executorToInputBytes.remove(eid) + executorToInputRecords.remove(eid) + executorToOutputBytes.remove(eid) + executorToOutputRecords.remove(eid) + executorToShuffleRead.remove(eid) + executorToShuffleWrite.remove(eid) + executorToLogUrls.remove(eid) } override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index e5363ce8ca9dc..c04964ec66479 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -28,9 +28,9 @@ import scala.xml._ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.JobExecutionStatus -import org.apache.spark.scheduler.StageInfo +import org.apache.spark.scheduler._ import org.apache.spark.ui._ -import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData, StageUIData} +import org.apache.spark.ui.jobs.UIData.{JobUIData, StageUIData} import org.apache.spark.util.Utils /** Page showing list of all ongoing and recently finished jobs */ @@ -123,55 +123,55 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { } } - private def makeExecutorEvent(executorUIDatas: HashMap[String, ExecutorUIData]): Seq[String] = { + private def makeExecutorEvent(executorUIDatas: Seq[SparkListenerEvent]): + Seq[String] = { val events = ListBuffer[String]() executorUIDatas.foreach { - case (executorId, event) => + case a: SparkListenerExecutorAdded => val addedEvent = s""" |{ | 'className': 'executor added', | 'group': 'executors', - | 'start': new Date(${event.startTime}), + | 'start': new Date(${a.time}), | 'content': '
Executor ${executorId} added
' + | 'data-title="Executor ${a.executorId}
' + + | 'Added at ${UIUtils.formatDate(new Date(a.time))}"' + + | 'data-html="true">Executor ${a.executorId} added' |} """.stripMargin events += addedEvent + case e: SparkListenerExecutorRemoved => + val removedEvent = + s""" + |{ + | 'className': 'executor removed', + | 'group': 'executors', + | 'start': new Date(${e.time}), + | 'content': '
Reason: ${e.reason.replace("\n", " ")}""" + } else { + "" + } + }"' + + | 'data-html="true">Executor ${e.executorId} removed
' + |} + """.stripMargin + events += removedEvent - if (event.finishTime.isDefined) { - val removedEvent = - s""" - |{ - | 'className': 'executor removed', - | 'group': 'executors', - | 'start': new Date(${event.finishTime.get}), - | 'content': '
Reason: ${event.finishReason.get.replace("\n", " ")}""" - } else { - "" - } - }"' + - | 'data-html="true">Executor ${executorId} removed
' - |} - """.stripMargin - events += removedEvent - } } events.toSeq } private def makeTimeline( jobs: Seq[JobUIData], - executors: HashMap[String, ExecutorUIData], + executors: Seq[SparkListenerEvent], startTime: Long): Seq[Node] = { val jobEventJsonAsStrSeq = makeJobEvent(jobs) @@ -353,7 +353,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { var content = summary val executorListener = parent.executorListener content ++= makeTimeline(activeJobs ++ completedJobs ++ failedJobs, - executorListener.executorIdToData, startTime) + executorListener.executorEvents, startTime) if (shouldShowActiveJobs) { content ++=

Active Jobs ({activeJobs.size})

++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 0ec42d68d3dcc..2f7f8976a8899 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -20,15 +20,14 @@ package org.apache.spark.ui.jobs import java.util.Date import javax.servlet.http.HttpServletRequest -import scala.collection.mutable.{Buffer, HashMap, ListBuffer} +import scala.collection.mutable.{Buffer, ListBuffer} import scala.xml.{Node, NodeSeq, Unparsed, Utility} import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.JobExecutionStatus -import org.apache.spark.scheduler.StageInfo +import org.apache.spark.scheduler._ import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} -import org.apache.spark.ui.jobs.UIData.ExecutorUIData /** Page showing statistics and stage list for a given job */ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { @@ -93,55 +92,55 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { } } - def makeExecutorEvent(executorUIDatas: HashMap[String, ExecutorUIData]): Seq[String] = { + def makeExecutorEvent(executorUIDatas: Seq[SparkListenerEvent]): Seq[String] = { val events = ListBuffer[String]() executorUIDatas.foreach { - case (executorId, event) => + case a: SparkListenerExecutorAdded => val addedEvent = s""" |{ | 'className': 'executor added', | 'group': 'executors', - | 'start': new Date(${event.startTime}), + | 'start': new Date(${a.time}), | 'content': '
Executor ${executorId} added
' + | 'data-title="Executor ${a.executorId}
' + + | 'Added at ${UIUtils.formatDate(new Date(a.time))}"' + + | 'data-html="true">Executor ${a.executorId} added' |} """.stripMargin events += addedEvent - if (event.finishTime.isDefined) { - val removedEvent = - s""" - |{ - | 'className': 'executor removed', - | 'group': 'executors', - | 'start': new Date(${event.finishTime.get}), - | 'content': '
Reason: ${event.finishReason.get.replace("\n", " ")}""" - } else { - "" - } - }"' + - | 'data-html="true">Executor ${executorId} removed
' - |} - """.stripMargin - events += removedEvent - } + case e: SparkListenerExecutorRemoved => + val removedEvent = + s""" + |{ + | 'className': 'executor removed', + | 'group': 'executors', + | 'start': new Date(${e.time}), + | 'content': '
Reason: ${e.reason.replace("\n", " ")}""" + } else { + "" + } + }"' + + | 'data-html="true">Executor ${e.executorId} removed
' + |} + """.stripMargin + events += removedEvent + } events.toSeq } private def makeTimeline( stages: Seq[StageInfo], - executors: HashMap[String, ExecutorUIData], + executors: Seq[SparkListenerEvent], appStartTime: Long): Seq[Node] = { val stageEventJsonAsStrSeq = makeStageEvent(stages) @@ -319,7 +318,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { val operationGraphListener = parent.operationGraphListener content ++= makeTimeline(activeStages ++ completedStages ++ failedStages, - executorListener.executorIdToData, appStartTime) + executorListener.executorEvents, appStartTime) content ++= UIUtils.showDagVizForJob( jobId, operationGraphListener.getOperationGraphForJob(jobId)) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 20dde7cec827e..50d1f0d09b2a1 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -179,11 +179,6 @@ private[spark] object UIData { } } - class ExecutorUIData( - val startTime: Long, - var finishTime: Option[Long] = None, - var finishReason: Option[String] = None) - case class TaskMetricsUIData( executorDeserializeTime: Long, executorRunTime: Long, From ba1791870ad048f8365bf322e16c545408da0d8f Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Tue, 6 Sep 2016 15:01:14 +0800 Subject: [PATCH 02/11] exclude executorIdToData() --- project/MimaExcludes.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 688218f6f43af..09e05feee4349 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -419,6 +419,7 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatusListener.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.streaming.scheduler.BatchInfo.streamIdToNumRecords"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.storageStatusList"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorIdToData"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.storage.StorageListener.storageStatusList"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ExceptionFailure.apply"), From 9169901bf3d43779aaea8abce0cbdc1439c26825 Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Wed, 7 Sep 2016 15:03:53 +0800 Subject: [PATCH 03/11] use case class instead of HashMap --- .../apache/spark/ui/exec/ExecutorsPage.scala | 26 +-- .../apache/spark/ui/exec/ExecutorsTab.scala | 166 ++++++++++-------- 2 files changed, 106 insertions(+), 86 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 982e8915a8ded..ddec06ef3e779 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -83,18 +83,22 @@ private[spark] object ExecutorsPage { val memUsed = status.memUsed val maxMem = status.maxMem val diskUsed = status.diskUsed - val totalCores = listener.executorToTotalCores.getOrElse(execId, 0) - val maxTasks = listener.executorToTasksMax.getOrElse(execId, 0) - val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0) - val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0) - val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0) + val totalCores = listener.executorToTaskSummary.get(execId).map(_.totalCores).getOrElse(0) + val maxTasks = listener.executorToTaskSummary.get(execId).map(_.tasksMax).getOrElse(0) + val activeTasks = listener.executorToTaskSummary.get(execId).map(_.tasksActive).getOrElse(0) + val failedTasks = listener.executorToTaskSummary.get(execId).map(_.tasksFailed).getOrElse(0) + val completedTasks = listener.executorToTaskSummary.get(execId) + .map(_.tasksComplete).getOrElse(0) val totalTasks = activeTasks + failedTasks + completedTasks - val totalDuration = listener.executorToDuration.getOrElse(execId, 0L) - val totalGCTime = listener.executorToJvmGCTime.getOrElse(execId, 0L) - val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L) - val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L) - val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L) - val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty) + val totalDuration = listener.executorToTaskSummary.get(execId).map(_.duration).getOrElse(0L) + val totalGCTime = listener.executorToTaskSummary.get(execId).map(_.jvmGCTime).getOrElse(0L) + val totalInputBytes = listener.executorToTaskSummary.get(execId).map(_.inputBytes).getOrElse(0L) + val totalShuffleRead = listener.executorToTaskSummary.get(execId) + .map(_.shuffleRead).getOrElse(0L) + val totalShuffleWrite = listener.executorToTaskSummary.get(execId) + .map(_.shuffleWrite).getOrElse(0L) + val executorLogs = listener.executorToTaskSummary.get(execId) + .map(_.executorLogs).getOrElse(Map.empty) new ExecutorSummary( execId, diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 5c35039d34072..030d89a949626 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -17,8 +17,7 @@ package org.apache.spark.ui.exec -import scala.collection.mutable -import scala.collection.mutable.HashMap +import scala.collection.mutable.{LinkedHashMap, ListBuffer} import org.apache.spark.{ExceptionFailure, Resubmitted, SparkConf, SparkContext} import org.apache.spark.annotation.DeveloperApi @@ -38,6 +37,25 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec } } +case class ExecutorTaskSummary( + var executorId: String, + var totalCores: Int = 0, + var tasksMax: Int = 0, + var tasksActive: Int = 0, + var tasksFailed: Int = 0, + var tasksComplete: Int = 0, + var duration: Long = 0L, + var jvmGCTime: Long = 0L, + var inputBytes: Long = 0L, + var inputRecords: Long = 0L, + var outputBytes: Long = 0L, + var outputRecords: Long = 0L, + var shuffleRead: Long = 0L, + var shuffleWrite: Long = 0L, + var executorLogs: Map[String, String] = Map.empty, + var isAlive: Boolean = true +) + /** * :: DeveloperApi :: * A SparkListener that prepares information to be displayed on the ExecutorsTab @@ -45,23 +63,12 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec @DeveloperApi class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: SparkConf) extends SparkListener { - val executorToTotalCores = HashMap[String, Int]() - val executorToTasksMax = HashMap[String, Int]() - val executorToTasksActive = HashMap[String, Int]() - val executorToTasksComplete = HashMap[String, Int]() - val executorToTasksFailed = HashMap[String, Int]() - val executorToDuration = HashMap[String, Long]() - val executorToJvmGCTime = HashMap[String, Long]() - val executorToInputBytes = HashMap[String, Long]() - val executorToInputRecords = HashMap[String, Long]() - val executorToOutputBytes = HashMap[String, Long]() - val executorToOutputRecords = HashMap[String, Long]() - val executorToShuffleRead = HashMap[String, Long]() - val executorToShuffleWrite = HashMap[String, Long]() - val executorToLogUrls = HashMap[String, Map[String, String]]() - var executorEvents = new mutable.ListBuffer[SparkListenerEvent]() - - val MAX_EXECUTOR_LIMIT = conf.getInt("spark.ui.timeline.executors.maximum", 1000) + var executorToTaskSummary = LinkedHashMap[String, ExecutorTaskSummary]() + var executorEvents = new ListBuffer[SparkListenerEvent]() + + private val maxTimelineExecutors = conf.getInt("spark.ui.timeline.executors.maximum", 1000) + private val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100) + private var deadExecutorCount: Int = 0 def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList @@ -69,12 +76,22 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized { val eid = executorAdded.executorId - executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap - executorToTotalCores(eid) = executorAdded.executorInfo.totalCores - executorToTasksMax(eid) = executorToTotalCores(eid) / conf.getInt("spark.task.cpus", 1) + if (!executorToTaskSummary.contains(eid)) { + executorToTaskSummary(eid) = ExecutorTaskSummary(eid) + } + executorToTaskSummary(eid).executorLogs = executorAdded.executorInfo.logUrlMap + executorToTaskSummary(eid).totalCores = executorAdded.executorInfo.totalCores + executorToTaskSummary(eid).tasksMax = + executorToTaskSummary(eid).totalCores / conf.getInt("spark.task.cpus", 1) executorEvents += executorAdded - if (executorEvents.size > MAX_EXECUTOR_LIMIT) { - executorEvents = executorEvents.drop(1) + if (executorEvents.size > maxTimelineExecutors) { + executorEvents.remove(0) + } + if (deadExecutorCount > retainedDeadExecutors) { + executorToTaskSummary.filter(t => !t._2.isAlive).remove(0).foreach { e => + executorToTaskSummary.remove(e.executorId) + deadExecutorCount = deadExecutorCount - 1 + } } } @@ -82,23 +99,10 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar executorRemoved: SparkListenerExecutorRemoved): Unit = synchronized { executorEvents += executorRemoved if (executorEvents.size > MAX_EXECUTOR_LIMIT) { - executorEvents = executorEvents.drop(1) + executorEvents.remove(0) } - val eid = executorRemoved.executorId - executorToTotalCores.remove(eid) - executorToTasksMax.remove(eid) - executorToTasksActive.remove(eid) - executorToTasksComplete.remove(eid) - executorToTasksFailed.remove(eid) - executorToDuration.remove(eid) - executorToJvmGCTime.remove(eid) - executorToInputBytes.remove(eid) - executorToInputRecords.remove(eid) - executorToOutputBytes.remove(eid) - executorToOutputRecords.remove(eid) - executorToShuffleRead.remove(eid) - executorToShuffleWrite.remove(eid) - executorToLogUrls.remove(eid) + deadExecutorCount = deadExecutorCount + 1 + executorToTaskSummary(eid).isAlive = false } override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { @@ -107,52 +111,64 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER || s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER } - storageStatus.foreach { s => executorToLogUrls(s.blockManagerId.executorId) = logs.toMap } + storageStatus.foreach { s => + val eid = s.blockManagerId.executorId + if (!executorToTaskSummary.contains(eid)) { + executorToTaskSummary(eid) = ExecutorTaskSummary(eid) + } + executorToTaskSummary(eid).executorLogs = logs.toMap + } } } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized { val eid = taskStart.taskInfo.executorId - executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1 + if (executorToTaskSummary.contains(eid)) { + executorToTaskSummary(eid).tasksActive = executorToTaskSummary(eid).tasksActive + 1 + } } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { val info = taskEnd.taskInfo if (info != null) { val eid = info.executorId - taskEnd.reason match { - case Resubmitted => - // Note: For resubmitted tasks, we continue to use the metrics that belong to the - // first attempt of this task. This may not be 100% accurate because the first attempt - // could have failed half-way through. The correct fix would be to keep track of the - // metrics added by each attempt, but this is much more complicated. - return - case e: ExceptionFailure => - executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1 - case _ => - executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 - } - - executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1 - executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration - - // Update shuffle read/write - val metrics = taskEnd.taskMetrics - if (metrics != null) { - executorToInputBytes(eid) = - executorToInputBytes.getOrElse(eid, 0L) + metrics.inputMetrics.bytesRead - executorToInputRecords(eid) = - executorToInputRecords.getOrElse(eid, 0L) + metrics.inputMetrics.recordsRead - executorToOutputBytes(eid) = - executorToOutputBytes.getOrElse(eid, 0L) + metrics.outputMetrics.bytesWritten - executorToOutputRecords(eid) = - executorToOutputRecords.getOrElse(eid, 0L) + metrics.outputMetrics.recordsWritten - - executorToShuffleRead(eid) = - executorToShuffleRead.getOrElse(eid, 0L) + metrics.shuffleReadMetrics.remoteBytesRead - executorToShuffleWrite(eid) = - executorToShuffleWrite.getOrElse(eid, 0L) + metrics.shuffleWriteMetrics.bytesWritten - executorToJvmGCTime(eid) = executorToJvmGCTime.getOrElse(eid, 0L) + metrics.jvmGCTime + if (executorToTaskSummary.contains(eid)) { + taskEnd.reason match { + case Resubmitted => + // Note: For resubmitted tasks, we continue to use the metrics that belong to the + // first attempt of this task. This may not be 100% accurate because the first attempt + // could have failed half-way through. The correct fix would be to keep track of the + // metrics added by each attempt, but this is much more complicated. + return + case e: ExceptionFailure => + executorToTaskSummary(eid).tasksFailed = executorToTaskSummary(eid).tasksFailed + 1 + case _ => + executorToTaskSummary(eid).tasksComplete = executorToTaskSummary(eid).tasksComplete + 1 + } + if (executorToTaskSummary(eid).tasksActive >= 1) { + executorToTaskSummary(eid).tasksActive = executorToTaskSummary(eid).tasksActive - 1 + } + executorToTaskSummary(eid).duration = executorToTaskSummary(eid).duration + info.duration + + // Update shuffle read/write + val metrics = taskEnd.taskMetrics + if (metrics != null) { + executorToTaskSummary(eid).inputBytes = + executorToTaskSummary(eid).inputBytes + metrics.inputMetrics.bytesRead + executorToTaskSummary(eid).inputRecords = + executorToTaskSummary(eid).inputRecords + metrics.inputMetrics.recordsRead + executorToTaskSummary(eid).outputBytes = + executorToTaskSummary(eid).outputBytes + metrics.outputMetrics.bytesWritten + executorToTaskSummary(eid).outputRecords = + executorToTaskSummary(eid).outputRecords + metrics.outputMetrics.recordsWritten + + executorToTaskSummary(eid).shuffleRead = + executorToTaskSummary(eid).shuffleRead + metrics.shuffleReadMetrics.remoteBytesRead + executorToTaskSummary(eid).shuffleWrite = + executorToTaskSummary(eid).shuffleWrite + metrics.shuffleWriteMetrics.bytesWritten + executorToTaskSummary(eid).jvmGCTime = + executorToTaskSummary(eid).jvmGCTime + metrics.jvmGCTime + } } } } From 2d445cb6046dada9422edaa2f42451d4665ca840 Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Thu, 8 Sep 2016 00:22:44 +0800 Subject: [PATCH 04/11] change executorToLogUrls to ExecutorTaskSummary.executorLogs --- .../scala/org/apache/spark/ui/exec/ExecutorsTab.scala | 11 +++++------ .../org/apache/spark/ui/jobs/ExecutorTable.scala | 3 ++- .../scala/org/apache/spark/ui/jobs/StagePage.scala | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 030d89a949626..3f39aa1ddaab9 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -88,21 +88,20 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar executorEvents.remove(0) } if (deadExecutorCount > retainedDeadExecutors) { - executorToTaskSummary.filter(t => !t._2.isAlive).remove(0).foreach { e => - executorToTaskSummary.remove(e.executorId) - deadExecutorCount = deadExecutorCount - 1 - } + val head = executorToTaskSummary.filter(t => !t._2.isAlive).head + executorToTaskSummary.remove(head._1) + deadExecutorCount = deadExecutorCount - 1 } } override def onExecutorRemoved( executorRemoved: SparkListenerExecutorRemoved): Unit = synchronized { executorEvents += executorRemoved - if (executorEvents.size > MAX_EXECUTOR_LIMIT) { + if (executorEvents.size > maxTimelineExecutors) { executorEvents.remove(0) } deadExecutorCount = deadExecutorCount + 1 - executorToTaskSummary(eid).isAlive = false + executorToTaskSummary(executorRemoved.executorId).isAlive = false } override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 133c3b1b9aca8..81a43720758b2 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -118,7 +118,8 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
{k}
{ - val logs = parent.executorsListener.executorToLogUrls.getOrElse(k, Map.empty) + val logs = parent.executorsListener.executorToTaskSummary.get(k) + .map(_.executorLogs).orElse(Map.empty) logs.map { case (logName, logUrl) => } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index ea7acc4734dff..f063172a49763 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -1009,8 +1009,8 @@ private[ui] class TaskDataSource( None } - val logs = executorsListener.executorToLogUrls.getOrElse(info.executorId, Map.empty) - + val logs = executorsListener.executorToTaskSummary.get(info.executorId) + .map(_.executorLogs).orElse(Map.empty) new TaskTableRowData( info.index, info.taskId, From a7e261c78dd086d4084e4559ea7aafbca0973e6f Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Thu, 8 Sep 2016 01:25:38 +0800 Subject: [PATCH 05/11] fix compile error --- .../src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala | 2 +- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 81a43720758b2..9fb3f35fd9685 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -119,7 +119,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
{ val logs = parent.executorsListener.executorToTaskSummary.get(k) - .map(_.executorLogs).orElse(Map.empty) + .map(_.executorLogs).getOrElse(Map.empty) logs.map { case (logName, logUrl) => } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index f063172a49763..0ae99c961b436 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -1010,7 +1010,7 @@ private[ui] class TaskDataSource( } val logs = executorsListener.executorToTaskSummary.get(info.executorId) - .map(_.executorLogs).orElse(Map.empty) + .map(_.executorLogs).getOrElse(Map.empty) new TaskTableRowData( info.index, info.taskId, From 4b865e5931dc7f5df2d8d7305bcef07dbf7b97b2 Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Thu, 8 Sep 2016 11:32:42 +0800 Subject: [PATCH 06/11] exclude all binary compatibility errors --- project/MimaExcludes.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 09e05feee4349..e4c70c999fe8d 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -420,6 +420,17 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.streaming.scheduler.BatchInfo.streamIdToNumRecords"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.storageStatusList"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorIdToData"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksActive"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksComplete"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToInputRecords"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToShuffleRead"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksFailed"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToShuffleWrite"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToDuration"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToInputBytes"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToLogUrls"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToOutputBytes"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToOutputRecords"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.storage.StorageListener.storageStatusList"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ExceptionFailure.apply"), From a7f0ec323bc52ff1071ce999a39cc882dee039d3 Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Fri, 9 Sep 2016 00:51:15 +0800 Subject: [PATCH 07/11] fix errors in history executor tab --- .../apache/spark/ui/exec/ExecutorsTab.scala | 80 ++++++++++--------- 1 file changed, 41 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 3f39aa1ddaab9..a8ad6e81279e0 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -122,52 +122,54 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized { val eid = taskStart.taskInfo.executorId - if (executorToTaskSummary.contains(eid)) { - executorToTaskSummary(eid).tasksActive = executorToTaskSummary(eid).tasksActive + 1 + if (!executorToTaskSummary.contains(eid)) { + executorToTaskSummary(eid) = ExecutorTaskSummary(eid) } + executorToTaskSummary(eid).tasksActive = executorToTaskSummary(eid).tasksActive + 1 } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { val info = taskEnd.taskInfo if (info != null) { val eid = info.executorId - if (executorToTaskSummary.contains(eid)) { - taskEnd.reason match { - case Resubmitted => - // Note: For resubmitted tasks, we continue to use the metrics that belong to the - // first attempt of this task. This may not be 100% accurate because the first attempt - // could have failed half-way through. The correct fix would be to keep track of the - // metrics added by each attempt, but this is much more complicated. - return - case e: ExceptionFailure => - executorToTaskSummary(eid).tasksFailed = executorToTaskSummary(eid).tasksFailed + 1 - case _ => - executorToTaskSummary(eid).tasksComplete = executorToTaskSummary(eid).tasksComplete + 1 - } - if (executorToTaskSummary(eid).tasksActive >= 1) { - executorToTaskSummary(eid).tasksActive = executorToTaskSummary(eid).tasksActive - 1 - } - executorToTaskSummary(eid).duration = executorToTaskSummary(eid).duration + info.duration - - // Update shuffle read/write - val metrics = taskEnd.taskMetrics - if (metrics != null) { - executorToTaskSummary(eid).inputBytes = - executorToTaskSummary(eid).inputBytes + metrics.inputMetrics.bytesRead - executorToTaskSummary(eid).inputRecords = - executorToTaskSummary(eid).inputRecords + metrics.inputMetrics.recordsRead - executorToTaskSummary(eid).outputBytes = - executorToTaskSummary(eid).outputBytes + metrics.outputMetrics.bytesWritten - executorToTaskSummary(eid).outputRecords = - executorToTaskSummary(eid).outputRecords + metrics.outputMetrics.recordsWritten - - executorToTaskSummary(eid).shuffleRead = - executorToTaskSummary(eid).shuffleRead + metrics.shuffleReadMetrics.remoteBytesRead - executorToTaskSummary(eid).shuffleWrite = - executorToTaskSummary(eid).shuffleWrite + metrics.shuffleWriteMetrics.bytesWritten - executorToTaskSummary(eid).jvmGCTime = - executorToTaskSummary(eid).jvmGCTime + metrics.jvmGCTime - } + if (!executorToTaskSummary.contains(eid)) { + executorToTaskSummary(eid) = ExecutorTaskSummary(eid) + } + taskEnd.reason match { + case Resubmitted => + // Note: For resubmitted tasks, we continue to use the metrics that belong to the + // first attempt of this task. This may not be 100% accurate because the first attempt + // could have failed half-way through. The correct fix would be to keep track of the + // metrics added by each attempt, but this is much more complicated. + return + case e: ExceptionFailure => + executorToTaskSummary(eid).tasksFailed = executorToTaskSummary(eid).tasksFailed + 1 + case _ => + executorToTaskSummary(eid).tasksComplete = executorToTaskSummary(eid).tasksComplete + 1 + } + if (executorToTaskSummary(eid).tasksActive >= 1) { + executorToTaskSummary(eid).tasksActive = executorToTaskSummary(eid).tasksActive - 1 + } + executorToTaskSummary(eid).duration = executorToTaskSummary(eid).duration + info.duration + + // Update shuffle read/write + val metrics = taskEnd.taskMetrics + if (metrics != null) { + executorToTaskSummary(eid).inputBytes = + executorToTaskSummary(eid).inputBytes + metrics.inputMetrics.bytesRead + executorToTaskSummary(eid).inputRecords = + executorToTaskSummary(eid).inputRecords + metrics.inputMetrics.recordsRead + executorToTaskSummary(eid).outputBytes = + executorToTaskSummary(eid).outputBytes + metrics.outputMetrics.bytesWritten + executorToTaskSummary(eid).outputRecords = + executorToTaskSummary(eid).outputRecords + metrics.outputMetrics.recordsWritten + + executorToTaskSummary(eid).shuffleRead = + executorToTaskSummary(eid).shuffleRead + metrics.shuffleReadMetrics.remoteBytesRead + executorToTaskSummary(eid).shuffleWrite = + executorToTaskSummary(eid).shuffleWrite + metrics.shuffleWriteMetrics.bytesWritten + executorToTaskSummary(eid).jvmGCTime = + executorToTaskSummary(eid).jvmGCTime + metrics.jvmGCTime } } } From 0080f146a6d33c3510e623a9498c85677b39a7ba Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Sat, 10 Sep 2016 12:04:27 +0800 Subject: [PATCH 08/11] change code style --- .../apache/spark/ui/exec/ExecutorsPage.scala | 41 ++++++------------- .../apache/spark/ui/exec/ExecutorsTab.scala | 41 ++++++++----------- 2 files changed, 30 insertions(+), 52 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index ddec06ef3e779..2b4c62ede79c3 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -83,22 +83,7 @@ private[spark] object ExecutorsPage { val memUsed = status.memUsed val maxMem = status.maxMem val diskUsed = status.diskUsed - val totalCores = listener.executorToTaskSummary.get(execId).map(_.totalCores).getOrElse(0) - val maxTasks = listener.executorToTaskSummary.get(execId).map(_.tasksMax).getOrElse(0) - val activeTasks = listener.executorToTaskSummary.get(execId).map(_.tasksActive).getOrElse(0) - val failedTasks = listener.executorToTaskSummary.get(execId).map(_.tasksFailed).getOrElse(0) - val completedTasks = listener.executorToTaskSummary.get(execId) - .map(_.tasksComplete).getOrElse(0) - val totalTasks = activeTasks + failedTasks + completedTasks - val totalDuration = listener.executorToTaskSummary.get(execId).map(_.duration).getOrElse(0L) - val totalGCTime = listener.executorToTaskSummary.get(execId).map(_.jvmGCTime).getOrElse(0L) - val totalInputBytes = listener.executorToTaskSummary.get(execId).map(_.inputBytes).getOrElse(0L) - val totalShuffleRead = listener.executorToTaskSummary.get(execId) - .map(_.shuffleRead).getOrElse(0L) - val totalShuffleWrite = listener.executorToTaskSummary.get(execId) - .map(_.shuffleWrite).getOrElse(0L) - val executorLogs = listener.executorToTaskSummary.get(execId) - .map(_.executorLogs).getOrElse(Map.empty) + val taskSummary = listener.executorToTaskSummary.getOrElse(execId, ExecutorTaskSummary(execId)) new ExecutorSummary( execId, @@ -107,19 +92,19 @@ private[spark] object ExecutorsPage { rddBlocks, memUsed, diskUsed, - totalCores, - maxTasks, - activeTasks, - failedTasks, - completedTasks, - totalTasks, - totalDuration, - totalGCTime, - totalInputBytes, - totalShuffleRead, - totalShuffleWrite, + taskSummary.totalCores, + taskSummary.tasksMax, + taskSummary.tasksActive, + taskSummary.tasksFailed, + taskSummary.tasksComplete, + taskSummary.tasksActive + taskSummary.tasksFailed + taskSummary.tasksComplete, + taskSummary.duration, + taskSummary.jvmGCTime, + taskSummary.inputBytes, + taskSummary.shuffleRead, + taskSummary.shuffleWrite, maxMem, - executorLogs + taskSummary.executorLogs ) } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index a8ad6e81279e0..7404031e1246c 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -68,7 +68,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar private val maxTimelineExecutors = conf.getInt("spark.ui.timeline.executors.maximum", 1000) private val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100) - private var deadExecutorCount: Int = 0 + private var deadExecutorCount = 0 def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList @@ -88,9 +88,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar executorEvents.remove(0) } if (deadExecutorCount > retainedDeadExecutors) { - val head = executorToTaskSummary.filter(t => !t._2.isAlive).head + val head = executorToTaskSummary.filter(e => !e._2.isAlive).head executorToTaskSummary.remove(head._1) - deadExecutorCount = deadExecutorCount - 1 + deadExecutorCount -= 1 } } @@ -100,7 +100,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar if (executorEvents.size > maxTimelineExecutors) { executorEvents.remove(0) } - deadExecutorCount = deadExecutorCount + 1 + deadExecutorCount += 1 executorToTaskSummary(executorRemoved.executorId).isAlive = false } @@ -125,7 +125,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar if (!executorToTaskSummary.contains(eid)) { executorToTaskSummary(eid) = ExecutorTaskSummary(eid) } - executorToTaskSummary(eid).tasksActive = executorToTaskSummary(eid).tasksActive + 1 + executorToTaskSummary(eid).tasksActive += 1 } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { @@ -143,33 +143,26 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar // metrics added by each attempt, but this is much more complicated. return case e: ExceptionFailure => - executorToTaskSummary(eid).tasksFailed = executorToTaskSummary(eid).tasksFailed + 1 + executorToTaskSummary(eid).tasksFailed += 1 case _ => - executorToTaskSummary(eid).tasksComplete = executorToTaskSummary(eid).tasksComplete + 1 + executorToTaskSummary(eid).tasksComplete += 1 } if (executorToTaskSummary(eid).tasksActive >= 1) { - executorToTaskSummary(eid).tasksActive = executorToTaskSummary(eid).tasksActive - 1 + executorToTaskSummary(eid).tasksActive -= 1 } - executorToTaskSummary(eid).duration = executorToTaskSummary(eid).duration + info.duration + executorToTaskSummary(eid).duration += info.duration // Update shuffle read/write val metrics = taskEnd.taskMetrics if (metrics != null) { - executorToTaskSummary(eid).inputBytes = - executorToTaskSummary(eid).inputBytes + metrics.inputMetrics.bytesRead - executorToTaskSummary(eid).inputRecords = - executorToTaskSummary(eid).inputRecords + metrics.inputMetrics.recordsRead - executorToTaskSummary(eid).outputBytes = - executorToTaskSummary(eid).outputBytes + metrics.outputMetrics.bytesWritten - executorToTaskSummary(eid).outputRecords = - executorToTaskSummary(eid).outputRecords + metrics.outputMetrics.recordsWritten - - executorToTaskSummary(eid).shuffleRead = - executorToTaskSummary(eid).shuffleRead + metrics.shuffleReadMetrics.remoteBytesRead - executorToTaskSummary(eid).shuffleWrite = - executorToTaskSummary(eid).shuffleWrite + metrics.shuffleWriteMetrics.bytesWritten - executorToTaskSummary(eid).jvmGCTime = - executorToTaskSummary(eid).jvmGCTime + metrics.jvmGCTime + executorToTaskSummary(eid).inputBytes += metrics.inputMetrics.bytesRead + executorToTaskSummary(eid).inputRecords += metrics.inputMetrics.recordsRead + executorToTaskSummary(eid).outputBytes += metrics.outputMetrics.bytesWritten + executorToTaskSummary(eid).outputRecords += metrics.outputMetrics.recordsWritten + + executorToTaskSummary(eid).shuffleRead += metrics.shuffleReadMetrics.remoteBytesRead + executorToTaskSummary(eid).shuffleWrite += metrics.shuffleWriteMetrics.bytesWritten + executorToTaskSummary(eid).jvmGCTime += metrics.jvmGCTime } } } From 4dda55ca2f614228fbd6f926fd201073894a8abf Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Sat, 10 Sep 2016 16:48:38 +0800 Subject: [PATCH 09/11] add private[ui] for new case class --- .../main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala | 4 +--- .../main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 2b4c62ede79c3..7953d77fd7ece 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -17,14 +17,12 @@ package org.apache.spark.ui.exec -import java.net.URLEncoder import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.status.api.v1.ExecutorSummary -import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} -import org.apache.spark.util.Utils +import org.apache.spark.ui.{UIUtils, WebUIPage} // This isn't even used anymore -- but we need to keep it b/c of a MiMa false positive private[ui] case class ExecutorSummaryInfo( diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 7404031e1246c..856af5d5c07c7 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -37,7 +37,7 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec } } -case class ExecutorTaskSummary( +private[ui] case class ExecutorTaskSummary( var executorId: String, var totalCores: Int = 0, var tasksMax: Int = 0, From c7258916b8f34cc31edcb7033e783d990a3fa769 Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Sat, 10 Sep 2016 17:31:54 +0800 Subject: [PATCH 10/11] simplifying codes --- .../apache/spark/ui/exec/ExecutorsTab.scala | 55 ++++++++----------- 1 file changed, 23 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 856af5d5c07c7..008de2bf3320d 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -76,13 +76,10 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized { val eid = executorAdded.executorId - if (!executorToTaskSummary.contains(eid)) { - executorToTaskSummary(eid) = ExecutorTaskSummary(eid) - } - executorToTaskSummary(eid).executorLogs = executorAdded.executorInfo.logUrlMap - executorToTaskSummary(eid).totalCores = executorAdded.executorInfo.totalCores - executorToTaskSummary(eid).tasksMax = - executorToTaskSummary(eid).totalCores / conf.getInt("spark.task.cpus", 1) + val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) + taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap + taskSummary.totalCores = executorAdded.executorInfo.totalCores + taskSummary.tasksMax = taskSummary.totalCores / conf.getInt("spark.task.cpus", 1) executorEvents += executorAdded if (executorEvents.size > maxTimelineExecutors) { executorEvents.remove(0) @@ -101,7 +98,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar executorEvents.remove(0) } deadExecutorCount += 1 - executorToTaskSummary(executorRemoved.executorId).isAlive = false + executorToTaskSummary.get(executorRemoved.executorId).map(e => e.isAlive = false) } override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { @@ -112,29 +109,23 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar } storageStatus.foreach { s => val eid = s.blockManagerId.executorId - if (!executorToTaskSummary.contains(eid)) { - executorToTaskSummary(eid) = ExecutorTaskSummary(eid) - } - executorToTaskSummary(eid).executorLogs = logs.toMap + val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) + taskSummary.executorLogs = logs.toMap } } } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized { val eid = taskStart.taskInfo.executorId - if (!executorToTaskSummary.contains(eid)) { - executorToTaskSummary(eid) = ExecutorTaskSummary(eid) - } - executorToTaskSummary(eid).tasksActive += 1 + val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) + taskSummary.tasksActive += 1 } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { val info = taskEnd.taskInfo if (info != null) { val eid = info.executorId - if (!executorToTaskSummary.contains(eid)) { - executorToTaskSummary(eid) = ExecutorTaskSummary(eid) - } + val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) taskEnd.reason match { case Resubmitted => // Note: For resubmitted tasks, we continue to use the metrics that belong to the @@ -143,26 +134,26 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar // metrics added by each attempt, but this is much more complicated. return case e: ExceptionFailure => - executorToTaskSummary(eid).tasksFailed += 1 + taskSummary.tasksFailed += 1 case _ => - executorToTaskSummary(eid).tasksComplete += 1 + taskSummary.tasksComplete += 1 } - if (executorToTaskSummary(eid).tasksActive >= 1) { - executorToTaskSummary(eid).tasksActive -= 1 + if (taskSummary.tasksActive >= 1) { + taskSummary.tasksActive -= 1 } - executorToTaskSummary(eid).duration += info.duration + taskSummary.duration += info.duration // Update shuffle read/write val metrics = taskEnd.taskMetrics if (metrics != null) { - executorToTaskSummary(eid).inputBytes += metrics.inputMetrics.bytesRead - executorToTaskSummary(eid).inputRecords += metrics.inputMetrics.recordsRead - executorToTaskSummary(eid).outputBytes += metrics.outputMetrics.bytesWritten - executorToTaskSummary(eid).outputRecords += metrics.outputMetrics.recordsWritten - - executorToTaskSummary(eid).shuffleRead += metrics.shuffleReadMetrics.remoteBytesRead - executorToTaskSummary(eid).shuffleWrite += metrics.shuffleWriteMetrics.bytesWritten - executorToTaskSummary(eid).jvmGCTime += metrics.jvmGCTime + taskSummary.inputBytes += metrics.inputMetrics.bytesRead + taskSummary.inputRecords += metrics.inputMetrics.recordsRead + taskSummary.outputBytes += metrics.outputMetrics.bytesWritten + taskSummary.outputRecords += metrics.outputMetrics.recordsWritten + + taskSummary.shuffleRead += metrics.shuffleReadMetrics.remoteBytesRead + taskSummary.shuffleWrite += metrics.shuffleWriteMetrics.bytesWritten + taskSummary.jvmGCTime += metrics.jvmGCTime } } } From ac995249a58d297b45a08f2f88ff6bf87240df6e Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Mon, 12 Sep 2016 20:44:19 +0800 Subject: [PATCH 11/11] remove unnecessary variable --- .../scala/org/apache/spark/ui/exec/ExecutorsTab.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 008de2bf3320d..678571fd4f5ac 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -68,7 +68,6 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar private val maxTimelineExecutors = conf.getInt("spark.ui.timeline.executors.maximum", 1000) private val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100) - private var deadExecutorCount = 0 def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList @@ -84,10 +83,11 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar if (executorEvents.size > maxTimelineExecutors) { executorEvents.remove(0) } - if (deadExecutorCount > retainedDeadExecutors) { - val head = executorToTaskSummary.filter(e => !e._2.isAlive).head + + val deadExecutors = executorToTaskSummary.filter(e => !e._2.isAlive) + if (deadExecutors.size > retainedDeadExecutors) { + val head = deadExecutors.head executorToTaskSummary.remove(head._1) - deadExecutorCount -= 1 } } @@ -97,8 +97,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar if (executorEvents.size > maxTimelineExecutors) { executorEvents.remove(0) } - deadExecutorCount += 1 - executorToTaskSummary.get(executorRemoved.executorId).map(e => e.isAlive = false) + executorToTaskSummary.get(executorRemoved.executorId).foreach(e => e.isAlive = false) } override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {