Skip to content

[SPARK-56203][SQL] Fix race condition in SortExec.rowSorter#55006

Closed
peter-toth wants to merge 1 commit intoapache:masterfrom
peter-toth:SPARK-56203-fix-sortexec-rowsorter
Closed

[SPARK-56203][SQL] Fix race condition in SortExec.rowSorter#55006
peter-toth wants to merge 1 commit intoapache:masterfrom
peter-toth:SPARK-56203-fix-sortexec-rowsorter

Conversation

@peter-toth
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Replace private[sql] var rowSorter: ThreadLocal[UnsafeExternalRowSorter] with @transient private[sql] lazy val rowSorter: ThreadLocal[UnsafeExternalRowSorter] in SortExec.

Remove the rowSorter = new ThreadLocal() reassignment that was inside createSorter().

Why are the changes needed?

SortExec is a shared plan object: the same instance is used by all tasks that execute different partitions of the same stage. In the original code, createSorter() would write rowSorter = new ThreadLocal() — an unsynchronised write to a shared var. If two tasks (threads T0 and T1) called createSorter() concurrently:

  1. T0 writes rowSorter = ThreadLocal_0, sets ThreadLocal_0.set(sorter_0)
  2. T1 writes rowSorter = ThreadLocal_1, sets ThreadLocal_1.set(sorter_1)
  3. T0's cleanupResources() reads rowSorter — now points to ThreadLocal_1 — calls ThreadLocal_1.get() on thread T0 → nullsorter_0 is leaked

With a stable lazy val, the ThreadLocal object is created once (Scala lazy-val initialisation is thread-safe). Every call to createSorter() just calls rowSorter.set(newSorter) on the same object. Because ThreadLocal gives each thread an independent slot, T0's slot and T1's slot are completely separate; neither task can observe or clobber the other's sorter reference.

@transient is required because ThreadLocal is not Serializable; after deserialisation the lazy val re-initialises to a fresh ThreadLocal on first access, which is the correct behaviour on an executor.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing SortSuite tests cover correct sort output and spill behaviour.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6

### What changes were proposed in this pull request?

Replace `private[sql] var rowSorter: ThreadLocal[UnsafeExternalRowSorter]`
with `@transient private[sql] lazy val rowSorter: ThreadLocal[UnsafeExternalRowSorter]`
in `SortExec`.

Remove the `rowSorter = new ThreadLocal()` reassignment that was inside
`createSorter()`.

### Why are the changes needed?

`SortExec` is a shared plan object: the same instance is used by all tasks
that execute different partitions of the same stage.  In the original code,
`createSorter()` would write `rowSorter = new ThreadLocal()` — an
unsynchronised write to a shared `var`.  If two tasks (threads T0 and T1)
called `createSorter()` concurrently:

1. T0 writes `rowSorter = ThreadLocal_0`, sets `ThreadLocal_0.set(sorter_0)`
2. T1 writes `rowSorter = ThreadLocal_1`, sets `ThreadLocal_1.set(sorter_1)`
3. T0's `cleanupResources()` reads `rowSorter` — now points to `ThreadLocal_1`
   — calls `ThreadLocal_1.get()` on thread T0 → `null` → `sorter_0` is leaked

With a stable `lazy val`, the `ThreadLocal` object is created once (Scala
lazy-val initialisation is thread-safe).  Every call to `createSorter()` just
calls `rowSorter.set(newSorter)` on the same object.  Because `ThreadLocal`
gives each thread an independent slot, T0's slot and T1's slot are completely
separate; neither task can observe or clobber the other's sorter reference.

`@transient` is required because `ThreadLocal` is not `Serializable`; after
deserialisation the lazy val re-initialises to a fresh `ThreadLocal` on first
access, which is the correct behaviour on an executor.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added `SortSuite` test "cleanupResources is safe when createSorter was never
called" which verifies that `cleanupResources()` is a no-op when called before
any sorter is created (the empty-partition case that previously required a
`rowSorter \!= null` guard on the now-removed reassignable `var`).

Existing `SortSuite` tests cover correct sort output and spill behaviour.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6
Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

I guess this was the root cause of SortSuite flakiness, @peter-toth ?

@peter-toth
Copy link
Copy Markdown
Contributor Author

I guess this was the root cause of SortSuite flakiness, @peter-toth ?

Could be, but I found the bug in a different way.

@dongjoon-hyun
Copy link
Copy Markdown
Member

All tests passed. Merged to master for Apache Spark 4.2.0.

@peter-toth
Copy link
Copy Markdown
Contributor Author

Thank you for the prompt review @dongjoon-hyun.

@dongjoon-hyun
Copy link
Copy Markdown
Member

Thank you. Feel free to backport this if you want to use this in the live release branches.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants