Skip to content

feat(spill): optimize external sort with SpillFileMerger to reduce write amplification#288

Merged
zjw1111 merged 19 commits into
alibaba:mainfrom
zjw1111:optimize-spill-performance
May 20, 2026
Merged

feat(spill): optimize external sort with SpillFileMerger to reduce write amplification#288
zjw1111 merged 19 commits into
alibaba:mainfrom
zjw1111:optimize-spill-performance

Conversation

@zjw1111
Copy link
Copy Markdown
Collaborator

@zjw1111 zjw1111 commented May 18, 2026

Purpose

Optimize external sort spill performance by introducing a SpillFileMerger (LSM-tree-like structure) to manage spill files, and add SetWriteBufferSpillThreadNumber API to control Arrow IPC threading during spill.

Key changes:

  • Add SpillFileMerger class that organizes spill files into levels and triggers merge when a level accumulates >= max_fan_in files, reducing read/write amplification from O(N/K) to O(log_K(N))
  • Add dynamic estimation of spill parameters (spill_batch_size_, actual_max_fan_in_) based on actual row sizes to better utilize memory budget
  • Add WriteContextBuilder::SetWriteBufferSpillThreadNumber(int32_t) API to control Arrow thread pool capacity and use_threads in SpillReader/SpillWriter, passed as bool enable_multi_thread_spill through the constructor chain: KeyValueFileStoreWrite -> MergeTreeWriter -> WriteBuffer -> ExternalSortBuffer -> SpillReader/SpillWriter
  • Add validation for local-sort.max-num-file-handles config (must be >= 2)
  • Fix state inconsistency: MergeAndReplaceFiles now only deletes input files on success, relying on SpillChannelManager::Reset() for cleanup on failure
  • Fix use-after-free: reset merge_function_wrapper in SortMergeReader::Close and on flush error
  • Align core option defaults with Java Paimon
  • Fix 4 TEST_P misuse cases (tests not using GetParam())
  • Enhance ReadContext, ScanContext, and WriteContext tests with default values and option overrides

Tests

  • SpillFileMergerTest.NoMergeBelowFanIn
  • SpillFileMergerTest.MergeTriggeredAtFanIn
  • SpillFileMergerTest.MinimalFanInTwo
  • SpillFileMergerTest.MultiLevelMerge
  • SpillFileMergerTest.ManyFilesWithFanInTwo
  • SpillFileMergerTest.FinalCleanupReducesFileCount
  • SpillFileMergerTest.FinalCleanupMergesSmallestFirst
  • SpillFileMergerTest.FinalCleanupNoOpWhenAlreadyBelowTarget
  • SpillFileMergerTest.FinalCleanupConvergesToTarget
  • SpillFileMergerTest.MergeFnFailurePreservesState
  • SpillFileMergerTest.ClearRemovesAllFiles
  • SpillFileMergerTest.SetMaxFanInAffectsMerge
  • SpillFileMergerTest.SetMaxFanInToLargerValueSuppressesMerge
  • SpillFileMergerTest.MergeOnlyTakesFanInFilesFromLevel
  • SortBufferTest.TestInMemorySortBufferEstimateMemoryUseForEachRow
  • MergeTreeWriterTest.TestSpillWithIOException
  • WriteBufferTest.TestSpillDiskQuotaEnforcement (added Write 3 case)
  • Enhanced ReadContextTest, ScanContextTest, WriteContextTest with default value and option override coverage

API and Format

  • New API: WriteContextBuilder::SetWriteBufferSpillThreadNumber(int32_t thread_number)
    • <= 0: disables Arrow IPC threading in spill (default)
    • > 0: sets arrow::SetCpuThreadPoolCapacity(thread_number) and enables use_threads in SpillReader/SpillWriter

Documentation

No.

Generative AI tooling

Generated-by: Claude Code (Claude Opus 4.6)

…te amplification

Introduce LeveledMerger (LSM-tree-like structure) to manage spill files in levels,
reducing read/write amplification from O(N/K) to O(log_K(N)). Also adds dynamic
estimation of spill parameters based on actual row sizes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 18, 2026 06:13
zjw1111 and others added 4 commits May 18, 2026 14:39
- Add Doxygen comments to public methods in LeveledMerger and InMemorySortBuffer
- Replace bare 'int' with 'int32_t' in leveled_merger_test.cpp
- Remove unused #include <numeric>
- Add integer type convention to docs/code-style.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- WriteBufferTest.TestMergeSpilledFilesSkipsWithSingleFile: update
  min file handles from 1 to 2 (now enforced minimum)
- WriteBufferTest.TestSpillDiskQuotaEnforcement Case 3: use single
  spill_file_size as quota since leveled compaction changes disk usage
- WriteInteTest: relax intermediate file count assertions to allow
  leveled merger's multi-level structure (files cleaned at read time)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…-free

When FlushWriteBuffer fails mid-operation (e.g., IO error during
rolling_writer->Write()), the producer thread may leave a KeyValue
cached in merge_function_wrapper_. That KeyValue holds Arrow data
referencing SpillReader::arrow_pool_ via raw pointer. After the
async producer/consumer is closed and SpillReaders are destroyed,
the cached KeyValue becomes a dangling reference, causing SEGV
during MergeTreeWriter destruction.

Fix: call merge_function_wrapper_->Reset() in the write_guard
ScopeGuard to release cached Arrow data before SpillReaders are
destroyed.

Also adapts MergeTreeWriterTest assertions to accommodate leveled
compaction file count behavior.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Comment thread src/paimon/core/mergetree/external_sort_buffer.h
Comment thread src/paimon/core/mergetree/external_sort_buffer.cpp
Comment thread src/paimon/core/core_options.h
Comment thread src/paimon/core/mergetree/external_sort_buffer.cpp
Comment thread src/paimon/core/mergetree/spill_file_merger.cpp
Comment thread src/paimon/core/mergetree/spill_file_merger.cpp
Comment thread src/paimon/core/mergetree/merge_tree_writer.cpp
Comment thread src/paimon/core/mergetree/merge_tree_writer_test.cpp
zjw1111 and others added 3 commits May 18, 2026 23:09
…t use-after-free

Move the merge_function_wrapper_->Reset() into SortMergeReader::Close()
(both LoserTree and MinHeap variants) so that cached KeyValue data is
released before underlying readers are destroyed. This prevents Arrow
Buffer objects from calling Free() on a dangling pool pointer when
SpillReader is later destructed.

Also reorder the cleanup in MergeTreeCompactRewriter::RewriteCompaction
to call reader->Close() before merge_file_split_read_.reset(), ensuring
the Reset() triggered by Close() runs while all resource providers are
still alive.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…arity

- Rename class LeveledMerger → SpillFileMerger, files leveled_merger → spill_file_merger
- Rename Compact/Compaction methods to Merge (RunMergeIfNeeded, RunFinalMergeIfNeeded, etc.)
- Move kMinFanIn validation from CoreOptions to ExternalSortBuffer::Create
- Improve variable names (l→level_idx, a/b→lhs/rhs, n→files_to_merge, f→file)
- Convert 6 spill tests from TEST_P to TEST_F (no parameterization needed)
- Fix 4 misused TEST_P in btree_global_index and file_system tests
- Make test assertions exact instead of range-based

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add TestSpillWithIOException exercising IO error injection across all
  spill code paths (write, intermediate merge, final merge, flush)
- Add SetMaxFanInToLargerValueSuppressesCompaction verifying dynamic fan-in
- Improve inline comments explaining leveled merge behavior in spill tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@zjw1111 zjw1111 requested review from Copilot and removed request for Copilot May 19, 2026 07:01
zjw1111 and others added 3 commits May 19, 2026 16:59
…mments

- Rename compact/compaction to merge in spill-related comments and test names
- Refine Write/FlushMemory return value variable names for clearer semantics
- Add Write 3 case to TestSpillDiskQuotaEnforcement
- Add comments in sort_buffer_test.cpp for Clear and empty batch behavior
- Fix ASSERT_LE to ASSERT_EQ for deterministic spill file counts in inte tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@zjw1111 zjw1111 requested review from Copilot and removed request for Copilot May 19, 2026 09:40
@zjw1111 zjw1111 changed the title feat(spill): optimize external sort with leveled merger to reduce write amplification feat(spill): optimize external sort with SpillFileMerger to reduce write amplification May 20, 2026
lxy-9602 and others added 4 commits May 20, 2026 14:52
Add WriteContextBuilder::SetWriteBufferSpillThreadNumber(int32_t) to
control Arrow IPC thread usage during spill. When > 0, sets Arrow CPU
thread pool capacity and enables use_threads in SpillReader/SpillWriter.
The bool is passed through the full constructor chain:
KeyValueFileStoreWrite -> MergeTreeWriter -> WriteBuffer ->
ExternalSortBuffer -> SpillReader/SpillWriter.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
lucasfang
lucasfang previously approved these changes May 20, 2026
Copy link
Copy Markdown
Collaborator

@lucasfang lucasfang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Collaborator

@lxy-9602 lxy-9602 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@zjw1111 zjw1111 merged commit d19b390 into alibaba:main May 20, 2026
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants