Skip to content

Commit

Permalink
Add two more metrics: state cache hit / miss count in HDFS state prov…
Browse files Browse the repository at this point in the history
…ider
  • Loading branch information
HeartSaVioR committed Jul 2, 2018
1 parent b620254 commit c9aada5
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.state

import java.io._
import java.util.Locale
import java.util.concurrent.atomic.LongAdder

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -182,7 +183,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit

def getCustomMetricsForProvider(): Map[StateStoreCustomMetric, Long] = synchronized {
Map(metricProviderLoaderMapSizeBytes -> SizeEstimator.estimate(loadedMaps),
metricProviderLoaderCountOfVersionsInMap -> loadedMaps.size)
metricProviderLoaderCountOfVersionsInMap -> loadedMaps.size,
metricLoadedMapCacheHit -> loadedMapCacheHitCount.sum(),
metricLoadedMapCacheMiss -> loadedMapCacheMissCount.sum())
}

/** Get the state store for making updates to create a new `version` of the store. */
Expand Down Expand Up @@ -230,7 +233,8 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
}

override def supportedCustomMetrics: Seq[StateStoreCustomMetric] = {
metricProviderLoaderMapSizeBytes :: metricProviderLoaderCountOfVersionsInMap :: Nil
metricProviderLoaderMapSizeBytes :: metricProviderLoaderCountOfVersionsInMap ::
metricLoadedMapCacheHit :: metricLoadedMapCacheMiss :: Nil
}

override def toString(): String = {
Expand All @@ -251,6 +255,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf)
private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)

private val loadedMapCacheHitCount: LongAdder = new LongAdder
private val loadedMapCacheMissCount: LongAdder = new LongAdder

private lazy val metricProviderLoaderMapSizeBytes: StateStoreCustomSizeMetric =
StateStoreCustomSizeMetric("providerLoadedMapSizeBytes",
"estimated size of states cache in provider")
Expand All @@ -259,6 +266,14 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
StateStoreCustomAverageMetric("providerLoadedMapCountOfVersions",
"count of versions in states cache in provider")

private lazy val metricLoadedMapCacheHit: StateStoreCustomMetric =
StateStoreCustomSumMetric("loadedMapCacheHitCount",
"count of cache hit on states cache in provider")

private lazy val metricLoadedMapCacheMiss: StateStoreCustomMetric =
StateStoreCustomSumMetric("loadedMapCacheMissCount",
"count of cache miss on states cache in provider")

private case class StoreFile(version: Long, path: Path, isSnapshot: Boolean)

private def commitUpdates(newVersion: Long, map: MapType, output: DataOutputStream): Unit = {
Expand Down Expand Up @@ -290,13 +305,16 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
// Shortcut if the map for this version is already there to avoid a redundant put.
val loadedCurrentVersionMap = synchronized { loadedMaps.get(version) }
if (loadedCurrentVersionMap.isDefined) {
loadedMapCacheHitCount.increment()
return loadedCurrentVersionMap.get
}

logWarning(s"The state for version $version doesn't exist in loadedMaps. " +
"Reading snapshot file and delta files if needed..." +
"Note that this is normal for the first batch of starting query.")

loadedMapCacheMissCount.increment()

val (result, elapsedMs) = Utils.timeTakenMs {
val snapshotCurrentVersionMap = readSnapshotFile(version)
if (snapshotCurrentVersionMap.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ trait StateStoreCustomMetric {
def desc: String
}

case class StateStoreCustomSumMetric(name: String, desc: String) extends StateStoreCustomMetric
case class StateStoreCustomAverageMetric(name: String, desc: String) extends StateStoreCustomMetric
case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric
case class StateStoreCustomTimingMetric(name: String, desc: String) extends StateStoreCustomMetric
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ class SymmetricHashJoinStateManager(
keyWithIndexToValueMetrics.numKeys, // represent each buffered row only once
keyToNumValuesMetrics.memoryUsedBytes + keyWithIndexToValueMetrics.memoryUsedBytes,
keyWithIndexToValueMetrics.customMetrics.map {
case (s @ StateStoreCustomSumMetric(_, desc), value) =>
s.copy(desc = newDesc(desc)) -> value
case (s @ StateStoreCustomAverageMetric(_, desc), value) =>
s.copy(desc = newDesc(desc)) -> value
case (s @ StateStoreCustomSizeMetric(_, desc), value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
private def stateStoreCustomMetrics: Map[String, SQLMetric] = {
val provider = StateStoreProvider.create(sqlContext.conf.stateStoreProviderClass)
provider.supportedCustomMetrics.map {
case StateStoreCustomSumMetric(name, desc) =>
name -> SQLMetrics.createMetric(sparkContext, desc)
case StateStoreCustomAverageMetric(name, desc) =>
name -> SQLMetrics.createAverageMetric(sparkContext, desc)
case StateStoreCustomSizeMetric(name, desc) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,12 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
assert(loadedMapSize.isDefined)
val initialLoadedMapSize = loadedMapSize.get._2
assert(initialLoadedMapSize >= 0)
var cacheHitCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheHitCount")
assert(cacheHitCount.isDefined)
assert(cacheHitCount.get._2 == 0)
var cacheMissCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheMissCount")
assert(cacheMissCount.isDefined)
assert(cacheMissCount.get._2 == 0)

put(store, "a", 1)
assert(store.metrics.numKeys === 1)
Expand All @@ -540,6 +546,12 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
assert(loadedMapSize.isDefined)
val loadedMapSizeForVersion1 = loadedMapSize.get._2
assert(loadedMapSizeForVersion1 > initialLoadedMapSize)
cacheHitCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheHitCount")
assert(cacheHitCount.isDefined)
assert(cacheHitCount.get._2 == 0)
cacheMissCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheMissCount")
assert(cacheMissCount.isDefined)
assert(cacheMissCount.get._2 == 0)

val storeV2 = provider.getStore(1)
assert(!storeV2.hasCommitted)
Expand All @@ -556,6 +568,12 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
assert(loadedMapSize.isDefined)
val loadedMapSizeForVersion1And2 = loadedMapSize.get._2
assert(loadedMapSizeForVersion1And2 > loadedMapSizeForVersion1)
cacheHitCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheHitCount")
assert(cacheHitCount.isDefined)
assert(cacheHitCount.get._2 == 1)
cacheMissCount = store.metrics.customMetrics.find(_._1.name == "loadedMapCacheMissCount")
assert(cacheMissCount.isDefined)
assert(cacheMissCount.get._2 == 0)

val reloadedProvider = newStoreProvider(store.id)
// intended to load version 2 instead of 1
Expand All @@ -569,6 +587,14 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
.find(_._1.name == "providerLoadedMapSizeBytes")
assert(loadedMapSize.isDefined)
assert(loadedMapSize.get._2 === loadedMapSizeForVersion1)
cacheHitCount = reloadedStore.metrics.customMetrics
.find(_._1.name == "loadedMapCacheHitCount")
assert(cacheHitCount.isDefined)
assert(cacheHitCount.get._2 == 0)
cacheMissCount = reloadedStore.metrics.customMetrics
.find(_._1.name == "loadedMapCacheMissCount")
assert(cacheMissCount.isDefined)
assert(cacheMissCount.get._2 == 1)

// now we are loading version 2
val reloadedStoreV2 = reloadedProvider.getStore(2)
Expand All @@ -580,6 +606,14 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
.find(_._1.name == "providerLoadedMapSizeBytes")
assert(loadedMapSize.isDefined)
assert(loadedMapSize.get._2 > loadedMapSizeForVersion1)
cacheHitCount = reloadedStoreV2.metrics.customMetrics
.find(_._1.name == "loadedMapCacheHitCount")
assert(cacheHitCount.isDefined)
assert(cacheHitCount.get._2 == 0)
cacheMissCount = reloadedStoreV2.metrics.customMetrics
.find(_._1.name == "loadedMapCacheMissCount")
assert(cacheMissCount.isDefined)
assert(cacheMissCount.get._2 == 2)
}

override def newStoreProvider(): HDFSBackedStateStoreProvider = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "numRowsUpdated" : 1,
| "memoryUsedBytes" : 2,
| "customMetrics" : {
| "loadedMapCacheHitCount" : 1,
| "loadedMapCacheMissCount" : 0,
| "providerLoadedMapSizeBytes" : 3
| }
| } ],
Expand Down Expand Up @@ -234,7 +236,8 @@ object StreamingQueryStatusAndProgressSuite {
"watermark" -> "2016-12-05T20:54:20.827Z").asJava),
stateOperators = Array(new StateOperatorProgress(
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2,
customMetrics = new java.util.HashMap(Map("providerLoadedMapSizeBytes" -> 3L)
customMetrics = new java.util.HashMap(Map("providerLoadedMapSizeBytes" -> 3L,
"loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L)
.mapValues(long2Long).asJava)
)),
sources = Array(
Expand Down

0 comments on commit c9aada5

Please sign in to comment.