Skip to content

[SPARK-56410][SQL][CORE] Add bounded k-way merge support in UnsafeExternalSorter to reduce OOM risk#55275

Closed
ivoson wants to merge 13 commits into
apache:masterfrom
ivoson:SPARK-56410-k-way-merge
Closed

[SPARK-56410][SQL][CORE] Add bounded k-way merge support in UnsafeExternalSorter to reduce OOM risk#55275
ivoson wants to merge 13 commits into
apache:masterfrom
ivoson:SPARK-56410-k-way-merge

Conversation

@ivoson
Copy link
Copy Markdown
Contributor

@ivoson ivoson commented Apr 9, 2026

What changes were proposed in this pull request?

Added bounded multi-round k-way merge to UnsafeExternalSorter to prevent OOM during sort-merge when there are many spill files. Previously, getSortedIterator() opened all spill readers simultaneously (~3MB per reader), causing OOM with hundreds of spills.

  • New UnsafeSorterBoundedSpillMerger class that merges spill files in rounds of at most K files (configurable merge factor, default 64), writing intermediate results to temp spill files, then merging those until the count fits in a single final round.
  • New internal config spark.unsafe.sorter.spill.merge.factor (default 64, set -1 to disable) that controls the maximum number of concurrent spill readers during merge. At 64 readers x 3MB = ~192MB, well within typical executor heap sizes.
  • Added merge observability logging — logs spill count, merge factor, and round information at merge time to aid future debugging.

How It Works

For example with 680 spills:

                    BEFORE (Current Behavior - OOM)
                    ================================

  680 spill files opened ALL AT ONCE
  +------+ +------+ +------+ +------+       +------+
  |Spill | |Spill | |Spill | |Spill |  ...  |Spill |
  |  1   | |  2   | |  3   | |  4   |       | 680  |
  +--+---+ +--+---+ +--+---+ +--+---+       +--+---+
     |        |        |        |               |
     |  3MB   |  3MB   |  3MB   |  3MB    3MB   |
     v        v        v        v               v
  +-----------------------------------------------------+
  |          PriorityQueue (680 readers)                 |
  |          680 x 3MB = ~2 GB buffers                  |
  |                   OOM!                               |
  +-----------------------------------------------------+


                    AFTER (Bounded Merge - Safe)
                    ============================

  Example: 680 spill files, merge factor K = 64

  --- Round 1: merge groups of 64 ----------------------

  Group 1 (64 files)    Group 2 (64 files)       Group 11 (remaining)
  +--++--+    +--+     +--++--+    +---+         +--++--+  +---+
  |S1||S2|... |S64|    |S65||S66|...|S128|  ...  |  ||  |..|680|
  +-++ +-+    +-++     +-++-+-+    +-+-+         +-++-++  +-+-+
    |   |       |        |   |       |             |   |     |
    v   v       v        v   v       v             v   v     v
  +--------------+     +--------------+          +--------------+
  | Merge (<=64  |     | Merge (<=64  |          | Merge (<=64  |
  |  readers)    |     |  readers)    |          |  readers)    |
  | ~192MB max   |     | ~192MB max   |          | ~192MB max   |
  +------+-------+     +------+-------+          +------+-------+
         |                    |                         |
         v                    v                         v
     +--------+          +--------+                +--------+
     |Temp    |          |Temp    |                |Temp    |
     |File 1  |          |File 2  |       ...      |File 11 |
     +--------+          +--------+                +--------+

  --- Round 2 (Final): 11 files <= 64, merge directly ---

     +--------+ +--------+           +--------+  +---------+
     |Temp    | |Temp    |    ...    |Temp    |  |In-Memory|
     |File 1  | |File 2  |           |File 11 |  |  Data   |
     +---+----+ +---+----+           +---+----+  +----+----+
         |          |                    |             |
         v          v                    v             v
     +------------------------------------------------------+
     |        Final PriorityQueue (<= 12 readers)           |
     |         12 x 3MB = ~36 MB  Safe!                     |
     +------------------------+-----------------------------+
                              |
                              v
                      +---------------+
                      | Sorted Output |
                      +---------------+


  --- Memory Comparison ------------------------------------

  Before:  680 readers x 3MB = ~2,040 MB  ->  OOM
  After:    64 readers x 3MB =   ~192 MB  ->  Safe

Why are the changes needed?

When UnsafeExternalSorter accumulates a large number of spill files, the merge phase opens all spill readers simultaneously. Each UnsafeSorterSpillReader allocates ~3MB of buffers (1MB NioBufferedFileInputStream + 1MB ReadAheadInputStream + 1MB record byte array). With hundreds of spills, this means ~GB+ of merge buffers alone, causing OOM even when the executor has sufficient heap for normal operation.

Does this PR introduce any user-facing change?

No

How was this patch tested?

UTs added.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-6)

@ivoson ivoson force-pushed the SPARK-56410-k-way-merge branch from 36c11d4 to 5c2a13c Compare April 13, 2026 02:45
@ivoson ivoson force-pushed the SPARK-56410-k-way-merge branch 3 times, most recently from cbf2183 to 1bb0a4a Compare April 13, 2026 05:37
@ivoson ivoson force-pushed the SPARK-56410-k-way-merge branch from 1bb0a4a to 43e6d8f Compare April 13, 2026 05:48
@ivoson ivoson marked this pull request as ready for review April 13, 2026 06:00
@ivoson ivoson changed the title [WIP][SPARK-56410][SQL][CORE] Add bounded k-way merge support in UnsafeExternalSorter to reduce OOM risk [SPARK-56410][SQL][CORE] Add bounded k-way merge support in UnsafeExternalSorter to reduce OOM risk Apr 13, 2026
@ivoson
Copy link
Copy Markdown
Contributor Author

ivoson commented Apr 13, 2026

cc @mridulm @attilapiros @LuciferYang @Ngone51 @cloud-fan @sadikovi can you please review this PR? Thanks

@LuciferYang
Copy link
Copy Markdown
Contributor

also cc @pan3793

private long totalSortTimeNanos = 0L;
private volatile SpillableIterator readingIterator = null;

private int spillMergeFactor =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading config from SparkEnv.get() in a field initializer is fragile — SparkEnv may not be initialized yet, silently disabling the feature with the -1 fallback. This is atypical for Spark.

The predominant pattern in comparable classes is to pass SparkConf (or resolved values) through the constructor:

  • BypassMergeSortShuffleWriter — receives SparkConf as constructor param, reads config in constructor body.
  • ShuffleExternalSorter — receives SparkConf as constructor param, reads config in constructor body.
  • UnsafeExternalSorter itself — its other config values (initialSize, pageSizeBytes, numElementsForSpillThreshold, etc.) are all passed as constructor params by callers like UnsafeExternalRowSorter and UnsafeKVExternalSorter, which read SparkEnv.get().conf() at the call site.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, move it as a constructor parameter.

}

@Test
public void testBoundedMergeOddSpillCountWithCarryForward() throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems all the tests only verify that the sorted output is correct, but none confirm whether the multi-round merge logic was actually triggered (as opposed to the single-round merge path). Is there any way to verify this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add merge round verification.

Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

Prior state: UnsafeExternalSorter.getSortedIterator() performed a single-round merge opening all spill readers simultaneously. Each UnsafeSorterSpillReader allocates ~3MB (1MB NioBufferedFileInputStream + 1MB ReadAheadInputStream + 1MB record byte array). Hundreds of spills → OOM.

Design approach: Bounded multi-round k-way merge processing at most mergeFactor (default 64) spill files per round, writing intermediate results to temp spill files. Bounds merge-phase memory to ~192MB regardless of spill count.

Key decisions: (1) Merge factor defaults to 64, configurable via internal config spark.unsafe.sorter.spill.merge.factor. (2) Original spill files are not deleted during intermediate rounds — ownership stays with UnsafeExternalSorter.

Implementation: New UnsafeSorterBoundedSpillMerger orchestrates multi-round merge, delegating each round to UnsafeSorterSpillMerger. UnsafeExternalSorter.getSortedIterator() adds a bounded-merge branch when spill count exceeds the factor. Cleanup integrates into cleanupResources().

round++;
List<UnsafeSorterSpillWriter> nextRoundSpills = new ArrayList<>();

logger.info("Bounded merge round {}: merging {} spill files with merge factor {}",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The round-level log shows spill count and merge factor but not the bytes of I/O incurred. When users tune the merge factor to balance memory vs. I/O, knowing the intermediate write cost per round would help diagnose performance. Consider accumulating the bytes written across groups in each round (the data is available from the ShuffleWriteMetrics in mergeGroupToSpill) and including it in this log message.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated, logs added for disk io.

Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. All prior review comments addressed — eager file deletion, constructor-based config, per-round I/O logging, and merge round test verification all look good.

Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's great to have this. Thank you, @ivoson , @cloud-fan , @LuciferYang , @pan3793 .

However, IIUC, the default value should be -1 at Apache Spark 4.2.0 in order to avoid a regression at the large memory Spark jobs.

  • The existing Spark jobs are tuned with the enough memory size to avoid OOM already.
  • This new configuration allows us to use less memory for the existing jobs (or new jobs).

@dongjoon-hyun
Copy link
Copy Markdown
Member

cc @peter-toth , too.

@ivoson
Copy link
Copy Markdown
Contributor Author

ivoson commented Apr 15, 2026

It's great to have this. Thank you, @ivoson , @cloud-fan , @LuciferYang , @pan3793 .

However, IIUC, the default value should be -1 at Apache Spark 4.2.0 in order to avoid a regression at the large memory Spark jobs.

  • The existing Spark jobs are tuned with the enough memory size to avoid OOM already.
  • This new configuration allows us to use less memory for the existing jobs (or new jobs).

Thanks @dongjoon-hyun . Updated the default value as -1.

@ivoson ivoson requested a review from dongjoon-hyun April 15, 2026 02:12
Copy link
Copy Markdown
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this. It's a nice feature to have!

Comment on lines +224 to +230
while (sorted.hasNext()) {
sorted.loadNext();
outputWriter.write(
sorted.getBaseObject(), sorted.getBaseOffset(),
sorted.getRecordLength(), sorted.getKeyPrefix());
}
outputWriter.close();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This amplifies the disk I/O operation compared to the existing single round sort-merge? It seems like a trade-off between memory and perf. An appropriate merge factor would be crucial.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for shared clusters/applications need to carefully choose a merge factor based on the workloads pattern.

logger.info("Final merge round: merging {} spill files",
MDC.of(LogKeys.NUM_SPILL_WRITERS, spillsToMerge.size()));

final UnsafeSorterSpillMerger finalMerger = new UnsafeSorterSpillMerger(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we only do the final merge only when spillsToMerge.size() + (inMemIterator != null ? 1 : 0) > 1?

I think there could be a case where spillsToMerge is 1 after bounded-merge (e.g., 200 spill files in total and the merge factor is 100) and inMemIterator is null. In that case, I think we can avoid the final merge.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx, updated.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take another look, since the loop condition is spillsToMerge.size() > mergeFactor and mergeFactor should be larger than 2.

So the num of spill should always > 1. And for the case 200 spills with 100 merge factor, in the final merge there will be 2 spill files.

readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
inMemIter = readingIterator;
}
return boundedMerger.merge(spillWriters, inMemIter);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a bounded merge round, deleteConsumedFiles(group) deletes the original spill files. cleanupResources() later calls deleteSpillFiles() which iterates spillWriters and tries to delete the same files again. Both methods guard with file.exists(), so no crash occurs — but it's wasteful.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing consumed writers from spillWriters requires synchronizing the access — SpillableIterator.spill() can add()
to it from another thread. This is doable but means changing the locking scope of SpillableIterator.spill(), which
I'd prefer not to do in this PR. The current approach is safe — deleteSpillFiles() skips already-deleted files via
file.exists(). Pls let me know your thoughts.

merger.addSpillIfNotEmpty(reader);
}

if (totalRecords > Integer.MAX_VALUE) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionWriters() guards against multi-writer groups exceeding Integer.MAX_VALUE total records. But a single writer with > Integer.MAX_VALUE records passes through as a size-1 group — the mergeGroupToSpill check catches that edge case. Both checks serve a purpose, but the relationship is not documented.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated with more comments, this is a defensive check here.

@ivoson ivoson force-pushed the SPARK-56410-k-way-merge branch from e8f3e2b to fe9af59 Compare April 15, 2026 10:38
Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @ivoson and all.

@dongjoon-hyun
Copy link
Copy Markdown
Member

cc @peter-toth , too


final UnsafeSorterSpillMerger finalMerger = new UnsafeSorterSpillMerger(
recordComparator, prefixComparator,
spillsToMerge.size() + (inMemIterator != null ? 1 : 0));
Copy link
Copy Markdown
Contributor

@peter-toth peter-toth Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I believe the logic could be + (inMemIterator != null && inMemIterator.hasNext() ? 1 : 0), but it doesn't make much difference

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx, updated.

@ivoson
Copy link
Copy Markdown
Contributor Author

ivoson commented Apr 20, 2026

cc @Ngone51 @dongjoon-hyun @LuciferYang Can you please help check that is this PR good to merge? Thanks.

.createWithDefault(1024 * 1024)

private[spark] val UNSAFE_SORTER_SPILL_MERGE_FACTOR =
ConfigBuilder("spark.unsafe.sorter.spill.merge.factor")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.unsafe appears to be a new namespace. Should it be placed under spark.shuffle.spill instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.unsafe.sorter.spill.* is actually an existing namespace — we already have below two configs used by UnsafeExternalSorter:

  • spark.unsafe.sorter.spill.read.ahead.enabled
  • spark.unsafe.sorter.spill.reader.buffer.size

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got

MDC.of(LogKeys.NUM_SPILL_WRITERS, spillWriters.size()),
MDC.of(LogKeys.MERGE_FACTOR, spillMergeFactor));

boundedMerger = new UnsafeSorterBoundedSpillMerger(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap the assignment in a synchronized(this) block mirroring the one in cleanupResources() at line 376, or add a comment documenting that the ordering is guaranteed by task-completion semantics. (Note: getSortedIterator() has no existing synchronized block to extend.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, comment added.

@ivoson ivoson requested a review from LuciferYang April 20, 2026 15:31
Copy link
Copy Markdown
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are no more comments, I will merge it once GA passes.

@LuciferYang
Copy link
Copy Markdown
Contributor

Merged into master. Thanks @ivoson @cloud-fan @dongjoon-hyun @peter-toth @Ngone51 @pan3793

@LuciferYang
Copy link
Copy Markdown
Contributor

If this one need to merge this into branch-4.1, please submit a separate pr. @ivoson Thanks ~

longvu-db pushed a commit to longvu-db/spark that referenced this pull request Apr 22, 2026
…ernalSorter to reduce OOM risk

### What changes were proposed in this pull request?
Added bounded multi-round k-way merge to `UnsafeExternalSorter` to prevent OOM during sort-merge when there are many spill files. Previously, `getSortedIterator()` opened all spill readers simultaneously (~3MB per reader), causing OOM with hundreds of spills.

- **New `UnsafeSorterBoundedSpillMerger` class** that merges spill files in rounds of at most K files (configurable merge factor, default 64), writing intermediate results to temp spill files, then merging those until the count fits in a single final round.
- **New internal config `spark.unsafe.sorter.spill.merge.factor`** (default 64, set -1 to disable) that controls the maximum number of concurrent spill readers during merge. At 64 readers x 3MB = ~192MB, well within typical executor heap sizes.
- **Added merge observability logging** — logs spill count, merge factor, and round information at merge time to aid future debugging.

#### How It Works
For example with 680 spills:

```
                    BEFORE (Current Behavior - OOM)
                    ================================

  680 spill files opened ALL AT ONCE
  +------+ +------+ +------+ +------+       +------+
  |Spill | |Spill | |Spill | |Spill |  ...  |Spill |
  |  1   | |  2   | |  3   | |  4   |       | 680  |
  +--+---+ +--+---+ +--+---+ +--+---+       +--+---+
     |        |        |        |               |
     |  3MB   |  3MB   |  3MB   |  3MB    3MB   |
     v        v        v        v               v
  +-----------------------------------------------------+
  |          PriorityQueue (680 readers)                 |
  |          680 x 3MB = ~2 GB buffers                  |
  |                   OOM!                               |
  +-----------------------------------------------------+

                    AFTER (Bounded Merge - Safe)
                    ============================

  Example: 680 spill files, merge factor K = 64

  --- Round 1: merge groups of 64 ----------------------

  Group 1 (64 files)    Group 2 (64 files)       Group 11 (remaining)
  +--++--+    +--+     +--++--+    +---+         +--++--+  +---+
  |S1||S2|... |S64|    |S65||S66|...|S128|  ...  |  ||  |..|680|
  +-++ +-+    +-++     +-++-+-+    +-+-+         +-++-++  +-+-+
    |   |       |        |   |       |             |   |     |
    v   v       v        v   v       v             v   v     v
  +--------------+     +--------------+          +--------------+
  | Merge (<=64  |     | Merge (<=64  |          | Merge (<=64  |
  |  readers)    |     |  readers)    |          |  readers)    |
  | ~192MB max   |     | ~192MB max   |          | ~192MB max   |
  +------+-------+     +------+-------+          +------+-------+
         |                    |                         |
         v                    v                         v
     +--------+          +--------+                +--------+
     |Temp    |          |Temp    |                |Temp    |
     |File 1  |          |File 2  |       ...      |File 11 |
     +--------+          +--------+                +--------+

  --- Round 2 (Final): 11 files <= 64, merge directly ---

     +--------+ +--------+           +--------+  +---------+
     |Temp    | |Temp    |    ...    |Temp    |  |In-Memory|
     |File 1  | |File 2  |           |File 11 |  |  Data   |
     +---+----+ +---+----+           +---+----+  +----+----+
         |          |                    |             |
         v          v                    v             v
     +------------------------------------------------------+
     |        Final PriorityQueue (<= 12 readers)           |
     |         12 x 3MB = ~36 MB  Safe!                     |
     +------------------------+-----------------------------+
                              |
                              v
                      +---------------+
                      | Sorted Output |
                      +---------------+

  --- Memory Comparison ------------------------------------

  Before:  680 readers x 3MB = ~2,040 MB  ->  OOM
  After:    64 readers x 3MB =   ~192 MB  ->  Safe
```

### Why are the changes needed?
When `UnsafeExternalSorter` accumulates a large number of spill files, the merge phase opens all spill readers simultaneously. Each `UnsafeSorterSpillReader` allocates ~3MB of buffers (1MB `NioBufferedFileInputStream` + 1MB `ReadAheadInputStream` + 1MB record byte array). With hundreds of spills, this means ~GB+ of merge buffers alone, causing OOM even when the executor has sufficient heap for normal operation.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UTs added.

### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-6)

Closes apache#55275 from ivoson/SPARK-56410-k-way-merge.

Authored-by: Tengfei Huang <tengfei.huang@databricks.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
@dongjoon-hyun
Copy link
Copy Markdown
Member

dongjoon-hyun commented Apr 22, 2026

If this one need to merge this into branch-4.1, please submit a separate pr. @ivoson Thanks ~

BTW, this is an improvement of Apache Spark 4.2.0 which we cannot backport to branch-4.1, @LuciferYang .

@ivoson
Copy link
Copy Markdown
Contributor Author

ivoson commented Apr 23, 2026

Thanks @dongjoon-hyun @LuciferYang . Let's keep it for spark 4.2.

@ivoson ivoson deleted the SPARK-56410-k-way-merge branch April 23, 2026 00:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants