Skip to content

Commit

Permalink
[SPARK-7729][UI] Executor which has been killed should also be displa…
Browse files Browse the repository at this point in the history
…yed on Executor Tab

andrewor14 squito Dead Executors should also be displayed on Executor Tab.
as following:
![image](https://cloud.githubusercontent.com/assets/545478/11492707/ae55d7f6-982b-11e5-919a-b62cd84684b2.png)

Author: Lianhui Wang <lianhuiwang09@gmail.com>

This patch had conflicts when merged, resolved by
Committer: Andrew Or <andrew@databricks.com>

Closes #10058 from lianhuiwang/SPARK-7729.
  • Loading branch information
lianhuiwang authored and Andrew Or committed Feb 23, 2016
1 parent 5d69eaf commit 9f42633
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[v1] class AllRDDResource(ui: SparkUI) {

@GET
def rddList(): Seq[RDDStorageInfo] = {
val storageStatusList = ui.storageListener.storageStatusList
val storageStatusList = ui.storageListener.activeStorageStatusList
val rddInfos = ui.storageListener.rddInfoList
rddInfos.map{rddInfo =>
AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList,
Expand All @@ -44,7 +44,7 @@ private[spark] object AllRDDResource {
rddId: Int,
listener: StorageListener,
includeDetails: Boolean): Option[RDDStorageInfo] = {
val storageStatusList = listener.storageStatusList
val storageStatusList = listener.activeStorageStatusList
listener.rddInfoList.find { _.id == rddId }.map { rddInfo =>
getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ private[v1] class ExecutorListResource(ui: SparkUI) {
listener.synchronized {
// The follow codes should be protected by `listener` to make sure no executors will be
// removed before we query their status. See SPARK-12784.
val storageStatusList = listener.storageStatusList
val storageStatusList = listener.activeStorageStatusList
(0 until storageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId)
ExecutorsPage.getExecInfo(listener, statusId, isActive = true)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class ExecutorStageSummary private[spark](
class ExecutorSummary private[spark](
val id: String,
val hostPort: String,
val isActive: Boolean,
val rddBlocks: Int,
val memoryUsed: Long,
val diskUsed: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.storage

import scala.collection.mutable

import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._

Expand All @@ -29,14 +30,20 @@ import org.apache.spark.scheduler._
* This class is thread-safe (unlike JobProgressListener)
*/
@DeveloperApi
class StorageStatusListener extends SparkListener {
class StorageStatusListener(conf: SparkConf) extends SparkListener {
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
private[storage] val deadExecutorStorageStatus = new mutable.ListBuffer[StorageStatus]()
private[this] val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100)

def storageStatusList: Seq[StorageStatus] = synchronized {
executorIdToStorageStatus.values.toSeq
}

def deadStorageStatusList: Seq[StorageStatus] = synchronized {
deadExecutorStorageStatus.toSeq
}

/** Update storage status list to reflect updated block statuses */
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
executorIdToStorageStatus.get(execId).foreach { storageStatus =>
Expand Down Expand Up @@ -87,8 +94,12 @@ class StorageStatusListener extends SparkListener {
override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) {
synchronized {
val executorId = blockManagerRemoved.blockManagerId.executorId
executorIdToStorageStatus.remove(executorId)
executorIdToStorageStatus.remove(executorId).foreach { status =>
deadExecutorStorageStatus += status
}
if (deadExecutorStorageStatus.size > retainedDeadExecutors) {
deadExecutorStorageStatus.trimStart(1)
}
}
}

}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private[spark] object SparkUI {
}

val environmentListener = new EnvironmentListener
val storageStatusListener = new StorageStatusListener
val storageStatusListener = new StorageStatusListener(conf)
val executorsListener = new ExecutorsListener(storageStatusListener, conf)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)
Expand Down
83 changes: 58 additions & 25 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,30 @@ private[ui] class ExecutorsPage(
private val GCTimePercent = 0.1

def render(request: HttpServletRequest): Seq[Node] = {
val (storageStatusList, execInfo) = listener.synchronized {
val (activeExecutorInfo, deadExecutorInfo) = listener.synchronized {
// The follow codes should be protected by `listener` to make sure no executors will be
// removed before we query their status. See SPARK-12784.
val _storageStatusList = listener.storageStatusList
val _execInfo = {
for (statusId <- 0 until _storageStatusList.size)
yield ExecutorsPage.getExecInfo(listener, statusId)
val _activeExecutorInfo = {
for (statusId <- 0 until listener.activeStorageStatusList.size)
yield ExecutorsPage.getExecInfo(listener, statusId, isActive = true)
}
(_storageStatusList, _execInfo)
val _deadExecutorInfo = {
for (statusId <- 0 until listener.deadStorageStatusList.size)
yield ExecutorsPage.getExecInfo(listener, statusId, isActive = false)
}
(_activeExecutorInfo, _deadExecutorInfo)
}

val execInfo = activeExecutorInfo ++ deadExecutorInfo
val execInfoSorted = execInfo.sortBy(_.id)
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty

val execTable =
val execTable = {
<table class={UIUtils.TABLE_CLASS_STRIPED_SORTABLE}>
<thead>
<th>Executor ID</th>
<th>Address</th>
<th>Status</th>
<th>RDD Blocks</th>
<th><span data-toggle="tooltip" title={ToolTips.STORAGE_MEMORY}>Storage Memory</span></th>
<th>Disk Used</th>
Expand All @@ -98,32 +104,48 @@ private[ui] class ExecutorsPage(
{execInfoSorted.map(execRow(_, logsExist))}
</tbody>
</table>
}

val content =
<div class="row">
<div class="span12">
<h4>Totals for {execInfo.size} Executors</h4>
{execSummary(execInfo)}
<h4>Dead Executors({deadExecutorInfo.size})</h4>
</div>
</div>
<div class="row">
<div class="span12">
<h4>Active Executors({activeExecutorInfo.size})</h4>
{execSummary(activeExecutorInfo)}
</div>
</div>
<div class = "row">
<div class="span12">
<h4>Active Executors</h4>
<h4>Executors</h4>
{execTable}
</div>
</div>;

UIUtils.headerSparkPage("Executors (" + execInfo.size + ")", content, parent)
UIUtils.headerSparkPage("Executors", content, parent)
}

/** Render an HTML row representing an executor */
private def execRow(info: ExecutorSummary, logsExist: Boolean): Seq[Node] = {
val maximumMemory = info.maxMemory
val memoryUsed = info.memoryUsed
val diskUsed = info.diskUsed
val executorStatus =
if (info.isActive) {
"Active"
} else {
"Dead"
}

<tr>
<td>{info.id}</td>
<td>{info.hostPort}</td>
<td sorttable_customkey={executorStatus.toString}>
{executorStatus}
</td>
<td>{info.rddBlocks}</td>
<td sorttable_customkey={memoryUsed.toString}>
{Utils.bytesToString(memoryUsed)} /
Expand Down Expand Up @@ -161,10 +183,14 @@ private[ui] class ExecutorsPage(
}
{
if (threadDumpEnabled) {
val encodedId = URLEncoder.encode(info.id, "UTF-8")
<td>
<a href={s"threadDump/?executorId=${encodedId}"}>Thread Dump</a>
</td>
if (info.isActive) {
val encodedId = URLEncoder.encode(info.id, "UTF-8")
<td>
<a href={s"threadDump/?executorId=${encodedId}"}>Thread Dump</a>
</td>
} else {
<td> </td>
}
} else {
Seq.empty
}
Expand Down Expand Up @@ -236,14 +262,13 @@ private[ui] class ExecutorsPage(
}

private def taskData(
maxTasks: Int,
activeTasks: Int,
failedTasks: Int,
completedTasks: Int,
totalTasks: Int,
totalDuration: Long,
totalGCTime: Long):
Seq[Node] = {
maxTasks: Int,
activeTasks: Int,
failedTasks: Int,
completedTasks: Int,
totalTasks: Int,
totalDuration: Long,
totalGCTime: Long): Seq[Node] = {
// Determine Color Opacity from 0.5-1
// activeTasks range from 0 to maxTasks
val activeTasksAlpha =
Expand Down Expand Up @@ -302,8 +327,15 @@ private[ui] class ExecutorsPage(

private[spark] object ExecutorsPage {
/** Represent an executor's info as a map given a storage status index */
def getExecInfo(listener: ExecutorsListener, statusId: Int): ExecutorSummary = {
val status = listener.storageStatusList(statusId)
def getExecInfo(
listener: ExecutorsListener,
statusId: Int,
isActive: Boolean): ExecutorSummary = {
val status = if (isActive) {
listener.activeStorageStatusList(statusId)
} else {
listener.deadStorageStatusList(statusId)
}
val execId = status.blockManagerId.executorId
val hostPort = status.blockManagerId.hostPort
val rddBlocks = status.numBlocks
Expand All @@ -326,6 +358,7 @@ private[spark] object ExecutorsPage {
new ExecutorSummary(
execId,
hostPort,
isActive,
rddBlocks,
memUsed,
diskUsed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
val executorToLogUrls = HashMap[String, Map[String, String]]()
val executorIdToData = HashMap[String, ExecutorUIData]()

def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList

def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList

override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
val eid = executorAdded.executorId
Expand All @@ -81,7 +83,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar

override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
applicationStart.driverLogs.foreach { logs =>
val storageStatus = storageStatusList.find { s =>
val storageStatus = activeStorageStatusList.find { s =>
s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc

private[ui] val _rddInfoMap = mutable.Map[Int, RDDInfo]() // exposed for testing

def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList

/** Filter RDD info to include only those with cached partitions */
def rddInfoList: Seq[RDDInfo] = synchronized {
Expand All @@ -54,7 +54,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = {
val rddIdsToUpdate = updatedBlocks.flatMap { case (bid, _) => bid.asRDDId.map(_.rddId) }.toSet
val rddInfosToUpdate = _rddInfoMap.values.toSeq.filter { s => rddIdsToUpdate.contains(s.id) }
StorageUtils.updateRddInfo(rddInfosToUpdate, storageStatusList)
StorageUtils.updateRddInfo(rddInfosToUpdate, activeStorageStatusList)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[ {
"id" : "<driver>",
"hostPort" : "localhost:57971",
"isActive" : true,
"rddBlocks" : 8,
"memoryUsed" : 28000128,
"diskUsed" : 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.storage

import org.apache.spark.{SparkFunSuite, Success}
import org.apache.spark.{SparkConf, SparkFunSuite, Success}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._

Expand All @@ -29,9 +29,11 @@ class StorageStatusListenerSuite extends SparkFunSuite {
private val bm2 = BlockManagerId("fat", "duck", 2)
private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false)
private val conf = new SparkConf()

test("block manager added/removed") {
val listener = new StorageStatusListener
conf.set("spark.ui.retainedDeadExecutors", "1")
val listener = new StorageStatusListener(conf)

// Block manager add
assert(listener.executorIdToStorageStatus.size === 0)
Expand All @@ -53,14 +55,18 @@ class StorageStatusListenerSuite extends SparkFunSuite {
assert(listener.executorIdToStorageStatus.size === 1)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
assert(listener.deadExecutorStorageStatus.size === 1)
assert(listener.deadExecutorStorageStatus(0).blockManagerId.executorId.equals("big"))
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm2))
assert(listener.executorIdToStorageStatus.size === 0)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(!listener.executorIdToStorageStatus.get("fat").isDefined)
assert(listener.deadExecutorStorageStatus.size === 1)
assert(listener.deadExecutorStorageStatus(0).blockManagerId.executorId.equals("fat"))
}

test("task end without updated blocks") {
val listener = new StorageStatusListener
val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics = new TaskMetrics
Expand All @@ -77,7 +83,7 @@ class StorageStatusListenerSuite extends SparkFunSuite {
}

test("task end with updated blocks") {
val listener = new StorageStatusListener
val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics1 = new TaskMetrics
Expand Down Expand Up @@ -126,7 +132,7 @@ class StorageStatusListenerSuite extends SparkFunSuite {
}

test("unpersist RDD") {
val listener = new StorageStatusListener
val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.ui.storage

import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkFunSuite, Success}
import org.apache.spark.{SparkConf, SparkFunSuite, Success}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.storage._
Expand All @@ -44,7 +44,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {

before {
bus = new LiveListenerBus
storageStatusListener = new StorageStatusListener
storageStatusListener = new StorageStatusListener(new SparkConf())
storageListener = new StorageListener(storageStatusListener)
bus.addListener(storageStatusListener)
bus.addListener(storageListener)
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,13 @@ Apart from these, the following properties are also available, and may be useful
How many finished batches the Spark UI and status APIs remember before garbage collecting.
</td>
</tr>
<tr>
<td><code>spark.ui.retainedDeadExecutors</code></td>
<td>100</td>
<td>
How many dead executors the Spark UI and status APIs remember before garbage collecting.
</td>
</tr>
</table>

#### Compression and Serialization
Expand Down
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ object MimaExcludes {
// SPARK-13220 Deprecate yarn-client and yarn-cluster mode
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler")
) ++ Seq (
// SPARK-7729 Executor which has been killed should also be displayed on Executor Tab
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this")
)
case v if v.startsWith("1.6") =>
Seq(
Expand Down

0 comments on commit 9f42633

Please sign in to comment.