-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-7729:Executor which has been killed should also be displayed on… #6263
Changes from 2 commits
7a82254
84563df
33eb9f5
f004f21
050e949
cc387cd
1fdffc5
cef7c8b
a179aa1
1a20371
608a24b
0983858
6e85c85
7c18511
a791bcc
06042ed
ee12fe1
e031a17
a0e7cef
3e23321
b827f8f
33fc892
2cf4f71
e1577dc
826587f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,25 +18,45 @@ | |
package org.apache.spark.storage | ||
|
||
import scala.collection.mutable | ||
|
||
import org.apache.spark.annotation.DeveloperApi | ||
import org.apache.spark.scheduler._ | ||
import com.google.common.cache.CacheBuilder | ||
import java.util.concurrent.TimeUnit | ||
import com.google.common.cache.CacheLoader | ||
import org.apache.spark.SparkConf | ||
|
||
import scala.collection.JavaConversions._ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: import ordering |
||
|
||
/** | ||
* :: DeveloperApi :: | ||
* A SparkListener that maintains executor storage status. | ||
* | ||
* This class is thread-safe (unlike JobProgressListener) | ||
*/ | ||
|
||
object StorageStatusListener { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
val TIME_TO_EXPIRE_KILLED_EXECUTOR = "spark.ui.timeToExpireKilledExecutor" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we really need this class? how about just exposing this string to the end user? |
||
} | ||
|
||
@DeveloperApi | ||
class StorageStatusListener extends SparkListener { | ||
class StorageStatusListener(conf: SparkConf) extends SparkListener { | ||
|
||
import StorageStatusListener._ | ||
|
||
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE) | ||
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]() | ||
private[storage] val removedExecutorIdToStorageStatus = CacheBuilder.newBuilder(). | ||
expireAfterWrite(conf.getTimeAsSeconds(TIME_TO_EXPIRE_KILLED_EXECUTOR, "0"), TimeUnit.SECONDS). | ||
build[String, StorageStatus]() | ||
|
||
def storageStatusList: Seq[StorageStatus] = synchronized { | ||
executorIdToStorageStatus.values.toSeq | ||
} | ||
|
||
|
||
def removedExecutorStorageStatusList: Seq[StorageStatus] = synchronized{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: space before { There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing it out. Corrected. |
||
removedExecutorIdToStorageStatus.asMap().values().toSeq | ||
} | ||
|
||
/** Update storage status list to reflect updated block statuses */ | ||
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) { | ||
executorIdToStorageStatus.get(execId).foreach { storageStatus => | ||
|
@@ -87,6 +107,8 @@ class StorageStatusListener extends SparkListener { | |
override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { | ||
synchronized { | ||
val executorId = blockManagerRemoved.blockManagerId.executorId | ||
removedExecutorIdToStorageStatus.put(executorId, | ||
executorIdToStorageStatus.get(executorId).get) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I really think it would be nice to keep the time the executor was removed as well from |
||
executorIdToStorageStatus.remove(executorId) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ import scala.xml.Node | |
import org.apache.spark.status.api.v1.ExecutorSummary | ||
import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} | ||
import org.apache.spark.util.Utils | ||
import org.apache.spark.storage.StorageStatus | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: ordering |
||
|
||
// This isn't even used anymore -- but we need to keep it b/c of a MiMa false positive | ||
private[ui] case class ExecutorSummaryInfo( | ||
|
@@ -41,6 +42,7 @@ private[ui] case class ExecutorSummaryInfo( | |
totalInputBytes: Long, | ||
totalShuffleRead: Long, | ||
totalShuffleWrite: Long, | ||
isAlive: Boolean, | ||
maxMemory: Long, | ||
executorLogs: Map[String, String]) | ||
|
||
|
@@ -49,15 +51,15 @@ private[ui] class ExecutorsPage( | |
parent: ExecutorsTab, | ||
threadDumpEnabled: Boolean) | ||
extends WebUIPage("") { | ||
|
||
private val listener = parent.listener | ||
|
||
def render(request: HttpServletRequest): Seq[Node] = { | ||
val storageStatusList = listener.storageStatusList | ||
val storageStatusList = listener.storageStatusList ++ listener.removedExecutorStorageStatusList | ||
val maxMem = storageStatusList.map(_.maxMem).sum | ||
val memUsed = storageStatusList.map(_.memUsed).sum | ||
val diskUsed = storageStatusList.map(_.diskUsed).sum | ||
val execInfo = for (statusId <- 0 until storageStatusList.size) yield | ||
ExecutorsPage.getExecInfo(listener, statusId) | ||
val execInfo = for (statusId <- 0 until storageStatusList.size) yield ExecutorsPage.getExecInfo(listener, statusId) | ||
val execInfoSorted = execInfo.sortBy(_.id) | ||
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty | ||
|
||
|
@@ -84,6 +86,7 @@ private[ui] class ExecutorsPage( | |
Shuffle Write | ||
</span> | ||
</th> | ||
<th>Executor Status</th> | ||
{if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty} | ||
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty} | ||
</thead> | ||
|
@@ -144,6 +147,9 @@ private[ui] class ExecutorsPage( | |
<td sorttable_customkey={info.totalShuffleWrite.toString}> | ||
{Utils.bytesToString(info.totalShuffleWrite)} | ||
</td> | ||
<td sorttable_customkey={info.isAlive.toString}> | ||
{if(info.isAlive) "Alive" else "Killed"} | ||
</td> | ||
{ | ||
if (logsExist) { | ||
<td> | ||
|
@@ -177,13 +183,15 @@ private[ui] class ExecutorsPage( | |
private[spark] object ExecutorsPage { | ||
/** Represent an executor's info as a map given a storage status index */ | ||
def getExecInfo(listener: ExecutorsListener, statusId: Int): ExecutorSummary = { | ||
|
||
val status = listener.storageStatusList(statusId) | ||
val execId = status.blockManagerId.executorId | ||
val hostPort = status.blockManagerId.hostPort | ||
val rddBlocks = status.numBlocks | ||
val memUsed = status.memUsed | ||
val maxMem = status.maxMem | ||
val diskUsed = status.diskUsed | ||
val isAlive = listener.storageStatusList.contains(status) | ||
val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0) | ||
val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0) | ||
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0) | ||
|
@@ -193,7 +201,7 @@ private[spark] object ExecutorsPage { | |
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L) | ||
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L) | ||
val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty) | ||
|
||
new ExecutorSummary( | ||
execId, | ||
hostPort, | ||
|
@@ -208,6 +216,7 @@ private[spark] object ExecutorsPage { | |
totalInputBytes, | ||
totalShuffleRead, | ||
totalShuffleWrite, | ||
isAlive, | ||
maxMem, | ||
executorLogs | ||
) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,13 @@ import org.scalatest.FunSuite | |
import org.apache.spark.Success | ||
import org.apache.spark.executor.TaskMetrics | ||
import org.apache.spark.scheduler._ | ||
import org.apache.spark.SparkConfSuite | ||
import org.apache.spark.SparkConf | ||
|
||
/** | ||
* Test the behavior of StorageStatusListener in response to all relevant events. | ||
*/ | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. spurious extra doc, I don't think you use |
||
|
||
/** | ||
* Test the behavior of StorageStatusListener in response to all relevant events. | ||
|
@@ -30,9 +37,10 @@ class StorageStatusListenerSuite extends FunSuite { | |
private val bm2 = BlockManagerId("fat", "duck", 2) | ||
private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false) | ||
private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false) | ||
val conf = new SparkConf() | ||
|
||
test("block manager added/removed") { | ||
val listener = new StorageStatusListener | ||
val listener = new StorageStatusListener(conf) | ||
|
||
// Block manager add | ||
assert(listener.executorIdToStorageStatus.size === 0) | ||
|
@@ -61,7 +69,7 @@ class StorageStatusListenerSuite extends FunSuite { | |
} | ||
|
||
test("task end without updated blocks") { | ||
val listener = new StorageStatusListener | ||
val listener = new StorageStatusListener(conf) | ||
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) | ||
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) | ||
val taskMetrics = new TaskMetrics | ||
|
@@ -78,7 +86,7 @@ class StorageStatusListenerSuite extends FunSuite { | |
} | ||
|
||
test("task end with updated blocks") { | ||
val listener = new StorageStatusListener | ||
val listener = new StorageStatusListener(conf) | ||
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) | ||
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) | ||
val taskMetrics1 = new TaskMetrics | ||
|
@@ -127,7 +135,7 @@ class StorageStatusListenerSuite extends FunSuite { | |
} | ||
|
||
test("unpersist RDD") { | ||
val listener = new StorageStatusListener | ||
val listener = new StorageStatusListener(conf) | ||
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) | ||
val taskMetrics1 = new TaskMetrics | ||
val taskMetrics2 = new TaskMetrics | ||
|
@@ -150,4 +158,12 @@ class StorageStatusListenerSuite extends FunSuite { | |
listener.onUnpersistRDD(SparkListenerUnpersistRDD(1)) | ||
assert(listener.executorIdToStorageStatus("big").numBlocks === 0) | ||
} | ||
|
||
test("Killed Executor Entry removed after configurable time") { | ||
val localtestconf = new SparkConf().set(StorageStatusListener.TIME_TO_EXPIRE_KILLED_EXECUTOR,"5s") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: line too long |
||
val listener = new StorageStatusListener(localtestconf) | ||
listener.removedExecutorIdToStorageStatus.put("1", new StorageStatus(null, 50)) | ||
Thread.sleep(5500) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can avoid sleeping by using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For that we'll have to set an arbitrary ticker to the main cache, We would not want to set any arbitrary ticker to the original cache. Creating a new cache in the test would not be testing of our functionality and would be equivalent of testing just the Guava Cache's code. right? Please correct, if wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you can do something in between -- StorageStatusListener can have a Sleeping isn't the worst thing in this case -- often it leads to flaky tests, though I don't think that would be the case here. Still, 5 seconds is awfully long for this test when it should take a tiny fraction of that, and it adds up over all the tests. |
||
assert(listener.removedExecutorIdToStorageStatus.asMap.get("1") == null) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is more complicated than it needs to be -- no need for an atomic (there is only one thread here) you can just use a long. also I'd check the class MyTicker extends Ticker {
var t = 0L
override def read(): Long = t
}
val ticker = new MyTicker
val listener = new StorageStatusListener(localtestconf, ticker)
listener.removedExecutorIdToStorageStatus.put("1", new StorageStatus(null, 50))
assert(listener.removedExecutorStorageStatusList.nonEmpty)
ticker.t = 5000000001L
assert(listener.removedExecutorStorageStatusList.isEmpty) |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ import org.apache.spark.Success | |
import org.apache.spark.executor.TaskMetrics | ||
import org.apache.spark.scheduler._ | ||
import org.apache.spark.storage._ | ||
import org.apache.spark.SparkConf | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: import ordering, should go above the rest of the spark imports (b/c class imports sort before package imports). Also while you're touching this, there should be a blank line between the scalatest and spark imports |
||
|
||
/** | ||
* Test various functionality in the StorageListener that supports the StorageTab. | ||
|
@@ -43,7 +44,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { | |
|
||
before { | ||
bus = new LiveListenerBus | ||
storageStatusListener = new StorageStatusListener | ||
storageStatusListener = new StorageStatusListener(new SparkConf()) | ||
storageListener = new StorageListener(storageStatusListener) | ||
bus.addListener(storageStatusListener) | ||
bus.addListener(storageListener) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import ordering -- these go above the o.a.spark imports, with a linebreak in between. See https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports