Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46906][SS] Add a check for stateful operator change for streaming #44927

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -3311,6 +3311,13 @@
],
"sqlState" : "42601"
},
"STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA" : {
"message" : [
"Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user changes stateful operator of existing streaming query.",
jingz-db marked this conversation as resolved.
Show resolved Hide resolved
"Stateful operators in the metadata: [<OpsInMetadataSeq>]; Stateful operators in current batch: [<OpsInCurBatchSeq>]"
],
"sqlState" : "42K03"
},
"STREAM_FAILED" : {
"message" : [
"Query [id = <id>, runId = <runId>] terminated with exception: <message>"
Expand Down
7 changes: 7 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2085,6 +2085,13 @@ The checkpoint seems to be only run with older Spark version(s). Run the streami

'`<optionName>`' must be specified.

### STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA

[SQLSTATE: 42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user changes stateful operator of existing streaming query.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: when user adds/removes/changes

Stateful operators in the metadata: [`<OpsInMetadataSeq>`]; Stateful operators in current batch: [`<OpsInCurBatchSeq>`]

### STREAM_FAILED

[SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1702,6 +1702,18 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
new NoSuchElementException("State is either not defined or has already been removed")
}

def statefulOperatorNotMatchInStateMetadataError(
opsInMetadataSeq: Seq[String],
jingz-db marked this conversation as resolved.
Show resolved Hide resolved
opsInCurBatchSeq: Seq[String]):
SparkRuntimeException = {
new SparkRuntimeException(
errorClass = s"STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA",
messageParameters = Map(
"OpsInMetadataSeq" -> opsInMetadataSeq.mkString(", "),
"OpsInCurBatchSeq" -> opsInCurBatchSeq.mkString(", "))
)
}

def cannotSetTimeoutDurationError(): SparkUnsupportedOperationException = {
new SparkUnsupportedOperationException(errorClass = "_LEGACY_ERROR_TEMP_2203")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class StateMetadataPartitionReader(
} else Array.empty
}

private def allOperatorStateMetadata: Array[OperatorStateMetadata] = {
private[sql] def allOperatorStateMetadata: Array[OperatorStateMetadata] = {
val stateDir = new Path(checkpointLocation, "state")
val opIds = fileManager
.list(stateDir, pathNameCanBeParsedAsLongFilter).map(f => pathToLong(f.getPath)).sorted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Express
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{LocalLimitExec, QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode}
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, MergingSessionsExec, ObjectHashAggregateExec, SortAggregateExec, UpdatingSessionsExec}
import org.apache.spark.sql.execution.datasources.v2.state.StateMetadataPartitionReader
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import org.apache.spark.sql.execution.python.FlatMapGroupsInPandasWithStateExec
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSourceV1
import org.apache.spark.sql.execution.streaming.state.OperatorStateMetadataWriter
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadataV1, OperatorStateMetadataWriter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.util.Utils
import org.apache.spark.util.{SerializableConfiguration, Utils}

/**
* A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]]
Expand Down Expand Up @@ -82,6 +84,39 @@ class IncrementalExecution(
.map(SQLConf.SHUFFLE_PARTITIONS.valueConverter)
.getOrElse(sparkSession.sessionState.conf.numShufflePartitions)

private def stateCheckpointLocationExists(stateCheckpointLocation: Path): Boolean = {
val fileManager =
CheckpointFileManager.create(stateCheckpointLocation, hadoopConf)
fileManager.exists(stateCheckpointLocation)
}

// A map of all (operatorId -> operatorName) in the state metadata
private lazy val opMapInMetadata: Map[Long, String] = {
var ret = Map.empty[Long, String]
if (stateCheckpointLocationExists(new Path(checkpointLocation))) {
jingz-db marked this conversation as resolved.
Show resolved Hide resolved
try {
val reader = new StateMetadataPartitionReader(
new Path(checkpointLocation).getParent.toString,
new SerializableConfiguration(hadoopConf))
val opMetadataList = reader.allOperatorStateMetadata
ret = opMetadataList.map { operatorMetadata =>
val metadataInfoV1 = operatorMetadata.asInstanceOf[OperatorStateMetadataV1].operatorInfo
metadataInfoV1.operatorId -> metadataInfoV1.operatorName
}.toMap
} catch {
case e: Exception =>
// no need to throw fatal error, returns empty map
logWarning("Error reading metadata path for stateful operator. " +
s"This may due to no prior committed batch, or previously run on lower versions:" +
s" ${e.getMessage}")
}
}
ret
}

// A map of all stateful operators in the physical plan
private var opMapInPhysicalPlan: Map[Long, String] = Map.empty[Long, String]

/**
* See [SPARK-18339]
* Walk the optimized logical plan and replace CurrentBatchTimestamp
Expand Down Expand Up @@ -184,9 +219,20 @@ class IncrementalExecution(
}
}

object getOpInCurBatchRule extends SparkPlanPartialRule {
override val rule: PartialFunction[SparkPlan, SparkPlan] = {
case stateStoreWriter: StateStoreWriter if isFirstBatch =>
opMapInPhysicalPlan = opMapInPhysicalPlan ++
Map(stateStoreWriter.getStateInfo.operatorId -> stateStoreWriter.shortName)
stateStoreWriter
}
}

object WriteStatefulOperatorMetadataRule extends SparkPlanPartialRule {
override val rule: PartialFunction[SparkPlan, SparkPlan] = {
case stateStoreWriter: StateStoreWriter if isFirstBatch =>
opMapInPhysicalPlan = opMapInPhysicalPlan ++
Map(stateStoreWriter.getStateInfo.operatorId -> stateStoreWriter.shortName)
val metadata = stateStoreWriter.operatorStateMetadata()
val metadataWriter = new OperatorStateMetadataWriter(new Path(
checkpointLocation, stateStoreWriter.getStateInfo.operatorId.toString), hadoopConf)
Expand Down Expand Up @@ -387,8 +433,29 @@ class IncrementalExecution(
rulesToCompose.reduceLeft { (ruleA, ruleB) => ruleA orElse ruleB }
}

private def checkOperatorValidWithMetadata(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we inline all the logic e.g. building opMapInMetadata and opMapInPhysicalPlan to here? I don't see we use these fields other than here. Let's scope fields and methods be narrower whenever possible.

That said, You don't need to use rule to build opMapInPhysicalPlan. Let's just use foreach to traverse the plan and build opMapInPhysicalPlan.

(opMapInMetadata.keySet ++ opMapInPhysicalPlan.keySet).foreach { opId =>
val opInMetadata = opMapInMetadata.getOrElse(opId, "not found")
val opInCurBatch = opMapInPhysicalPlan.getOrElse(opId, "not found")
if (opInMetadata != opInCurBatch) {
throw QueryExecutionErrors.statefulOperatorNotMatchInStateMetadataError(
opMapInMetadata.values.toSeq,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we print out association between opId and opName in error message? It may be uneasy to understand what is mismatching only with opNames.

opMapInPhysicalPlan.values.toSeq
)
}
}
}

override def apply(plan: SparkPlan): SparkPlan = {
val planWithStateOpId = plan transform composedRule
// get stateful operators for current batch
planWithStateOpId transform getOpInCurBatchRule.rule
// Need to check before write to metadata because we need to detect add operator
// Only check when streaming is restarting and is first batch,
// also metadata path is not empty
if (isFirstBatch && currentBatchId != 0 && !opMapInMetadata.isEmpty) {
checkOperatorValidWithMetadata()
}
// The rule doesn't change the plan but cause the side effect that metadata is written
// in the checkpoint directory of stateful operator.
planWithStateOpId transform WriteStatefulOperatorMetadataRule.rule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ trait StateStoreWriter extends StatefulOperator with PythonSQLMetrics { self: Sp
}

/** Name to output in [[StreamingOperatorProgress]] to identify operator type */
protected def shortName: String = "defaultName"
def shortName: String = "defaultName"

/**
* Should the MicroBatchExecution run another batch based on this stateful operator and the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ package org.apache.spark.sql.execution.streaming.state

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.{Column, Row}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Append
import org.apache.spark.sql.execution.datasources.v2.state.{StateDataSourceUnspecifiedRequiredOption, StateSourceOptions}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, MemoryStream}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
import org.apache.spark.sql.streaming.OutputMode.Complete
import org.apache.spark.sql.streaming.OutputMode.{Complete, Update}
import org.apache.spark.sql.test.SharedSparkSession


class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {

import testImplicits._

private lazy val hadoopConf = spark.sessionState.newHadoopConf()
Expand All @@ -45,7 +47,6 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
assert(operatorMetadata.operatorInfo == expectedMetadata.operatorInfo &&
operatorMetadata.stateStoreInfo.sameElements(expectedMetadata.stateStoreInfo))
}

jingz-db marked this conversation as resolved.
Show resolved Hide resolved
test("Serialize and deserialize stateful operator metadata") {
withTempDir { checkpointDir =>
val statePath = new Path(checkpointDir.toString, "state/0")
Expand Down Expand Up @@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
Map("optionName" -> StateSourceOptions.PATH))
}

test("Operator metadata path non-existence should not fail query") {
withTempDir { checkpointDir =>
val inputData = MemoryStream[Int]
val aggregated =
inputData.toDF()
.groupBy($"value")
.agg(count("*"))
.as[(Int, Long)]

testStream(aggregated, Complete)(
StartStream(checkpointLocation = checkpointDir.toString),
AddData(inputData, 3),
CheckLastBatch((3, 1)),
StopStream
)

// Delete operator metadata path
val metadataPath = new Path(checkpointDir.toString, s"state/0/_metadata/metadata")
val fm = CheckpointFileManager.create(new Path(checkpointDir.getCanonicalPath), hadoopConf)
fm.delete(metadataPath)

// Restart the query
testStream(aggregated, Complete)(
StartStream(checkpointLocation = checkpointDir.toString),
AddData(inputData, 3),
CheckLastBatch((3, 2)),
StopStream
)
}
}

test("Changing operator - " +
"replace, add, remove operators will trigger error with debug message") {
jingz-db marked this conversation as resolved.
Show resolved Hide resolved
withTempDir { checkpointDir =>
val inputData = MemoryStream[Int]
val stream = inputData.toDF().withColumn("eventTime", timestamp_seconds($"value"))

testStream(stream.dropDuplicates())(
StartStream(checkpointLocation = checkpointDir.toString),
AddData(inputData, 1),
ProcessAllAvailable(),
StopStream
)

def checkOpChangeError(OpsInMetadataSeq: Seq[String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if you are using more than two lines to define the method, param should start at second line of definition. (In other words, all params should appear at the same indentation.)

https://github.com/databricks/scala-style-guide?tab=readme-ov-file#indent

Also, param should start with lowercase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any automation tool for checking this other than ./dev/scalafmt? This command is listed on the spark developer tool wiki, and is actually quite messy - it will touch all existing files other than only formatting my code change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe community has to run the formatter at once for the whole codebase. I'm not sure scalafmt can deal with the whole styles though. It is still good to familiarize Scala style guide for Databricks; it doesn't only contain styles automation can handle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! Thanks Jungtaek!

OpsInCurBatchSeq: Seq[String],
jingz-db marked this conversation as resolved.
Show resolved Hide resolved
ex: Throwable): Unit = {
checkError(ex.asInstanceOf[SparkRuntimeException],
"STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA", "42K03",
Map("OpsInMetadataSeq" -> OpsInMetadataSeq.mkString(", "),
"OpsInCurBatchSeq" -> OpsInCurBatchSeq.mkString(", "))
)
}

// replace dropDuplicates with dropDuplicatesWithinWatermark
testStream(stream.withWatermark("eventTime", "10 seconds")
.dropDuplicatesWithinWatermark(), Append)(
StartStream(checkpointLocation = checkpointDir.toString),
AddData(inputData, 2),
ExpectFailure[SparkRuntimeException] {
jingz-db marked this conversation as resolved.
Show resolved Hide resolved
(t: Throwable) => {
checkOpChangeError(Seq("dedupe"), Seq("dedupeWithinWatermark"), t)
}
}
)

// replace operator
jingz-db marked this conversation as resolved.
Show resolved Hide resolved
testStream(stream.groupBy("value").count(), Update)(
StartStream(checkpointLocation = checkpointDir.toString),
AddData(inputData, 3),
ExpectFailure[SparkRuntimeException] {
(t: Throwable) => {
checkOpChangeError(Seq("dedupe"), Seq("stateStoreSave"), t)
}
}
)

// add operator
testStream(stream.dropDuplicates()
.groupBy("value").count(), Update)(
StartStream(checkpointLocation = checkpointDir.toString),
AddData(inputData, 3),
ExpectFailure[SparkRuntimeException] {
(t: Throwable) => {
checkOpChangeError(Seq("dedupe"), Seq("stateStoreSave", "dedupe"), t)
}
}
)

// remove operator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please split down to separate test case if this is fully isolated with other check.

Btw, this actually brings up food for thought. Do we disallow stateful query to be stateless? E.g. could you simply test the removal of stateful operator with checkpointDir rather than spinning up another checkpoint?

It's OK if we have been supporting the case (although undocumented) and we keep supporting the case. If not, we could just test the case via using checkpointDir.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we disallow stateful query to be stateless?

We don't allow even before adding the operator check. Streaming will throw error with message as "state path not found".

E.g. could you simply test the removal of stateful operator with checkpointDir rather than spinning up another checkpoint?

Done. Restarting a stateless query from a stateful query will now trigger error with message as:

[STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA] Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user adds/removes/changes stateful operator of existing streaming query.
Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)]; Stateful operators in current batch: []. SQLSTATE: 42K03

withTempDir { newCheckpointDir =>
val inputData = MemoryStream[Int]
val stream = inputData.toDF().withColumn("eventTime", timestamp_seconds($"value"))

testStream(stream.dropDuplicates().groupBy("value").count(), Update)(
StartStream(checkpointLocation = newCheckpointDir.toString),
AddData(inputData, 1),
ProcessAllAvailable(),
StopStream
)
testStream(stream.dropDuplicates(), Update)(
StartStream(checkpointLocation = newCheckpointDir.toString),
AddData(inputData, 3),
ExpectFailure[SparkRuntimeException] {
(t: Throwable) => {
checkOpChangeError(Seq("stateStoreSave", "dedupe"), Seq("dedupe"), t)
}
}
)
}
}
}
}