feat: add flink continuous split enumerator#17562
Conversation
138fdb4 to
f890ce8
Compare
...flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
Outdated
Show resolved
Hide resolved
...i-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
Outdated
Show resolved
Hide resolved
f890ce8 to
acaeeca
Compare
acaeeca to
6ea326b
Compare
cbeb1d0 to
4c483e4
Compare
HuangZhenQiu
left a comment
There was a problem hiding this comment.
@xushiyan Please review the PR at your most convenient time.
...i-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
Show resolved
Hide resolved
0d4abb9 to
b338b58
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds support for continuous/incremental reading of Hudi tables in Flink by introducing a new HoodieContinuousSplitEnumerator that can discover new splits from the Hudi timeline as commits occur.
Key Changes:
- Introduces continuous split enumeration infrastructure for streaming incremental reads
- Adds position tracking to maintain enumeration state across checkpoints
- Provides a configurable scan context to encapsulate scanning parameters
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| HoodieContinuousSplitEnumerator.java | Main enumerator implementation that periodically discovers new splits from Hudi commits |
| HoodieContinuousSplitBatch.java | Data class representing a batch of splits with instant range information |
| HoodieContinuousSplitDiscover.java | Interface for split discovery operations |
| DefaultHoodieSplitDiscover.java | Default implementation that delegates to IncrementalInputSplits |
| HoodieEnumeratorPosition.java | Tracks the last enumerated instant for incremental discovery |
| ScanContext.java | Encapsulates scan configuration including paths, instants, and skip options |
| HoodieSourceSplit.java | Adds static SPLIT_COUNTER for generating unique split IDs |
| HoodieSplitEnumeratorState.java | Extended to include last enumerated instant tracking |
| IncrementalInputSplits.java | Adds inputHoodieSourceSplits method to convert splits to HoodieSourceSplit type |
| AbstractHoodieSplitEnumerator.java | Updated to pass null values for new state parameters |
| TestHoodieContinuousSplitBatch.java | Comprehensive tests for HoodieContinuousSplitBatch |
| TestDefaultHoodieSplitDiscover.java | Tests for split discovery implementation |
| TestHoodieContinuousSplitEnumerator.java | Tests for continuous enumerator with mocks |
| TestScanContext.java | Tests for ScanContext builder and accessors |
| TestIncrementalInputSplits.java | Tests for the new inputHoodieSourceSplits method |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...link-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
Outdated
Show resolved
Hide resolved
...flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
Outdated
Show resolved
Hide resolved
...flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
Show resolved
Hide resolved
...link-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
Outdated
Show resolved
Hide resolved
...flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
Outdated
Show resolved
Hide resolved
...rce/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitDiscover.java
Outdated
Show resolved
Hide resolved
...rce/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitDiscover.java
Outdated
Show resolved
Hide resolved
...source/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
Outdated
Show resolved
Hide resolved
...rce/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieEnumeratorPosition.java
Outdated
Show resolved
Hide resolved
...i-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
Outdated
Show resolved
Hide resolved
...link-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
Outdated
Show resolved
Hide resolved
...flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
Outdated
Show resolved
Hide resolved
...i-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
Outdated
Show resolved
Hide resolved
...i-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
Outdated
Show resolved
Hide resolved
...i-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
Outdated
Show resolved
Hide resolved
...e/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieSplitEnumeratorState.java
Outdated
Show resolved
Hide resolved
cshuo
left a comment
There was a problem hiding this comment.
Thks for the contribution, left some comments.
...source/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
Outdated
Show resolved
Hide resolved
...i-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
Outdated
Show resolved
Hide resolved
...i-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
Outdated
Show resolved
Hide resolved
...rce/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieEnumeratorPosition.java
Outdated
Show resolved
Hide resolved
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ScanContext.java
Outdated
Show resolved
Hide resolved
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ScanContext.java
Show resolved
Hide resolved
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ScanContext.java
Show resolved
Hide resolved
...rce/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieEnumeratorPosition.java
Outdated
Show resolved
Hide resolved
b338b58 to
01c2a2e
Compare
d17d9f0 to
29df33c
Compare
f685a23 to
493abcb
Compare
...source/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
Outdated
Show resolved
Hide resolved
...i-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
Show resolved
Hide resolved
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ScanContext.java
Show resolved
Hide resolved
493abcb to
e474e99
Compare
...udi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java
Outdated
Show resolved
Hide resolved
...i-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
Outdated
Show resolved
Hide resolved
...i-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
Outdated
Show resolved
Hide resolved
...rce/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieEnumeratorPosition.java
Show resolved
Hide resolved
...source/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
Outdated
Show resolved
Hide resolved
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ScanContext.java
Outdated
Show resolved
Hide resolved
...i-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
Outdated
Show resolved
Hide resolved
...source/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
Show resolved
Hide resolved
6bbc20f to
7a74e37
Compare
...flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
Outdated
Show resolved
Hide resolved
7a74e37 to
658201a
Compare
...source/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
Outdated
Show resolved
Hide resolved
658201a to
46992bc
Compare
| public static HoodieContinuousSplitBatch fromResult(IncrementalInputSplits.Result result) { | ||
| List<HoodieSourceSplit> splits = result.getInputSplits().stream().map(split -> | ||
| new HoodieSourceSplit( | ||
| HoodieSourceSplit.SPLIT_COUNTER.incrementAndGet(), |
There was a problem hiding this comment.
HoodieSourceSplit should not hold SPLIT_COUNTER for itself. better managed by the caller who creates the splits. it could be init'ed here
There was a problem hiding this comment.
the current toString() is a human-readable form of info to describe the object, not suitable to be used as an id. also this should be able to uniquely identify the split reading the same set of records? it should at least contain file id, which is not present currently
...i-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
Show resolved
Hide resolved
...i-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
Show resolved
Hide resolved
|
The PR also resolved #14417 |
Describe the issue this Pull Request addresses
Add HoodieContinuousSplitEnumerator for reading Hudi table incrementally
Summary and Changelog
Add HoodieContinuousSplitEnumerator, DefaultHoodieContinuousSplitDiscover and HoodieContinuousSplitBatch classes
Impact
none
Risk Level
none
Documentation Update
Contributor's checklist