diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala index 645ede26a0879..7750a096230cb 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala @@ -28,7 +28,7 @@ private[v1] class AllRDDResource(ui: SparkUI) { @GET def rddList(): Seq[RDDStorageInfo] = { - val storageStatusList = ui.storageListener.storageStatusList + val storageStatusList = ui.storageListener.activeStorageStatusList val rddInfos = ui.storageListener.rddInfoList rddInfos.map{rddInfo => AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList, @@ -44,7 +44,7 @@ private[spark] object AllRDDResource { rddId: Int, listener: StorageListener, includeDetails: Boolean): Option[RDDStorageInfo] = { - val storageStatusList = listener.storageStatusList + val storageStatusList = listener.activeStorageStatusList listener.rddInfoList.find { _.id == rddId }.map { rddInfo => getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails) } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala index 3bdba922328c2..6ca59c2f3caeb 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala @@ -31,9 +31,9 @@ private[v1] class ExecutorListResource(ui: SparkUI) { listener.synchronized { // The follow codes should be protected by `listener` to make sure no executors will be // removed before we query their status. See SPARK-12784. - val storageStatusList = listener.storageStatusList + val storageStatusList = listener.activeStorageStatusList (0 until storageStatusList.size).map { statusId => - ExecutorsPage.getExecInfo(listener, statusId) + ExecutorsPage.getExecInfo(listener, statusId, isActive = true) } } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index d116e68c17f18..909dd0c07ea63 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -54,6 +54,7 @@ class ExecutorStageSummary private[spark]( class ExecutorSummary private[spark]( val id: String, val hostPort: String, + val isActive: Boolean, val rddBlocks: Int, val memoryUsed: Long, val diskUsed: Long, diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index d98aae8ff0c68..f552b498a76de 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import scala.collection.mutable +import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ @@ -29,14 +30,20 @@ import org.apache.spark.scheduler._ * This class is thread-safe (unlike JobProgressListener) */ @DeveloperApi -class StorageStatusListener extends SparkListener { +class StorageStatusListener(conf: SparkConf) extends SparkListener { // This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE) private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]() + private[storage] val deadExecutorStorageStatus = new mutable.ListBuffer[StorageStatus]() + private[this] val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100) def storageStatusList: Seq[StorageStatus] = synchronized { executorIdToStorageStatus.values.toSeq } + def deadStorageStatusList: Seq[StorageStatus] = synchronized { + deadExecutorStorageStatus.toSeq + } + /** Update storage status list to reflect updated block statuses */ private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) { executorIdToStorageStatus.get(execId).foreach { storageStatus => @@ -87,8 +94,12 @@ class StorageStatusListener extends SparkListener { override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { synchronized { val executorId = blockManagerRemoved.blockManagerId.executorId - executorIdToStorageStatus.remove(executorId) + executorIdToStorageStatus.remove(executorId).foreach { status => + deadExecutorStorageStatus += status + } + if (deadExecutorStorageStatus.size > retainedDeadExecutors) { + deadExecutorStorageStatus.trimStart(1) + } } } - } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 6cc30eeaf5d82..5324a7682960b 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -196,7 +196,7 @@ private[spark] object SparkUI { } val environmentListener = new EnvironmentListener - val storageStatusListener = new StorageStatusListener + val storageStatusListener = new StorageStatusListener(conf) val executorsListener = new ExecutorsListener(storageStatusListener, conf) val storageListener = new StorageListener(storageStatusListener) val operationGraphListener = new RDDOperationGraphListener(conf) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index e1f754999912b..eba7a312ba81f 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -54,24 +54,30 @@ private[ui] class ExecutorsPage( private val GCTimePercent = 0.1 def render(request: HttpServletRequest): Seq[Node] = { - val (storageStatusList, execInfo) = listener.synchronized { + val (activeExecutorInfo, deadExecutorInfo) = listener.synchronized { // The follow codes should be protected by `listener` to make sure no executors will be // removed before we query their status. See SPARK-12784. - val _storageStatusList = listener.storageStatusList - val _execInfo = { - for (statusId <- 0 until _storageStatusList.size) - yield ExecutorsPage.getExecInfo(listener, statusId) + val _activeExecutorInfo = { + for (statusId <- 0 until listener.activeStorageStatusList.size) + yield ExecutorsPage.getExecInfo(listener, statusId, isActive = true) } - (_storageStatusList, _execInfo) + val _deadExecutorInfo = { + for (statusId <- 0 until listener.deadStorageStatusList.size) + yield ExecutorsPage.getExecInfo(listener, statusId, isActive = false) + } + (_activeExecutorInfo, _deadExecutorInfo) } + + val execInfo = activeExecutorInfo ++ deadExecutorInfo val execInfoSorted = execInfo.sortBy(_.id) val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty - val execTable = + val execTable = { + @@ -98,22 +104,28 @@ private[ui] class ExecutorsPage( {execInfoSorted.map(execRow(_, logsExist))}
Executor ID AddressStatus RDD Blocks Storage Memory Disk Used
+ } val content =
-

Totals for {execInfo.size} Executors

- {execSummary(execInfo)} +

Dead Executors({deadExecutorInfo.size})

+
+
+
+
+

Active Executors({activeExecutorInfo.size})

+ {execSummary(activeExecutorInfo)}
-

Active Executors

+

Executors

{execTable}
; - UIUtils.headerSparkPage("Executors (" + execInfo.size + ")", content, parent) + UIUtils.headerSparkPage("Executors", content, parent) } /** Render an HTML row representing an executor */ @@ -121,9 +133,19 @@ private[ui] class ExecutorsPage( val maximumMemory = info.maxMemory val memoryUsed = info.memoryUsed val diskUsed = info.diskUsed + val executorStatus = + if (info.isActive) { + "Active" + } else { + "Dead" + } + {info.id} {info.hostPort} + + {executorStatus} + {info.rddBlocks} {Utils.bytesToString(memoryUsed)} / @@ -161,10 +183,14 @@ private[ui] class ExecutorsPage( } { if (threadDumpEnabled) { - val encodedId = URLEncoder.encode(info.id, "UTF-8") - - Thread Dump - + if (info.isActive) { + val encodedId = URLEncoder.encode(info.id, "UTF-8") + + Thread Dump + + } else { + + } } else { Seq.empty } @@ -236,14 +262,13 @@ private[ui] class ExecutorsPage( } private def taskData( - maxTasks: Int, - activeTasks: Int, - failedTasks: Int, - completedTasks: Int, - totalTasks: Int, - totalDuration: Long, - totalGCTime: Long): - Seq[Node] = { + maxTasks: Int, + activeTasks: Int, + failedTasks: Int, + completedTasks: Int, + totalTasks: Int, + totalDuration: Long, + totalGCTime: Long): Seq[Node] = { // Determine Color Opacity from 0.5-1 // activeTasks range from 0 to maxTasks val activeTasksAlpha = @@ -302,8 +327,15 @@ private[ui] class ExecutorsPage( private[spark] object ExecutorsPage { /** Represent an executor's info as a map given a storage status index */ - def getExecInfo(listener: ExecutorsListener, statusId: Int): ExecutorSummary = { - val status = listener.storageStatusList(statusId) + def getExecInfo( + listener: ExecutorsListener, + statusId: Int, + isActive: Boolean): ExecutorSummary = { + val status = if (isActive) { + listener.activeStorageStatusList(statusId) + } else { + listener.deadStorageStatusList(statusId) + } val execId = status.blockManagerId.executorId val hostPort = status.blockManagerId.hostPort val rddBlocks = status.numBlocks @@ -326,6 +358,7 @@ private[spark] object ExecutorsPage { new ExecutorSummary( execId, hostPort, + isActive, rddBlocks, memUsed, diskUsed, diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index dcfebe92ed805..788f35ec77d9f 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -61,7 +61,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar val executorToLogUrls = HashMap[String, Map[String, String]]() val executorIdToData = HashMap[String, ExecutorUIData]() - def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList + def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList + + def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized { val eid = executorAdded.executorId @@ -81,7 +83,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { applicationStart.driverLogs.foreach { logs => - val storageStatus = storageStatusList.find { s => + val storageStatus = activeStorageStatusList.find { s => s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER || s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index f1e28b4e1e9c2..8f75b586e1399 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -43,7 +43,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc private[ui] val _rddInfoMap = mutable.Map[Int, RDDInfo]() // exposed for testing - def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList + def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList /** Filter RDD info to include only those with cached partitions */ def rddInfoList: Seq[RDDInfo] = synchronized { @@ -54,7 +54,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = { val rddIdsToUpdate = updatedBlocks.flatMap { case (bid, _) => bid.asRDDId.map(_.rddId) }.toSet val rddInfosToUpdate = _rddInfoMap.values.toSeq.filter { s => rddIdsToUpdate.contains(s.id) } - StorageUtils.updateRddInfo(rddInfosToUpdate, storageStatusList) + StorageUtils.updateRddInfo(rddInfosToUpdate, activeStorageStatusList) } /** diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json index 9d5d224e55176..4a88eeee747dc 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json @@ -1,6 +1,7 @@ [ { "id" : "", "hostPort" : "localhost:57971", + "isActive" : true, "rddBlocks" : 8, "memoryUsed" : 28000128, "diskUsed" : 0, diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala index 9de434166bba3..14daa003bc5a6 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import org.apache.spark.{SparkFunSuite, Success} +import org.apache.spark.{SparkConf, SparkFunSuite, Success} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ @@ -29,9 +29,11 @@ class StorageStatusListenerSuite extends SparkFunSuite { private val bm2 = BlockManagerId("fat", "duck", 2) private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false) private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false) + private val conf = new SparkConf() test("block manager added/removed") { - val listener = new StorageStatusListener + conf.set("spark.ui.retainedDeadExecutors", "1") + val listener = new StorageStatusListener(conf) // Block manager add assert(listener.executorIdToStorageStatus.size === 0) @@ -53,14 +55,18 @@ class StorageStatusListenerSuite extends SparkFunSuite { assert(listener.executorIdToStorageStatus.size === 1) assert(!listener.executorIdToStorageStatus.get("big").isDefined) assert(listener.executorIdToStorageStatus.get("fat").isDefined) + assert(listener.deadExecutorStorageStatus.size === 1) + assert(listener.deadExecutorStorageStatus(0).blockManagerId.executorId.equals("big")) listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm2)) assert(listener.executorIdToStorageStatus.size === 0) assert(!listener.executorIdToStorageStatus.get("big").isDefined) assert(!listener.executorIdToStorageStatus.get("fat").isDefined) + assert(listener.deadExecutorStorageStatus.size === 1) + assert(listener.deadExecutorStorageStatus(0).blockManagerId.executorId.equals("fat")) } test("task end without updated blocks") { - val listener = new StorageStatusListener + val listener = new StorageStatusListener(conf) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) val taskMetrics = new TaskMetrics @@ -77,7 +83,7 @@ class StorageStatusListenerSuite extends SparkFunSuite { } test("task end with updated blocks") { - val listener = new StorageStatusListener + val listener = new StorageStatusListener(conf) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) val taskMetrics1 = new TaskMetrics @@ -126,7 +132,7 @@ class StorageStatusListenerSuite extends SparkFunSuite { } test("unpersist RDD") { - val listener = new StorageStatusListener + val listener = new StorageStatusListener(conf) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) val taskMetrics1 = new TaskMetrics val taskMetrics2 = new TaskMetrics diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index d1dbf7c1558b2..6b7c538ac8549 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.ui.storage import org.scalatest.BeforeAndAfter -import org.apache.spark.{SparkFunSuite, Success} +import org.apache.spark.{SparkConf, SparkFunSuite, Success} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.storage._ @@ -44,7 +44,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { before { bus = new LiveListenerBus - storageStatusListener = new StorageStatusListener + storageStatusListener = new StorageStatusListener(new SparkConf()) storageListener = new StorageListener(storageStatusListener) bus.addListener(storageStatusListener) bus.addListener(storageListener) diff --git a/docs/configuration.md b/docs/configuration.md index 568eca9b5fa95..4b1b00720b006 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -584,6 +584,13 @@ Apart from these, the following properties are also available, and may be useful How many finished batches the Spark UI and status APIs remember before garbage collecting. + + spark.ui.retainedDeadExecutors + 100 + + How many dead executors the Spark UI and status APIs remember before garbage collecting. + + #### Compression and Serialization diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 47693aa29db61..b12aefcf836ab 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -272,6 +272,9 @@ object MimaExcludes { // SPARK-13220 Deprecate yarn-client and yarn-cluster mode ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler") + ) ++ Seq ( + // SPARK-7729 Executor which has been killed should also be displayed on Executor Tab + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this") ) case v if v.startsWith("1.6") => Seq(