New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-46906][SS] Add a check for stateful operator change for streaming #44927
Conversation
@@ -3317,6 +3317,12 @@ | |||
], | |||
"sqlState" : "XXKST" | |||
}, | |||
"STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA" : { | |||
"message" : [ | |||
"Streaming stateful operator name does not match with the operator in state metadata with the same operator id (id: <operatorId>). Stateful Operator name for current batch: <currentOperatorName>; Operator name in the state metadata: <stateMetadataOperatorName>." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we explain why this occur to the customer? Like "changing stateful operator of existing streaming query is not allowed."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -184,6 +185,41 @@ class IncrementalExecution( | |||
} | |||
} | |||
|
|||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe better to compare in higher level - construct a map opId -> operatorName for both physical plan and state metadata, and compare two.
With this approach we can also give a better error message for all cases 1) new addition of stateful operator(s), 2) removal of existing stateful operator(s), 3) replacement of existing stateful operator(s). 3) can also happen if someone tries to replace dropDuplicates with dropDuplicatesWithinWatermark after encountering some issue.
Worth noting that we consider state metadata as a source of truth in this PR - so if there is no state metadata prior to this, it would be just same as letting Spark create a new state metadata file and we do comparison (this cannot perform any real check though). So the check can be also done after executing physical planning rules, maybe at the end of state.apply()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review Jungtaek! I also like the idea of adding a map.
So the check can be also done after executing physical planning rules, maybe at the end of state.apply()
I tried to add the check after WriteStatefulOperatorMetadataRule
but this will miss detecting for adding an operator after restart (because the additional operator is already written to metadata). So I keep the check before WriteStatefulOperatorMetadataRule
and will omit the check if metadata is empty.
It is also worth noting that if we do not perform the check before writing to metadata and fail the query, untruthful info will be written to state metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another pass.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
Outdated
Show resolved
Hide resolved
@@ -387,8 +433,29 @@ class IncrementalExecution( | |||
rulesToCompose.reduceLeft { (ruleA, ruleB) => ruleA orElse ruleB } | |||
} | |||
|
|||
private def checkOperatorValidWithMetadata(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we inline all the logic e.g. building opMapInMetadata and opMapInPhysicalPlan to here? I don't see we use these fields other than here. Let's scope fields and methods be narrower whenever possible.
That said, You don't need to use rule to build opMapInPhysicalPlan. Let's just use foreach to traverse the plan and build opMapInPhysicalPlan.
val opInCurBatch = opMapInPhysicalPlan.getOrElse(opId, "not found") | ||
if (opInMetadata != opInCurBatch) { | ||
throw QueryExecutionErrors.statefulOperatorNotMatchInStateMetadataError( | ||
opMapInMetadata.values.toSeq, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we print out association between opId and opName in error message? It may be uneasy to understand what is mismatching only with opNames.
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
Outdated
Show resolved
Hide resolved
docs/sql-error-conditions.md
Outdated
|
||
[SQLSTATE: 42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) | ||
|
||
Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user changes stateful operator of existing streaming query. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: when user adds/removes/changes
...c/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala
Outdated
Show resolved
Hide resolved
StopStream | ||
) | ||
|
||
def checkOpChangeError(OpsInMetadataSeq: Seq[String], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if you are using more than two lines to define the method, param should start at second line of definition. (In other words, all params should appear at the same indentation.)
https://github.com/databricks/scala-style-guide?tab=readme-ov-file#indent
Also, param should start with lowercase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any automation tool for checking this other than ./dev/scalafmt
? This command is listed on the spark developer tool wiki, and is actually quite messy - it will touch all existing files other than only formatting my code change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe community has to run the formatter at once for the whole codebase. I'm not sure scalafmt can deal with the whole styles though. It is still good to familiarize Scala style guide for Databricks; it doesn't only contain styles automation can handle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it! Thanks Jungtaek!
...c/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala
Outdated
Show resolved
Hide resolved
...c/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala
Outdated
Show resolved
Hide resolved
} | ||
) | ||
|
||
// remove operator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please split down to separate test case if this is fully isolated with other check.
Btw, this actually brings up food for thought. Do we disallow stateful query to be stateless? E.g. could you simply test the removal of stateful operator with checkpointDir rather than spinning up another checkpoint?
It's OK if we have been supporting the case (although undocumented) and we keep supporting the case. If not, we could just test the case via using checkpointDir.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we disallow stateful query to be stateless?
We don't allow even before adding the operator check. Streaming will throw error with message as "state path not found".
E.g. could you simply test the removal of stateful operator with checkpointDir rather than spinning up another checkpoint?
Done. Restarting a stateless query from a stateful query will now trigger error with message as:
[STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA] Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user adds/removes/changes stateful operator of existing streaming query.
Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)]; Stateful operators in current batch: []. SQLSTATE: 42K03
1ae65c4
to
72dfeb9
Compare
72dfeb9
to
7ecb493
Compare
Thanks Jungtaek for your thorough code review! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 with super-nit (assume you'll make a fix). Thanks for the work!
testStream(restartStream, Update)( | ||
StartStream(checkpointLocation = checkpointDir.toString), | ||
AddData(inputData, 3), | ||
ExpectFailure[SparkRuntimeException] { t => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: another {} is unnecessary after { t =>
. multiple lines are allowed after =>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 pending CI.
@jingz-db Mind retriggering GA? You can either manually do this in your fork or simply push an empty commit to do this automatically. Thanks! |
Thanks! Merging to master. |
What changes were proposed in this pull request?
Currently user will get a misleading error as org.apache.spark.sql.execution.streaming.state.StateSchemaNotCompatible if restarting query in the same checkpoint location and changing their stateful operator. This PR catches such errors and throws a new error with informative message.
After physical planning, before execution phase, we will read from state metadata with the current operator id to fetch operator name of committed batch with the same operator id. If operator name does not match, throws the error.
Why are the changes needed?
The current error message is misleading to users. We should provide users with message that can guide them to the real root cause of error.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests.
Was this patch authored or co-authored using generative AI tooling?
No