Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7729][UI]Executor which has been killed should also be displayed on Executor Tab #10058

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
bd40d49
init commit
lianhuiwang Dec 1, 2015
97dbd62
rename activeStorageStatusList
lianhuiwang Dec 1, 2015
16e175d
fix scala style
lianhuiwang Dec 1, 2015
7b244ff
address CodingCat's comments
lianhuiwang Dec 1, 2015
b106cfe
fix minor error
lianhuiwang Dec 2, 2015
47255fa
remove synchronized
lianhuiwang Dec 2, 2015
325149f
address vanzin's comments
lianhuiwang Dec 4, 2015
6532e01
add REST api
lianhuiwang Dec 5, 2015
a99175b
add mima exclusion
lianhuiwang Dec 6, 2015
122d3f2
fix REST API's test
lianhuiwang Dec 6, 2015
1433c04
Merge branch 'master' of https://github.com/apache/spark into SPARK-7729
lianhuiwang Dec 15, 2015
f6b4739
combine active and dead executors
lianhuiwang Dec 15, 2015
ada7e14
fix style
lianhuiwang Dec 15, 2015
677996c
fix style
lianhuiwang Dec 15, 2015
308eade
fix style
lianhuiwang Dec 15, 2015
a1a04fc
fix mima's fail
lianhuiwang Dec 16, 2015
1608041
Merge branch 'apache-master' into SPARK-7729
lianhuiwang Dec 16, 2015
49ef6e9
Merge branch 'apache-master' into SPARK-7729
lianhuiwang Feb 2, 2016
7c7ca97
update with master
lianhuiwang Feb 2, 2016
35eef9a
update with master
lianhuiwang Feb 2, 2016
f749a5f
update with master
lianhuiwang Feb 2, 2016
8f0be11
update with master
lianhuiwang Feb 2, 2016
c632d39
Merge branch 'apache2' into SPARK-7729
lianhuiwang Feb 21, 2016
96950c6
address andrewor's comments
lianhuiwang Feb 21, 2016
9ef6c5b
Merge branch 'apache-master' into SPARK-7729
lianhuiwang Feb 23, 2016
c88afa8
Merge branch 'apache-master' into SPARK-7729
lianhuiwang Feb 23, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[v1] class AllRDDResource(ui: SparkUI) {

@GET
def rddList(): Seq[RDDStorageInfo] = {
val storageStatusList = ui.storageListener.storageStatusList
val storageStatusList = ui.storageListener.activeStorageStatusList
val rddInfos = ui.storageListener.rddInfoList
rddInfos.map{rddInfo =>
AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList,
Expand All @@ -44,7 +44,7 @@ private[spark] object AllRDDResource {
rddId: Int,
listener: StorageListener,
includeDetails: Boolean): Option[RDDStorageInfo] = {
val storageStatusList = listener.storageStatusList
val storageStatusList = listener.activeStorageStatusList
listener.rddInfoList.find { _.id == rddId }.map { rddInfo =>
getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ private[v1] class ExecutorListResource(ui: SparkUI) {
listener.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I missed something, why we need synchronized here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more than one thread use listener object. There are rest api, executor page in UI, and SparkListener.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still confused about the reason on imposing the sequential access here.....can you give an example of the problem without this synchronized....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because ExecutorsPage.getExecInfo() use executorToTasksActive, executorToTasksFailed of ExecutorsListener and SparkListenerBus thread also change value of executorToTasksActive,executorToTasksFailed in onPostEvent() , we need to use synchronized there. It is better that synchronized is used in ExecutorsPage.getExecInfo().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is necessary, essentially you are imposing that even the GET operations are to be processed sequentially....

// The follow codes should be protected by `listener` to make sure no executors will be
// removed before we query their status. See SPARK-12784.
val storageStatusList = listener.storageStatusList
val storageStatusList = listener.activeStorageStatusList
(0 until storageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId)
ExecutorsPage.getExecInfo(listener, statusId, isActive = true)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class ExecutorStageSummary private[spark](
class ExecutorSummary private[spark](
val id: String,
val hostPort: String,
val isActive: Boolean,
val rddBlocks: Int,
val memoryUsed: Long,
val diskUsed: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.storage

import scala.collection.mutable

import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._

Expand All @@ -29,14 +30,20 @@ import org.apache.spark.scheduler._
* This class is thread-safe (unlike JobProgressListener)
*/
@DeveloperApi
class StorageStatusListener extends SparkListener {
class StorageStatusListener(conf: SparkConf) extends SparkListener {
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
private[storage] val deadExecutorStorageStatus = new mutable.ListBuffer[StorageStatus]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you keep this name for backward compatibility

private[this] val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you like to update the document as well to reflect this config entry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I have done it.


def storageStatusList: Seq[StorageStatus] = synchronized {
def activeStorageStatusList: Seq[StorageStatus] = synchronized {
executorIdToStorageStatus.values.toSeq
}

def deadStorageStatusList: Seq[StorageStatus] = synchronized {
deadExecutorStorageStatus.toSeq
}

/** Update storage status list to reflect updated block statuses */
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
executorIdToStorageStatus.get(execId).foreach { storageStatus =>
Expand All @@ -52,7 +59,7 @@ class StorageStatusListener extends SparkListener {

/** Update storage status list to reflect the removal of an RDD from the cache */
private def updateStorageStatus(unpersistedRDDId: Int) {
storageStatusList.foreach { storageStatus =>
activeStorageStatusList.foreach { storageStatus =>
storageStatus.rddBlocksById(unpersistedRDDId).foreach { case (blockId, _) =>
storageStatus.removeBlock(blockId)
}
Expand Down Expand Up @@ -87,8 +94,12 @@ class StorageStatusListener extends SparkListener {
override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) {
synchronized {
val executorId = blockManagerRemoved.blockManagerId.executorId
executorIdToStorageStatus.remove(executorId)
executorIdToStorageStatus.remove(executorId).foreach { status =>
deadExecutorStorageStatus += status
}
if (deadExecutorStorageStatus.size > retainedDeadExecutors) {
deadExecutorStorageStatus.trimStart(1)
}
}
}

}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private[spark] object SparkUI {
}

val environmentListener = new EnvironmentListener
val storageStatusListener = new StorageStatusListener
val storageStatusListener = new StorageStatusListener(conf)
val executorsListener = new ExecutorsListener(storageStatusListener, conf)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)
Expand Down
81 changes: 56 additions & 25 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,30 @@ private[ui] class ExecutorsPage(
private val GCTimePercent = 0.1

def render(request: HttpServletRequest): Seq[Node] = {
val (storageStatusList, execInfo) = listener.synchronized {
val (activeExecutorInfo, deadExecutorInfo) = listener.synchronized {
// The follow codes should be protected by `listener` to make sure no executors will be
// removed before we query their status. See SPARK-12784.
val _storageStatusList = listener.storageStatusList
val _execInfo = {
for (statusId <- 0 until _storageStatusList.size)
yield ExecutorsPage.getExecInfo(listener, statusId)
val _activeExecutorInfo = {
for (statusId <- 0 until listener.activeStorageStatusList.size)
yield ExecutorsPage.getExecInfo(listener, statusId, isActive = true)
}
(_storageStatusList, _execInfo)
val _deadExecutorInfo = {
for (statusId <- 0 until listener.deadStorageStatusList.size)
yield ExecutorsPage.getExecInfo(listener, statusId, isActive = false)
}
(_activeExecutorInfo, _deadExecutorInfo)
}

val execInfo = activeExecutorInfo ++ deadExecutorInfo
val execInfoSorted = execInfo.sortBy(_.id)
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty

val execTable =
val execTable = {
<table class={UIUtils.TABLE_CLASS_STRIPED_SORTABLE}>
<thead>
<th>Executor ID</th>
<th>Address</th>
<th>Status</th>
<th>RDD Blocks</th>
<th><span data-toggle="tooltip" title={ToolTips.STORAGE_MEMORY}>Storage Memory</span></th>
<th>Disk Used</th>
Expand All @@ -97,32 +103,48 @@ private[ui] class ExecutorsPage(
{execInfoSorted.map(execRow(_, logsExist))}
</tbody>
</table>
}

val content =
<div class="row">
<div class="span12">
<h4>Totals for {execInfo.size} Executors</h4>
{execSummary(execInfo)}
<h4>Dead Executors({deadExecutorInfo.size})</h4>
</div>
</div>
<div class="row">
<div class="span12">
<h4>Active Executors({activeExecutorInfo.size})</h4>
{execSummary(activeExecutorInfo)}
</div>
</div>
<div class = "row">
<div class="span12">
<h4>Active Executors</h4>
<h4>Executors</h4>
{execTable}
</div>
</div>;

UIUtils.headerSparkPage("Executors (" + execInfo.size + ")", content, parent)
UIUtils.headerSparkPage("Executors", content, parent)
}

/** Render an HTML row representing an executor */
private def execRow(info: ExecutorSummary, logsExist: Boolean): Seq[Node] = {
val maximumMemory = info.maxMemory
val memoryUsed = info.memoryUsed
val diskUsed = info.diskUsed
val executorStatus =
if (info.isActive) {
"Active"
} else {
"Dead"
}

<tr>
<td>{info.id}</td>
<td>{info.hostPort}</td>
<td sorttable_customkey={executorStatus.toString}>
{executorStatus}
</td>
<td>{info.rddBlocks}</td>
<td sorttable_customkey={memoryUsed.toString}>
{Utils.bytesToString(memoryUsed)} /
Expand Down Expand Up @@ -159,10 +181,14 @@ private[ui] class ExecutorsPage(
}
{
if (threadDumpEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can do threadDumpEnabled && info.isActive

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when threadDump is enable, I think "<td> </td>" is better than Seq.empty for dead executor, because active executor and dead executor are in the same table. It is better for table alignment.

val encodedId = URLEncoder.encode(info.id, "UTF-8")
<td>
<a href={s"threadDump/?executorId=${encodedId}"}>Thread Dump</a>
</td>
if (info.isActive) {
val encodedId = URLEncoder.encode(info.id, "UTF-8")
<td>
<a href={s"threadDump/?executorId=${encodedId}"}>Thread Dump</a>
</td>
} else {
<td> </td>
}
} else {
Seq.empty
}
Expand Down Expand Up @@ -231,14 +257,13 @@ private[ui] class ExecutorsPage(
}

private def taskData(
maxTasks: Int,
activeTasks: Int,
failedTasks: Int,
completedTasks: Int,
totalTasks: Int,
totalDuration: Long,
totalGCTime: Long):
Seq[Node] = {
maxTasks: Int,
activeTasks: Int,
failedTasks: Int,
completedTasks: Int,
totalTasks: Int,
totalDuration: Long,
totalGCTime: Long): Seq[Node] = {
// Determine Color Opacity from 0.5-1
// activeTasks range from 0 to maxTasks
val activeTasksAlpha =
Expand Down Expand Up @@ -297,8 +322,13 @@ private[ui] class ExecutorsPage(

private[spark] object ExecutorsPage {
/** Represent an executor's info as a map given a storage status index */
def getExecInfo(listener: ExecutorsListener, statusId: Int): ExecutorSummary = {
val status = listener.storageStatusList(statusId)
def getExecInfo(listener: ExecutorsListener, statusId: Int, isActive: Boolean)
: ExecutorSummary = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style:

def getExecInfo(
    listener: ExecutorsListener,
    statusId: Int,
    isActive: Boolean): ExecutorSummary = {
}

val status = if (isActive) {
listener.activeStorageStatusList(statusId)
} else {
listener.deadStorageStatusList(statusId)
}
val execId = status.blockManagerId.executorId
val hostPort = status.blockManagerId.hostPort
val rddBlocks = status.numBlocks
Expand All @@ -320,6 +350,7 @@ private[spark] object ExecutorsPage {
new ExecutorSummary(
execId,
hostPort,
isActive,
rddBlocks,
memUsed,
diskUsed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
val executorToLogUrls = HashMap[String, Map[String, String]]()
val executorIdToData = HashMap[String, ExecutorUIData]()

def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.activeStorageStatusList

def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used anywhere


override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
val eid = executorAdded.executorId
Expand All @@ -80,7 +82,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar

override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
applicationStart.driverLogs.foreach { logs =>
val storageStatus = storageStatusList.find { s =>
val storageStatus = activeStorageStatusList.find { s =>
s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc

private[ui] val _rddInfoMap = mutable.Map[Int, RDDInfo]() // exposed for testing

def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.activeStorageStatusList

/** Filter RDD info to include only those with cached partitions */
def rddInfoList: Seq[RDDInfo] = synchronized {
Expand All @@ -54,7 +54,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = {
val rddIdsToUpdate = updatedBlocks.flatMap { case (bid, _) => bid.asRDDId.map(_.rddId) }.toSet
val rddInfosToUpdate = _rddInfoMap.values.toSeq.filter { s => rddIdsToUpdate.contains(s.id) }
StorageUtils.updateRddInfo(rddInfosToUpdate, storageStatusList)
StorageUtils.updateRddInfo(rddInfosToUpdate, activeStorageStatusList)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[ {
"id" : "<driver>",
"hostPort" : "localhost:57971",
"isActive" : true,
"rddBlocks" : 8,
"memoryUsed" : 28000128,
"diskUsed" : 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.storage

import org.apache.spark.{SparkFunSuite, Success}
import org.apache.spark.{SparkConf, SparkFunSuite, Success}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._

Expand All @@ -29,9 +29,11 @@ class StorageStatusListenerSuite extends SparkFunSuite {
private val bm2 = BlockManagerId("fat", "duck", 2)
private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false)
private val conf = new SparkConf()

test("block manager added/removed") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update this test to add checks for the dead executors list you're adding?

val listener = new StorageStatusListener
conf.set("spark.ui.retainedDeadExecutors", "1")
val listener = new StorageStatusListener(conf)

// Block manager add
assert(listener.executorIdToStorageStatus.size === 0)
Expand All @@ -53,14 +55,18 @@ class StorageStatusListenerSuite extends SparkFunSuite {
assert(listener.executorIdToStorageStatus.size === 1)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
assert(listener.deadExecutorStorageStatus.size === 1)
assert(listener.deadExecutorStorageStatus(0).blockManagerId.executorId.equals("big"))
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm2))
assert(listener.executorIdToStorageStatus.size === 0)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(!listener.executorIdToStorageStatus.get("fat").isDefined)
assert(listener.deadExecutorStorageStatus.size === 1)
assert(listener.deadExecutorStorageStatus(0).blockManagerId.executorId.equals("fat"))
}

test("task end without updated blocks") {
val listener = new StorageStatusListener
val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics = new TaskMetrics
Expand All @@ -77,7 +83,7 @@ class StorageStatusListenerSuite extends SparkFunSuite {
}

test("task end with updated blocks") {
val listener = new StorageStatusListener
val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics1 = new TaskMetrics
Expand Down Expand Up @@ -126,7 +132,7 @@ class StorageStatusListenerSuite extends SparkFunSuite {
}

test("unpersist RDD") {
val listener = new StorageStatusListener
val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.ui.storage

import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkFunSuite, Success}
import org.apache.spark.{SparkConf, SparkFunSuite, Success}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.storage._
Expand All @@ -44,7 +44,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {

before {
bus = new LiveListenerBus
storageStatusListener = new StorageStatusListener
storageStatusListener = new StorageStatusListener(new SparkConf())
storageListener = new StorageListener(storageStatusListener)
bus.addListener(storageStatusListener)
bus.addListener(storageListener)
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,13 @@ Apart from these, the following properties are also available, and may be useful
How many finished batches the Spark UI and status APIs remember before garbage collecting.
</td>
</tr>
<tr>
<td><code>spark.ui.retainedDeadExecutors</code></td>
<td>100</td>
<td>
How many dead executors the Spark UI and status APIs remember before garbage collecting.
</td>
</tr>
</table>

#### Compression and Serialization
Expand Down
Loading