-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-6329] Adjust the partitioner automatically for flink consistent hashing index #9087
Conversation
…uld detect whether clustering service occurs and automatically adjust the partitioner and write function if clustering service happens.
1bc4ea7
to
92e8459
Compare
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCollector.java
Outdated
Show resolved
Hide resolved
HoodieFlinkTable table = writeClient.getHoodieTable(); | ||
Option<HoodieInstant> latestPendingReplaceInstant = table.getActiveTimeline().filterPendingReplaceTimeline().lastInstant(); | ||
if (latestPendingReplaceInstant.isPresent() && latestPendingReplaceInstant.get().getTimestamp().compareTo(lastRefreshInstant) > 0) { | ||
LOG.info("Found new pending replacement commit. Last pending replacement commit is {}.", latestPendingReplaceInstant); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to make sure this is a CLUSTERING
instant, you may also needs to check the plan, because clustering and insert overwrite share the same kind of instant type: REPLACE_COMMIT
. There is a util method can do this: ClusteringUtil#isClusteringInstant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. I could add a check here, using ClusteringUtils#getPendingClusteringInstantTimes
to filter out pending clustering instants.
...flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
Outdated
Show resolved
Hide resolved
… bucket index 2. Check replacement is clustering
47e33ac
to
cc9a853
Compare
6329.patch.zip |
private List<String> indexKeyFields; | ||
private Map<String, Pair<String, ConsistentBucketIdentifier>> partitionToIdentifier; | ||
private String lastRefreshInstant = HoodieTimeline.INIT_INSTANT_TS; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see lastRefreshInstant
is used anywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry to miss that, it is used to check whether there is new pending clustering request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, fine with the change, may need more tests in production~
Change Logs
This pr is the third subtask of HUDI-4373.
It focuses on resolving the resizing cases in the write pipelines. It would detect whether clustering service occurs and automatically adjust the partitioner and write function if clustering service happens.
ps: This work is follow up of #6737. Thanks for contribution @YuweiXiao
Impact
NA
Risk level (write none, low medium or high below)
NA
Documentation Update
All documents update would be introduced in the final subtask of HUDI-4373.
Contributor's checklist