[SPARK-57003][SQL][SS] Widen stateful operator output and state schema nullability#56061
[SPARK-57003][SQL][SS] Widen stateful operator output and state schema nullability#56061HeartSaVioR wants to merge 9 commits into
Conversation
|
cc. @cloud-fan Please take a look, thanks! |
cloud-fan
left a comment
There was a problem hiding this comment.
Summary
Prior state and problem. A stateful operator's state schema is built from its input attributes, and historically the schema (including nullability) gets recorded on the first batch and re-validated against the input schema on every subsequent batch. IncrementalExecution.optimizedPlan is recomputed every microbatch — same analyzed plan, but the optimizer runs fresh each batch and observes per-batch data state. Rules like PropagateEmptyRelation collapse one branch of a Union when that branch is empty in some microbatch: the surviving branch's nullability becomes the Union's output nullability, propagates into the stateful operator above, and the stateful operator's own child.output-derived output flips with it. Two downstream consequences from that drift: (1) state schema drift triggers STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE on restart, since the existing equalsIgnoreNameAndCompatibleNullability check rejects nullable→non-nullable narrowing; (2) operators above the stateful op see "non-nullable" output for one batch, codegen skips null checks, then state-restored rows from a prior nullable batch carry actual nulls (NPE).
Design approach. Three independent components, all gated on spark.sql.streaming.statefulOperator.alwaysNullableOutput.enabled (default true, pinned per-query via OffsetSeq at batch 0 so existing queries keep their old behavior):
- (a) Stateful physical execs widen the state key/value schemas they register via
validateAndMaybeEvolveStateSchemaand pass tomapPartitionsWith*StateStoreto fully nullable (outer + nestedasNullable). This stabilizes the on-disk schema. - (b) Stateful logical operators (
Aggregate,Join,Distinct,Deduplicate,DeduplicateWithinWatermark,GlobalLimit,FlatMapGroupsWithState,TransformWithState, etc.) and their physical execs widen their declaredoutputto fully nullable. Drivers above see nullable inputs even if the optimizer would have inferred non-nullable.isStateful/containsStatefulOperatoronLogicalPlan(from the previous commit) provide the gating mechanism. - (c) New optimizer rule
WidenStatefulOperatorAttributeNullabilityruns afterUpdateAttributeNullabilityin both the main optimizer (viaIncrementalExecution.optimizedPlan) and AQE. It bottom-up walks subtrees containing a stateful operator and deep-widensAttributeReferences whoseexprIdmatchesp.output ++ p.children.flatMap(_.output). This catches references that the per-opoutputoverride on its own would not (e.g. nested-struct nullability inside expression bodies, references in ancestorProject/Filter).
Key design decisions.
- Conf pinned via
OffsetSeqat batch 0 (new entry inOffsetSeqMetadata.relevantSQLConfs+relevantSQLConfDefaultValueswith"false"default for pre-existing queries). Restart-safe migration. isStatefullives onLogicalPlanitself rather than a marker trait, so the rule can use a uniformcontainsStatefulOperatorcheck without re-deriving statefulness via pattern matching against the union of stateful types. Tradeoff: tiny generic-API bloat for a streaming-only concept onLogicalPlan. Defensible.- The state schema compatibility check (
StateSchemaCompatibilityChecker) is unchanged — the design relies on both stored and new schemas being widened so the existing strict-nullability check trivially passes. (See the inline comment on the SQLConf doc string.)
Implementation sketch. New file WidenStatefulOperatorAttributeNullability.scala (catalyst/analysis) holds both the helper object WidenStatefulOpNullability (deep-widen + state-schema widen + output widen) and the rule. Stateful logical operators in basicLogicalOperators.scala, object.scala, pythonLogicalOperators.scala get isStateful + output-widening overrides. Stateful execs in statefulOperators.scala, streamingLimits.scala, StreamingSymmetricHashJoinExec.scala, TransformWithStateExec.scala, TransformWithStateInPySparkExec.scala, FlatMapGroupsWithStateExec.scala, FlatMapGroupsInPandasWithStateExec.scala get output overrides; the agg / dedup / join execs also widen the state schemas they register and open. IncrementalExecution.optimizedPlan and AQEOptimizer get the new rule batch. New regression suite StreamingStatefulOperatorNullabilityDriftSuite covers the Union-branch-drop restart for aggregate / dedup / dedup-within-watermark, plus the codegen-NPE struct-key case and rule-level scope / recursion checks.
General notes
- Test coverage gaps. The new drift suite covers Aggregate, Deduplicate, DeduplicateWithinWatermark. Missing union-branch-drop restart cases for: stream-stream
Join,FlatMapGroupsWithState,TransformWithState. The last two are especially worth adding given the component-(a) gap noted inline on the helper Scaladoc — their grouping-key state schemas can still drift. - Import ordering is off in six files. The new
import org.apache.spark.sql.catalyst.analysis.WidenStatefulOpNullabilityis placed afterorg.apache.spark.sql.types.*/org.apache.spark.sql.streaming.*inTransformWithStateExec.scala,TransformWithStateInPySparkExec.scala,FlatMapGroupsWithStateExec.scala,FlatMapGroupsInPandasWithStateExec.scala,StreamingSymmetricHashJoinExec.scala,streamingLimits.scala. Spark convention is alphabetical withinorg.apache.spark.*, so it should go among the otherorg.apache.spark.sql.catalyst.*imports. [SPARK-XXXXX]placeholder. Both commits in this branch still have[SPARK-XXXXX]in the subject; the PR description referencesSPARK-57003. Needs the JIRA ID before merge.
This is a substantive fix for a long-standing streaming / optimizer interaction issue, and the three-component design is sound. Inline comments below cover the more substantive items.
| * - (a) `widenStateSchema`: explicit `asNullable` at every state-schema construction | ||
| * site in each stateful physical exec. |
There was a problem hiding this comment.
Component (a) is described as applying "at every state-schema construction site in each stateful physical exec," but several execs are missing the explicit widening:
FlatMapGroupsWithStateExec.validateAndMaybeEvolveStateSchema(FlatMapGroupsWithStateExec.scala~L203):groupingAttributes.toStructTypeis registered un-widened; and the twoStateStore.get/mapPartitionsWithStateStorecalls indoExecute(~L247-263) open state stores with the un-widened key schema.FlatMapGroupsInPandasWithStateExecinherits the same base, so it has the same gap.TransformWithStateExec:getColFamilySchemas'sdefaultSchema(~L143-145),validateAndMaybeEvolveStateSchema(viavalidateAndWriteStateSchemaat ~L380), and theStateStore.get/mapPartitionsWithStateStorecalls (~L406-417, ~L428-435) all usekeyExpressions.toStructType/keyEncoder.schemaun-widened.TransformWithStateInPySparkExec: same pattern.
Grouping attributes are input-derived and subject to the same nullability drift the rest of the fix is preventing. Component (c) may incidentally widen the references via the logical-plan rewrite, but having component (a) skip these execs makes the defense-in-depth claim of the design false and leaves a real gap if (c) misses for any reason (rule excluded, unresolved subplan, etc.). Either add widenStateSchema(...) at these sites for consistency with StateStoreSaveExec / BaseStreamingDeduplicateExec / StreamingSymmetricHashJoinExec, or tighten the wording here to describe which execs are intentionally exempt and why.
There was a problem hiding this comment.
Let's make sure nullability widening is applied to those operators as well - I thought we did it and looks like we missed it.
| * 1. At a stateful operator: rewrite every `AttributeReference` inside the operator's | ||
| * internal expressions via [[WidenStatefulOpNullability#deepWidenAttribute]] whenever | ||
| * the attribute's `exprId` matches one in the operator's own (already widened via | ||
| * component (b)) `output`. | ||
| * | ||
| * 2. At non-stateful ancestor operators: rewrite `AttributeReference`s whose `exprId` is | ||
| * in `children.flatMap(_.output)` (already widened thanks to component (b)). |
There was a problem hiding this comment.
These two bullets describe a split — (1) "at a stateful operator" matches against "the operator's own ... output", (2) "at non-stateful ancestor operators" matches against children.flatMap(_.output). But the implementation below uses the same union (p.output ++ p.children.flatMap(_.output)) for every node it visits, with no branch on isStateful. Either rewrite this section to describe the actual uniform behavior, or change the code to take different exprId sources for the two cases (the more conservative version would also help with the over-widening concern in the next comment).
There was a problem hiding this comment.
Let's address this as a part of next comment.
| case p: LeafNode => p | ||
| case p if !p.containsStatefulOperator => p | ||
| case p => | ||
| val widenableExprIds: Set[ExprId] = (p.output ++ p.children.flatMap(_.output)) |
There was a problem hiding this comment.
Because widenableExprIds always pulls from both p.output and all of p.children.flatMap(_.output), when an operator has a mix of stateful and non-stateful children (e.g. a non-stream-stream Join above a streaming aggregate on one side and a batch source on the other), references to the non-stateful sibling's attributes are also deep-widened. The docstring above implies this happens only against attributes "already widened thanks to component (b)" — but the non-stateful sibling's attributes are not. The widening is always correctness-safe (nullable is a valid weakening), so this is a docs / over-widening concern, not a bug. Worth either restricting to children whose subtrees contain a stateful operator (p.children.filter(_.containsStatefulOperator).flatMap(_.output)), or acknowledging the over-widening in the comment so future readers don't expect the narrower behavior the docstring promises.
There was a problem hiding this comment.
I think it's more native to restrict the widening to the proper scope, as long as it doesn't trigger major complication. Let's see whether it is simple enough - if not I'll keep the code but update the relevant code comment (and maybe PR description).
| class StateSchemaCompatibilityCheckerWithNullabilityWideningDisabledSuite | ||
| extends StateSchemaCompatibilityCheckerTestMixin { | ||
|
|
||
| private def applyNewSchemaToNestedFieldInValue(newNestedSchema: StructType): StructType = { | ||
| applyNewSchemaToNestedField(valueSchema, newNestedSchema, "value3") | ||
| override protected def sparkConf: org.apache.spark.SparkConf = { | ||
| super.sparkConf.set(SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT.key, "false") | ||
| } |
There was a problem hiding this comment.
The only thing this suite changes vs the parent is setting STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT=false. But StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema doesn't read that conf — the schemas it receives are exactly what the test passes in, and no production widening helper is invoked from these tests. The four storing nullable column into non-nullable column ... tests and the two changing the name of nested field ... tests therefore pass identically with the conf at either value. The conf separation is cosmetic.
Two follow-ups:
- The
changing the name of nested field ...pair is unrelated to nullability — there's no reason for these to live in a "NullabilityWideningDisabled" suite. Move them back to the main suite. - For the four nullability tests, the intent appears to be "these are no longer reachable in production with widening on," but the unit tests don't simulate that path — they exercise the checker in isolation. Either consolidate them back into the main suite (they still validate the unchanged checker behavior, which is worth keeping), or rework the setup so the conf actually has an observable effect (e.g. call through the production widening helpers in the test).
There was a problem hiding this comment.
Yeah good finding. StateSchemaCompatibilityChecker used the config during development of this PR and we changed it while the test is left behind. Mostly moving back to main suite (and reverting the refactor) seems valid.
| import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LocalRelation, Project} | ||
| import org.apache.spark.sql.types.IntegerType | ||
|
|
||
| withSQLConf(SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT.key -> "true") { |
There was a problem hiding this comment.
STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT.key -> "true" is redundant — true is the default. Either drop the withSQLConf wrapper, or change to "false" and assert the rule no-ops (which would be a useful additional case).
There was a problem hiding this comment.
I'll check whether we have a test for disabling the config - if there is one, we can just remove withSQLConf here. Otherwise obviously we need to have a separate test, with removing the withSQLConf here.
| import org.apache.spark.sql.execution.streaming.state._ | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.streaming._ | ||
| import org.apache.spark.sql.catalyst.analysis.WidenStatefulOpNullability |
There was a problem hiding this comment.
Import out of order — org.apache.spark.sql.catalyst.analysis.WidenStatefulOpNullability should sit alongside the other org.apache.spark.sql.catalyst.* imports near the top of the org.apache.spark.* block, not after org.apache.spark.sql.streaming._. Same issue in TransformWithStateInPySparkExec.scala, FlatMapGroupsWithStateExec.scala, FlatMapGroupsInPandasWithStateExec.scala, StreamingSymmetricHashJoinExec.scala, and streamingLimits.scala.
fda259a to
ad5ac11
Compare
c40fceb to
b7ca2a4
Compare
…onf.scala Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
cloud-fan
left a comment
There was a problem hiding this comment.
Re-review (HEAD f69c587): No new code issues. The two real findings from the prior round are still pending:
FlatMapGroupsWithStateExec/TransformWithStateExecstill pass un-widened state schemas atvalidateAndMaybeEvolveStateSchemaand the executor-sideStateStore.get/mapPartitionsWithStateStore/StateStoreProvider.createAndInitsites (3289376355).WidenStatefulOperatorAttributeNullability.applystill widens attributes from bothp.outputand every child's output, so a node with mixed stateful / non-stateful children would also widen references to the non-stateful side (3289376367).
Both were acknowledged in the prior thread; flagging only to confirm they're not yet in HEAD.
…estrict over-widening, add tests
- Component (a): add widenStateSchema to FlatMapGroupsWithStateExec,
TransformWithStateExec, and TransformWithStateInPySparkExec
(including user-defined state variable col family schemas)
- Component (c): restrict child output widening to stateful subtrees
only at non-stateful ancestors; stateful operators still widen all
children's output for internal expression references
- Revert cosmetic test suite refactor in StateSchemaCompatibilityCheckerSuite
- Drop redundant withSQLConf("true") in drift suite
- Replace non-deterministic current_timestamp() with timestamp_seconds()
- Add union-branch-drop restart tests for stream-stream join,
flatMapGroupsWithState, and transformWithState
- Improve assertJournaledStateSchemaAllNullable to discover all state
stores and schema formats (v2 file, v3 directory, _stateSchema)
The test expected non-existent sub-condition STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE.NULLABILITY_CHANGED but the actual error uses STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE without a sub-condition.
|
I've addressed review comments - @cloud-fan would you mind taking another look? Thanks! |
cloud-fan
left a comment
There was a problem hiding this comment.
Re-review status (HEAD e675474): 6 prior findings addressed (3289376355, 3289376359, 3289376397, 3289376398, 3289376402, and the SQLConf doc claim in 3289376372 was rewritten to be accurate), 1 partially addressed (3289376367 — see inline). 2 new (1 late catch I missed last round, 1 follow-up to 3289376367). Approving — the remaining items are doc/comment cleanups that don't block.
| .doc("When true, every streaming stateful operator reports its output schema with " + | ||
| "nullable=true on all columns (including nested struct fields, array elements, and " + | ||
| "map values), the state schema is widened at every construction site, and the state " + | ||
| "schema is widened at every construction site, so the existing state schema " + |
There was a problem hiding this comment.
Apologies — late catch from the prior re-review, the suggestion accept in f69c587 left two copies of the same clause. Line 3448 ends ... the state schema is widened at every construction site, and the state and line 3449 starts schema is widened at every construction site, so .... One should go — likely the intent was to combine the original "state schema is widened" + the new "compatibility check trivially passes" parts into a single sentence.
| "schema is widened at every construction site, so the existing state schema " + | |
| "map values), and the state schema is widened at every construction site, so the " + | |
| "existing state schema compatibility check trivially passes regardless of input " + | |
| "nullability. " + |
There was a problem hiding this comment.
I think this should change L3448 if this is the suggestion? I'll fix it manually.
| p.children.filter(_.containsStatefulOperator).flatMap(_.output) | ||
| } | ||
| val widenableExprIds: Set[ExprId] = | ||
| (p.output ++ childOutputs) |
There was a problem hiding this comment.
Filtering childOutputs to stateful subtrees only tightens half of the union — p.output is still pulled in unconditionally. For an Inner / Outer / Full Join, Join.output = left.output ++ right.output carries the non-stateful side's exprIds, so for a non-stream-stream Join above [stateful, batch], references to the batch side in the join condition still end up in widenableExprIds and still get widened. (For LeftSemi / LeftAnti where Join.output = left.output, the filter does fully help.)
Not a blocker for this PR, but it means the comment at lines 90-91 ("only children whose subtrees contain a stateful operator are included, to avoid unnecessary widening of non-stateful siblings") slightly overstates the scope for mixed-stateful Joins. Two ways to reconcile in a follow-up:
- Tighten the code: compute
statefulExprIdsfrom stateful children's outputs and use(p.output.filter(ar => statefulExprIds.contains(ar.exprId)) ++ statefulExprIds-attrs)so the Inner/Outer-join case is also handled. - Or keep the partial fix and weaken the comment to call out the mixed-stateful
Joincaveat.
Happy with either path.
There was a problem hiding this comment.
Actually, stateless operator does not need to widen the output columns. The reason we widen the output columns for stateful operator is because we enforce the state schema to be nullable and also enforce the output schema to be nullable. We do not do the same with stateless operator. I'm making change.
| "restarts. The effective value is pinned per query via the offset log at batch 0, " + | ||
| "so pre-existing queries keep their original behavior; only newly started queries " + | ||
| "pick this up.") | ||
| .version("4.1.0") |
There was a problem hiding this comment.
self-review: This should be 4.3.0.
…g binding policy - Remove 4 duplicate test names in StateSchemaCompatibilityCheckerSuite (the original suite already had them) - Revert TWS getColFamilySchemas key/value widening: user-defined state variables have composite keys (groupingKey + userKey) where the user's key part should not be widened; the grouping key part is already widened by component (c) at the logical plan level - Add withBindingPolicy(ConfigBindingPolicy.SESSION) to the new config
What changes were proposed in this pull request?
Introduce a three-component fix for stateful-operator nullability drift, gated by
spark.sql.streaming.statefulOperator.alwaysNullableOutput.enabled(pinned per-query via the offset log):WidenStatefulOpNullability.widenStateSchema: every stateful physical exec widens its state key/value schema to fully nullable at construction. This coversStateStoreSaveExec,BaseStreamingDeduplicateExec,StreamingSymmetricHashJoinExec,FlatMapGroupsWithStateExec,TransformWithStateExec(including user-defined state variable col family schemas),TransformWithStateInPySparkExec, andStreamingGlobalLimitExec.WidenStatefulOpNullability.widenOutputForStatefulOp: every stateful logical and physical operator widens its declaredoutputto fully nullable.WidenStatefulOperatorAttributeNullability: an optimizer rule that widensAttributeReferences inside stateful ops' internal expressions and propagates upward through ancestor expressions. The rule usesresolveOperatorsUp(bottom-up) and scopes the widening precisely: at a stateful operator, all children's output is included (for internal expression references like grouping keys); at non-stateful ancestors, only children whose subtrees contain a stateful operator are included, avoiding unnecessary widening of non-stateful siblings. The node's ownp.outputis excluded for non-stateful ancestors because the bottom-up traversal guarantees children are already transformed.With the above fix, we aim to ensure the state schema to be "fully" nullable (top level column, nested column, and collection types) regardless of the input schema, and the output schema of the stateful operator to be also "fully" nullable as well. The change of output schema for stateful operator is necessary, because even if the input schema is non-nullable, state can produce the null value, hence the output can be nullable.
Why are the changes needed?
This has been a long standing issue of streaming engine vs Query Optimizer.
By the nature of streaming query, the query is meant to be long-running, in many cases spans to multiple Spark versions. Also, the logical plan is not always the same across batches (e.g. there are multiple stream sources and one of the source does not have a new data at batch N). This puts the streaming query to be affected by analyzer and optimizer.
The state schema of stateful operator is mostly determined by the input schema of the stateful operator, and nullability isn't an exception. If the input schema has a nullable column, state schema would have a nullable column. Vice versa with non-nullable column.
For Query Optimizer, one of the optimizations is to flip the nullability, say, nullable to non-nullable if appropriate. This can be done directly or indirectly, and the most problematic case is when the optimization is applied "selectively".
The one of easy example is the elimination of Union: for the streaming query with multiple streams using Union, batch N could have one stream be non-empty while another stream to be empty. For that case,
PropagateEmptyRelationcan drop emptyUnionbranches, causing a per-column nullability flip that propagates into a stateful operator's state schema across microbatches or restarts. This causes eitherSTATE_STORE_KEY_SCHEMA_NOT_COMPATIBLEon restart or a codegen NPE when state-restored rows carry nulls in columns declared non-nullable.Does this PR introduce any user-facing change?
No user-visible behavior change for new queries (all stateful operator outputs become nullable, which is semantically correct). Existing queries keep their original behavior via the offset log gate.
How was this patch tested?
New
StreamingStatefulOperatorNullabilityDriftSuitecovering:_stateSchema).deepWidenAttributerecursion into nested types.Was this patch authored or co-authored using generative AI tooling?
Yes. Generated-by: Claude 4.7 Opus