Skip to content

feat: use byte-based target batch size for shuffle IPC blocks#3913

Closed
andygrove wants to merge 3 commits intoapache:mainfrom
andygrove:byte-based-shuffle-batch-size-2
Closed

feat: use byte-based target batch size for shuffle IPC blocks#3913
andygrove wants to merge 3 commits intoapache:mainfrom
andygrove:byte-based-shuffle-batch-size-2

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented Apr 8, 2026

Which issue does this PR close?

Partial fix for #3882

Rationale for this change

The native shuffle writer currently uses a row-based target batch size of 8192 rows for coalescing small batches before writing IPC blocks. For narrow schemas (few columns, small data types), this produces tiny blocks with disproportionate per-block IPC schema overhead.

A byte-based threshold ensures reasonably sized blocks regardless of schema width, improving shuffle write efficiency for narrow schemas without negatively impacting wide schemas.

What changes are included in this PR?

  • Replace Arrow's BatchCoalescer (row-based) with byte-based accumulation in BufBatchWriter — batches are buffered until their total memory size reaches the target, then concatenated and written as a single IPC block
  • Switch SinglePartitionShufflePartitioner buffering from row-count to byte-size threshold
  • Add target_batch_bytes parameter to MultiPartitionShuffleRepartitioner, while keeping the row-based batch_size for scratch space sizing and input batch slicing (which is about processing chunk limits, not output block size)
  • Add COMET_SHUFFLE_TARGET_BATCH_BYTES config (spark.comet.exec.shuffle.targetBatchBytes, default 1 MiB)
  • Add target_batch_bytes field to ShuffleWriter protobuf message
  • Add --target-batch-bytes CLI arg to the standalone shuffle benchmark tool
  • Fix bug: COMET_SHUFFLE_WRITE_BUFFER_SIZE used .max(Int.MaxValue) instead of .min(Int.MaxValue) when converting Long to Int for protobuf, which always sent 2GB regardless of the configured value

How are these changes tested?

Existing shuffle tests (19 tests) all pass. The test_batch_coalescing_reduces_size test validates that byte-based coalescing still produces smaller output than no coalescing.

Replace the row-based batch size (8192 rows) with a byte-based threshold
(default 1 MiB) for coalescing small batches before writing shuffle IPC
blocks. For narrow schemas, 8192 rows can produce tiny blocks with high
per-block schema overhead. A byte-based threshold ensures reasonably
sized blocks regardless of schema width.

Changes:
- Replace BatchCoalescer (row-based) with byte-based accumulation in
  BufBatchWriter
- Switch SinglePartitionShufflePartitioner buffering to byte-based
- Add target_batch_bytes parameter to MultiPartitionShuffleRepartitioner
  (keeping row-based batch_size for scratch space and input slicing)
- Add COMET_SHUFFLE_TARGET_BATCH_BYTES config (spark.comet.exec.shuffle.targetBatchBytes)
- Add target_batch_bytes field to ShuffleWriter protobuf message
- Fix bug: COMET_SHUFFLE_WRITE_BUFFER_SIZE used .max(Int.MaxValue)
  instead of .min(Int.MaxValue), always sending 2GB regardless of config
@andygrove
Copy link
Copy Markdown
Member Author

I tried running this with TPC-H @ 1TB and hit OOM

@andygrove andygrove closed this Apr 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant