diff --git a/python/pyspark/sql/tests/streaming/test_streaming_listener.py b/python/pyspark/sql/tests/streaming/test_streaming_listener.py index 620186f70b665..b4922f54b2170 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming_listener.py +++ b/python/pyspark/sql/tests/streaming/test_streaming_listener.py @@ -312,7 +312,12 @@ def get_number_of_public_methods(clz): ) self.assertEqual( get_number_of_public_methods("org.apache.spark.sql.streaming.StateOperatorProgress"), - 27, + # SPARK-56537: bumped from 27 to 30 due to the new snapshotCustomMetricNames + # constructor parameter (getter + synthetic default) and the new internal + # copyForNoExecution() method on StateOperatorProgress. Both are non-public + # API (private[spark] / private[sql]) so they are not mirrored on the + # Python side; only the count needs updating. + 30, msg, ) self.assertEqual( diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 0502936e3cc4e..619d8fb53311a 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -56,7 +56,10 @@ class StateOperatorProgress private[spark] ( val numRowsDroppedByWatermark: Long, val numShufflePartitions: Long, val numStateStoreInstances: Long, - val customMetrics: ju.Map[String, JLong] = new ju.HashMap()) + val customMetrics: ju.Map[String, JLong] = new ju.HashMap(), + // Names of customMetrics entries treated as snapshots of state-store status; + // preserved by copyForNoExecution() and not surfaced in JSON output. + private[spark] val snapshotCustomMetricNames: Set[String] = Set.empty) extends Serializable { /** The compact JSON representation of this progress. */ @@ -81,7 +84,34 @@ class StateOperatorProgress private[spark] ( numRowsDroppedByWatermark = newNumRowsDroppedByWatermark, numShufflePartitions = numShufflePartitions, numStateStoreInstances = numStateStoreInstances, - customMetrics = customMetrics) + customMetrics = customMetrics, + snapshotCustomMetricNames = snapshotCustomMetricNames) + + /** + * Returns a copy of this progress suitable for a no-data trigger event. Per-batch fields (row + * counts, time-Ms fields, and customMetrics entries not in `snapshotCustomMetricNames`) are + * zeroed; snapshot fields and snapshot customMetrics entries are preserved. + */ + private[sql] def copyForNoExecution(): StateOperatorProgress = { + val newCustomMetrics = new ju.HashMap[String, JLong](customMetrics.size()) + customMetrics.forEach { (k, v) => + newCustomMetrics.put(k, if (snapshotCustomMetricNames.contains(k)) v else 0L) + } + new StateOperatorProgress( + operatorName = operatorName, + numRowsTotal = numRowsTotal, + numRowsUpdated = 0L, + allUpdatesTimeMs = 0L, + numRowsRemoved = 0L, + allRemovalsTimeMs = 0L, + commitTimeMs = 0L, + memoryUsedBytes = memoryUsedBytes, + numRowsDroppedByWatermark = 0L, + numShufflePartitions = numShufflePartitions, + numStateStoreInstances = numStateStoreInstances, + customMetrics = newCustomMetrics, + snapshotCustomMetricNames = snapshotCustomMetricNames) + } private[sql] def jsonValue: JValue = { ("operatorName" -> JString(operatorName)) ~ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala index 76b395d225042..9561206eb55f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala @@ -137,6 +137,9 @@ trait StatefulOperatorCustomMetric { def name: String def desc: String def createSQLMetric(sparkContext: SparkContext): SQLMetric + // True if the metric reflects current state rather than per-batch work; snapshot + // metrics are preserved on no-data trigger events. Mirrors StateStoreCustomMetric. + def isSnapshot: Boolean = false } /** Custom stateful operator metric for simple "count" gauge */ @@ -402,7 +405,8 @@ trait StateStoreWriter numRowsDroppedByWatermark = longMetric("numRowsDroppedByWatermark").value, numShufflePartitions = stateInfo.map(_.numPartitions.toLong).getOrElse(-1L), numStateStoreInstances = longMetric("numStateStoreInstances").value, - javaConvertedCustomMetrics + javaConvertedCustomMetrics, + snapshotCustomMetricNames ) } @@ -475,17 +479,43 @@ trait StateStoreWriter }.toMap } - private def stateStoreInstanceMetrics: Map[StateStoreInstanceMetric, SQLMetric] = { + // All instance metrics with their (partitionId, storeName) bindings; consumed by + // both `stateStoreInstanceMetrics` (for SQLMetric registration) and + // `snapshotCustomMetricNames` (for the snapshot-name set). The result is a + // serializable Seq so storing it as a lazy val on this trait is safe even when + // the enclosing SparkPlan is shipped to executors. The provider itself is NOT + // stored as a field (it is non-serializable), so each consumer below recreates + // it locally. + private lazy val stateStoreInstanceMetricsWithIds: Seq[StateStoreInstanceMetric] = { val provider = StateStoreProvider.create(conf.stateStoreProviderClass) - val maxPartitions = stateInfo.map(_.numPartitions).getOrElse(conf.defaultNumShufflePartitions) - + val maxPartitions = + stateInfo.map(_.numPartitions).getOrElse(conf.defaultNumShufflePartitions) (0 until maxPartitions).flatMap { partitionId => provider.supportedInstanceMetrics.flatMap { metric => - stateStoreNames.map { storeName => - val metricWithPartition = metric.withNewId(partitionId, storeName) - (metricWithPartition, metricWithPartition.createSQLMetric(sparkContext)) - } + stateStoreNames.map(metric.withNewId(partitionId, _)) } + } + } + + // Names of customMetrics entries treated as snapshots; preserved by + // StateOperatorProgress.copyForNoExecution() on no-data trigger events. Includes + // provider- and operator-level metrics with isSnapshot = true, and all instance + // metric names (instance metrics use sentinel inits like -1 with monotonic + // combine, so they are always snapshot-style). + private lazy val snapshotCustomMetricNames: Set[String] = { + val provider = StateStoreProvider.create(conf.stateStoreProviderClass) + val customSnapshots = provider.supportedCustomMetrics.collect { + case m if m.isSnapshot => m.name + }.toSet + val operatorSnapshots = customStatefulOperatorMetrics.collect { + case m if m.isSnapshot => m.name + }.toSet + customSnapshots ++ operatorSnapshots ++ stateStoreInstanceMetricsWithIds.map(_.name).toSet + } + + private def stateStoreInstanceMetrics: Map[StateStoreInstanceMetric, SQLMetric] = { + stateStoreInstanceMetricsWithIds.map { metric => + (metric, metric.createSQLMetric(sparkContext)) }.toMap } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala index 73b75df1a599d..161696fb92607 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala @@ -648,13 +648,9 @@ abstract class ProgressContext( * New execution stats will only retain the values as a snapshot of the query status. * (E.g. for stateful operators, numRowsTotal is a snapshot of the status, whereas * numRowsUpdated is bound to the batch.) - * TODO(SPARK-56537): We do not seem to clear up all values in StateOperatorProgress which are - * bound to the batch. Fix this. */ private def resetExecStatsForNoExecution(originExecStats: ExecutionStats): ExecutionStats = { - val newStatefulOperators = originExecStats.stateOperators.map { so => - so.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark = 0, newNumRowsRemoved = 0) - } + val newStatefulOperators = originExecStats.stateOperators.map(_.copyForNoExecution()) val newEventTimeStats = if (originExecStats.eventTimeStats.contains("watermark")) { Map("watermark" -> progressReporter.formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 29a17b4eb7ec9..2562f1ff3304e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -566,7 +566,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with private lazy val metricStateOnCurrentVersionSizeBytes: StateStoreCustomSizeMetric = StateStoreCustomSizeMetric("stateOnCurrentVersionSizeBytes", - "estimated size of state only on current version") + "estimated size of state only on current version", + isSnapshot = true) private lazy val metricLoadedMapCacheHit: StateStoreCustomMetric = StateStoreCustomSumMetric("loadedMapCacheHitCount", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 45168f4071328..b3d734c71f919 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -1488,22 +1488,27 @@ object RocksDBStateStoreProvider { val CUSTOM_METRIC_FLUSH_WRITTEN_BYTES = StateStoreCustomSizeMetric( "rocksdbTotalBytesWrittenByFlush", "RocksDB: flush - total bytes written by flush") + // Snapshot metrics: read current RocksDB state, preserved on no-data trigger events. val CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE = StateStoreCustomSizeMetric( "rocksdbPinnedBlocksMemoryUsage", - "RocksDB: memory usage for pinned blocks") + "RocksDB: memory usage for pinned blocks", + isSnapshot = true) val CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES_KEYS = StateStoreCustomSizeMetric( "rocksdbNumInternalColFamiliesKeys", - "RocksDB: number of internal keys for internal column families") + "RocksDB: number of internal keys for internal column families", + isSnapshot = true) val CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES = StateStoreCustomSizeMetric( "rocksdbNumExternalColumnFamilies", - "RocksDB: number of external column families") + "RocksDB: number of external column families", + isSnapshot = true) val CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES = StateStoreCustomSizeMetric( "rocksdbNumInternalColumnFamilies", - "RocksDB: number of internal column families") + "RocksDB: number of internal column families", + isSnapshot = true) - // Total SST file size + // Total SST file size (snapshot). val CUSTOM_METRIC_SST_FILE_SIZE = StateStoreCustomSizeMetric( - "rocksdbSstFileSize", "RocksDB: size of all SST files") + "rocksdbSstFileSize", "RocksDB: size of all SST files", isSnapshot = true) val CUSTOM_METRIC_NUM_SNAPSHOTS_AUTO_REPAIRED = StateStoreCustomSumMetric( "rocksdbNumSnapshotsAutoRepaired", "RocksDB: number of snapshots that were automatically repaired during store load") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index e3601f1ef2246..ad067b8edcc3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -537,6 +537,10 @@ trait StateStoreCustomMetric { def desc: String def withNewDesc(desc: String): StateStoreCustomMetric def createSQLMetric(sparkContext: SparkContext): SQLMetric + + // True if the metric reflects current store state (e.g. file size, memory) rather + // than per-batch work; snapshot metrics are preserved on no-data trigger events. + def isSnapshot: Boolean = false } case class StateStoreCustomSumMetric(name: String, desc: String) extends StateStoreCustomMetric { @@ -546,7 +550,10 @@ case class StateStoreCustomSumMetric(name: String, desc: String) extends StateSt SQLMetrics.createMetric(sparkContext, desc) } -case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric { +case class StateStoreCustomSizeMetric( + name: String, + desc: String, + override val isSnapshot: Boolean = false) extends StateStoreCustomMetric { override def withNewDesc(desc: String): StateStoreCustomSizeMetric = copy(desc = desc) override def createSQLMetric(sparkContext: SparkContext): SQLMetric = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala index da037936849e6..134bfc6b914b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala @@ -20,15 +20,16 @@ package org.apache.spark.sql.execution.streaming import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider import org.apache.spark.sql.functions.{count, timestamp_seconds, window} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger} +import org.apache.spark.sql.streaming.{OutputMode, StateOperatorProgress, StreamTest, Trigger} import org.apache.spark.sql.streaming.util.StreamManualClock class ProgressReporterSuite extends StreamTest { import testImplicits._ - test("no-data batch resets numRowsRemoved to zero" + + test("no-data batch resets all per-batch StateOperatorProgress fields to zero" + " via resetExecStatsForNoExecution") { val clock = new StreamManualClock val input = MemoryStream[Int] @@ -73,7 +74,7 @@ class ProgressReporterSuite extends StreamTest { .exists(_.stateOperators.head.numRowsRemoved > 0) assert(removed, "Expected numRowsRemoved > 0") }, - // Idle trigger — finishNoExecutionTrigger calls + // Idle trigger: finishNoExecutionTrigger calls // resetExecStatsForNoExecution which must zero out // per-batch metrics. AdvanceManualClock(1 * 1000), @@ -94,10 +95,135 @@ class ProgressReporterSuite extends StreamTest { assert(so.numRowsUpdated === 0, s"numRowsUpdated=${so.numRowsUpdated}") assert(so.numRowsDroppedByWatermark === 0, s"numRowsDroppedByWatermark=${so.numRowsDroppedByWatermark}") + assert(so.allUpdatesTimeMs === 0, + s"allUpdatesTimeMs=${so.allUpdatesTimeMs}") + assert(so.allRemovalsTimeMs === 0, + s"allRemovalsTimeMs=${so.allRemovalsTimeMs}") + assert(so.commitTimeMs === 0, + s"commitTimeMs=${so.commitTimeMs}") } }, StopStream ) } } + + test("SPARK-56537: no-data batch resets per-batch customMetrics but" + + " preserves snapshot customMetrics (RocksDB)") { + val clock = new StreamManualClock + val input = MemoryStream[Int] + val agg = input.toDF() + .select(timestamp_seconds($"value") as "ts", $"value") + .withWatermark("ts", "10 seconds") + .groupBy(window($"ts", "10 seconds")) + .agg(count("*") as "cnt") + .select($"window".getField("start").cast("long"), $"cnt") + + withSQLConf( + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName, + SQLConf.STREAMING_POLLING_DELAY.key -> "0", + SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "0") { + testStream(agg, outputMode = OutputMode.Update)( + StartStream( + Trigger.ProcessingTime("1 second"), + triggerClock = clock), + // Batch 0: real data, populates customMetrics with non-zero per-batch values. + AddData(input, 1, 2, 3), + AdvanceManualClock(1 * 1000), + CheckNewAnswer((0L, 3L)), + // Idle trigger. + AdvanceManualClock(1 * 1000), + Execute("verify customMetrics behavior on idle trigger") { q => + eventually(Timeout(streamingTimeout)) { + val progress = q.recentProgress.filter(_.stateOperators.nonEmpty) + val lastDataIdx = progress.lastIndexWhere { p => + p.durationMs.containsKey("addBatch") + } + assert(lastDataIdx >= 0, "no data batch found") + val idleIdx = progress.indexWhere( + !_.durationMs.containsKey("addBatch"), lastDataIdx + 1) + assert(idleIdx > lastDataIdx, + "no idle trigger found after data batch") + + val dataCm = progress(lastDataIdx).stateOperators.head.customMetrics + val idleCm = progress(idleIdx).stateOperators.head.customMetrics + + // Per-batch RocksDB metrics: zeroed on idle. The metric must be present + // in the map (we keep keys consistent across data and idle progress). + Seq("rocksdbCommitFlushLatency", + "rocksdbPutCount", + "rocksdbTotalBytesWritten").foreach { k => + assert(idleCm.containsKey(k), s"$k missing on idle") + assert(idleCm.get(k) === 0L, + s"per-batch metric $k expected 0 on idle, got ${idleCm.get(k)}") + } + + // Snapshot RocksDB metrics: value unchanged across idle trigger. + Seq("rocksdbPinnedBlocksMemoryUsage", + "rocksdbNumInternalColFamiliesKeys", + "rocksdbNumExternalColumnFamilies", + "rocksdbNumInternalColumnFamilies", + "rocksdbSstFileSize").foreach { k => + assert(idleCm.containsKey(k), s"$k missing on idle") + assert(idleCm.get(k) === dataCm.get(k), + s"snapshot metric $k changed across idle trigger: " + + s"data=${dataCm.get(k)} idle=${idleCm.get(k)}") + } + } + }, + StopStream + ) + } + } + + test("SPARK-56537: copyForNoExecution zeroes per-batch fields and preserves snapshot fields") { + val customMetrics = new java.util.HashMap[String, java.lang.Long]() + customMetrics.put("perBatchTimer", 100L) + customMetrics.put("perBatchCounter", 50L) + customMetrics.put("snapshotSize", 999L) + val orig = new StateOperatorProgress( + operatorName = "op", + numRowsTotal = 50L, + numRowsUpdated = 10L, + allUpdatesTimeMs = 7L, + numRowsRemoved = 3L, + allRemovalsTimeMs = 5L, + commitTimeMs = 11L, + memoryUsedBytes = 2048L, + numRowsDroppedByWatermark = 2L, + numShufflePartitions = 4L, + numStateStoreInstances = 4L, + customMetrics = customMetrics, + snapshotCustomMetricNames = Set("snapshotSize")) + + val out = orig.copyForNoExecution() + + // Per-batch fields are zeroed. + assert(out.numRowsUpdated === 0L) + assert(out.allUpdatesTimeMs === 0L) + assert(out.numRowsRemoved === 0L) + assert(out.allRemovalsTimeMs === 0L) + assert(out.commitTimeMs === 0L) + assert(out.numRowsDroppedByWatermark === 0L) + + // Snapshot fields are preserved. + assert(out.operatorName === "op") + assert(out.numRowsTotal === 50L) + assert(out.memoryUsedBytes === 2048L) + assert(out.numShufflePartitions === 4L) + assert(out.numStateStoreInstances === 4L) + + // customMetrics: per-batch zeroed, snapshot preserved. + assert(out.customMetrics.get("perBatchTimer") === 0L) + assert(out.customMetrics.get("perBatchCounter") === 0L) + assert(out.customMetrics.get("snapshotSize") === 999L) + + // Original is not mutated. + assert(orig.numRowsUpdated === 10L) + assert(orig.customMetrics.get("perBatchTimer") === 100L) + + // snapshotCustomMetricNames is preserved so subsequent copy() round-trips work. + assert(out.snapshotCustomMetricNames === Set("snapshotSize")) + } }