Skip to content

Commit

Permalink
SPARK-7729:Executor which has been killed should also be displayed on…
Browse files Browse the repository at this point in the history
… Executors Tab.
  • Loading branch information
archit.thakur committed May 19, 2015
1 parent df34793 commit 7a82254
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 5 deletions.
Expand Up @@ -32,10 +32,15 @@ import org.apache.spark.scheduler._
class StorageStatusListener extends SparkListener {
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
private[storage] val removedExecutorIdToStorageStatus = mutable.Map[String, StorageStatus]()

def storageStatusList: Seq[StorageStatus] = synchronized {
executorIdToStorageStatus.values.toSeq
}
def removedExecutorStorageStatusList: Seq[StorageStatus] = synchronized{
removedExecutorIdToStorageStatus.values.toSeq
}


/** Update storage status list to reflect updated block statuses */
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
Expand Down Expand Up @@ -87,6 +92,8 @@ class StorageStatusListener extends SparkListener {
override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) {
synchronized {
val executorId = blockManagerRemoved.blockManagerId.executorId
removedExecutorIdToStorageStatus.put(executorId,
executorIdToStorageStatus.get(executorId).get)
executorIdToStorageStatus.remove(executorId)
}
}
Expand Down
16 changes: 11 additions & 5 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
Expand Up @@ -25,6 +25,7 @@ import scala.xml.Node
import org.apache.spark.status.api.v1.ExecutorSummary
import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageStatus

// This isn't even used anymore -- but we need to keep it b/c of a MiMa false positive
private[ui] case class ExecutorSummaryInfo(
Expand All @@ -41,6 +42,7 @@ private[ui] case class ExecutorSummaryInfo(
totalInputBytes: Long,
totalShuffleRead: Long,
totalShuffleWrite: Long,
isAlive: Boolean,
maxMemory: Long,
executorLogs: Map[String, String])

Expand All @@ -52,12 +54,11 @@ private[ui] class ExecutorsPage(
private val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = listener.storageStatusList
val storageStatusList = listener.storageStatusList ++ listener.removedExecutorStorageStatusList
val maxMem = storageStatusList.map(_.maxMem).sum
val memUsed = storageStatusList.map(_.memUsed).sum
val diskUsed = storageStatusList.map(_.diskUsed).sum
val execInfo = for (statusId <- 0 until storageStatusList.size) yield
ExecutorsPage.getExecInfo(listener, statusId)
val execInfo = for (status <- storageStatusList) yield getExecInfo(status)
val execInfoSorted = execInfo.sortBy(_.id)
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty

Expand All @@ -84,6 +85,7 @@ private[ui] class ExecutorsPage(
Shuffle Write
</span>
</th>
<th>Executor Status</th>
{if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty}
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
</thead>
Expand Down Expand Up @@ -144,6 +146,9 @@ private[ui] class ExecutorsPage(
<td sorttable_customkey={info.totalShuffleWrite.toString}>
{Utils.bytesToString(info.totalShuffleWrite)}
</td>
<td sorttable_customkey={info.isAlive.toString}>
{if(info.isAlive) "Alive" else "Killed"}
</td>
{
if (logsExist) {
<td>
Expand Down Expand Up @@ -176,8 +181,7 @@ private[ui] class ExecutorsPage(

private[spark] object ExecutorsPage {
/** Represent an executor's info as a map given a storage status index */
def getExecInfo(listener: ExecutorsListener, statusId: Int): ExecutorSummary = {
val status = listener.storageStatusList(statusId)
private def getExecInfo(status: StorageStatus): ExecutorSummaryInfo = {
val execId = status.blockManagerId.executorId
val hostPort = status.blockManagerId.hostPort
val rddBlocks = status.numBlocks
Expand All @@ -192,6 +196,7 @@ private[spark] object ExecutorsPage {
val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
val isAlive = listener.storageStatusList.contains(status)
val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)

new ExecutorSummary(
Expand All @@ -208,6 +213,7 @@ private[spark] object ExecutorsPage {
totalInputBytes,
totalShuffleRead,
totalShuffleWrite,
isAlive,
maxMem,
executorLogs
)
Expand Down
Expand Up @@ -58,6 +58,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
val executorIdToData = HashMap[String, ExecutorUIData]()

def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
def removedExecutorStorageStatusList: Seq[StorageStatus] = {
storageStatusListener.removedExecutorStorageStatusList
}

override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
val eid = executorAdded.executorId
Expand Down

0 comments on commit 7a82254

Please sign in to comment.