Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,12 @@ def get_number_of_public_methods(clz):
)
self.assertEqual(
get_number_of_public_methods("org.apache.spark.sql.streaming.StateOperatorProgress"),
27,
# SPARK-56537: bumped from 27 to 30 due to the new snapshotCustomMetricNames
# constructor parameter (getter + synthetic default) and the new internal
# copyForNoExecution() method on StateOperatorProgress. Both are non-public
# API (private[spark] / private[sql]) so they are not mirrored on the
# Python side; only the count needs updating.
30,
msg,
)
self.assertEqual(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ class StateOperatorProgress private[spark] (
val numRowsDroppedByWatermark: Long,
val numShufflePartitions: Long,
val numStateStoreInstances: Long,
val customMetrics: ju.Map[String, JLong] = new ju.HashMap())
val customMetrics: ju.Map[String, JLong] = new ju.HashMap(),
// Names of customMetrics entries treated as snapshots of state-store status;
// preserved by copyForNoExecution() and not surfaced in JSON output.
private[spark] val snapshotCustomMetricNames: Set[String] = Set.empty)
extends Serializable {

/** The compact JSON representation of this progress. */
Expand All @@ -81,7 +84,34 @@ class StateOperatorProgress private[spark] (
numRowsDroppedByWatermark = newNumRowsDroppedByWatermark,
numShufflePartitions = numShufflePartitions,
numStateStoreInstances = numStateStoreInstances,
customMetrics = customMetrics)
customMetrics = customMetrics,
snapshotCustomMetricNames = snapshotCustomMetricNames)

/**
* Returns a copy of this progress suitable for a no-data trigger event. Per-batch fields (row
* counts, time-Ms fields, and customMetrics entries not in `snapshotCustomMetricNames`) are
* zeroed; snapshot fields and snapshot customMetrics entries are preserved.
*/
private[sql] def copyForNoExecution(): StateOperatorProgress = {
val newCustomMetrics = new ju.HashMap[String, JLong](customMetrics.size())
customMetrics.forEach { (k, v) =>
newCustomMetrics.put(k, if (snapshotCustomMetricNames.contains(k)) v else 0L)
}
new StateOperatorProgress(
operatorName = operatorName,
numRowsTotal = numRowsTotal,
numRowsUpdated = 0L,
allUpdatesTimeMs = 0L,
numRowsRemoved = 0L,
allRemovalsTimeMs = 0L,
commitTimeMs = 0L,
memoryUsedBytes = memoryUsedBytes,
numRowsDroppedByWatermark = 0L,
numShufflePartitions = numShufflePartitions,
numStateStoreInstances = numStateStoreInstances,
customMetrics = newCustomMetrics,
snapshotCustomMetricNames = snapshotCustomMetricNames)
}

private[sql] def jsonValue: JValue = {
("operatorName" -> JString(operatorName)) ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ trait StatefulOperatorCustomMetric {
def name: String
def desc: String
def createSQLMetric(sparkContext: SparkContext): SQLMetric
// True if the metric reflects current state rather than per-batch work; snapshot
// metrics are preserved on no-data trigger events. Mirrors StateStoreCustomMetric.
def isSnapshot: Boolean = false
}

/** Custom stateful operator metric for simple "count" gauge */
Expand Down Expand Up @@ -402,7 +405,8 @@ trait StateStoreWriter
numRowsDroppedByWatermark = longMetric("numRowsDroppedByWatermark").value,
numShufflePartitions = stateInfo.map(_.numPartitions.toLong).getOrElse(-1L),
numStateStoreInstances = longMetric("numStateStoreInstances").value,
javaConvertedCustomMetrics
javaConvertedCustomMetrics,
snapshotCustomMetricNames
)
}

Expand Down Expand Up @@ -475,17 +479,43 @@ trait StateStoreWriter
}.toMap
}

private def stateStoreInstanceMetrics: Map[StateStoreInstanceMetric, SQLMetric] = {
// All instance metrics with their (partitionId, storeName) bindings; consumed by
// both `stateStoreInstanceMetrics` (for SQLMetric registration) and
// `snapshotCustomMetricNames` (for the snapshot-name set). The result is a
// serializable Seq so storing it as a lazy val on this trait is safe even when
// the enclosing SparkPlan is shipped to executors. The provider itself is NOT
// stored as a field (it is non-serializable), so each consumer below recreates
// it locally.
private lazy val stateStoreInstanceMetricsWithIds: Seq[StateStoreInstanceMetric] = {
val provider = StateStoreProvider.create(conf.stateStoreProviderClass)
val maxPartitions = stateInfo.map(_.numPartitions).getOrElse(conf.defaultNumShufflePartitions)

val maxPartitions =
stateInfo.map(_.numPartitions).getOrElse(conf.defaultNumShufflePartitions)
(0 until maxPartitions).flatMap { partitionId =>
provider.supportedInstanceMetrics.flatMap { metric =>
stateStoreNames.map { storeName =>
val metricWithPartition = metric.withNewId(partitionId, storeName)
(metricWithPartition, metricWithPartition.createSQLMetric(sparkContext))
}
stateStoreNames.map(metric.withNewId(partitionId, _))
}
}
}

// Names of customMetrics entries treated as snapshots; preserved by
// StateOperatorProgress.copyForNoExecution() on no-data trigger events. Includes
// provider- and operator-level metrics with isSnapshot = true, and all instance
// metric names (instance metrics use sentinel inits like -1 with monotonic
// combine, so they are always snapshot-style).
private lazy val snapshotCustomMetricNames: Set[String] = {
val provider = StateStoreProvider.create(conf.stateStoreProviderClass)
val customSnapshots = provider.supportedCustomMetrics.collect {
case m if m.isSnapshot => m.name
}.toSet
val operatorSnapshots = customStatefulOperatorMetrics.collect {
case m if m.isSnapshot => m.name
}.toSet
customSnapshots ++ operatorSnapshots ++ stateStoreInstanceMetricsWithIds.map(_.name).toSet
}

private def stateStoreInstanceMetrics: Map[StateStoreInstanceMetric, SQLMetric] = {
stateStoreInstanceMetricsWithIds.map { metric =>
(metric, metric.createSQLMetric(sparkContext))
}.toMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,13 +648,9 @@ abstract class ProgressContext(
* New execution stats will only retain the values as a snapshot of the query status.
* (E.g. for stateful operators, numRowsTotal is a snapshot of the status, whereas
* numRowsUpdated is bound to the batch.)
* TODO(SPARK-56537): We do not seem to clear up all values in StateOperatorProgress which are
* bound to the batch. Fix this.
*/
private def resetExecStatsForNoExecution(originExecStats: ExecutionStats): ExecutionStats = {
val newStatefulOperators = originExecStats.stateOperators.map { so =>
so.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark = 0, newNumRowsRemoved = 0)
}
val newStatefulOperators = originExecStats.stateOperators.map(_.copyForNoExecution())
val newEventTimeStats = if (originExecStats.eventTimeStats.contains("watermark")) {
Map("watermark" -> progressReporter.formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with

private lazy val metricStateOnCurrentVersionSizeBytes: StateStoreCustomSizeMetric =
StateStoreCustomSizeMetric("stateOnCurrentVersionSizeBytes",
"estimated size of state only on current version")
"estimated size of state only on current version",
isSnapshot = true)

private lazy val metricLoadedMapCacheHit: StateStoreCustomMetric =
StateStoreCustomSumMetric("loadedMapCacheHitCount",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1488,22 +1488,27 @@ object RocksDBStateStoreProvider {
val CUSTOM_METRIC_FLUSH_WRITTEN_BYTES = StateStoreCustomSizeMetric(
"rocksdbTotalBytesWrittenByFlush",
"RocksDB: flush - total bytes written by flush")
// Snapshot metrics: read current RocksDB state, preserved on no-data trigger events.
val CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE = StateStoreCustomSizeMetric(
"rocksdbPinnedBlocksMemoryUsage",
"RocksDB: memory usage for pinned blocks")
"RocksDB: memory usage for pinned blocks",
isSnapshot = true)
val CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES_KEYS = StateStoreCustomSizeMetric(
"rocksdbNumInternalColFamiliesKeys",
"RocksDB: number of internal keys for internal column families")
"RocksDB: number of internal keys for internal column families",
isSnapshot = true)
val CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES = StateStoreCustomSizeMetric(
"rocksdbNumExternalColumnFamilies",
"RocksDB: number of external column families")
"RocksDB: number of external column families",
isSnapshot = true)
val CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES = StateStoreCustomSizeMetric(
"rocksdbNumInternalColumnFamilies",
"RocksDB: number of internal column families")
"RocksDB: number of internal column families",
isSnapshot = true)

// Total SST file size
// Total SST file size (snapshot).
val CUSTOM_METRIC_SST_FILE_SIZE = StateStoreCustomSizeMetric(
"rocksdbSstFileSize", "RocksDB: size of all SST files")
"rocksdbSstFileSize", "RocksDB: size of all SST files", isSnapshot = true)
val CUSTOM_METRIC_NUM_SNAPSHOTS_AUTO_REPAIRED = StateStoreCustomSumMetric(
"rocksdbNumSnapshotsAutoRepaired",
"RocksDB: number of snapshots that were automatically repaired during store load")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,10 @@ trait StateStoreCustomMetric {
def desc: String
def withNewDesc(desc: String): StateStoreCustomMetric
def createSQLMetric(sparkContext: SparkContext): SQLMetric

// True if the metric reflects current store state (e.g. file size, memory) rather
// than per-batch work; snapshot metrics are preserved on no-data trigger events.
def isSnapshot: Boolean = false
}

case class StateStoreCustomSumMetric(name: String, desc: String) extends StateStoreCustomMetric {
Expand All @@ -546,7 +550,10 @@ case class StateStoreCustomSumMetric(name: String, desc: String) extends StateSt
SQLMetrics.createMetric(sparkContext, desc)
}

case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric {
case class StateStoreCustomSizeMetric(
name: String,
desc: String,
override val isSnapshot: Boolean = false) extends StateStoreCustomMetric {
override def withNewDesc(desc: String): StateStoreCustomSizeMetric = copy(desc = desc)

override def createSQLMetric(sparkContext: SparkContext): SQLMetric =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ package org.apache.spark.sql.execution.streaming
import org.scalatest.concurrent.PatienceConfiguration.Timeout

import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
import org.apache.spark.sql.functions.{count, timestamp_seconds, window}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger}
import org.apache.spark.sql.streaming.{OutputMode, StateOperatorProgress, StreamTest, Trigger}
import org.apache.spark.sql.streaming.util.StreamManualClock

class ProgressReporterSuite extends StreamTest {
import testImplicits._

test("no-data batch resets numRowsRemoved to zero" +
test("no-data batch resets all per-batch StateOperatorProgress fields to zero" +
" via resetExecStatsForNoExecution") {
val clock = new StreamManualClock
val input = MemoryStream[Int]
Expand Down Expand Up @@ -73,7 +74,7 @@ class ProgressReporterSuite extends StreamTest {
.exists(_.stateOperators.head.numRowsRemoved > 0)
assert(removed, "Expected numRowsRemoved > 0")
},
// Idle trigger finishNoExecutionTrigger calls
// Idle trigger: finishNoExecutionTrigger calls
// resetExecStatsForNoExecution which must zero out
// per-batch metrics.
AdvanceManualClock(1 * 1000),
Expand All @@ -94,10 +95,135 @@ class ProgressReporterSuite extends StreamTest {
assert(so.numRowsUpdated === 0, s"numRowsUpdated=${so.numRowsUpdated}")
assert(so.numRowsDroppedByWatermark === 0,
s"numRowsDroppedByWatermark=${so.numRowsDroppedByWatermark}")
assert(so.allUpdatesTimeMs === 0,
s"allUpdatesTimeMs=${so.allUpdatesTimeMs}")
assert(so.allRemovalsTimeMs === 0,
s"allRemovalsTimeMs=${so.allRemovalsTimeMs}")
assert(so.commitTimeMs === 0,
s"commitTimeMs=${so.commitTimeMs}")
}
},
StopStream
)
}
}

test("SPARK-56537: no-data batch resets per-batch customMetrics but" +
" preserves snapshot customMetrics (RocksDB)") {
val clock = new StreamManualClock
val input = MemoryStream[Int]
val agg = input.toDF()
.select(timestamp_seconds($"value") as "ts", $"value")
.withWatermark("ts", "10 seconds")
.groupBy(window($"ts", "10 seconds"))
.agg(count("*") as "cnt")
.select($"window".getField("start").cast("long"), $"cnt")

withSQLConf(
SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.STREAMING_POLLING_DELAY.key -> "0",
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "0") {
testStream(agg, outputMode = OutputMode.Update)(
StartStream(
Trigger.ProcessingTime("1 second"),
triggerClock = clock),
// Batch 0: real data, populates customMetrics with non-zero per-batch values.
AddData(input, 1, 2, 3),
AdvanceManualClock(1 * 1000),
CheckNewAnswer((0L, 3L)),
// Idle trigger.
AdvanceManualClock(1 * 1000),
Execute("verify customMetrics behavior on idle trigger") { q =>
eventually(Timeout(streamingTimeout)) {
val progress = q.recentProgress.filter(_.stateOperators.nonEmpty)
val lastDataIdx = progress.lastIndexWhere { p =>
p.durationMs.containsKey("addBatch")
}
assert(lastDataIdx >= 0, "no data batch found")
val idleIdx = progress.indexWhere(
!_.durationMs.containsKey("addBatch"), lastDataIdx + 1)
assert(idleIdx > lastDataIdx,
"no idle trigger found after data batch")

val dataCm = progress(lastDataIdx).stateOperators.head.customMetrics
val idleCm = progress(idleIdx).stateOperators.head.customMetrics

// Per-batch RocksDB metrics: zeroed on idle. The metric must be present
// in the map (we keep keys consistent across data and idle progress).
Seq("rocksdbCommitFlushLatency",
"rocksdbPutCount",
"rocksdbTotalBytesWritten").foreach { k =>
assert(idleCm.containsKey(k), s"$k missing on idle")
assert(idleCm.get(k) === 0L,
s"per-batch metric $k expected 0 on idle, got ${idleCm.get(k)}")
}

// Snapshot RocksDB metrics: value unchanged across idle trigger.
Seq("rocksdbPinnedBlocksMemoryUsage",
"rocksdbNumInternalColFamiliesKeys",
"rocksdbNumExternalColumnFamilies",
"rocksdbNumInternalColumnFamilies",
"rocksdbSstFileSize").foreach { k =>
assert(idleCm.containsKey(k), s"$k missing on idle")
assert(idleCm.get(k) === dataCm.get(k),
s"snapshot metric $k changed across idle trigger: " +
s"data=${dataCm.get(k)} idle=${idleCm.get(k)}")
}
}
},
StopStream
)
}
}

test("SPARK-56537: copyForNoExecution zeroes per-batch fields and preserves snapshot fields") {
val customMetrics = new java.util.HashMap[String, java.lang.Long]()
customMetrics.put("perBatchTimer", 100L)
customMetrics.put("perBatchCounter", 50L)
customMetrics.put("snapshotSize", 999L)
val orig = new StateOperatorProgress(
operatorName = "op",
numRowsTotal = 50L,
numRowsUpdated = 10L,
allUpdatesTimeMs = 7L,
numRowsRemoved = 3L,
allRemovalsTimeMs = 5L,
commitTimeMs = 11L,
memoryUsedBytes = 2048L,
numRowsDroppedByWatermark = 2L,
numShufflePartitions = 4L,
numStateStoreInstances = 4L,
customMetrics = customMetrics,
snapshotCustomMetricNames = Set("snapshotSize"))

val out = orig.copyForNoExecution()

// Per-batch fields are zeroed.
assert(out.numRowsUpdated === 0L)
assert(out.allUpdatesTimeMs === 0L)
assert(out.numRowsRemoved === 0L)
assert(out.allRemovalsTimeMs === 0L)
assert(out.commitTimeMs === 0L)
assert(out.numRowsDroppedByWatermark === 0L)

// Snapshot fields are preserved.
assert(out.operatorName === "op")
assert(out.numRowsTotal === 50L)
assert(out.memoryUsedBytes === 2048L)
assert(out.numShufflePartitions === 4L)
assert(out.numStateStoreInstances === 4L)

// customMetrics: per-batch zeroed, snapshot preserved.
assert(out.customMetrics.get("perBatchTimer") === 0L)
assert(out.customMetrics.get("perBatchCounter") === 0L)
assert(out.customMetrics.get("snapshotSize") === 999L)

// Original is not mutated.
assert(orig.numRowsUpdated === 10L)
assert(orig.customMetrics.get("perBatchTimer") === 100L)

// snapshotCustomMetricNames is preserved so subsequent copy() round-trips work.
assert(out.snapshotCustomMetricNames === Set("snapshotSize"))
}
}