-
Notifications
You must be signed in to change notification settings - Fork 2.4k
[HUDI-9504] support in-memory buffer sort in append write #13409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
45cf09a
to
2efb9bf
Compare
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
Outdated
Show resolved
Hide resolved
08ab684
to
123870c
Compare
@zhangyue19921010 @danny0405 |
Will finish my review later this week. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @cshuo for the fist round of review
@@ -586,6 +586,27 @@ private FlinkOptions() { | |||
.withDescription("Maximum memory in MB for a write task, when the threshold hits,\n" | |||
+ "it flushes the max size data bucket to avoid OOM, default 1GB"); | |||
|
|||
@AdvancedConfig | |||
public static final ConfigOption<Boolean> WRITE_BUFFER_SORT_ENABLED = ConfigOptions | |||
.key("write.buffer.sort.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
write.sort.enabled?
|
||
@AdvancedConfig | ||
public static final ConfigOption<String> WRITE_BUFFER_SORT_KEYS = ConfigOptions | ||
.key("write.buffer.sort.keys") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
write.sort.keys?
.../hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
Outdated
Show resolved
Hide resolved
.../hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
Outdated
Show resolved
Hide resolved
.../hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
Outdated
Show resolved
Hide resolved
|
||
public AppendWriteFunctionWithBufferSort(Configuration config, RowType rowType) { | ||
super(config, rowType); | ||
this.writebufferSize = config.get(FlinkOptions.WRITE_BUFFER_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe WRITE_BUFFER_SIZE is not needed , since memorySegmentPool
has an limited memory size, which can be used to trigger buffer flushing.
.../hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
Show resolved
Hide resolved
.../hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
Outdated
Show resolved
Hide resolved
.../hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
Outdated
Show resolved
Hide resolved
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
Show resolved
Hide resolved
.../hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
Outdated
Show resolved
Hide resolved
.../hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
Show resolved
Hide resolved
.../hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
Outdated
Show resolved
Hide resolved
...flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
Show resolved
Hide resolved
@HuangZhenQiu Still working on this? |
@zhangyue19921010 Yes. I was OOO last week. Will update the diff this week. |
36ed297
to
c692d75
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. @cshuo @zhangyue19921010
3f5ee19
to
1d86872
Compare
Ok, will take another look soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HuangZhenQiu Thks for updating, left some comments.
.../hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
Outdated
Show resolved
Hide resolved
.../hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
Outdated
Show resolved
Hide resolved
.../hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
Outdated
Show resolved
Hide resolved
.../hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
Outdated
Show resolved
Hide resolved
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/BufferUtils.java
Outdated
Show resolved
Hide resolved
.../hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
Outdated
Show resolved
Hide resolved
.../hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
Outdated
Show resolved
Hide resolved
.../hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
Show resolved
Hide resolved
...flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
Show resolved
Hide resolved
f27de92
to
d5210d8
Compare
fd80089
to
3c2854e
Compare
@cshuo |
Thks for updating. Seems the pr don't fix the comments here, i.e., with the current impl, records are partially ordered within a parquet file, since it may contains batches from multiple |
Small files is not good for query performance. But if we have whole parquet file with order, we will lose the data freshness. Sort time will increase a lot then cause the high back pressure in Flink job. Thus, we use the buffer size to control the row group level order and compression ratio. It is a trade off to achieve data freshness and storage size without keeping parquet file level sort. We will leverage table service to do the stitching later. |
As mentioned above, we can trigger flushing by buffer memory size and set the size properly to relieve the small files pressure. And the current impl seems can't ensure the data is ordered in row group level either, since row group is switched when it reaches the configured size limit, e.g., default 120Mb currently. (
Actually the data freshness is decided by checkpoint interval. The writer will flush and commit the written files during checkpoint, until which point the data remains invisible.
Agree that it will need more sort time to keep whole file ordered. Not sure how significant the impact is, I remembered @Alowator has a ingestion benchmark which includes sorting of binary buffer here, and said |
...flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
Show resolved
Hide resolved
cc @danny0405 @zhangyue19921010 for final review. |
e3058eb
to
a1ae791
Compare
a1ae791
to
9446081
Compare
* | ||
* @throws IOException | ||
*/ | ||
private void sortAndSend() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During sort and send, we still use a common write helper, which means this sort is still a partial sort?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on our experience, global sorting for Parquet files requires additional memory and inevitably leads to small file issues, although the sorting process itself is highly efficient (accounting for less than 5% in flame graphs). For partial sorting implementations, I recommend uniformly updating all references—including configuration names, java docs, and method names ,etc —to use the term "Partial Order" to prevent unnecessary confusion among other users.
.key("write.sort.enabled") | ||
.booleanType() | ||
.defaultValue(false) // default no sort | ||
.withDescription("Whether to enable buffer sort within append write function."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think we need to Implement a complete sorting semantics.
We can refer to StreamWriteFunction and use Buckets to control the implementation of flush.
Fortunately, most of the code in this PR can be reused, just need to improve the flush control
Change Logs
Add in memory buffer sort in append write function to improve the parquet compression ratio. From our experiment and testing, It can improve 300% compression ratio with right sort key and buffer size configuration.
Impact
User can use the feature by enable the buffer sort configurations
Risk level (write none, low medium or high below)
low
Documentation Update
It is a new feature. Jira will be created to update the website.
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist