[HUDI-7030] Commit-based Clustering Plan Strategy#18251
[HUDI-7030] Commit-based Clustering Plan Strategy#18251prashantwason wants to merge 2 commits intoapache:masterfrom
Conversation
Summary: The first version of commit based clustering plan is scope to append only ingestion jobs Test Plan: unit tests Reviewers: O955 Project Hoodie Project Reviewer: Add blocking reviewers, #hoodie_blocking_reviewers, #ldap_hudi, vamshi, bkrishen, ureview Reviewed By: O955 Project Hoodie Project Reviewer: Add blocking reviewers, #hoodie_blocking_reviewers, #ldap_hudi, vamshi, bkrishen Tags: #has_java JIRA Issues: HUDI-7030 Differential Revision: https://code.uberinternal.com/D18833959
Summary: create clustering groups for remaining partitions Test Plan: unit tests Reviewers: O955 Project Hoodie Project Reviewer: Add blocking reviewers, #hoodie_blocking_reviewers, #ldap_hudi, ureview, vamshi Reviewed By: O955 Project Hoodie Project Reviewer: Add blocking reviewers, #hoodie_blocking_reviewers, #ldap_hudi, vamshi Subscribers: vamshi Tags: #has_java JIRA Issues: HUDI-7030, HUDI-7197 Differential Revision: https://code.uberinternal.com/D18935003
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18251 +/- ##
============================================
- Coverage 57.29% 57.23% -0.07%
- Complexity 18542 18559 +17
============================================
Files 1945 1946 +1
Lines 106202 106395 +193
Branches 13130 13151 +21
============================================
+ Hits 60848 60891 +43
- Misses 39628 39780 +152
+ Partials 5726 5724 -2
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
nsivabalan
left a comment
There was a problem hiding this comment.
shared feedback for source code.
will review tests in next iteration.
| .noDefaultValue() | ||
| .markAdvanced() | ||
| .sinceVersion("0.14.0") | ||
| .withDocumentation("Last commit time to start clustering from, applied to commit-based clustering plan strategy"); |
There was a problem hiding this comment.
Shouldn't this be, ....earliest.commit.time.to.cluster
There was a problem hiding this comment.
can we call out in docs whether this is inclusive or exclusive?
| return getString(HoodieClusteringConfig.PARTITION_REGEX_PATTERN); | ||
| } | ||
|
|
||
| public String getClusteringLastCommit() { |
There was a problem hiding this comment.
lets fix all getter methods once we fix the config key
| List<FileSlice> fileSlices = new ArrayList<>(); | ||
| List<HoodieWriteStat> writeStats = commitMetadata.getWriteStats(partition); | ||
| for (HoodieWriteStat stat : writeStats) { | ||
| // Only consider base files (ignore log files for clustering) |
There was a problem hiding this comment.
we can't ignore log files for clustering in MOR table.
we have added FileGroupReader abstraction in general. So, we should not ignore any log files here.
There was a problem hiding this comment.
Or, we should confine this plan only for COW table.
| partitionToFiles.put(partition, eligibleFileSlices); | ||
| } | ||
|
|
||
| if (partitionToSize.get(partition) >= getWriteConfig().getClusteringMaxBytesInGroup()) { |
There was a problem hiding this comment.
but this does not strictly honor the max clustering group size right.
we should remove the last added entry so that we are <= ClusteringMaxBytesInGroup?
| // For partitions that have not been processed, create clustering groups | ||
| for (String partition : partitionToSize.keySet()) { | ||
| if (partitionToSize.get(partition) > 0) { | ||
| Pair<Stream<HoodieClusteringGroup>, Boolean> groupPair = buildClusteringGroupsForPartition(partition, partitionToFiles.get(partition)); |
There was a problem hiding this comment.
Are we not accounting for ClusteringMaxNumGroups here?
| </configuration> | ||
| </plugin> | ||
| <plugin> | ||
| <groupId>org.codehaus.mojo</groupId> |
There was a problem hiding this comment.
why do we need this change?
Summary
This PR introduces a new commit-based clustering plan strategy for Apache Hudi that processes data files based on their commit/ingestion order. This is particularly useful for streaming ingestion workloads where:
Key Changes
CommitBasedClusteringPlanStrategyclass - Processes commits in order and generates clustering plans based on commit patternsConfiguration
hoodie.clustering.plan.strategy.classCommitBasedClusteringPlanStrategyhoodie.clustering.plan.strategy.max.bytes.per.grouphoodie.clustering.plan.strategy.max.num.groupshoodie.clustering.plan.last.commitTest plan
CommitBasedClusteringPlanStrategy(7 tests, all passing)testGenerateClusteringPlanWithNoCommitstestGenerateClusteringPlanWithoutCheckpointtestGenerateClusteringPlanWithSingleCommitSinglePartition(parameterized)testGenerateClusteringPlanWithSingleCommitMultiPartitionstestGenerateClusteringPlanWithMultiCommitstestGenerateClusteringPlanWithReplaceCommit🤖 Generated with Claude Code