Skip to content

Commit

Permalink
[SPARK-2845] Add timestamps to block manager events.
Browse files Browse the repository at this point in the history
These are not used by the UI but are useful when analysing the
logs from a spark job.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #654 from vanzin/bm-event-tstamp and squashes the following commits:

d5d6e66 [Marcelo Vanzin] Fix tests.
ec06218 [Marcelo Vanzin] Review feedback.
f134dbc [Marcelo Vanzin] Merge branch 'master' into bm-event-tstamp
b495b7c [Marcelo Vanzin] Merge branch 'master' into bm-event-tstamp
7d2fe9e [Marcelo Vanzin] Review feedback.
d6f381c [Marcelo Vanzin] Update tests added after patch was created.
45e3bf8 [Marcelo Vanzin] Fix unit test after merge.
b37a10f [Marcelo Vanzin] Use === in test assertions.
ef72824 [Marcelo Vanzin] Handle backwards compatibility with 1.0.0.
aca1151 [Marcelo Vanzin] Fix unit test to check new fields.
efdda8e [Marcelo Vanzin] Add timestamps to block manager events.
  • Loading branch information
Marcelo Vanzin authored and andrewor14 committed Sep 3, 2014
1 parent e5d3768 commit ccc69e2
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerBlockManagerAdded(blockManagerId: BlockManagerId, maxMem: Long)
case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId)
extends SparkListenerEvent

@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
blockLocations.remove(blockId)
}
}
listenerBus.post(SparkListenerBlockManagerRemoved(blockManagerId))
listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
}

private def expireDeadHosts() {
Expand Down Expand Up @@ -325,6 +325,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}

private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
val time = System.currentTimeMillis()
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
case Some(manager) =>
Expand All @@ -340,9 +341,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
id.hostPort, Utils.bytesToString(maxMemSize)))

blockManagerInfo(id) =
new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor)
new BlockManagerInfo(id, time, maxMemSize, slaveActor)
}
listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize))
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
}

private def updateBlockInfo(
Expand Down
12 changes: 8 additions & 4 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,15 @@ private[spark] object JsonProtocol {
val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId)
("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~
("Block Manager ID" -> blockManagerId) ~
("Maximum Memory" -> blockManagerAdded.maxMem)
("Maximum Memory" -> blockManagerAdded.maxMem) ~
("Timestamp" -> blockManagerAdded.time)
}

def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = {
val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId)
("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~
("Block Manager ID" -> blockManagerId)
("Block Manager ID" -> blockManagerId) ~
("Timestamp" -> blockManagerRemoved.time)
}

def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = {
Expand Down Expand Up @@ -466,12 +468,14 @@ private[spark] object JsonProtocol {
def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = {
val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
val maxMem = (json \ "Maximum Memory").extract[Long]
SparkListenerBlockManagerAdded(blockManagerId, maxMem)
val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
SparkListenerBlockManagerAdded(time, blockManagerId, maxMem)
}

def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = {
val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
SparkListenerBlockManagerRemoved(blockManagerId)
val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
SparkListenerBlockManagerRemoved(time, blockManagerId)
}

def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,34 +36,34 @@ class StorageStatusListenerSuite extends FunSuite {

// Block manager add
assert(listener.executorIdToStorageStatus.size === 0)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
assert(listener.executorIdToStorageStatus.size === 1)
assert(listener.executorIdToStorageStatus.get("big").isDefined)
assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1)
assert(listener.executorIdToStorageStatus("big").maxMem === 1000L)
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
assert(listener.executorIdToStorageStatus.size === 2)
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2)
assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)

// Block manager remove
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1))
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm1))
assert(listener.executorIdToStorageStatus.size === 1)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm2))
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm2))
assert(listener.executorIdToStorageStatus.size === 0)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(!listener.executorIdToStorageStatus.get("fat").isDefined)
}

test("task end without updated blocks") {
val listener = new StorageStatusListener
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics = new TaskMetrics

// Task end with no updated blocks
Expand All @@ -79,8 +79,8 @@ class StorageStatusListenerSuite extends FunSuite {

test("task end with updated blocks") {
val listener = new StorageStatusListener
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
Expand Down Expand Up @@ -128,7 +128,7 @@ class StorageStatusListenerSuite extends FunSuite {

test("unpersist RDD") {
val listener = new StorageStatusListener
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
val myRddInfo1 = rddInfo1
val myRddInfo2 = rddInfo2
val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L))
bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener._rddInfoMap.size === 3)
assert(storageListener.rddInfoList.size === 0) // not cached
Expand Down Expand Up @@ -175,7 +175,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L, 0L))
taskMetrics0.updatedBlocks = Some(Seq(block0))
taskMetrics1.updatedBlocks = Some(Seq(block1))
bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L))
bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener.rddInfoList.size === 0)
bus.postToAll(SparkListenerTaskEnd(0, 0, "big", Success, taskInfo, taskMetrics0))
Expand Down
37 changes: 33 additions & 4 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import java.util.Properties

import scala.collection.Map

import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
import org.scalatest.FunSuite

Expand Down Expand Up @@ -52,9 +55,9 @@ class JsonProtocolSuite extends FunSuite {
"System Properties" -> Seq(("Username", "guest"), ("Password", "guest")),
"Classpath Entries" -> Seq(("Super library", "/tmp/super_library"))
))
val blockManagerAdded = SparkListenerBlockManagerAdded(
val blockManagerAdded = SparkListenerBlockManagerAdded(1L,
BlockManagerId("Stars", "In your multitude...", 300), 500)
val blockManagerRemoved = SparkListenerBlockManagerRemoved(
val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
BlockManagerId("Scarce", "to be counted...", 100))
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield")
Expand Down Expand Up @@ -151,6 +154,28 @@ class JsonProtocolSuite extends FunSuite {
assert(newMetrics.inputMetrics.isEmpty)
}

test("BlockManager events backward compatibility") {
// SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property.
val blockManagerAdded = SparkListenerBlockManagerAdded(1L,
BlockManagerId("Stars", "In your multitude...", 300), 500)
val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
BlockManagerId("Scarce", "to be counted...", 100))

val oldBmAdded = JsonProtocol.blockManagerAddedToJson(blockManagerAdded)
.removeField({ _._1 == "Timestamp" })

val deserializedBmAdded = JsonProtocol.blockManagerAddedFromJson(oldBmAdded)
assert(SparkListenerBlockManagerAdded(-1L, blockManagerAdded.blockManagerId,
blockManagerAdded.maxMem) === deserializedBmAdded)

val oldBmRemoved = JsonProtocol.blockManagerRemovedToJson(blockManagerRemoved)
.removeField({ _._1 == "Timestamp" })

val deserializedBmRemoved = JsonProtocol.blockManagerRemovedFromJson(oldBmRemoved)
assert(SparkListenerBlockManagerRemoved(-1L, blockManagerRemoved.blockManagerId) ===
deserializedBmRemoved)
}


/** -------------------------- *
| Helper test running methods |
Expand Down Expand Up @@ -242,8 +267,10 @@ class JsonProtocolSuite extends FunSuite {
assertEquals(e1.environmentDetails, e2.environmentDetails)
case (e1: SparkListenerBlockManagerAdded, e2: SparkListenerBlockManagerAdded) =>
assert(e1.maxMem === e2.maxMem)
assert(e1.time === e2.time)
assertEquals(e1.blockManagerId, e2.blockManagerId)
case (e1: SparkListenerBlockManagerRemoved, e2: SparkListenerBlockManagerRemoved) =>
assert(e1.time === e2.time)
assertEquals(e1.blockManagerId, e2.blockManagerId)
case (e1: SparkListenerUnpersistRDD, e2: SparkListenerUnpersistRDD) =>
assert(e1.rddId == e2.rddId)
Expand Down Expand Up @@ -945,7 +972,8 @@ class JsonProtocolSuite extends FunSuite {
| "Host": "In your multitude...",
| "Port": 300
| },
| "Maximum Memory": 500
| "Maximum Memory": 500,
| "Timestamp": 1
|}
"""

Expand All @@ -957,7 +985,8 @@ class JsonProtocolSuite extends FunSuite {
| "Executor ID": "Scarce",
| "Host": "to be counted...",
| "Port": 100
| }
| },
| "Timestamp": 2
|}
"""

Expand Down

0 comments on commit ccc69e2

Please sign in to comment.