feat(flink): Support dynamic bucket for flink streaming with partitio…#18640
Conversation
111f50b to
36f9935
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This chunk contains the test coverage for the new dynamic bucket / partitioned RLI write path: unit tests for the new RecordLevelIndexBackend lazy-eviction and dictionary-encoded bucket caches, the renamed GlobalRecordLevelIndexBackend and GlobalRecordIndexPartitioner, the partitioned RLI routing in RecordIndexPartitioner, and an end-to-end integration test (ITTestDynamicBucketStreamWrite) covering CoW/MoR, partitioned/non-partitioned, insert overwrite and bucket scaling. I traced the lazy-eviction math, the access-order semantics of the LinkedHashMap-based partitionBucketCaches, and the partitioner routing — the assertions hold for the chosen FILE_GROUP_COUNT/NUM_PARTITIONS pair. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A few small naming and simplification suggestions below.
cc @yihua
| */ | ||
| protected Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> tagRecordsWithLocationForStreamingWrites(HoodieData<HoodieRecord> untaggedRecords, | ||
| Set<String> enabledMetadataPartitions) { | ||
| // no need to tag of the incoming records is empty. |
There was a problem hiding this comment.
🤖 nit: typo — "no need to tag of the incoming records is empty" should probably be "no need to tag records if the incoming data is empty".
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()), | ||
| writeProfile, | ||
| writeConfig); | ||
| this.maxParallelism = RuntimeContextUtils.getMaxNumberOfParallelSubtasks(getRuntimeContext()); |
There was a problem hiding this comment.
🤖 nit: getMaxNumberOfParallelSubtasks, getNumberOfParallelSubtasks, and getIndexOfThisSubtask are each called twice — once inside the BucketAssigner constructor args and again to assign the fields. Could you assign the three fields first and then pass them to the constructor?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| public static boolean isStreamingIndexWriteEnabled(Configuration conf) { | ||
| return conf.get(FlinkOptions.METADATA_ENABLED) | ||
| && OptionsResolver.getIndexType(conf) == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX | ||
| && (OptionsResolver.getIndexType(conf) == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX |
There was a problem hiding this comment.
🤖 nit: OptionsResolver.getIndexType(conf) is called twice in the same boolean expression — could you extract it to a local variable like the surrounding methods do?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the updates! This round adds an INDEX_RLI_CACHE_CONCURRENT_PARTITIONS_NUM config to drive inferMemorySizeForCache, raises the SamplingActionExecutor default record interval from 1000 to 10000, and consolidates the partitioned-RLI write-mode validation from Pipelines into a switch in HoodieTableFactory.checkIndexType. One thing worth a second look: the move dropped the !OptionsResolver.isMultiWriter(conf) check (so OCC is now silently accepted for partitioned RLI), and sanityCheck doesn't run on the HoodieFlinkStreamer path — see the inline comment to confirm intent. The three NITs from the prior pass (typo in HoodieBackedTableMetadataWriter, duplicate RuntimeContextUtils calls in DynamicBucketAssignFunction, duplicate getIndexType in OptionsResolver) are still open. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here.
| ValidationUtils.checkArgument(OptionsResolver.isUpsertOperation(conf) || OptionsResolver.isInsertOverwrite(conf), | ||
| "Partitioned record level index supports only Flink streaming upsert and insert overwrite."); | ||
| ValidationUtils.checkArgument(!OptionsResolver.isNonBlockingConcurrencyControl(conf), | ||
| "Partitioned record level index does not support non-blocking concurrency control."); |
There was a problem hiding this comment.
🤖 The previous Pipelines.validateRecordLevelIndexStreamWrite had three checks; this case keeps the upsert/insertOverwrite and NBCC ones but drops !OptionsResolver.isMultiWriter(conf). Since isMultiWriter is true for both OCC and NBCC, OPTIMISTIC_CONCURRENCY_CONTROL is now silently accepted for partitioned RLI — was that intentional? Each writer bootstraps its own RecordLevelIndexBackend cache from MDT and won't observe the other writer's in-flight assignments. Also, sanityCheck doesn't run on the HoodieFlinkStreamer path, so misconfigurations there will no longer be rejected up front.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
…ned RLI
Describe the issue this Pull Request addresses
Flink streaming writes need a partitioned record-level-index path that can support dynamic bucket assignment without relying on global record-key-to-location lookups. The existing RLI integration was centered on global record locations and did not provide a partition-scoped backend or pipeline wiring for routing records to dynamic bucket file groups.
This PR adds the Flink dynamic bucket write path backed by partitioned RLI, separates global and partitioned index backend responsibilities, and wires the corresponding index write partitioning and table options.
Summary and Changelog
DynamicBucketAssignFunctionandDynamicBucketAssignOperatorto route Flink records through partition-scoped RLI-backed bucket assignment.RecordLevelIndexBackendas the partitioned RLI backend with partition-local spillable caches.GlobalRecordIndexPartitionerand updatesRecordIndexPartitionerso index writes route consistently for global and partitioned RLI layouts.RECORD_LEVEL_INDEXtable setup inHoodieTableFactory, including metadata table streaming writes and RLI write buffer defaults.Impact
IndexBackendabstraction.Risk Level
low
Documentation Update
Contributor's checklist