Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7729][UI]Executor which has been killed should also be displayed on Executor Tab #10058

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
bd40d49
init commit
lianhuiwang Dec 1, 2015
97dbd62
rename activeStorageStatusList
lianhuiwang Dec 1, 2015
16e175d
fix scala style
lianhuiwang Dec 1, 2015
7b244ff
address CodingCat's comments
lianhuiwang Dec 1, 2015
b106cfe
fix minor error
lianhuiwang Dec 2, 2015
47255fa
remove synchronized
lianhuiwang Dec 2, 2015
325149f
address vanzin's comments
lianhuiwang Dec 4, 2015
6532e01
add REST api
lianhuiwang Dec 5, 2015
a99175b
add mima exclusion
lianhuiwang Dec 6, 2015
122d3f2
fix REST API's test
lianhuiwang Dec 6, 2015
1433c04
Merge branch 'master' of https://github.com/apache/spark into SPARK-7729
lianhuiwang Dec 15, 2015
f6b4739
combine active and dead executors
lianhuiwang Dec 15, 2015
ada7e14
fix style
lianhuiwang Dec 15, 2015
677996c
fix style
lianhuiwang Dec 15, 2015
308eade
fix style
lianhuiwang Dec 15, 2015
a1a04fc
fix mima's fail
lianhuiwang Dec 16, 2015
1608041
Merge branch 'apache-master' into SPARK-7729
lianhuiwang Dec 16, 2015
49ef6e9
Merge branch 'apache-master' into SPARK-7729
lianhuiwang Feb 2, 2016
7c7ca97
update with master
lianhuiwang Feb 2, 2016
35eef9a
update with master
lianhuiwang Feb 2, 2016
f749a5f
update with master
lianhuiwang Feb 2, 2016
8f0be11
update with master
lianhuiwang Feb 2, 2016
c632d39
Merge branch 'apache2' into SPARK-7729
lianhuiwang Feb 21, 2016
96950c6
address andrewor's comments
lianhuiwang Feb 21, 2016
9ef6c5b
Merge branch 'apache-master' into SPARK-7729
lianhuiwang Feb 23, 2016
c88afa8
Merge branch 'apache-master' into SPARK-7729
lianhuiwang Feb 23, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[v1] class AllRDDResource(ui: SparkUI) {

@GET
def rddList(): Seq[RDDStorageInfo] = {
val storageStatusList = ui.storageListener.storageStatusList
val storageStatusList = ui.storageListener.activeStorageStatusList
val rddInfos = ui.storageListener.rddInfoList
rddInfos.map{rddInfo =>
AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList,
Expand All @@ -44,7 +44,7 @@ private[spark] object AllRDDResource {
rddId: Int,
listener: StorageListener,
includeDetails: Boolean): Option[RDDStorageInfo] = {
val storageStatusList = listener.storageStatusList
val storageStatusList = listener.activeStorageStatusList
listener.rddInfoList.find { _.id == rddId }.map { rddInfo =>
getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ private[v1] class ExecutorListResource(ui: SparkUI) {
@GET
def executorList(): Seq[ExecutorSummary] = {
val listener = ui.executorsListener
val storageStatusList = listener.storageStatusList
(0 until storageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId)
listener.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I missed something, why we need synchronized here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more than one thread use listener object. There are rest api, executor page in UI, and SparkListener.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still confused about the reason on imposing the sequential access here.....can you give an example of the problem without this synchronized....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because ExecutorsPage.getExecInfo() use executorToTasksActive, executorToTasksFailed of ExecutorsListener and SparkListenerBus thread also change value of executorToTasksActive,executorToTasksFailed in onPostEvent() , we need to use synchronized there. It is better that synchronized is used in ExecutorsPage.getExecInfo().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is necessary, essentially you are imposing that even the GET operations are to be processed sequentially....

val storageStatusList = listener.activeStorageStatusList
(0 until storageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId, true)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.storage
import scala.collection.mutable

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkConf
import org.apache.spark.scheduler._

/**
Expand All @@ -29,14 +30,20 @@ import org.apache.spark.scheduler._
* This class is thread-safe (unlike JobProgressListener)
*/
@DeveloperApi
class StorageStatusListener extends SparkListener {
class StorageStatusListener(conf: SparkConf) extends SparkListener {
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
private[storage] val deadExecutorStorageStatus = new mutable.ListBuffer[StorageStatus]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you keep this name for backward compatibility

private[this] val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you like to update the document as well to reflect this config entry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I have done it.


def storageStatusList: Seq[StorageStatus] = synchronized {
def activeStorageStatusList: Seq[StorageStatus] = synchronized {
executorIdToStorageStatus.values.toSeq
}

def deadStorageStatusList: Seq[StorageStatus] = synchronized {
deadExecutorStorageStatus.toSeq
}

/** Update storage status list to reflect updated block statuses */
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
executorIdToStorageStatus.get(execId).foreach { storageStatus =>
Expand All @@ -52,7 +59,7 @@ class StorageStatusListener extends SparkListener {

/** Update storage status list to reflect the removal of an RDD from the cache */
private def updateStorageStatus(unpersistedRDDId: Int) {
storageStatusList.foreach { storageStatus =>
activeStorageStatusList.foreach { storageStatus =>
storageStatus.rddBlocksById(unpersistedRDDId).foreach { case (blockId, _) =>
storageStatus.removeBlock(blockId)
}
Expand Down Expand Up @@ -87,8 +94,13 @@ class StorageStatusListener extends SparkListener {
override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) {
synchronized {
val executorId = blockManagerRemoved.blockManagerId.executorId
executorIdToStorageStatus.remove(executorId)
val removedStorageStatus = executorIdToStorageStatus.remove(executorId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: looks nicer if you do executorIdToStorageStatus.remove(executorId).foreach { status =>

if (removedStorageStatus.isDefined) {
deadExecutorStorageStatus += removedStorageStatus.get
if (deadExecutorStorageStatus.size > retainedDeadExecutors) {
deadExecutorStorageStatus.trimStart(1)
}
}
}
}

}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private[spark] object SparkUI {
}

val environmentListener = new EnvironmentListener
val storageStatusListener = new StorageStatusListener
val storageStatusListener = new StorageStatusListener(conf)
val executorsListener = new ExecutorsListener(storageStatusListener)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)
Expand Down
118 changes: 69 additions & 49 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node

import org.apache.spark.status.api.v1.ExecutorSummary
import org.apache.spark.storage.StorageStatus
import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -52,68 +53,82 @@ private[ui] class ExecutorsPage(
private val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = listener.storageStatusList
listener.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar situation here

val activeStorageStatusList = listener.activeStorageStatusList
val activeExecutorsTable = listingExecTable(activeStorageStatusList, true)
val deadStorageStatusList = listener.deadStorageStatusList
val deadExecutorsTable = listingExecTable(deadStorageStatusList, false)
val content =
<span>
<h4>ActiveExecutors({activeStorageStatusList.size})</h4> {activeExecutorsTable}
<h4>DeadExecutors({deadStorageStatusList.size})</h4> {deadExecutorsTable}
</span>

UIUtils.headerSparkPage("Executors", content, parent)
}
}

private def listingExecTable(storageStatusList: Seq[StorageStatus], isActive: Boolean)
: Seq[Node] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return types shall be put to next line with 2 space indent.

val maxMem = storageStatusList.map(_.maxMem).sum
val memUsed = storageStatusList.map(_.memUsed).sum
val diskUsed = storageStatusList.map(_.diskUsed).sum
val execInfo = for (statusId <- 0 until storageStatusList.size) yield
ExecutorsPage.getExecInfo(listener, statusId)
ExecutorsPage.getExecInfo(listener, statusId, isActive)
val execInfoSorted = execInfo.sortBy(_.id)
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty
val isShowThreadDump = threadDumpEnabled && isActive
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ifShowThreadDump shall be a more proper name?


val execTable =
<table class={UIUtils.TABLE_CLASS_STRIPED_SORTABLE}>
<thead>
<th>Executor ID</th>
<th>Address</th>
<th>RDD Blocks</th>
<th><span data-toggle="tooltip" title={ToolTips.STORAGE_MEMORY}>Storage Memory</span></th>
<th>Disk Used</th>
<th>Active Tasks</th>
<th>Failed Tasks</th>
<th>Complete Tasks</th>
<th>Total Tasks</th>
<th>Task Time</th>
<th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
<th>
<!-- Place the shuffle write tooltip on the left (rather than the default position
// scalastyle:off
<div class="row-fluid">
<div class="span12">
{
if (isActive) {
<ul class="unstyled">
<li><strong>Memory:</strong>
{Utils.bytesToString(memUsed)} Used
({Utils.bytesToString(maxMem)} Total) </li>
<li><strong>Disk:</strong> {Utils.bytesToString(diskUsed)} Used </li>
</ul>
}
}
<table class={UIUtils.TABLE_CLASS_STRIPED_SORTABLE}>
<thead>
<th>Executor ID</th>
<th>Address</th>
<th>RDD Blocks</th>
<th><span data-toggle="tooltip" title={ToolTips.STORAGE_MEMORY}>Storage Memory</span></th>
<th>Disk Used</th>
<th>Active Tasks</th>
<th>Failed Tasks</th>
<th>Complete Tasks</th>
<th>Total Tasks</th>
<th>Task Time</th>
<th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
<th>
<!-- Place the shuffle write tooltip on the left (rather than the default position
of on top) because the shuffle write column is the last column on the right side and
the tooltip is wider than the column, so it doesn't fit on top. -->
<span data-toggle="tooltip" data-placement="left" title={ToolTips.SHUFFLE_WRITE}>
Shuffle Write
</span>
</th>
{if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty}
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
</thead>
<tbody>
{execInfoSorted.map(execRow(_, logsExist))}
</tbody>
</table>

val content =
<div class="row-fluid">
<div class="span12">
<ul class="unstyled">
<li><strong>Memory:</strong>
{Utils.bytesToString(memUsed)} Used
({Utils.bytesToString(maxMem)} Total) </li>
<li><strong>Disk:</strong> {Utils.bytesToString(diskUsed)} Used </li>
</ul>
</div>
</th>
{if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty}
{if (isShowThreadDump) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
</thead>
<tbody>
{execInfoSorted.map(execRow(_, logsExist, isShowThreadDump))}
</tbody>
</table>
</div>
<div class = "row">
<div class="span12">
{execTable}
</div>
</div>;

UIUtils.headerSparkPage("Executors (" + execInfo.size + ")", content, parent)
</div>
// scalastyle:on
}

/** Render an HTML row representing an executor */
private def execRow(info: ExecutorSummary, logsExist: Boolean): Seq[Node] = {
private def execRow(info: ExecutorSummary, logsExist: Boolean, isShowThreadDump: Boolean)
: Seq[Node] = {
val maximumMemory = info.maxMemory
val memoryUsed = info.memoryUsed
val diskUsed = info.diskUsed
Expand Down Expand Up @@ -160,7 +175,7 @@ private[ui] class ExecutorsPage(
}
}
{
if (threadDumpEnabled) {
if (isShowThreadDump) {
val encodedId = URLEncoder.encode(info.id, "UTF-8")
<td>
<a href={s"threadDump/?executorId=${encodedId}"}>Thread Dump</a>
Expand All @@ -176,8 +191,13 @@ private[ui] class ExecutorsPage(

private[spark] object ExecutorsPage {
/** Represent an executor's info as a map given a storage status index */
def getExecInfo(listener: ExecutorsListener, statusId: Int): ExecutorSummary = {
val status = listener.storageStatusList(statusId)
def getExecInfo(listener: ExecutorsListener, statusId: Int, isActive: Boolean)
: ExecutorSummary = {
val status = if (isActive) {
listener.activeStorageStatusList(statusId)
} else {
listener.deadStorageStatusList(statusId)
}
val execId = status.blockManagerId.executorId
val hostPort = status.blockManagerId.hostPort
val rddBlocks = status.numBlocks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
val executorToLogUrls = HashMap[String, Map[String, String]]()
val executorIdToData = HashMap[String, ExecutorUIData]()

def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.activeStorageStatusList

def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used anywhere


override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
val eid = executorAdded.executorId
Expand All @@ -75,7 +77,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp

override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
applicationStart.driverLogs.foreach { logs =>
val storageStatus = storageStatusList.find { s =>
val storageStatus = activeStorageStatusList.find { s =>
s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc

private[ui] val _rddInfoMap = mutable.Map[Int, RDDInfo]() // exposed for testing

def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.activeStorageStatusList

/** Filter RDD info to include only those with cached partitions */
def rddInfoList: Seq[RDDInfo] = synchronized {
Expand All @@ -54,7 +54,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = {
val rddIdsToUpdate = updatedBlocks.flatMap { case (bid, _) => bid.asRDDId.map(_.rddId) }.toSet
val rddInfosToUpdate = _rddInfoMap.values.toSeq.filter { s => rddIdsToUpdate.contains(s.id) }
StorageUtils.updateRddInfo(rddInfosToUpdate, storageStatusList)
StorageUtils.updateRddInfo(rddInfosToUpdate, activeStorageStatusList)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.storage

import org.apache.spark.{SparkFunSuite, Success}
import org.apache.spark.{SparkConf, SparkFunSuite, Success}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._

Expand All @@ -29,9 +29,10 @@ class StorageStatusListenerSuite extends SparkFunSuite {
private val bm2 = BlockManagerId("fat", "duck", 2)
private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false)
private val conf = new SparkConf()

test("block manager added/removed") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update this test to add checks for the dead executors list you're adding?

val listener = new StorageStatusListener
val listener = new StorageStatusListener(conf)

// Block manager add
assert(listener.executorIdToStorageStatus.size === 0)
Expand Down Expand Up @@ -60,7 +61,7 @@ class StorageStatusListenerSuite extends SparkFunSuite {
}

test("task end without updated blocks") {
val listener = new StorageStatusListener
val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics = new TaskMetrics
Expand All @@ -77,7 +78,7 @@ class StorageStatusListenerSuite extends SparkFunSuite {
}

test("task end with updated blocks") {
val listener = new StorageStatusListener
val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics1 = new TaskMetrics
Expand Down Expand Up @@ -126,7 +127,7 @@ class StorageStatusListenerSuite extends SparkFunSuite {
}

test("unpersist RDD") {
val listener = new StorageStatusListener
val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.ui.storage

import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkFunSuite, Success}
import org.apache.spark.{SparkConf, SparkFunSuite, Success}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.storage._
Expand All @@ -43,7 +43,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {

before {
bus = new LiveListenerBus
storageStatusListener = new StorageStatusListener
storageStatusListener = new StorageStatusListener(new SparkConf())
storageListener = new StorageListener(storageStatusListener)
bus.addListener(storageStatusListener)
bus.addListener(storageListener)
Expand Down