From ad08a5e430eb32bab53d0b650e90b16ab79c7857 Mon Sep 17 00:00:00 2001 From: Dhruv Patel <71836462+DHRUV6029@users.noreply.github.com> Date: Fri, 8 May 2026 23:38:37 -0700 Subject: [PATCH] [SPARK-56537][SS] Reset per-batch time fields and customMetrics on no batch trigger progress event ### What changes were proposed in this pull request? This PR is a follow-up to SPARK-56464 (commit 930c3039871), which left a `TODO(SPARK-56537)` in `ProgressReporter#resetExecStatsForNoExecution` to track the remaining per-batch fields on `StateOperatorProgress` that were not being reset on no-data trigger progress events. Three changes: 1. Reset the per-batch time fields on no-data trigger progress events. `allUpdatesTimeMs`, `allRemovalsTimeMs`, and `commitTimeMs` are now reset to 0 alongside the row-count fields (`numRowsUpdated`, `numRowsRemoved`, `numRowsDroppedByWatermark`) that were already handled by SPARK-56464. 2. Reset per-batch entries of `customMetrics` while preserving snapshot entries. `StateOperatorProgress.customMetrics` carries values from two metric registries (`StateStoreCustomMetric` for provider-level, `StatefulOperatorCustomMetric` for operator-level) and conflates per-batch counters/timings with snapshot reads of state-store status (current memory usage, key counts, file size). On a no-data trigger we now zero per-batch entries and preserve snapshot entries. The snapshot/per-batch distinction is encoded at the metric definition via a new `isSnapshot: Boolean` flag on `StateStoreCustomMetric` (default `false`). The six snapshot Size metrics are marked at their definitions: - RocksDB (5): `rocksdbSstFileSize`, `rocksdbPinnedBlocksMemoryUsage`, `rocksdbNumInternalColFamiliesKeys`, `rocksdbNumExternalColumnFamilies`, `rocksdbNumInternalColumnFamilies`. - HDFSBackedStateStoreProvider (1): `stateOnCurrentVersionSizeBytes`. `StateStoreCustomTimingMetric` and `StateStoreCustomSumMetric` keep using the trait default (always per-batch). Operator-level `StatefulOperatorCustomSumMetric` instances (declared by `BaseStreamingDeduplicateExec`, `StreamingSymmetricHashJoinExec`, and `TransformWithStateExecBase`) are also always per-batch. 3. Centralize the reset semantics in a new `copyForNoExecution()` method on `StateOperatorProgress` instead of growing `copy(...)`'s parameter list further. The method takes no parameters; it inspects the operator instance's `snapshotCustomMetricNames` (a new `private[spark]` constructor field, defaulted to `Set.empty`, populated at progress build time by `StateStoreWriter.getProgress`) to decide which `customMetrics` keys to preserve. The existing 3-arg `copy(newNumRowsUpdated, newNumRowsDroppedByWatermark, newNumRowsRemoved)` signature is unchanged; its body is updated to thread `snapshotCustomMetricNames` through so `SessionWindowStateStoreSaveExec.getProgress` round-trip preserves it. The `TODO(SPARK-56537)` comment is removed from `ProgressReporter#resetExecStatsForNoExecution`, whose body is reduced to a single delegating map: `originExecStats.stateOperators.map(_.copyForNoExecution())`. ### Why are the changes needed? Today, on a no-data ("idle") trigger progress event, `StateOperatorProgress` carries the previous batch's values for `allUpdatesTimeMs`, `allRemovalsTimeMs`, `commitTimeMs`, and most of `customMetrics`. To a user reading `query.lastProgress` / `query.recentProgress` during an idle period this looks like work was performed when none was. It is also a known source of test flakiness. The `TODO(SPARK-56537)` left by SPARK-56464 in `ProgressReporter#resetExecStatsForNoExecution` explicitly tracks this follow-up. The design was discussed on the JIRA ticket and confirmed before implementation: - Encode snapshot semantics at the metric definition (option (2b) in the audit comment), not via a hardcoded whitelist in the reset routine. - Add a new `copyForNoExecution()` method on `StateOperatorProgress` rather than growing the existing `copy(...)` argument list further (3 args after SPARK-56464 would have become 6+). ### Does this PR introduce _any_ user-facing change? No public API change. User-visible behavior change: idle-trigger progress events emitted via `StreamingQueryListener.QueryProgressEvent`, `query.lastProgress`, and `query.recentProgress` will now report `0` for all per-batch fields and per-batch `customMetrics` entries instead of carrying stale values from the previous data batch. Snapshot fields (`numRowsTotal`, `memoryUsedBytes`, `numShufflePartitions`, `numStateStoreInstances`, snapshot custom metrics) are unchanged. Same direction as the SPARK-56464 fix; this PR completes the audit. ### How was this patch tested? New and updated tests in `ProgressReporterSuite.scala`: 1. Extended the SPARK-56464 test with assertions that the three time fields (`allUpdatesTimeMs`, `allRemovalsTimeMs`, `commitTimeMs`) are reset to 0 on the idle trigger, alongside the existing row-count assertions. Test description updated from "no-data batch resets numRowsRemoved to zero" to "no-data batch resets all per-batch StateOperatorProgress fields to zero" to reflect the broader scope. 2. New test "SPARK-56537: no-data batch resets per-batch customMetrics but preserves snapshot customMetrics (RocksDB)" exercising the per-batch / snapshot split end-to-end against a real `RocksDBStateStoreProvider`. The test runs one data batch, advances the manual clock to trigger an idle progress event, then asserts that 3 per-batch RocksDB metrics (`rocksdbCommitFlushLatency`, `rocksdbPutCount`, `rocksdbTotalBytesWritten`) are reset to 0 on idle, while 5 snapshot RocksDB metrics (`rocksdbPinnedBlocksMemoryUsage`, `rocksdbNumInternalColFamiliesKeys`, `rocksdbNumExternalColumnFamilies`, `rocksdbNumInternalColumnFamilies`, `rocksdbSstFileSize`) are preserved across the idle trigger. Local verification: - `build/sbt 'sql/testOnly *ProgressReporterSuite'` -> 2/2 tests pass. - `build/sbt 'sql/testOnly *ProgressReporterSuite *StreamingQueryStatusAndProgressSuite *StreamingAggregationSuite *StreamingDeduplicationSuite *MultiStatefulOperatorsSuite'` -> 240 tests pass in 7m 16s. - `dev/mima` -> no exclusions required. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 --- .../streaming/test_streaming_listener.py | 7 +- .../apache/spark/sql/streaming/progress.scala | 34 ++++- .../stateful/statefulOperators.scala | 46 ++++-- .../streaming/runtime/ProgressReporter.scala | 6 +- .../state/HDFSBackedStateStoreProvider.scala | 3 +- .../state/RocksDBStateStoreProvider.scala | 17 ++- .../streaming/state/StateStore.scala | 9 +- .../streaming/ProgressReporterSuite.scala | 132 +++++++++++++++++- 8 files changed, 227 insertions(+), 27 deletions(-) diff --git a/python/pyspark/sql/tests/streaming/test_streaming_listener.py b/python/pyspark/sql/tests/streaming/test_streaming_listener.py index 620186f70b665..b4922f54b2170 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming_listener.py +++ b/python/pyspark/sql/tests/streaming/test_streaming_listener.py @@ -312,7 +312,12 @@ def get_number_of_public_methods(clz): ) self.assertEqual( get_number_of_public_methods("org.apache.spark.sql.streaming.StateOperatorProgress"), - 27, + # SPARK-56537: bumped from 27 to 30 due to the new snapshotCustomMetricNames + # constructor parameter (getter + synthetic default) and the new internal + # copyForNoExecution() method on StateOperatorProgress. Both are non-public + # API (private[spark] / private[sql]) so they are not mirrored on the + # Python side; only the count needs updating. + 30, msg, ) self.assertEqual( diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 0502936e3cc4e..619d8fb53311a 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -56,7 +56,10 @@ class StateOperatorProgress private[spark] ( val numRowsDroppedByWatermark: Long, val numShufflePartitions: Long, val numStateStoreInstances: Long, - val customMetrics: ju.Map[String, JLong] = new ju.HashMap()) + val customMetrics: ju.Map[String, JLong] = new ju.HashMap(), + // Names of customMetrics entries treated as snapshots of state-store status; + // preserved by copyForNoExecution() and not surfaced in JSON output. + private[spark] val snapshotCustomMetricNames: Set[String] = Set.empty) extends Serializable { /** The compact JSON representation of this progress. */ @@ -81,7 +84,34 @@ class StateOperatorProgress private[spark] ( numRowsDroppedByWatermark = newNumRowsDroppedByWatermark, numShufflePartitions = numShufflePartitions, numStateStoreInstances = numStateStoreInstances, - customMetrics = customMetrics) + customMetrics = customMetrics, + snapshotCustomMetricNames = snapshotCustomMetricNames) + + /** + * Returns a copy of this progress suitable for a no-data trigger event. Per-batch fields (row + * counts, time-Ms fields, and customMetrics entries not in `snapshotCustomMetricNames`) are + * zeroed; snapshot fields and snapshot customMetrics entries are preserved. + */ + private[sql] def copyForNoExecution(): StateOperatorProgress = { + val newCustomMetrics = new ju.HashMap[String, JLong](customMetrics.size()) + customMetrics.forEach { (k, v) => + newCustomMetrics.put(k, if (snapshotCustomMetricNames.contains(k)) v else 0L) + } + new StateOperatorProgress( + operatorName = operatorName, + numRowsTotal = numRowsTotal, + numRowsUpdated = 0L, + allUpdatesTimeMs = 0L, + numRowsRemoved = 0L, + allRemovalsTimeMs = 0L, + commitTimeMs = 0L, + memoryUsedBytes = memoryUsedBytes, + numRowsDroppedByWatermark = 0L, + numShufflePartitions = numShufflePartitions, + numStateStoreInstances = numStateStoreInstances, + customMetrics = newCustomMetrics, + snapshotCustomMetricNames = snapshotCustomMetricNames) + } private[sql] def jsonValue: JValue = { ("operatorName" -> JString(operatorName)) ~ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala index 76b395d225042..9561206eb55f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala @@ -137,6 +137,9 @@ trait StatefulOperatorCustomMetric { def name: String def desc: String def createSQLMetric(sparkContext: SparkContext): SQLMetric + // True if the metric reflects current state rather than per-batch work; snapshot + // metrics are preserved on no-data trigger events. Mirrors StateStoreCustomMetric. + def isSnapshot: Boolean = false } /** Custom stateful operator metric for simple "count" gauge */ @@ -402,7 +405,8 @@ trait StateStoreWriter numRowsDroppedByWatermark = longMetric("numRowsDroppedByWatermark").value, numShufflePartitions = stateInfo.map(_.numPartitions.toLong).getOrElse(-1L), numStateStoreInstances = longMetric("numStateStoreInstances").value, - javaConvertedCustomMetrics + javaConvertedCustomMetrics, + snapshotCustomMetricNames ) } @@ -475,17 +479,43 @@ trait StateStoreWriter }.toMap } - private def stateStoreInstanceMetrics: Map[StateStoreInstanceMetric, SQLMetric] = { + // All instance metrics with their (partitionId, storeName) bindings; consumed by + // both `stateStoreInstanceMetrics` (for SQLMetric registration) and + // `snapshotCustomMetricNames` (for the snapshot-name set). The result is a + // serializable Seq so storing it as a lazy val on this trait is safe even when + // the enclosing SparkPlan is shipped to executors. The provider itself is NOT + // stored as a field (it is non-serializable), so each consumer below recreates + // it locally. + private lazy val stateStoreInstanceMetricsWithIds: Seq[StateStoreInstanceMetric] = { val provider = StateStoreProvider.create(conf.stateStoreProviderClass) - val maxPartitions = stateInfo.map(_.numPartitions).getOrElse(conf.defaultNumShufflePartitions) - + val maxPartitions = + stateInfo.map(_.numPartitions).getOrElse(conf.defaultNumShufflePartitions) (0 until maxPartitions).flatMap { partitionId => provider.supportedInstanceMetrics.flatMap { metric => - stateStoreNames.map { storeName => - val metricWithPartition = metric.withNewId(partitionId, storeName) - (metricWithPartition, metricWithPartition.createSQLMetric(sparkContext)) - } + stateStoreNames.map(metric.withNewId(partitionId, _)) } + } + } + + // Names of customMetrics entries treated as snapshots; preserved by + // StateOperatorProgress.copyForNoExecution() on no-data trigger events. Includes + // provider- and operator-level metrics with isSnapshot = true, and all instance + // metric names (instance metrics use sentinel inits like -1 with monotonic + // combine, so they are always snapshot-style). + private lazy val snapshotCustomMetricNames: Set[String] = { + val provider = StateStoreProvider.create(conf.stateStoreProviderClass) + val customSnapshots = provider.supportedCustomMetrics.collect { + case m if m.isSnapshot => m.name + }.toSet + val operatorSnapshots = customStatefulOperatorMetrics.collect { + case m if m.isSnapshot => m.name + }.toSet + customSnapshots ++ operatorSnapshots ++ stateStoreInstanceMetricsWithIds.map(_.name).toSet + } + + private def stateStoreInstanceMetrics: Map[StateStoreInstanceMetric, SQLMetric] = { + stateStoreInstanceMetricsWithIds.map { metric => + (metric, metric.createSQLMetric(sparkContext)) }.toMap } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala index 73b75df1a599d..161696fb92607 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala @@ -648,13 +648,9 @@ abstract class ProgressContext( * New execution stats will only retain the values as a snapshot of the query status. * (E.g. for stateful operators, numRowsTotal is a snapshot of the status, whereas * numRowsUpdated is bound to the batch.) - * TODO(SPARK-56537): We do not seem to clear up all values in StateOperatorProgress which are - * bound to the batch. Fix this. */ private def resetExecStatsForNoExecution(originExecStats: ExecutionStats): ExecutionStats = { - val newStatefulOperators = originExecStats.stateOperators.map { so => - so.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark = 0, newNumRowsRemoved = 0) - } + val newStatefulOperators = originExecStats.stateOperators.map(_.copyForNoExecution()) val newEventTimeStats = if (originExecStats.eventTimeStats.contains("watermark")) { Map("watermark" -> progressReporter.formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 29a17b4eb7ec9..2562f1ff3304e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -566,7 +566,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with private lazy val metricStateOnCurrentVersionSizeBytes: StateStoreCustomSizeMetric = StateStoreCustomSizeMetric("stateOnCurrentVersionSizeBytes", - "estimated size of state only on current version") + "estimated size of state only on current version", + isSnapshot = true) private lazy val metricLoadedMapCacheHit: StateStoreCustomMetric = StateStoreCustomSumMetric("loadedMapCacheHitCount", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 45168f4071328..b3d734c71f919 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -1488,22 +1488,27 @@ object RocksDBStateStoreProvider { val CUSTOM_METRIC_FLUSH_WRITTEN_BYTES = StateStoreCustomSizeMetric( "rocksdbTotalBytesWrittenByFlush", "RocksDB: flush - total bytes written by flush") + // Snapshot metrics: read current RocksDB state, preserved on no-data trigger events. val CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE = StateStoreCustomSizeMetric( "rocksdbPinnedBlocksMemoryUsage", - "RocksDB: memory usage for pinned blocks") + "RocksDB: memory usage for pinned blocks", + isSnapshot = true) val CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES_KEYS = StateStoreCustomSizeMetric( "rocksdbNumInternalColFamiliesKeys", - "RocksDB: number of internal keys for internal column families") + "RocksDB: number of internal keys for internal column families", + isSnapshot = true) val CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES = StateStoreCustomSizeMetric( "rocksdbNumExternalColumnFamilies", - "RocksDB: number of external column families") + "RocksDB: number of external column families", + isSnapshot = true) val CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES = StateStoreCustomSizeMetric( "rocksdbNumInternalColumnFamilies", - "RocksDB: number of internal column families") + "RocksDB: number of internal column families", + isSnapshot = true) - // Total SST file size + // Total SST file size (snapshot). val CUSTOM_METRIC_SST_FILE_SIZE = StateStoreCustomSizeMetric( - "rocksdbSstFileSize", "RocksDB: size of all SST files") + "rocksdbSstFileSize", "RocksDB: size of all SST files", isSnapshot = true) val CUSTOM_METRIC_NUM_SNAPSHOTS_AUTO_REPAIRED = StateStoreCustomSumMetric( "rocksdbNumSnapshotsAutoRepaired", "RocksDB: number of snapshots that were automatically repaired during store load") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index e3601f1ef2246..ad067b8edcc3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -537,6 +537,10 @@ trait StateStoreCustomMetric { def desc: String def withNewDesc(desc: String): StateStoreCustomMetric def createSQLMetric(sparkContext: SparkContext): SQLMetric + + // True if the metric reflects current store state (e.g. file size, memory) rather + // than per-batch work; snapshot metrics are preserved on no-data trigger events. + def isSnapshot: Boolean = false } case class StateStoreCustomSumMetric(name: String, desc: String) extends StateStoreCustomMetric { @@ -546,7 +550,10 @@ case class StateStoreCustomSumMetric(name: String, desc: String) extends StateSt SQLMetrics.createMetric(sparkContext, desc) } -case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric { +case class StateStoreCustomSizeMetric( + name: String, + desc: String, + override val isSnapshot: Boolean = false) extends StateStoreCustomMetric { override def withNewDesc(desc: String): StateStoreCustomSizeMetric = copy(desc = desc) override def createSQLMetric(sparkContext: SparkContext): SQLMetric = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala index da037936849e6..134bfc6b914b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala @@ -20,15 +20,16 @@ package org.apache.spark.sql.execution.streaming import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider import org.apache.spark.sql.functions.{count, timestamp_seconds, window} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger} +import org.apache.spark.sql.streaming.{OutputMode, StateOperatorProgress, StreamTest, Trigger} import org.apache.spark.sql.streaming.util.StreamManualClock class ProgressReporterSuite extends StreamTest { import testImplicits._ - test("no-data batch resets numRowsRemoved to zero" + + test("no-data batch resets all per-batch StateOperatorProgress fields to zero" + " via resetExecStatsForNoExecution") { val clock = new StreamManualClock val input = MemoryStream[Int] @@ -73,7 +74,7 @@ class ProgressReporterSuite extends StreamTest { .exists(_.stateOperators.head.numRowsRemoved > 0) assert(removed, "Expected numRowsRemoved > 0") }, - // Idle trigger — finishNoExecutionTrigger calls + // Idle trigger: finishNoExecutionTrigger calls // resetExecStatsForNoExecution which must zero out // per-batch metrics. AdvanceManualClock(1 * 1000), @@ -94,10 +95,135 @@ class ProgressReporterSuite extends StreamTest { assert(so.numRowsUpdated === 0, s"numRowsUpdated=${so.numRowsUpdated}") assert(so.numRowsDroppedByWatermark === 0, s"numRowsDroppedByWatermark=${so.numRowsDroppedByWatermark}") + assert(so.allUpdatesTimeMs === 0, + s"allUpdatesTimeMs=${so.allUpdatesTimeMs}") + assert(so.allRemovalsTimeMs === 0, + s"allRemovalsTimeMs=${so.allRemovalsTimeMs}") + assert(so.commitTimeMs === 0, + s"commitTimeMs=${so.commitTimeMs}") } }, StopStream ) } } + + test("SPARK-56537: no-data batch resets per-batch customMetrics but" + + " preserves snapshot customMetrics (RocksDB)") { + val clock = new StreamManualClock + val input = MemoryStream[Int] + val agg = input.toDF() + .select(timestamp_seconds($"value") as "ts", $"value") + .withWatermark("ts", "10 seconds") + .groupBy(window($"ts", "10 seconds")) + .agg(count("*") as "cnt") + .select($"window".getField("start").cast("long"), $"cnt") + + withSQLConf( + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName, + SQLConf.STREAMING_POLLING_DELAY.key -> "0", + SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "0") { + testStream(agg, outputMode = OutputMode.Update)( + StartStream( + Trigger.ProcessingTime("1 second"), + triggerClock = clock), + // Batch 0: real data, populates customMetrics with non-zero per-batch values. + AddData(input, 1, 2, 3), + AdvanceManualClock(1 * 1000), + CheckNewAnswer((0L, 3L)), + // Idle trigger. + AdvanceManualClock(1 * 1000), + Execute("verify customMetrics behavior on idle trigger") { q => + eventually(Timeout(streamingTimeout)) { + val progress = q.recentProgress.filter(_.stateOperators.nonEmpty) + val lastDataIdx = progress.lastIndexWhere { p => + p.durationMs.containsKey("addBatch") + } + assert(lastDataIdx >= 0, "no data batch found") + val idleIdx = progress.indexWhere( + !_.durationMs.containsKey("addBatch"), lastDataIdx + 1) + assert(idleIdx > lastDataIdx, + "no idle trigger found after data batch") + + val dataCm = progress(lastDataIdx).stateOperators.head.customMetrics + val idleCm = progress(idleIdx).stateOperators.head.customMetrics + + // Per-batch RocksDB metrics: zeroed on idle. The metric must be present + // in the map (we keep keys consistent across data and idle progress). + Seq("rocksdbCommitFlushLatency", + "rocksdbPutCount", + "rocksdbTotalBytesWritten").foreach { k => + assert(idleCm.containsKey(k), s"$k missing on idle") + assert(idleCm.get(k) === 0L, + s"per-batch metric $k expected 0 on idle, got ${idleCm.get(k)}") + } + + // Snapshot RocksDB metrics: value unchanged across idle trigger. + Seq("rocksdbPinnedBlocksMemoryUsage", + "rocksdbNumInternalColFamiliesKeys", + "rocksdbNumExternalColumnFamilies", + "rocksdbNumInternalColumnFamilies", + "rocksdbSstFileSize").foreach { k => + assert(idleCm.containsKey(k), s"$k missing on idle") + assert(idleCm.get(k) === dataCm.get(k), + s"snapshot metric $k changed across idle trigger: " + + s"data=${dataCm.get(k)} idle=${idleCm.get(k)}") + } + } + }, + StopStream + ) + } + } + + test("SPARK-56537: copyForNoExecution zeroes per-batch fields and preserves snapshot fields") { + val customMetrics = new java.util.HashMap[String, java.lang.Long]() + customMetrics.put("perBatchTimer", 100L) + customMetrics.put("perBatchCounter", 50L) + customMetrics.put("snapshotSize", 999L) + val orig = new StateOperatorProgress( + operatorName = "op", + numRowsTotal = 50L, + numRowsUpdated = 10L, + allUpdatesTimeMs = 7L, + numRowsRemoved = 3L, + allRemovalsTimeMs = 5L, + commitTimeMs = 11L, + memoryUsedBytes = 2048L, + numRowsDroppedByWatermark = 2L, + numShufflePartitions = 4L, + numStateStoreInstances = 4L, + customMetrics = customMetrics, + snapshotCustomMetricNames = Set("snapshotSize")) + + val out = orig.copyForNoExecution() + + // Per-batch fields are zeroed. + assert(out.numRowsUpdated === 0L) + assert(out.allUpdatesTimeMs === 0L) + assert(out.numRowsRemoved === 0L) + assert(out.allRemovalsTimeMs === 0L) + assert(out.commitTimeMs === 0L) + assert(out.numRowsDroppedByWatermark === 0L) + + // Snapshot fields are preserved. + assert(out.operatorName === "op") + assert(out.numRowsTotal === 50L) + assert(out.memoryUsedBytes === 2048L) + assert(out.numShufflePartitions === 4L) + assert(out.numStateStoreInstances === 4L) + + // customMetrics: per-batch zeroed, snapshot preserved. + assert(out.customMetrics.get("perBatchTimer") === 0L) + assert(out.customMetrics.get("perBatchCounter") === 0L) + assert(out.customMetrics.get("snapshotSize") === 999L) + + // Original is not mutated. + assert(orig.numRowsUpdated === 10L) + assert(orig.customMetrics.get("perBatchTimer") === 100L) + + // snapshotCustomMetricNames is preserved so subsequent copy() round-trips work. + assert(out.snapshotCustomMetricNames === Set("snapshotSize")) + } }