Skip to content

[VL] Remove buffering of sorted partitions in RSS writer to prevent OOM#11059

Merged
kerwin-zk merged 2 commits intoapache:mainfrom
boneanxs:fix_celeborn_oom
Nov 17, 2025
Merged

[VL] Remove buffering of sorted partitions in RSS writer to prevent OOM#11059
kerwin-zk merged 2 commits intoapache:mainfrom
boneanxs:fix_celeborn_oom

Conversation

@boneanxs
Copy link
Copy Markdown
Contributor

What changes are proposed in this pull request?

#10244 removed RssPartitionWriterOutputStream which could buffer the whole partition in memory which could cause oom.

This pr directly push data to rssClient when sortEvict calls, since rssClient itself will also buffer data before sending to the remote.

How was this patch tested?

@github-actions github-actions Bot added the VELOX label Nov 10, 2025
@boneanxs
Copy link
Copy Markdown
Contributor Author

@marin-ma @kerwin-zk hey, could you please help review this? Thanks!

ARROW_ASSIGN_OR_RAISE(
auto rssOs, arrow::io::BufferOutputStream::Create(options_->pushBufferMaxSize, arrow::default_memory_pool()));
if (codec_ != nullptr) {
ARROW_ASSIGN_OR_RAISE(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to compress data here again for rss shuffle? Looks there's a compression already inside shuffleClient: https://github.com/apache/celeborn/blob/5e4d80bb1e764b80f5d3462bb8ffb9061efc63b4/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java#L1052

@kerwin-zk

Copy link
Copy Markdown
Member

@wForget wForget Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: We have disabled compression of rss client(Uniffle) and only retained compression of the gluten shuffle.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, celeborn also don't need to compress again IMO, let me remove it as well.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, celeborn also don't need to compress again IMO, let me remove it as well.

Maybe disabling compression on the Celeborn side by config(if having this option) is sufficient, and there is no need to implicitly disable it in the Gluten codebase.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zuston This has already been implemented in the new version of Celeborn, and I'll adapt it accordingly later.

@github-actions github-actions Bot added the RSS label Nov 10, 2025
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@marin-ma
Copy link
Copy Markdown
Contributor

@boneanxs Thanks for identifying this issue. I wonder if this change may affect the shuffled data size. The original design was aimed at generating a smaller compressed output by flushing the compressed data with more buffered input.

@boneanxs
Copy link
Copy Markdown
Contributor Author

@boneanxs Thanks for identifying this issue. I wonder if this change may affect the shuffled data size. The original design was aimed at generating a smaller compressed output by flushing the compressed data with more buffered input.

@marin-ma Thanks for pointing that out. I tested it and found that it still produces less shuffle data than rss_sort. I can add more tests to measure how much the data volume increases when compared to buffering the entire partition. However, even if the volume does increase, I think we still can’t buffer the whole partition for large ones

@wForget
Copy link
Copy Markdown
Member

wForget commented Nov 11, 2025

which could buffer the whole partition in memory which could cause oom.

Doesn't rss shuffle writer trigger a spill?

@boneanxs
Copy link
Copy Markdown
Contributor Author

which could buffer the whole partition in memory which could cause oom.

Doesn't rss shuffle writer trigger a spill?

Spill still can't evict buffers, which will call sortEvict and then buffered in arrow bufferOutputStream

@wForget
Copy link
Copy Markdown
Member

wForget commented Nov 11, 2025

which could buffer the whole partition in memory which could cause oom.

Doesn't rss shuffle writer trigger a spill?

Spill still can't evict buffers, which will call sortEvict and then buffered in arrow bufferOutputStream

That's right, thank you for your explanation.

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@marin-ma
Copy link
Copy Markdown
Contributor

marin-ma commented Nov 12, 2025

@boneanxs Thanks for following.

I tested it and found that it still produces less shuffle data than rss_sort.

Could you also compare with vanilla spark + celeborn?

I can add more tests to measure how much the data volume increases when compared to buffering the entire partition.

It would be nice if there are more performance results to be shared.

@boneanxs
Copy link
Copy Markdown
Contributor Author

@boneanxs Thanks for following.

I tested it and found that it still produces less shuffle data than rss_sort.

Could you also compare with vanilla spark + celeborn?

I can add more tests to measure how much the data volume increases when compared to buffering the entire partition.

It would be nice if there are more performance results to be shared.

@marin-ma I ran a test query on TPC-DS (3TB) using the following SQL:

SELECT
  ss.ss_customer_sk,
  ss.ss_item_sk,
  ss.ss_ticket_number,
  ss.ss_store_sk,
  ss.ss_promo_sk,
  ss.ss_sold_date_sk,
  c.c_customer_id,
  c.c_first_name,
  c.c_last_name,
  SUM(ss.ss_net_paid) AS total_paid
FROM 
(SELECT /*+ REPARTITION(50)*/ * from store_sales) ss
LEFT JOIN customer c
  ON ss.ss_customer_sk = c.c_customer_sk
GROUP BY
  ss.ss_customer_sk,
  ss.ss_item_sk,
  ss.ss_ticket_number,
  ss.ss_store_sk,
  ss.ss_promo_sk,
  ss.ss_sold_date_sk,
  c.c_customer_id,
  c.c_first_name,
  c.c_last_name
limit 100;

When comparing the first shuffle stage, I didn’t observe any performance regression with this patch applied.

  1. Vanilla Spark
Screenshot 2025-11-14 at 10 35 33
  1. Buffering all partitions + 512m offheap
Screenshot 2025-11-14 at 10 37 34
  1. With this patch + 512m offheap
Screenshot 2025-11-14 at 10 38 38
  1. With this patch + 256m off-heap (to force more spill)
Screenshot 2025-11-14 at 10 41 57

By counting the number of VeloxCelebornColumnarShuffleWriter: Gluten shuffle writer: Trying to push for the same partition, I can confirm there're more spill happens when reducing the memory.

Copy link
Copy Markdown
Contributor

@marin-ma marin-ma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for confirming the performance results!

@kerwin-zk kerwin-zk merged commit f2a6870 into apache:main Nov 17, 2025
100 of 102 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants