Skip to content

[SPARK-57196][SQL] Make UnionExec whole-stage codegen thread-safe#56252

Closed
gengliangwang wants to merge 2 commits into
apache:masterfrom
gengliangwang:spark-union-codegen-threadsafe
Closed

[SPARK-57196][SQL] Make UnionExec whole-stage codegen thread-safe#56252
gengliangwang wants to merge 2 commits into
apache:masterfrom
gengliangwang:spark-union-codegen-threadsafe

Conversation

@gengliangwang
Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

UnionExec whole-stage codegen fusion (SPARK-56482) kept per-emission codegen state in mutable instance fields on the plan node: currentEmittingChild (set in doProduce, read in doConsume to pick a child's projection) and numOutputRowsTerm (the once-per-stage metric term). This PR moves both fields to ThreadLocal, isolating the state to the single thread that runs a given doCodeGen pass.

Why are the changes needed?

A single UnionExec instance can have its whole-stage codegen driven by more than one thread at the same time: a reused exchange/subquery stage is generated concurrently with the main plan, and async subquery / dynamic-partition-pruning execution can overlap a driver-side doCodeGen. With the shared mutable field, a racing doProduce resets currentEmittingChild to -1 while another thread is still inside doConsume, tripping:

java.lang.IllegalArgumentException: requirement failed:
  UnionExec.doConsume invoked outside doProduce emission window

This surfaced as a flaky LogicalPlanTagInSparkPlanSuite.q2 failure (q2 contains a UNION, and union fusion is enabled by default). Each doCodeGen pass is itself single-threaded (produce -> doConsume run inline on one thread), so a ThreadLocal isolates the state per pass without the cross-thread race, while preserving the existing per-stage semantics (the metric term is still computed once per pass).

Does this PR introduce any user-facing change?

No. It removes an intermittent internal code-generation failure; the generated code and query results are unchanged.

How was this patch tested?

Added a UnionCodegenSuite test, "SPARK-57196: concurrent codegen of a shared UnionExec stage is thread-safe", that drives doCodeGen() on one shared fused UnionExec stage from 8 threads. It reproduces the "outside doProduce emission window" failure on the unpatched code and passes with this fix. Also verified the full UnionCodegenSuite (43 tests), its ANSI/AQE variants, and LogicalPlanTagInSparkPlanSuite q2 all pass.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Opus 4.8)

### What changes were proposed in this pull request?

`UnionExec` whole-stage codegen fusion (SPARK-56482) kept per-emission codegen
state in mutable instance fields on the plan node: `currentEmittingChild` (set
in `doProduce`, read in `doConsume` to pick a child's projection) and
`numOutputRowsTerm` (the once-per-stage metric term). This PR moves both fields
to `ThreadLocal`, isolating the state to the single thread that runs a given
`doCodeGen` pass.

### Why are the changes needed?

A single `UnionExec` instance can have its whole-stage codegen driven by more
than one thread at the same time: a reused exchange/subquery stage is generated
concurrently with the main plan, and async subquery / dynamic-partition-pruning
execution can overlap a driver-side `doCodeGen`. With the shared mutable field,
a racing `doProduce` resets `currentEmittingChild` to -1 while another thread is
still inside `doConsume`, tripping:

    java.lang.IllegalArgumentException: requirement failed:
      UnionExec.doConsume invoked outside doProduce emission window

This surfaced as a flaky `LogicalPlanTagInSparkPlanSuite.q2` failure (q2 has a
UNION, and union fusion is on by default). Each `doCodeGen` pass is itself
single-threaded (`produce` -> `doConsume` run inline on one thread), so a
`ThreadLocal` isolates the state per pass without the cross-thread race.

### Does this PR introduce _any_ user-facing change?

No. It removes an intermittent internal codegen failure; generated code and
results are unchanged.

### How was this patch tested?

Added `UnionCodegenSuite` test "SPARK-57196: concurrent codegen of a shared
UnionExec stage is thread-safe" that drives `doCodeGen()` on one shared fused
`UnionExec` stage from 8 threads. It reproduces the "outside doProduce emission
window" failure on the old code and passes with this fix. Verified the full
`UnionCodegenSuite` (43 tests), the ANSI/AQE variants, and
`LogicalPlanTagInSparkPlanSuite` q2 all pass.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Opus 4.8)

Co-authored-by: Isaac
@gengliangwang
Copy link
Copy Markdown
Member Author

@gengliangwang gengliangwang requested a review from cloud-fan June 1, 2026 20:31
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 blocking, 2 non-blocking, 0 nits.
Correct, well-motivated, well-tested thread-safety fix. Two non-blocking design questions only.

Design / architecture (2)

  • basicPhysicalOperators.scala:1004: pass-scoped codegen state stored as per-thread ThreadLocal, while the same method already routes a sibling piece through ctx — why ThreadLocal over ctx-scoped storage? — see inline
  • (general): is UnionExec uniquely exposed, or do other CodegenSupport operators with varying per-emission instance state have the same race — worth a documented invariant?

Verification

Traced the race: numOutputRowsTerm is ctx.addReferenceObj (a per-CodegenContext references[N] term) and currentEmittingChild cycles per child and resets to -1, so both vary per pass and a shared field lets a concurrent doProduce clobber a value another thread reads in doConsume. Each doCodeGen pass runs produce->doConsume inline on one thread and passes don't nest on a thread, so per-thread ThreadLocal == per-pass isolation — the race is removed and the metric stays once-per-stage. doConsume's require(i >= 0) still fires before numOutputRowsTerm.get, so a null term can't leak into generated code. The benign control case (CodegenSupport.parent, also a shared mutable field but always set to the same value) confirms the fix targets exactly the fields that vary.

// is still in `doConsume`. Each `doCodeGen` pass is itself single-threaded
// (`produce` -> `doConsume` run inline on one thread), so a `ThreadLocal`
// isolates the state per pass without that cross-thread race.
@transient private lazy val numOutputRowsTerm = new ThreadLocal[String]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two fields are pass-scoped codegen state but stored as per-thread ThreadLocal. The framework's established home for pass-scoped codegen state is CodegenContext (INPUT_ROW, currentVars, currentPartitionIndexVar, freshNamePrefix), created fresh per doCodeGen pass — and this very method already routes a sibling piece of pass state, currentPartitionIndexVar, through ctx (saved/restored just below). So two pieces of the same kind of state now use two different mechanisms here. ctx-scoped storage would be exactly per-pass (ThreadLocal's per-thread isolation is only correct because passes don't nest on a thread) and consistent with the existing pattern. The tradeoff — and why this is a question, not a blocker — is that adding union-specific fields to the shared CodegenContext pollutes a class every operator uses, whereas ThreadLocal stays localized here. Could you add a sentence (comment or PR description) on why ThreadLocal was preferred over threading these through ctx?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point -- you're right that this is per-pass state, and ThreadLocal is correct only because per-pass and per-thread coincide here (a pass runs inline on one thread, and passes don't nest). I've expanded the comment to spell out the scope and the rationale.

The reason I preferred ThreadLocal over ctx: CodegenContext has no general-purpose per-pass attribute map -- currentPartitionIndexVar and friends are named fields. Threading currentEmittingChild / numOutputRowsTerm through ctx would mean adding UnionExec-specific fields to a class every operator instantiates, whereas the ThreadLocal keeps the state local to the one node that needs it. That's the tradeoff you described, and I landed on keeping it localized.

Happy to move it onto ctx instead if you feel the consistency with currentPartitionIndexVar outweighs the pollution -- let me know your preference.

…cal over ctx

Address review feedback: explain that the state is valid only for one
doCodeGen pass, why per-thread isolation is correct here, and why it is
kept in a ThreadLocal rather than threaded through CodegenContext.

Co-authored-by: Isaac
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 addressed, 1 remaining, 0 new.
The expanded comment cleanly answers my ctx-vs-ThreadLocal question — I'm good with ThreadLocal here: the "ctx has no general-purpose per-pass attribute map, and adding UnionExec-specific fields would pollute a class every operator uses" argument outweighs the consistency-with-currentPartitionIndexVar point. Thanks for documenting it.

Design / architecture (1, non-blocking, remaining)

  • The broader round-1 question is still open: are other CodegenSupport operators that keep varying per-emission state in instance fields exposed to the same race, or is UnionExec's per-child multiplexing genuinely unique? Optional follow-up — not blocking this PR.

Verification

Re-confirmed the documented invariant against source: a doCodeGen pass runs produce->doConsume inline on one thread and passes never nest, so per-thread ThreadLocal == per-pass isolation; require(i >= 0) (1088) still fires before numOutputRowsTerm.get (1097). The three new comment claims all check out (currentPartitionIndexVar save/restore at 1054/1072; CodegenContext exposes only named fields, no general map; reset-guards-stale at 1071).

@gengliangwang
Copy link
Copy Markdown
Member Author

@cloud-fan thanks for the review. Merging to master/4.x/4.2

@gengliangwang
Copy link
Copy Markdown
Member Author

cc @huaxingao as well

gengliangwang added a commit that referenced this pull request Jun 3, 2026
### What changes were proposed in this pull request?

`UnionExec` whole-stage codegen fusion (SPARK-56482) kept per-emission codegen state in mutable instance fields on the plan node: `currentEmittingChild` (set in `doProduce`, read in `doConsume` to pick a child's projection) and `numOutputRowsTerm` (the once-per-stage metric term). This PR moves both fields to `ThreadLocal`, isolating the state to the single thread that runs a given `doCodeGen` pass.

### Why are the changes needed?

A single `UnionExec` instance can have its whole-stage codegen driven by more than one thread at the same time: a reused exchange/subquery stage is generated concurrently with the main plan, and async subquery / dynamic-partition-pruning execution can overlap a driver-side `doCodeGen`. With the shared mutable field, a racing `doProduce` resets `currentEmittingChild` to `-1` while another thread is still inside `doConsume`, tripping:

```
java.lang.IllegalArgumentException: requirement failed:
  UnionExec.doConsume invoked outside doProduce emission window
```

This surfaced as a flaky `LogicalPlanTagInSparkPlanSuite.q2` failure (q2 contains a `UNION`, and union fusion is enabled by default). Each `doCodeGen` pass is itself single-threaded (`produce` -> `doConsume` run inline on one thread), so a `ThreadLocal` isolates the state per pass without the cross-thread race, while preserving the existing per-stage semantics (the metric term is still computed once per pass).

### Does this PR introduce _any_ user-facing change?

No. It removes an intermittent internal code-generation failure; the generated code and query results are unchanged.

### How was this patch tested?

Added a `UnionCodegenSuite` test, "SPARK-57196: concurrent codegen of a shared UnionExec stage is thread-safe", that drives `doCodeGen()` on one shared fused `UnionExec` stage from 8 threads. It reproduces the "outside doProduce emission window" failure on the unpatched code and passes with this fix. Also verified the full `UnionCodegenSuite` (43 tests), its ANSI/AQE variants, and `LogicalPlanTagInSparkPlanSuite` q2 all pass.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Opus 4.8)

Closes #56252 from gengliangwang/spark-union-codegen-threadsafe.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 694c848)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
gengliangwang added a commit that referenced this pull request Jun 3, 2026
### What changes were proposed in this pull request?

`UnionExec` whole-stage codegen fusion (SPARK-56482) kept per-emission codegen state in mutable instance fields on the plan node: `currentEmittingChild` (set in `doProduce`, read in `doConsume` to pick a child's projection) and `numOutputRowsTerm` (the once-per-stage metric term). This PR moves both fields to `ThreadLocal`, isolating the state to the single thread that runs a given `doCodeGen` pass.

### Why are the changes needed?

A single `UnionExec` instance can have its whole-stage codegen driven by more than one thread at the same time: a reused exchange/subquery stage is generated concurrently with the main plan, and async subquery / dynamic-partition-pruning execution can overlap a driver-side `doCodeGen`. With the shared mutable field, a racing `doProduce` resets `currentEmittingChild` to `-1` while another thread is still inside `doConsume`, tripping:

```
java.lang.IllegalArgumentException: requirement failed:
  UnionExec.doConsume invoked outside doProduce emission window
```

This surfaced as a flaky `LogicalPlanTagInSparkPlanSuite.q2` failure (q2 contains a `UNION`, and union fusion is enabled by default). Each `doCodeGen` pass is itself single-threaded (`produce` -> `doConsume` run inline on one thread), so a `ThreadLocal` isolates the state per pass without the cross-thread race, while preserving the existing per-stage semantics (the metric term is still computed once per pass).

### Does this PR introduce _any_ user-facing change?

No. It removes an intermittent internal code-generation failure; the generated code and query results are unchanged.

### How was this patch tested?

Added a `UnionCodegenSuite` test, "SPARK-57196: concurrent codegen of a shared UnionExec stage is thread-safe", that drives `doCodeGen()` on one shared fused `UnionExec` stage from 8 threads. It reproduces the "outside doProduce emission window" failure on the unpatched code and passes with this fix. Also verified the full `UnionCodegenSuite` (43 tests), its ANSI/AQE variants, and `LogicalPlanTagInSparkPlanSuite` q2 all pass.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Opus 4.8)

Closes #56252 from gengliangwang/spark-union-codegen-threadsafe.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 694c848)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants