Skip to content

[HUDI-9504] support in-memory buffer sort in append write #13409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

HuangZhenQiu
Copy link
Contributor

Change Logs

Add in memory buffer sort in append write function to improve the parquet compression ratio. From our experiment and testing, It can improve 300% compression ratio with right sort key and buffer size configuration.

Impact

User can use the feature by enable the buffer sort configurations

Risk level (write none, low medium or high below)

low

Documentation Update

It is a new feature. Jira will be created to update the website.

  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Jun 9, 2025
@HuangZhenQiu HuangZhenQiu force-pushed the HUDI-9504-batch-sort branch from 45cf09a to 2efb9bf Compare June 9, 2025 07:10
@HuangZhenQiu HuangZhenQiu force-pushed the HUDI-9504-batch-sort branch 5 times, most recently from 08ab684 to 123870c Compare June 11, 2025 21:12
@HuangZhenQiu
Copy link
Contributor Author

@zhangyue19921010 @danny0405
Updated the diff with BinaryInMemorySortBuffer.

@github-actions github-actions bot added size:L PR with lines of changes in (300, 1000] and removed size:XL PR with lines of changes > 1000 labels Jun 12, 2025
@zhangyue19921010
Copy link
Contributor

@zhangyue19921010 @danny0405 Updated the diff with BinaryInMemorySortBuffer.

Will finish my review later this week.

@xushiyan xushiyan requested a review from danny0405 June 13, 2025 14:05
Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @cshuo for the fist round of review

@@ -586,6 +586,27 @@ private FlinkOptions() {
.withDescription("Maximum memory in MB for a write task, when the threshold hits,\n"
+ "it flushes the max size data bucket to avoid OOM, default 1GB");

@AdvancedConfig
public static final ConfigOption<Boolean> WRITE_BUFFER_SORT_ENABLED = ConfigOptions
.key("write.buffer.sort.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write.sort.enabled?


@AdvancedConfig
public static final ConfigOption<String> WRITE_BUFFER_SORT_KEYS = ConfigOptions
.key("write.buffer.sort.keys")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write.sort.keys?


public AppendWriteFunctionWithBufferSort(Configuration config, RowType rowType) {
super(config, rowType);
this.writebufferSize = config.get(FlinkOptions.WRITE_BUFFER_SIZE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe WRITE_BUFFER_SIZE is not needed , since memorySegmentPool has an limited memory size, which can be used to trigger buffer flushing.

@zhangyue19921010
Copy link
Contributor

@HuangZhenQiu Still working on this?

@HuangZhenQiu
Copy link
Contributor Author

@zhangyue19921010 Yes. I was OOO last week. Will update the diff this week.

@HuangZhenQiu HuangZhenQiu force-pushed the HUDI-9504-batch-sort branch from 36ed297 to c692d75 Compare July 10, 2025 08:06
Copy link
Contributor Author

@HuangZhenQiu HuangZhenQiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. @cshuo @zhangyue19921010

@HuangZhenQiu HuangZhenQiu force-pushed the HUDI-9504-batch-sort branch 3 times, most recently from 3f5ee19 to 1d86872 Compare July 11, 2025 16:50
@cshuo
Copy link
Contributor

cshuo commented Jul 14, 2025

Thanks for the review. @cshuo @zhangyue19921010

Ok, will take another look soon.

Copy link
Contributor

@cshuo cshuo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HuangZhenQiu Thks for updating, left some comments.

@HuangZhenQiu HuangZhenQiu force-pushed the HUDI-9504-batch-sort branch 2 times, most recently from f27de92 to d5210d8 Compare July 18, 2025 17:38
@HuangZhenQiu HuangZhenQiu force-pushed the HUDI-9504-batch-sort branch 2 times, most recently from fd80089 to 3c2854e Compare July 18, 2025 17:49
@HuangZhenQiu
Copy link
Contributor Author

@cshuo
Thanks for the valuable comments. Resolved all of them except the buffer size option. Shall we keep it for the flexibility of users to adopt the feature?

@cshuo
Copy link
Contributor

cshuo commented Jul 21, 2025

@cshuo Thanks for the valuable comments. Resolved all of them except the buffer size option. Shall we keep it for the flexibility of users to adopt the feature?

Thks for updating. Seems the pr don't fix the comments here, i.e., with the current impl, records are partially ordered within a parquet file, since it may contains batches from multiple sortAndSend. We should keep all records within a file strictly ordered to fully leverage the advantages of the sorting.

@HuangZhenQiu
Copy link
Contributor Author

Small files is not good for query performance. But if we have whole parquet file with order, we will lose the data freshness. Sort time will increase a lot then cause the high back pressure in Flink job. Thus, we use the buffer size to control the row group level order and compression ratio. It is a trade off to achieve data freshness and storage size without keeping parquet file level sort. We will leverage table service to do the stitching later.

@cshuo
Copy link
Contributor

cshuo commented Jul 21, 2025

Small files is not good for query performance.

As mentioned above, we can trigger flushing by buffer memory size and set the size properly to relieve the small files pressure. And the current impl seems can't ensure the data is ordered in row group level either, since row group is switched when it reaches the configured size limit, e.g., default 120Mb currently. (HoodieStorageConfig#PARQUET_BLOCK_SIZE).

But if we have whole parquet file with order, we will lose the data freshness.

Actually the data freshness is decided by checkpoint interval. The writer will flush and commit the written files during checkpoint, until which point the data remains invisible.

Sort time will increase a lot then cause the high back pressure in Flink job.

Agree that it will need more sort time to keep whole file ordered. Not sure how significant the impact is, I remembered @Alowator has a ingestion benchmark which includes sorting of binary buffer here, and said sort performs fast enough so it doesn't affect write performance, where the default batch size is 256Mb to trigger flushing. Maybe you can double check that. cc @HuangZhenQiu

@cshuo
Copy link
Contributor

cshuo commented Jul 21, 2025

cc @danny0405 @zhangyue19921010 for final review.

@HuangZhenQiu HuangZhenQiu force-pushed the HUDI-9504-batch-sort branch 4 times, most recently from e3058eb to a1ae791 Compare July 29, 2025 01:01
@HuangZhenQiu HuangZhenQiu force-pushed the HUDI-9504-batch-sort branch from a1ae791 to 9446081 Compare July 29, 2025 17:09
@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

*
* @throws IOException
*/
private void sortAndSend() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During sort and send, we still use a common write helper, which means this sort is still a partial sort?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on our experience, global sorting for Parquet files requires additional memory and inevitably leads to small file issues, although the sorting process itself is highly efficient (accounting for less than 5% in flame graphs). For partial sorting implementations, I recommend uniformly updating all references—including configuration names, java docs, and method names ,etc —to use the term "Partial Order" to prevent unnecessary confusion among other users.

.key("write.sort.enabled")
.booleanType()
.defaultValue(false) // default no sort
.withDescription("Whether to enable buffer sort within append write function.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we need to Implement a complete sorting semantics.

We can refer to StreamWriteFunction and use Buckets to control the implementation of flush.

Fortunately, most of the code in this PR can be reused, just need to improve the flush control

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:L PR with lines of changes in (300, 1000]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants