From 1d1b18d80bd0ff2f2546821ad637eb07c3df59f2 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 16 Sep 2014 20:44:04 +0900 Subject: [PATCH 1/4] Added Cache Hit Count and Cache Miss Count metrics --- .../main/scala/org/apache/spark/CacheManager.scala | 3 +++ .../scala/org/apache/spark/executor/TaskMetrics.scala | 4 ++++ .../apache/spark/ui/jobs/JobProgressListener.scala | 11 +++++++++++ .../scala/org/apache/spark/ui/jobs/StagePage.scala | 8 ++++++++ .../main/scala/org/apache/spark/ui/jobs/UIData.scala | 4 ++++ .../scala/org/apache/spark/util/JsonProtocol.scala | 4 ++++ 6 files changed, 34 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index f8584b90cabe6..b8a41cafcabb7 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -45,9 +45,12 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { case Some(blockResult) => // Partition is already materialized, so just return its values context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics) + context.taskMetrics.cacheHitCount += 1 new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) case None => + + context.taskMetrics.cacheMissCount += 1 // Acquire a lock for loading this partition // If another thread already holds the lock, wait for it to finish return its results val storedValues = acquireLockForPartition[T](key) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 99a88c13456df..3b14520812e39 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -115,6 +115,10 @@ class TaskMetrics extends Serializable { */ var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None + var cacheHitCount: Long = _ + + var cacheMissCount: Long = _ + /** * A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization * issues from readers in different threads, in-progress tasks use a ShuffleReadMetrics for each diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index eaeb861f59e5a..417b81e0dceb6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -227,6 +227,17 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val timeDelta = taskMetrics.executorRunTime - oldMetrics.map(_.executorRunTime).getOrElse(0L) stageData.executorRunTime += timeDelta + + val cacheHitCountDelta = + taskMetrics.cacheHitCount - oldMetrics.map(_.cacheHitCount).getOrElse(0L) + stageData.cacheHitCount += cacheHitCountDelta + execSummary.cacheHitCount += cacheHitCountDelta + + val cacheMissCountDelta = + taskMetrics.cacheMissCount - oldMetrics.map(_.cacheMissCount).getOrElse(0L) + stageData.cacheMissCount += cacheMissCountDelta + execSummary.cacheMissCount += cacheMissCountDelta + } override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index db01be596e073..4bbf6753b8a87 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -56,6 +56,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val hasShuffleRead = stageData.shuffleReadBytes > 0 val hasShuffleWrite = stageData.shuffleWriteBytes > 0 val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0 + val hasCacheHitRatio = stageData.cacheHitCount > 0 || stageData.cacheMissCount > 0 // scalastyle:off val summary = @@ -93,6 +94,13 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { {Utils.bytesToString(stageData.diskBytesSpilled)} } + {if (hasCacheHitRatio) +
  • + Cache hit ratio: {stageData.cacheHitCount.toDouble / + (stageData.cacheHitCount + stageData.cacheMissCount)} + % +
  • + } // scalastyle:on diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index a336bf7e1ed02..138d5ef249c31 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -34,6 +34,8 @@ private[jobs] object UIData { var shuffleWrite : Long = 0 var memoryBytesSpilled : Long = 0 var diskBytesSpilled : Long = 0 + var cacheHitCount: Long = 0 + var cacheMissCount: Long = 0 } class StageUIData { @@ -49,6 +51,8 @@ private[jobs] object UIData { var shuffleWriteBytes: Long = _ var memoryBytesSpilled: Long = _ var diskBytesSpilled: Long = _ + var cacheHitCount: Long = _ + var cacheMissCount: Long = _ var schedulingPool: String = "" var description: Option[String] = None diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index c4dddb2d1037e..4cc709aa7b27c 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -248,6 +248,8 @@ private[spark] object JsonProtocol { ("Result Serialization Time" -> taskMetrics.resultSerializationTime) ~ ("Memory Bytes Spilled" -> taskMetrics.memoryBytesSpilled) ~ ("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~ + ("Cache Hit Count" -> taskMetrics.cacheHitCount) ~ + ("Cache Miss Count" -> taskMetrics.cacheMissCount) ~ ("Shuffle Read Metrics" -> shuffleReadMetrics) ~ ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~ ("Input Metrics" -> inputMetrics) ~ @@ -571,6 +573,8 @@ private[spark] object JsonProtocol { metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long] metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long] metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long] + metrics.cacheHitCount = (json \ "Cache Hit Count").extract[Long] + metrics.cacheMissCount = (json \ "Cache Miss Count").extract[Long] metrics.setShuffleReadMetrics( Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)) metrics.shuffleWriteMetrics = From 678c676004f180f4087807cac6a472338d796b3e Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 16 Sep 2014 22:09:07 +0900 Subject: [PATCH 2/4] Modified StagePage.scala --- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 4bbf6753b8a87..cceb3c584a773 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -96,7 +96,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { } {if (hasCacheHitRatio)
  • - Cache hit ratio: {stageData.cacheHitCount.toDouble / + Cache hit ratio: {stageData.cacheHitCount * 100.0 / (stageData.cacheHitCount + stageData.cacheMissCount)} %
  • From 05724f84b5927e589a76a4f3cc7e3b8161996512 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 17 Sep 2014 00:03:40 +0900 Subject: [PATCH 3/4] Modified ExecutorTable.scala to display cache hit ratio --- .../scala/org/apache/spark/ui/jobs/ExecutorTable.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 2987dc04494a5..18581473871be 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -49,6 +49,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobPr Shuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) + Cache Hit Ratio {createExecutorTable()} @@ -85,6 +86,14 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobPr {Utils.bytesToString(v.memoryBytesSpilled)} {Utils.bytesToString(v.diskBytesSpilled)} + { if (v.cacheHitCount > 0 || v.cacheMissCount > 0) { + + {v.cacheHitCount * 100 / (v.cacheHitCount + v.cacheMissCount)} % + + } else { + + }} } case None => From 8a2000a71c3f9c0a0d58f02b035f7470621d1474 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 17 Sep 2014 01:46:29 +0900 Subject: [PATCH 4/4] Modified JsonProtocolSuite to adjust modified JsonProtocol --- .../apache/spark/util/JsonProtocolSuite.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 2b45d8b695853..998751f42d0cc 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -43,10 +43,10 @@ class JsonProtocolSuite extends FunSuite { SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true)) val taskEnd = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success, makeTaskInfo(123L, 234, 67, 345L, false), - makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false)) + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, 180, 20, hasHadoopInput = false)) val taskEndWithHadoopInput = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success, makeTaskInfo(123L, 234, 67, 345L, false), - makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true)) + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, 90, 10, hasHadoopInput = true)) val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties) val jobEnd = SparkListenerJobEnd(20, JobSucceeded) val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]]( @@ -83,7 +83,7 @@ class JsonProtocolSuite extends FunSuite { testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L)) testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false)) - testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false)) + testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, 500, 500, hasHadoopInput = false)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500)) // StorageLevel @@ -146,7 +146,7 @@ class JsonProtocolSuite extends FunSuite { test("InputMetrics backward compatibility") { // InputMetrics were added after 1.0.1. - val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true) + val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, 3, 7, hasHadoopInput = true) assert(metrics.inputMetrics.nonEmpty) val newJson = JsonProtocol.taskMetricsToJson(metrics) val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" } @@ -551,6 +551,8 @@ class JsonProtocolSuite extends FunSuite { d: Long, e: Int, f: Int, + g: Long, + h: Long, hasHadoopInput: Boolean) = { val t = new TaskMetrics val sw = new ShuffleWriteMetrics @@ -561,6 +563,8 @@ class JsonProtocolSuite extends FunSuite { t.jvmGCTime = d t.resultSerializationTime = a + b t.memoryBytesSpilled = a + c + t.cacheHitCount = g + t.cacheMissCount = h if (hasHadoopInput) { val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) @@ -805,6 +809,8 @@ class JsonProtocolSuite extends FunSuite { | "Result Serialization Time": 700, | "Memory Bytes Spilled": 800, | "Disk Bytes Spilled": 0, + | "Cache Hit Count": 180, + | "Cache Miss Count": 20, | "Shuffle Read Metrics": { | "Shuffle Finish Time": 900, | "Remote Blocks Fetched": 800, @@ -889,6 +895,8 @@ class JsonProtocolSuite extends FunSuite { | "Result Serialization Time": 700, | "Memory Bytes Spilled": 800, | "Disk Bytes Spilled": 0, + | "Cache Hit Count": 90, + | "Cache Miss Count": 10, | "Shuffle Write Metrics": { | "Shuffle Bytes Written": 1200, | "Shuffle Write Time": 1500