Skip to content

[SPARK-57001][SS] Hoist isStateful / containsStatefulOperator onto LogicalPlan#56057

Closed
HeartSaVioR wants to merge 2 commits into
apache:masterfrom
HeartSaVioR:hoist-isStateful-logicalplan
Closed

[SPARK-57001][SS] Hoist isStateful / containsStatefulOperator onto LogicalPlan#56057
HeartSaVioR wants to merge 2 commits into
apache:masterfrom
HeartSaVioR:hoist-isStateful-logicalplan

Conversation

@HeartSaVioR
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Introduce two new methods on LogicalPlan:

  • def isStateful: Boolean = false -- per-operator declaration of whether the node is a streaming stateful operator (kept across microbatches).
  • def containsStatefulOperator: Boolean -- subtree-level check, memoized.

Override isStateful on the operators that are streaming stateful: Aggregate, Join (stream-stream), GlobalLimit, Distinct, Deduplicate, DeduplicateWithinWatermark, FlatMapGroupsWithState, FlatMapGroupsInPandasWithState, TransformWithState, TransformWithStateInPySpark.

Why are the changes needed?

This will be used as a convenient utility for future works. Currently we ask each rule to re-derive the stateful-operator check via pattern matching.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

N/A.

Was this patch authored or co-authored using generative AI tooling?

Yes. Generated-by: Claude 4.6 Opus

…o `LogicalPlan`

### What changes were proposed in this pull request?

Introduce two new methods on `LogicalPlan`:

- `def isStateful: Boolean = false` -- per-operator declaration of whether the node
  is a streaming stateful operator (kept across microbatches).
- `def containsStatefulOperator: Boolean` -- subtree-level check, memoized.

Override `isStateful` on the operators that are streaming stateful:
`Aggregate`, `Join` (stream-stream), `GlobalLimit`, `Distinct`,
`Deduplicate`, `DeduplicateWithinWatermark`, `FlatMapGroupsWithState`,
`FlatMapGroupsInPandasWithState`, `TransformWithState`,
`TransformWithStateInPySpark`.

### Why are the changes needed?

Several upcoming streaming-side rules (e.g. an optimizer rule that widens
`AttributeReference` nullability around stateful operators) need an
`isStateful` / `containsStatefulOperator` notion on `LogicalPlan` itself
rather than having each rule re-derive the stateful-operator check via
pattern matching.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing `UnsupportedOperationCheckerSuite` and streaming test suites
cover the behavior preservation. No new tests are added in this commit;
subsequent PRs that build on `isStateful` will add targeted tests.

### Was this patch authored or co-authored using generative AI tooling?

Yes.
@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

cc. @cloud-fan Please take a look, thanks!

Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What this PR does

Adds two methods on LogicalPlan:

  • def isStateful: Boolean = false — per-node predicate, overridden on the ten logical operators that become StateStoreWriters at execution (Aggregate, stream-stream Join, GlobalLimit, Distinct, Deduplicate, DeduplicateWithinWatermark, FlatMapGroupsWithState, FlatMapGroupsInPandasWithState, TransformWithState, TransformWithStateInPySpark), each gated on child.isStreaming (or left.isStreaming && right.isStreaming for Join).
  • def containsStatefulOperator: Boolean — memoized subtree-level OR, backed by a private[this] lazy val.

Design notes

The two-method shape is well-motivated: consumers ask two distinct questions. The per-node question ("is this the stateful op?") is what lets plan.foreach { sub => if (sub.isStateful) ... } and plan.collect { case p if p.isStateful => p } replace ten-arm pattern matches. The subtree question ("does this plan contain any?") is what MicroBatchExecution.disableAQESupportInStatelessIfUnappropriated and SequentialUnionAnalysis care about. isStreaming only ever needs the subtree question (only leaf relations introduce the property), which is why one method suffices there.

Coverage check against the physical StateStoreWriter operators (StreamingGlobalLimitExec, StateStoreSaveExec / SessionWindowStateStoreSaveExec from Aggregate, StreamingDeduplicateExec, StreamingDeduplicateWithinWatermarkExec, StreamingSymmetricHashJoinExec, FlatMapGroupsWithStateExec, TransformWithStateExec(InPySpark)): the override list is complete. StreamingLocalLimitExec is correctly excluded (not a StateStoreWriter); UpdateEventTimeColumnExec is correctly excluded (not stateful).

One concern worth addressing in this PR

The new isStateful set matches MicroBatchExecution.containsStatefulOperator exactly, but diverges from UnsupportedOperationChecker.isStatefulOperation in two places: Deduplicate counts as stateful here regardless of whether its keys carry an event-time column, and streaming GlobalLimit is included here but not there. Those two checks aren't really competing — isStatefulOperation is scoped to the chained-watermark-correctness analysis ("ops that can emit late rows"), while the PR's isStateful is the broader runtime "uses a StateStoreWriter" view — but the PR description ("currently we ask each rule to re-derive the stateful-operator check via pattern matching") implies this should replace such pattern matches, which would include isStatefulOperation. Migrating those callers blindly would be a silent semantic change.

The fix is documentation: pin down in the Scaladoc what question this API answers (runtime / StateStoreWriter view) and explicitly note that isStatefulOperation answers a narrower question and is not a straightforward callee for replacement. See the inline comment on LogicalPlan.scala.

Follow-ups (non-blocking)

  • MicroBatchExecution.containsStatefulOperator (the private def in the AQE-disable check, around line 527) is now exactly equivalent to analyzedPlan.containsStatefulOperator — natural cleanup in a follow-up.
  • A small unit test (batch plan → false; streaming aggregate/dedup/limit/join/etc. → containsStatefulOperator == true; memoization fires once) would help guard the future migrations.

/** Marks if a streaming node is a stateful operator. */
def isStateful: Boolean = false

/** Marks if a subplan contains a stateful operator. */
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two suggestions for the Scaladoc:

  1. "Marks if" is awkward — these return a boolean rather than marking anything. "Whether …" or "Returns true if …" is more conventional. For containsStatefulOperator, please also say it includes this (the body reads isStateful || children.exists(...)).

  2. More substantively, please nail down what "stateful" means here. The new definition is the streaming-runtime view (any operator that becomes a StateStoreWriter at execution) and matches MicroBatchExecution.containsStatefulOperator exactly. It diverges from UnsupportedOperationChecker.isStatefulOperation on two operators: Deduplicate is stateful here regardless of whether keys carry an event-time column, and streaming GlobalLimit is included here but not there. Calling that out — and noting that isStatefulOperation is intentionally narrower (scoped to the chained-watermark correctness check) and isn't a drop-in replacement target — will keep future PRs from silently swapping callers and changing analyzer semantics. Worth naming which existing checks are intended replacement targets, too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1, makes sense for the wording - I'll update.

For 2, I think it's mostly a bug if there are divergence. Deduplicate should be marked as stateful regardless of the event time column. Streaming GlobalLimit should be also marked as stateful, although it's almost a niche usage.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK looks like it's not about this PR but about the way we check the stateful operator in UnsupportedOperationChecker. It's a bit nuanced and it's not the same with this, but good to unify it if unification doesn't hurt. Though it should be a follow-up rather than this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, e1e208e

As commented as above, 2 isn't addressed. FYI.

final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
override protected def withNewChildInternal(newChild: LogicalPlan): Distinct =
copy(child = newChild)
override def isStateful: Boolean = child.isStreaming
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This override is non-obvious at the Distinct layer — Distinct doesn't directly become a StateStoreWriter. The existing comment in UnsupportedOperationChecker.isStatefulOperation explains it: "Since the Distinct node will be replaced to Aggregate in the optimizer rule ReplaceDistinctWithAggregate, here we also need to check all Distinct node by assuming it as Aggregate." Worth preserving that rationale here, or at least a // see ReplaceDistinctWithAggregate one-liner.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed - it's good to have a comment as it's not directly converted in physical planning but rather go through operator rewrite. Let's have a code comment to briefly explain it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, e1e208e

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

HeartSaVioR commented May 24, 2026

Just 2 cents.

AI comment is very very verbose - this PR has only 21 lines of changes and what I got from AI is likely (more than) 50 lines of wall of text.

Can we at least push LLM to be concise and brief, or can we adjust how to post the comment, so that PR author can easily ignore the part of comment? I don't believe PR author needs to read the whole content; major part is for future reviewer. Even worse, if I were to generate the PR description with LLM and don't write it by myself, both of PR description and PR comment would be very verbose and human would have to spend time to read through it.

My biggest worry of this is, given the cost of reading through verbose wall of text, PR author will even skip trying to understand PR comments and simply give the PR link to LLM and address review comments, making the whole development loop to be almost human-less. I'm not sure the community is on the same page this is the way to go.

@HeartSaVioR HeartSaVioR requested a review from cloud-fan May 24, 2026 07:09
@cloud-fan
Copy link
Copy Markdown
Contributor

good point. I need the summary notes to understand the PR, but I don't need to post them as the information is likely already known by the PR author. Will improve it soon.

Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

Thanks for the review!

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

Merging to master/4.x.

HeartSaVioR added a commit that referenced this pull request May 26, 2026
…o `LogicalPlan`

### What changes were proposed in this pull request?

Introduce two new methods on `LogicalPlan`:

- `def isStateful: Boolean = false` -- per-operator declaration of whether the node is a streaming stateful operator (kept across microbatches).
- `def containsStatefulOperator: Boolean` -- subtree-level check, memoized.

Override `isStateful` on the operators that are streaming stateful: `Aggregate`, `Join` (stream-stream), `GlobalLimit`, `Distinct`, `Deduplicate`, `DeduplicateWithinWatermark`, `FlatMapGroupsWithState`, `FlatMapGroupsInPandasWithState`, `TransformWithState`, `TransformWithStateInPySpark`.

### Why are the changes needed?

This will be used as a convenient utility for future works. Currently we ask each rule to re-derive the stateful-operator check via pattern matching.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

N/A.

### Was this patch authored or co-authored using generative AI tooling?

Yes. Generated-by: Claude 4.6 Opus

Closes #56057 from HeartSaVioR/hoist-isStateful-logicalplan.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants