Skip to content

Commit

Permalink
address vanzin's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
lianhuiwang committed Dec 4, 2015
1 parent 47255fa commit 325149f
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,11 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) {
synchronized {
val executorId = blockManagerRemoved.blockManagerId.executorId
val removedStorageStatus = executorIdToStorageStatus.remove(executorId)
if (removedStorageStatus.isDefined) {
deadExecutorStorageStatus += removedStorageStatus.get
if (deadExecutorStorageStatus.size > retainedDeadExecutors) {
deadExecutorStorageStatus.trimStart(1)
}
executorIdToStorageStatus.remove(executorId).foreach { status =>
deadExecutorStorageStatus += status
}
if (deadExecutorStorageStatus.size > retainedDeadExecutors) {
deadExecutorStorageStatus.trimStart(1)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class StorageStatusListenerSuite extends SparkFunSuite {
private val conf = new SparkConf()

test("block manager added/removed") {
conf.set("spark.ui.retainedDeadExecutors", "1")
val listener = new StorageStatusListener(conf)

// Block manager add
Expand All @@ -54,10 +55,14 @@ class StorageStatusListenerSuite extends SparkFunSuite {
assert(listener.executorIdToStorageStatus.size === 1)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
assert(listener.deadExecutorStorageStatus.size === 1)
assert(listener.deadExecutorStorageStatus(0).blockManagerId.executorId.equals("big"))
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm2))
assert(listener.executorIdToStorageStatus.size === 0)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(!listener.executorIdToStorageStatus.get("fat").isDefined)
assert(listener.deadExecutorStorageStatus.size === 1)
assert(listener.deadExecutorStorageStatus(0).blockManagerId.executorId.equals("fat"))
}

test("task end without updated blocks") {
Expand Down

0 comments on commit 325149f

Please sign in to comment.