Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-7729:Executor which has been killed should also be displayed on… #6263

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7a82254
SPARK-7729:Executor which has been killed should also be displayed on…
May 19, 2015
84563df
Revert "SPARK-7729:Executor which has been killed should also be disp…
May 19, 2015
33eb9f5
SPARK-7729: Executor which has been killed should also be displayed o…
Nov 4, 2015
f004f21
SPARK-7729: review comments incorporated.
Nov 5, 2015
050e949
SPARK-7729: review comments incorporated - 2.
Nov 5, 2015
cc387cd
SPARK-7729: review comment.
Nov 5, 2015
1fdffc5
SPARK-7729: review comments - 4.
Nov 5, 2015
cef7c8b
SPARK-7729: another few minor tweeks.
Nov 16, 2015
a179aa1
SPARK-7729: ordering correction.
Nov 17, 2015
1a20371
SPARK-7729: developer api annotation.
Nov 17, 2015
608a24b
Merge remote-tracking branch 'upstream/master'
Nov 20, 2015
0983858
SPARK-7729: Executor which has been killed should also be displayed o…
Nov 4, 2015
6e85c85
SPARK-7729: review comments incorporated.
Nov 5, 2015
7c18511
SPARK-7729: review comments incorporated - 2.
Nov 5, 2015
a791bcc
SPARK-7729: review comment.
Nov 5, 2015
06042ed
SPARK-7729: review comments - 4.
Nov 5, 2015
ee12fe1
SPARK-7729: another few minor tweeks.
Nov 16, 2015
e031a17
SPARK-7729: ordering correction.
Nov 17, 2015
a0e7cef
SPARK-7729: developer api annotation.
Nov 17, 2015
3e23321
Merge branch 'SPARK-7729' of https://github.com/archit279thakur/spark…
Nov 20, 2015
b827f8f
SPARK-7729: scalastyle issues fixing.
Nov 20, 2015
33fc892
SPARK-7729: rebasing with master.
Nov 20, 2015
2cf4f71
SPARK-7729: scalastyle again.
Nov 20, 2015
e1577dc
SPARK-7729: build failure correction.
Nov 20, 2015
826587f
SPARK-7729: scalastyle again.
Nov 20, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class ExecutorSummary private[spark](
val totalInputBytes: Long,
val totalShuffleRead: Long,
val totalShuffleWrite: Long,
val isAlive: Boolean,
val maxMemory: Long,
val executorLogs: Map[String, String])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,55 @@

package org.apache.spark.storage

import java.util.concurrent.TimeUnit

import scala.collection.JavaConversions.collectionAsScalaIterable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid using JavaConversions, you should prefer JavaConverters, which forces you to call .asScala, making the transformation much clearer to future code readers. the convention is to import scala.collection.JavaConverters._

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

import scala.collection.mutable
import scala.language.reflectiveCalls
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am getting this warning on the compilation (though there are many already being thrown).

[warn] core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala:174: reflective access of structural type member method advance should be enabled
[warn] by making the implicit value scala.language.reflectiveCalls visible.
[warn] This can be achieved by adding the import clause 'import scala.language.reflectiveCalls'
[warn] or by setting the compiler option -language:reflectiveCalls.
[warn] See the Scala docs for value scala.language.reflectiveCalls for a discussion
[warn] why the feature should be explicitly enabled.
[warn] ticker.advance(5, TimeUnit.SECONDS)

That's why I imported that. I checked again, I am still getting this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so that is in the StorageStatusListener_Suite_, not StorageStatusListener. You're getting that b/c you're calling ticker.advance, though you haven't explicitly defined a class or interface w/ that method, so scala is using some reflection tricks to do it. If you change that test as I suggested, that warning will disappear and you can remove the import


import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._

import com.google.common.base.Ticker
import com.google.common.cache.CacheBuilder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import ordering -- these go above the o.a.spark imports, with a linebreak in between. See https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports


/**
* :: DeveloperApi ::
* A SparkListener that maintains executor storage status.
*
* This class is thread-safe (unlike JobProgressListener)
*/

object StorageStatusListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DeveloperApi

val TIME_TO_EXPIRE_KILLED_EXECUTOR = "spark.ui.timeToExpireKilledExecutor"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need this class? how about just exposing this string to the end user?

}

@DeveloperApi
class StorageStatusListener extends SparkListener {
class StorageStatusListener(conf: SparkConf) extends SparkListener {
var ticker = Ticker.systemTicker()

private [storage] def this(conf: SparkConf, ticker: Ticker) = {
this(conf)
this.ticker = ticker
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can avoid the var here, though the scala syntax is a little tricky:

class StorageStatusListener private[storage](conf: SparkConf, ticker: Ticker) {
  def this(conf: SparkConf) = {
    this(conf, Ticker.systemTicker())
  }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, This definitely looks better.


import StorageStatusListener._

// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
private[storage] val removedExecutorIdToStorageStatus = CacheBuilder.newBuilder().
expireAfterWrite(conf.getTimeAsSeconds(TIME_TO_EXPIRE_KILLED_EXECUTOR, "0"), TimeUnit.SECONDS).
ticker(ticker).build[String, StorageStatus]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit:
can you put each chained call on its own line, with the . at the beginning of the line? so

private[storage] val removedExecutorIdToStorageStatus = CacheBuilder.newBuilder()
  .expireAfterWrite(...)
  .ticker(ticker)
  .build...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also does this need to be private[storage], or can it just be private?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was accessing it in tests, so kept it private[storage]. I can make it private and addition of new tests, (if require) can make it private[storage] ?


def storageStatusList: Seq[StorageStatus] = synchronized {
executorIdToStorageStatus.values.toSeq
}


def removedExecutorStorageStatusList: Seq[StorageStatus] = synchronized{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space before {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out. Corrected.

removedExecutorIdToStorageStatus.asMap().values().toSeq
}

/** Update storage status list to reflect updated block statuses */
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
executorIdToStorageStatus.get(execId).foreach { storageStatus =>
Expand Down Expand Up @@ -87,6 +116,8 @@ class StorageStatusListener extends SparkListener {
override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) {
synchronized {
val executorId = blockManagerRemoved.blockManagerId.executorId
removedExecutorIdToStorageStatus.put(executorId,
executorIdToStorageStatus.get(executorId).get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really think it would be nice to keep the time the executor was removed as well from blockManagerRemoved.time to show in the UI

executorIdToStorageStatus.remove(executorId)
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private[spark] object SparkUI {
}

val environmentListener = new EnvironmentListener
val storageStatusListener = new StorageStatusListener
val storageStatusListener = new StorageStatusListener(conf)
val executorsListener = new ExecutorsListener(storageStatusListener)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)
Expand Down
17 changes: 13 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.xml.Node
import org.apache.spark.status.api.v1.ExecutorSummary
import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageStatus
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ordering


// This isn't even used anymore -- but we need to keep it b/c of a MiMa false positive
private[ui] case class ExecutorSummaryInfo(
Expand All @@ -41,6 +42,7 @@ private[ui] case class ExecutorSummaryInfo(
totalInputBytes: Long,
totalShuffleRead: Long,
totalShuffleWrite: Long,
isAlive: Boolean,
maxMemory: Long,
executorLogs: Map[String, String])

Expand All @@ -49,15 +51,15 @@ private[ui] class ExecutorsPage(
parent: ExecutorsTab,
threadDumpEnabled: Boolean)
extends WebUIPage("") {

private val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = listener.storageStatusList
val storageStatusList = listener.storageStatusList ++ listener.removedExecutorStorageStatusList
val maxMem = storageStatusList.map(_.maxMem).sum
val memUsed = storageStatusList.map(_.memUsed).sum
val diskUsed = storageStatusList.map(_.diskUsed).sum
val execInfo = for (statusId <- 0 until storageStatusList.size) yield
ExecutorsPage.getExecInfo(listener, statusId)
val execInfo = for (statusId <- 0 until storageStatusList.size) yield ExecutorsPage.getExecInfo(listener, statusId)
val execInfoSorted = execInfo.sortBy(_.id)
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty

Expand All @@ -84,6 +86,7 @@ private[ui] class ExecutorsPage(
Shuffle Write
</span>
</th>
<th>Executor Status</th>
{if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty}
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
</thead>
Expand Down Expand Up @@ -144,6 +147,9 @@ private[ui] class ExecutorsPage(
<td sorttable_customkey={info.totalShuffleWrite.toString}>
{Utils.bytesToString(info.totalShuffleWrite)}
</td>
<td sorttable_customkey={info.isAlive.toString}>
{if(info.isAlive) "Alive" else "Killed"}
</td>
{
if (logsExist) {
<td>
Expand Down Expand Up @@ -177,13 +183,15 @@ private[ui] class ExecutorsPage(
private[spark] object ExecutorsPage {
/** Represent an executor's info as a map given a storage status index */
def getExecInfo(listener: ExecutorsListener, statusId: Int): ExecutorSummary = {

val status = listener.storageStatusList(statusId)
val execId = status.blockManagerId.executorId
val hostPort = status.blockManagerId.hostPort
val rddBlocks = status.numBlocks
val memUsed = status.memUsed
val maxMem = status.maxMem
val diskUsed = status.diskUsed
val isAlive = listener.storageStatusList.contains(status)
val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0)
val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
Expand All @@ -193,7 +201,7 @@ private[spark] object ExecutorsPage {
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)

new ExecutorSummary(
execId,
hostPort,
Expand All @@ -208,6 +216,7 @@ private[spark] object ExecutorsPage {
totalInputBytes,
totalShuffleRead,
totalShuffleWrite,
isAlive,
maxMem,
executorLogs
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
val executorIdToData = HashMap[String, ExecutorUIData]()

def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
def removedExecutorStorageStatusList: Seq[StorageStatus] = {
storageStatusListener.removedExecutorStorageStatusList
}

override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
val eid = executorAdded.executorId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@

package org.apache.spark.storage

import org.scalatest.FunSuite
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.SparkConf
import org.apache.spark.Success
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.scalatest.FunSuite

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spurious extra doc, I don't think you use SparkConfSuite, and the import of SparkConf is out of order

import com.google.common.base.Ticker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: import ordering & grouping. org.scalatest & com.google go in the same group, above the org.apache.spark group

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out. Corrected.


/**
* Test the behavior of StorageStatusListener in response to all relevant events.
Expand All @@ -30,9 +36,10 @@ class StorageStatusListenerSuite extends FunSuite {
private val bm2 = BlockManagerId("fat", "duck", 2)
private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false)
val conf = new SparkConf()

test("block manager added/removed") {
val listener = new StorageStatusListener
val listener = new StorageStatusListener(conf)

// Block manager add
assert(listener.executorIdToStorageStatus.size === 0)
Expand Down Expand Up @@ -61,7 +68,7 @@ class StorageStatusListenerSuite extends FunSuite {
}

test("task end without updated blocks") {
val listener = new StorageStatusListener
val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics = new TaskMetrics
Expand All @@ -78,7 +85,7 @@ class StorageStatusListenerSuite extends FunSuite {
}

test("task end with updated blocks") {
val listener = new StorageStatusListener
val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics1 = new TaskMetrics
Expand Down Expand Up @@ -127,7 +134,7 @@ class StorageStatusListenerSuite extends FunSuite {
}

test("unpersist RDD") {
val listener = new StorageStatusListener
val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
Expand All @@ -150,4 +157,21 @@ class StorageStatusListenerSuite extends FunSuite {
listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
}

test("Killed Executor Entry removed after configurable time") {
val localtestconf = new SparkConf().set(StorageStatusListener.TIME_TO_EXPIRE_KILLED_EXECUTOR,"5s")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: line too long

val ticker = new Ticker {
val nanos = new AtomicLong()
def advance(time: Long, timeUnit: TimeUnit) = {
nanos.addAndGet(timeUnit.toNanos(time))
}
override def read() = {
nanos.getAndAdd(0)
}
}
val listener = new StorageStatusListener(localtestconf, ticker)
listener.removedExecutorIdToStorageStatus.put("1", new StorageStatus(null, 50))
ticker.advance(5, TimeUnit.SECONDS)
assert(listener.removedExecutorIdToStorageStatus.asMap.get("1") == null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is more complicated than it needs to be -- no need for an atomic (there is only one thread here) you can just use a long. also I'd check the removedExecutorStorageStatusList method, rather than the cache itself.

    class MyTicker extends Ticker {
      var t = 0L
      override def read(): Long = t
    }
    val ticker = new MyTicker
    val listener = new StorageStatusListener(localtestconf, ticker)
    listener.removedExecutorIdToStorageStatus.put("1", new StorageStatus(null, 50))
    assert(listener.removedExecutorStorageStatusList.nonEmpty)
    ticker.t = 5000000001L
    assert(listener.removedExecutorStorageStatusList.isEmpty)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.Success
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.storage._
import org.apache.spark.SparkConf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: import ordering, should go above the rest of the spark imports (b/c class imports sort before package imports). Also while you're touching this, there should be a blank line between the scalatest and spark imports


/**
* Test various functionality in the StorageListener that supports the StorageTab.
Expand All @@ -43,7 +44,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {

before {
bus = new LiveListenerBus
storageStatusListener = new StorageStatusListener
storageStatusListener = new StorageStatusListener(new SparkConf())
storageListener = new StorageListener(storageStatusListener)
bus.addListener(storageStatusListener)
bus.addListener(storageListener)
Expand Down