Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46906][SS] Add a check for stateful operator change for streaming #44927

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -3317,6 +3317,12 @@
],
"sqlState" : "XXKST"
},
"STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA" : {
"message" : [
"Streaming stateful operator name does not match with the operator in state metadata with the same operator id (id: <operatorId>). Stateful Operator name for current batch: <currentOperatorName>; Operator name in the state metadata: <stateMetadataOperatorName>."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we explain why this occur to the customer? Like "changing stateful operator of existing streaming query is not allowed."

],
"sqlState" : "42K03"
},
"SUM_OF_LIMIT_AND_OFFSET_EXCEEDS_MAX_INT" : {
"message" : [
"The sum of the LIMIT clause and the OFFSET clause must not be greater than the maximum 32-bit integer value (2,147,483,647) but found limit = <limit>, offset = <offset>."
Expand Down Expand Up @@ -7443,4 +7449,4 @@
],
"sqlState" : "P0001"
}
}
},
6 changes: 6 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2091,6 +2091,12 @@ The checkpoint seems to be only run with older Spark version(s). Run the streami

Query [id = `<id>`, runId = `<runId>`] terminated with exception: `<message>`

### STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA

* [SQLSTATE: 42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Streaming stateful operator name does not match with the operator in state metadata with the same operator id (id: `<operatorId>`). Stateful Operator name for current batch: `<currentOperatorName>`; Operator name in the state metadata: `<stateMetadataOperatorName>`.

### SUM_OF_LIMIT_AND_OFFSET_EXCEEDS_MAX_INT

[SQLSTATE: 22003](sql-error-conditions-sqlstates.html#class-22-data-exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1702,6 +1702,19 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
new NoSuchElementException("State is either not defined or has already been removed")
}

def statefulOperatorNotMatchInStateMetadataError(
operatorId: Long,
currentOperatorName: String, stateMetadataOperatorName: String):
SparkRuntimeException = {
new SparkRuntimeException(
errorClass = "STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA",
messageParameters = Map(
"operatorId" -> operatorId.toString,
"currentOperatorName" -> currentOperatorName,
"stateMetadataOperatorName" -> stateMetadataOperatorName)
)
}

def cannotSetTimeoutDurationError(): SparkUnsupportedOperationException = {
new SparkUnsupportedOperationException(errorClass = "_LEGACY_ERROR_TEMP_2203")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Express
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{LocalLimitExec, QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode}
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, MergingSessionsExec, ObjectHashAggregateExec, SortAggregateExec, UpdatingSessionsExec}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import org.apache.spark.sql.execution.python.FlatMapGroupsInPandasWithStateExec
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSourceV1
import org.apache.spark.sql.execution.streaming.state.OperatorStateMetadataWriter
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadataReader, OperatorStateMetadataV1, OperatorStateMetadataWriter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -184,6 +185,41 @@ class IncrementalExecution(
}
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to compare in higher level - construct a map opId -> operatorName for both physical plan and state metadata, and compare two.

With this approach we can also give a better error message for all cases 1) new addition of stateful operator(s), 2) removal of existing stateful operator(s), 3) replacement of existing stateful operator(s). 3) can also happen if someone tries to replace dropDuplicates with dropDuplicatesWithinWatermark after encountering some issue.

Worth noting that we consider state metadata as a source of truth in this PR - so if there is no state metadata prior to this, it would be just same as letting Spark create a new state metadata file and we do comparison (this cannot perform any real check though). So the check can be also done after executing physical planning rules, maybe at the end of state.apply().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review Jungtaek! I also like the idea of adding a map.

So the check can be also done after executing physical planning rules, maybe at the end of state.apply()

I tried to add the check after WriteStatefulOperatorMetadataRule but this will miss detecting for adding an operator after restart (because the additional operator is already written to metadata). So I keep the check before WriteStatefulOperatorMetadataRule and will omit the check if metadata is empty.
It is also worth noting that if we do not perform the check before writing to metadata and fail the query, untruthful info will be written to state metadata.

* Read from existing operator state metadata that contains the same operator id
* with current operator id. Check if they contains the same operator name.
* Throw errors if not match.
*/
object checkOperatorInMetadata extends SparkPlanPartialRule {
override val rule: PartialFunction[SparkPlan, SparkPlan] = {
case stateStoreWriter: StateStoreWriter if isFirstBatch =>
val opId = stateStoreWriter.getStateInfo.operatorId
try {
val metadataPathToCheck = new Path(checkpointLocation, opId.toString)
logInfo("Reading from operator metadata, check if stateful operator with " +
"the same id from committed batch is the same operator of current stateful operator. " +
s"Stateful operator metadata path to check: ${metadataPathToCheck.toString}")
val operatorMetadata: OperatorStateMetadataV1 = new OperatorStateMetadataReader(
metadataPathToCheck, hadoopConf).read().asInstanceOf[OperatorStateMetadataV1]
val operatorInMetadata = operatorMetadata.operatorInfo.operatorName
if (operatorMetadata.operatorInfo.operatorName != stateStoreWriter.shortName) {
throw QueryExecutionErrors.statefulOperatorNotMatchInStateMetadataError(
opId, stateStoreWriter.shortName, operatorInMetadata)
}
} catch {
case e: java.io.FileNotFoundException =>
// no need to throw fatal error
logWarning("Error reading metadata path for stateful operator. " +
"This may due to no prior committed batch, or previously run on lower versions. " +
"Trying to read operator metadata for stateful operator " +
s"$opId: ${e.toString}")
case e: Exception =>
throw e
}
stateStoreWriter
}
}

object WriteStatefulOperatorMetadataRule extends SparkPlanPartialRule {
override val rule: PartialFunction[SparkPlan, SparkPlan] = {
case stateStoreWriter: StateStoreWriter if isFirstBatch =>
Expand Down Expand Up @@ -389,6 +425,9 @@ class IncrementalExecution(

override def apply(plan: SparkPlan): SparkPlan = {
val planWithStateOpId = plan transform composedRule
// Check operator name, fail the query if operator changes;
// Will not change rule
planWithStateOpId transform checkOperatorInMetadata.rule
// The rule doesn't change the plan but cause the side effect that metadata is written
// in the checkpoint directory of stateful operator.
planWithStateOpId transform WriteStatefulOperatorMetadataRule.rule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ trait StateStoreWriter extends StatefulOperator with PythonSQLMetrics { self: Sp
}

/** Name to output in [[StreamingOperatorProgress]] to identify operator type */
protected def shortName: String = "defaultName"
def shortName: String = "defaultName"

/**
* Should the MicroBatchExecution run another batch based on this stateful operator and the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package org.apache.spark.sql.execution.streaming.state

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.{Column, Row}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Append
import org.apache.spark.sql.execution.datasources.v2.state.{StateDataSourceUnspecifiedRequiredOption, StateSourceOptions}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, MemoryStream}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
import org.apache.spark.sql.streaming.OutputMode.Complete
import org.apache.spark.sql.streaming.OutputMode.{Complete, Update}
import org.apache.spark.sql.test.SharedSparkSession


Expand Down Expand Up @@ -215,4 +217,93 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
Map("optionName" -> StateSourceOptions.PATH))
}

test("Operator metadata path non-existence should not fail query") {
withTempDir { checkpointDir =>
val inputData = MemoryStream[Int]
val aggregated =
inputData.toDF()
.groupBy($"value")
.agg(count("*"))
.as[(Int, Long)]

testStream(aggregated, Complete)(
StartStream(checkpointLocation = checkpointDir.toString),
AddData(inputData, 3),
CheckLastBatch((3, 1)),
StopStream
)

// Delete operator metadata path
val metadataPath = new Path(checkpointDir.toString, s"state/0/_metadata/metadata")
val fm = CheckpointFileManager.create(new Path(checkpointDir.getCanonicalPath), hadoopConf)
fm.delete(metadataPath)

// Restart the query
testStream(aggregated, Complete)(
StartStream(checkpointLocation = checkpointDir.toString),
AddData(inputData, 3),
CheckLastBatch((3, 2)),
StopStream
)
}
}

test("Restarting query - " +
"checking operator name of the same operator id is the same in the metadata") {
withTempDir { checkpointDir =>
val inputData = MemoryStream[Int]
val stream = inputData.toDF().withColumn("eventTime", timestamp_seconds($"value"))

testStream(stream
.withWatermark("eventTime", "10 seconds")
.dropDuplicatesWithinWatermark())(
StartStream(checkpointLocation = checkpointDir.toString),
AddData(inputData, 1),
ProcessAllAvailable(),
StopStream
)

def checkOpChangeError(opName: String, ex: Throwable): Unit = {
checkError(ex.asInstanceOf[SparkRuntimeException],
"STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA", "42K03",
Map("operatorId" -> 0.toString,
"currentOperatorName" -> opName,
"stateMetadataOperatorName" -> "dedupeWithinWatermark")
)
}

testStream(stream.dropDuplicates(), Append)(
StartStream(checkpointLocation = checkpointDir.toString),
AddData(inputData, 2),
ExpectFailure[SparkRuntimeException] {
jingz-db marked this conversation as resolved.
Show resolved Hide resolved
(t: Throwable) => {
checkOpChangeError("dedupe", t)
}
}
)

testStream(stream.groupBy("value").count(), Update)(
StartStream(checkpointLocation = checkpointDir.toString),
AddData(inputData, 3),
ExpectFailure[SparkRuntimeException] {
(t: Throwable) => {
checkOpChangeError("stateStoreSave", t)
}
}
)

testStream(stream
.groupBy(session_window($"eventTime", "10 seconds").as("session"), $"value")
.agg(count("*").as("numEvents")), Complete)(
StartStream(checkpointLocation = checkpointDir.toString),
AddData(inputData, 3),
ExpectFailure[SparkRuntimeException] {
(t: Throwable) => {
checkOpChangeError("sessionWindowStateStoreSaveExec", t)
}
}
)
}
}
}