Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
958b491
SPARK-51358 Introduce snapshot upload lag detection through StateStor…
zecookiez Mar 1, 2025
7ffadd8
SPARK-51358 Make test less flaky
zecookiez Mar 1, 2025
3c6a5f9
SPARK-51358 Update logging and event listener init
zecookiez Mar 4, 2025
6056856
SPARK-51358 Remove log
zecookiez Mar 4, 2025
41eaba4
SPARK-51358 Remove setListener
zecookiez Mar 4, 2025
4117326
SPARK-51358 Remove setListener call
zecookiez Mar 4, 2025
d039f73
SPARK-51358 Add additional detail to docstring
zecookiez Mar 6, 2025
cf6da39
SPARK-51358 Fix StateStoreSuite and use log interpolator
zecookiez Mar 7, 2025
e51b491
Merge branch 'master' into SPARK-51358
zecookiez Mar 7, 2025
6ea790f
SPARK-51358 Update coordinator logic, add additional configurations, …
zecookiez Mar 13, 2025
f2b84d4
SPARK-51358 Clean up comments and styling
zecookiez Mar 13, 2025
77aa7db
SPARK-51358 Temporarily add faulty provider for tests
zecookiez Mar 13, 2025
3148211
SPARK-51358 Switch to SparkConf
zecookiez Mar 13, 2025
6ba4dcf
SPARK-51358 Use multipliers for alert thresholds
zecookiez Mar 18, 2025
b0c3e81
Merge branch 'master' into SPARK-51358
zecookiez Mar 18, 2025
11b5343
SPARK-51358 Compare diff to batches instead of relative instances
zecookiez Mar 18, 2025
6ed3366
SPARK-51358 Verify config turns lag reports off properly
zecookiez Mar 18, 2025
8cb4bbf
SPARK-51358 Switch event reporting to an object
zecookiez Mar 19, 2025
8b1fd5b
SPARK-51358 Fix feedback
zecookiez Mar 20, 2025
4ada285
SPARK-51358 Add additional edge cases and HDFS support and tests
zecookiez Mar 21, 2025
3d79a80
SPARK-51358 Make report interval's granularity per query
zecookiez Mar 21, 2025
a100f49
SPARK-51358 Better handling of edge case with wiped out coordinator
zecookiez Mar 21, 2025
2c07bf3
SPARK-51358 Use version requirement as well for lagging stores report
zecookiez Mar 22, 2025
9b7b75e
SPARK-51358 Try separate coordinatorRef
zecookiez Mar 24, 2025
7a3dca4
SPARK-51358 Clean up fix for coordinatorRef
zecookiez Mar 24, 2025
ea73d47
SPARK-51358 Switch to case class for query start
zecookiez Mar 24, 2025
3de7008
SPARK-51358 Add simultaneous query test
zecookiez Mar 25, 2025
b46dc63
Merge branch 'master' into SPARK-51358
zecookiez Mar 25, 2025
70b7a8a
SPARK-51358 Remove additional faulty providers from tests
zecookiez Mar 25, 2025
a69d44e
SPARK-51358 Switch default to version 0 and clean up rest of feedback
zecookiez Mar 26, 2025
29000ec
SPARK-51358 Add additional tests
zecookiez Mar 27, 2025
1723546
Merge branch 'master' into SPARK-51358
zecookiez Apr 2, 2025
9856463
SPARK-51358 Fix case for AvailableNow and repeated restarts
zecookiez Apr 2, 2025
b2a7ccb
SPARK-51358 Add extra tests for HDFS as well
zecookiez Apr 3, 2025
ec585a3
SPARK-51358 Report timestamp when loading snapshot
zecookiez Apr 4, 2025
f2edfa5
Merge branch 'apache:master' into SPARK-51358
zecookiez Apr 4, 2025
1710fe4
SPARK-51358 Bump to retrigger
zecookiez Apr 4, 2025
d8b2184
SPARK-51358 Fix test
zecookiez Apr 4, 2025
00b01da
Merge branch 'master' into SPARK-51358
zecookiez Apr 9, 2025
735b356
SPARK-51358 Fix merge and nits, and rename the event listener to forw…
zecookiez Apr 9, 2025
2d60ea9
SPARK-51358 Clean up and dedupe test code
zecookiez Apr 10, 2025
e4c0cf9
SPARK-51358 Add comment for coordRef entry point
zecookiez Apr 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ private[spark] object LogKeys {
case object NUM_ITERATIONS extends LogKey
case object NUM_KAFKA_PULLS extends LogKey
case object NUM_KAFKA_RECORDS_PULLED extends LogKey
case object NUM_LAGGING_STORES extends LogKey
case object NUM_LEADING_SINGULAR_VALUES extends LogKey
case object NUM_LEFT_PARTITION_VALUES extends LogKey
case object NUM_LOADED_ENTRIES extends LogKey
Expand Down Expand Up @@ -751,6 +752,9 @@ private[spark] object LogKeys {
case object SLEEP_TIME extends LogKey
case object SLIDE_DURATION extends LogKey
case object SMALLEST_CLUSTER_INDEX extends LogKey
case object SNAPSHOT_EVENT extends LogKey
case object SNAPSHOT_EVENT_TIME_DELTA extends LogKey
case object SNAPSHOT_EVENT_VERSION_DELTA extends LogKey
case object SNAPSHOT_VERSION extends LogKey
case object SOCKET_ADDRESS extends LogKey
case object SOURCE extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2332,6 +2332,70 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG =
buildConf("spark.sql.streaming.stateStore.multiplierForMinVersionDiffToLog")
.internal()
.doc(
"Determines the version threshold for logging warnings when a state store falls behind. " +
"The coordinator logs a warning when the store's uploaded snapshot version trails the " +
"query's latest version by the configured number of deltas needed to create a snapshot, " +
"times this multiplier."
)
.version("4.1.0")
.longConf
.checkValue(k => k >= 1L, "Must be greater than or equal to 1")
.createWithDefault(5L)

val STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_TIME_DIFF_TO_LOG =
buildConf("spark.sql.streaming.stateStore.multiplierForMinTimeDiffToLog")
.internal()
.doc(
"Determines the time threshold for logging warnings when a state store falls behind. " +
"The coordinator logs a warning when the store's uploaded snapshot timestamp trails the " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we also detect the case where its likely that maintenance runs very infrequently ? for eg - if the user is running an availNow query with small batch durations ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added a case for this - thanks!

"current time by the configured maintenance interval, times this multiplier."
)
.version("4.1.0")
.longConf
.checkValue(k => k >= 1L, "Must be greater than or equal to 1")
.createWithDefault(10L)

val STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG =
buildConf("spark.sql.streaming.stateStore.coordinatorReportSnapshotUploadLag")
.internal()
.doc(
"When enabled, the state store coordinator will report state stores whose snapshot " +
"have not been uploaded for some time. See the conf snapshotLagReportInterval for " +
"the minimum time between reports, and the conf multiplierForMinVersionDiffToLog " +
"and multiplierForMinTimeDiffToLog for the logging thresholds."
)
.version("4.1.0")
.booleanConf
.createWithDefault(true)

val STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL =
buildConf("spark.sql.streaming.stateStore.snapshotLagReportInterval")
.internal()
.doc(
"The minimum amount of time between the state store coordinator's reports on " +
"state store instances trailing behind in snapshot uploads."
)
.version("4.1.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.MINUTES.toMillis(5))

val STATE_STORE_COORDINATOR_MAX_LAGGING_STORES_TO_REPORT =
buildConf("spark.sql.streaming.stateStore.maxLaggingStoresToReport")
.internal()
.doc(
"Maximum number of state stores the coordinator will report as trailing in " +
"snapshot uploads. Stores are selected based on the most lagging behind in " +
"snapshot version."
)
.version("4.1.0")
.intConf
.checkValue(k => k >= 0, "Must be greater than or equal to 0")
.createWithDefault(5)

val FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION =
buildConf("spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion")
.internal()
Expand Down Expand Up @@ -5931,6 +5995,21 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
def stateStoreSkipNullsForStreamStreamJoins: Boolean =
getConf(STATE_STORE_SKIP_NULLS_FOR_STREAM_STREAM_JOINS)

def stateStoreCoordinatorMultiplierForMinVersionDiffToLog: Long =
getConf(STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG)

def stateStoreCoordinatorMultiplierForMinTimeDiffToLog: Long =
getConf(STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_TIME_DIFF_TO_LOG)

def stateStoreCoordinatorReportSnapshotUploadLag: Boolean =
getConf(STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG)

def stateStoreCoordinatorSnapshotLagReportInterval: Long =
getConf(STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL)

def stateStoreCoordinatorMaxLaggingStoresToReport: Int =
getConf(STATE_STORE_COORDINATOR_MAX_LAGGING_STORES_TO_REPORT)

def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)

def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class StreamingQueryManager private[sql] (
with Logging {

private[sql] val stateStoreCoordinator =
StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env, sqlConf)
private val listenerBus =
new StreamingQueryListenerBus(Some(sparkSession.sparkContext.listenerBus))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class IncrementalExecution(
MutableMap[Long, Array[Array[String]]] = MutableMap[Long, Array[Array[String]]](),
val stateSchemaMetadatas: MutableMap[Long, StateSchemaBroadcast] =
MutableMap[Long, StateSchemaBroadcast](),
mode: CommandExecutionMode.Value = CommandExecutionMode.ALL)
mode: CommandExecutionMode.Value = CommandExecutionMode.ALL,
val isTerminatingTrigger: Boolean = false)
extends QueryExecution(sparkSession, logicalPlan, mode = mode) with Logging {

// Modified planner with stateful operations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,8 @@ class MicroBatchExecution(
watermarkPropagator,
execCtx.previousContext.isEmpty,
currentStateStoreCkptId,
stateSchemaMetadatas)
stateSchemaMetadatas,
isTerminatingTrigger = trigger.isInstanceOf[AvailableNowTrigger.type])
execCtx.executionPlan.executedPlan // Force the lazy generation of execution plan
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, ReportsSinkMetrics, ReportsSourceMetrics, SparkDataStream}
import org.apache.spark.sql.execution.{QueryExecution, StreamSourceAwareSparkPlan}
import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress}
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent, QueryProgressEvent}
import org.apache.spark.util.{Clock, Utils}
Expand All @@ -61,6 +62,12 @@ class ProgressReporter(
val noDataProgressEventInterval: Long =
sparkSession.sessionState.conf.streamingNoDataProgressEventInterval

val coordinatorReportSnapshotUploadLag: Boolean =
sparkSession.sessionState.conf.stateStoreCoordinatorReportSnapshotUploadLag

val stateStoreCoordinator: StateStoreCoordinatorRef =
sparkSession.sessionState.streamingQueryManager.stateStoreCoordinator

private val timestampFormat =
DateTimeFormatter
.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
Expand Down Expand Up @@ -283,6 +290,17 @@ abstract class ProgressContext(
progressReporter.lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
progressReporter.updateProgress(newProgress)

// Ask the state store coordinator to log all lagging state stores
if (progressReporter.coordinatorReportSnapshotUploadLag) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should this check to see if it is enabled be done in the logLaggingStateStores func?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do it in both sides right now - this check is added here for both readability and reducing overhead to retrieve the coordinator ref + sending the RPC call

val latestVersion = lastEpochId + 1
progressReporter.stateStoreCoordinator
.logLaggingStateStores(
lastExecution.runId,
latestVersion,
lastExecution.isTerminatingTrigger
)
}

// Update the value since this trigger executes a batch successfully.
this.execStatsOnLatestExecutedBatch = Some(execStats)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming.state

import java.io._
import java.util
import java.util.Locale
import java.util.{Locale, UUID}
import java.util.concurrent.atomic.{AtomicLong, LongAdder}

import scala.collection.mutable
Expand Down Expand Up @@ -551,6 +551,10 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
val snapshotCurrentVersionMap = readSnapshotFile(version)
if (snapshotCurrentVersionMap.isDefined) {
synchronized { putStateIntoStateCacheMap(version, snapshotCurrentVersionMap.get) }

// Report the loaded snapshot's version to the coordinator
reportSnapshotUploadToCoordinator(version)

return snapshotCurrentVersionMap.get
}

Expand Down Expand Up @@ -580,6 +584,10 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
}

synchronized { putStateIntoStateCacheMap(version, resultMap) }

// Report the last available snapshot's version to the coordinator
reportSnapshotUploadToCoordinator(lastAvailableVersion)

resultMap
}

Expand Down Expand Up @@ -699,6 +707,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
log"for ${MDC(LogKeys.OP_TYPE, opType)}")
// Compare and update with the version that was just uploaded.
lastUploadedSnapshotVersion.updateAndGet(v => Math.max(version, v))
// Report the snapshot upload event to the coordinator
reportSnapshotUploadToCoordinator(version)
}

/**
Expand Down Expand Up @@ -1043,6 +1053,18 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
CompressionCodec.createCodec(sparkConf, storeConf.compressionCodec),
keySchema, valueSchema)
}

/** Reports to the coordinator the store's latest snapshot version */
private def reportSnapshotUploadToCoordinator(version: Long): Unit = {
if (storeConf.reportSnapshotUploadLag) {
// Attach the query run ID and current timestamp to the RPC message
val runId = UUID.fromString(StateStoreProvider.getRunId(hadoopConf))
val currentTimestamp = System.currentTimeMillis()
StateStoreProvider.coordinatorRef.foreach(
_.snapshotUploaded(StateStoreProviderId(stateStoreId, runId), version, currentTimestamp)
)
}
}
}

/** [[StateStoreChangeDataReader]] implementation for [[HDFSBackedStateStoreProvider]] */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ case object StoreTaskCompletionListener extends RocksDBOpType("store_task_comple
* @param stateStoreId StateStoreId for the state store
* @param localRootDir Root directory in local disk that is used to working and checkpointing dirs
* @param hadoopConf Hadoop configuration for talking to the remote file system
* @param eventForwarder The RocksDBEventForwarder object for reporting events to the coordinator
*/
class RocksDB(
dfsRootDir: String,
Expand All @@ -73,7 +74,8 @@ class RocksDB(
loggingId: String = "",
useColumnFamilies: Boolean = false,
enableStateStoreCheckpointIds: Boolean = false,
partitionId: Int = 0) extends Logging {
partitionId: Int = 0,
eventForwarder: Option[RocksDBEventForwarder] = None) extends Logging {

import RocksDB._

Expand Down Expand Up @@ -403,6 +405,9 @@ class RocksDB(
// Initialize maxVersion upon successful load from DFS
fileManager.setMaxSeenVersion(version)

// Report this snapshot version to the coordinator
reportSnapshotUploadToCoordinator(latestSnapshotVersion)

openLocalRocksDB(metadata)

if (loadedVersion != version) {
Expand Down Expand Up @@ -480,6 +485,9 @@ class RocksDB(
// Initialize maxVersion upon successful load from DFS
fileManager.setMaxSeenVersion(version)

// Report this snapshot version to the coordinator
reportSnapshotUploadToCoordinator(latestSnapshotVersion)

openLocalRocksDB(metadata)

if (loadedVersion != version) {
Expand Down Expand Up @@ -617,6 +625,8 @@ class RocksDB(
loadedVersion = -1 // invalidate loaded data
throw t
}
// Report this snapshot version to the coordinator
reportSnapshotUploadToCoordinator(snapshotVersion)
this
}

Expand Down Expand Up @@ -1495,13 +1505,25 @@ class RocksDB(
log"Current lineage: ${MDC(LogKeys.LINEAGE, lineageManager)}")
// Compare and update with the version that was just uploaded.
lastUploadedSnapshotVersion.updateAndGet(v => Math.max(snapshot.version, v))
// Report snapshot upload event to the coordinator.
reportSnapshotUploadToCoordinator(snapshot.version)
} finally {
snapshot.close()
}

fileManagerMetrics
}

/** Reports to the coordinator with the event listener that a snapshot finished uploading */
private def reportSnapshotUploadToCoordinator(version: Long): Unit = {
if (conf.reportSnapshotUploadLag) {
// Note that we still report snapshot versions even when changelog checkpointing is disabled.
// The coordinator needs a way to determine whether upload messages are disabled or not,
// which would be different between RocksDB and HDFS stores due to changelog checkpointing.
eventForwarder.foreach(_.reportSnapshotUploaded(version))
}
}

/** Create a native RocksDB logger that forwards native logs to log4j with correct log levels. */
private def createLogger(): Logger = {
val dbLogger = new Logger(rocksDbOptions.infoLogLevel()) {
Expand Down Expand Up @@ -1768,7 +1790,8 @@ case class RocksDBConf(
highPriorityPoolRatio: Double,
compressionCodec: String,
allowFAllocate: Boolean,
compression: String)
compression: String,
reportSnapshotUploadLag: Boolean)

object RocksDBConf {
/** Common prefix of all confs in SQLConf that affects RocksDB */
Expand Down Expand Up @@ -1951,7 +1974,8 @@ object RocksDBConf {
getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF),
storeConf.compressionCodec,
getBooleanConf(ALLOW_FALLOCATE_CONF),
getStringConf(COMPRESSION_CONF))
getStringConf(COMPRESSION_CONF),
storeConf.reportSnapshotUploadLag)
}

def apply(): RocksDBConf = apply(new StateStoreConf())
Expand Down
Loading