feat(flink): extend Flink quickstart example to use source v2#18518
Conversation
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Style & Readability Review — a few small readability suggestions below.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Style & Readability Review — a few small readability suggestions: a flipped assertEquals argument order and magic number in the test, a redundant inline comment that duplicates the Javadoc, and a bare // ignored on a caught exception that could use a brief rationale.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for contributing! The change is a straightforward extension of the quickstart to demonstrate Source V2. One small concern around silently swallowing ExecutionException from tableResult.await() — worth a quick look.
HuangZhenQiu
left a comment
There was a problem hiding this comment.
Resolved AI comments.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the follow-up — the cosmetic nits (removed duplicate inline comment, renamed table → resolvedTable, flipped assertEquals args) all look good. One thing to note: my prior question about silently swallowing ExecutionException from tableResult.await() in execBatchSelectSql wasn't addressed in this round and I didn't see a reply explaining why — would still be worth at least logging the cause so job failures aren't masked.
3a65c89 to
3da4553
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR extends the Flink quickstart example to demonstrate the FLIP-27 Source V2 reader for both streaming (continuous) and batch (static) modes, and expands the IT test matrix to cover the new path. No issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review.
Style & Readability — a couple of small readability nits below — one on the silent exception catch and one on the builder call pattern.
cc @yihua
3da4553 to
d716218
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR extends the Flink quickstart example to demonstrate Source V2 usage via a new useSourceV2 arg and adds a bounded batch query path, with tests parameterized across COW/MOR × V1/V2. No critical correctness issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. A few small naming and clarity suggestions below.
cc @yihua
| List<Row> batchRows = flinkQuickstart.queryBatchData(); | ||
|
|
||
| // full table scan | ||
| assertEquals(8, batchRows.size()); |
There was a problem hiding this comment.
🤖 nit: the magic 8 here will silently pass even if the dataset changes size — could you use TestQuickstartData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT.size() (or wherever the expected row count lives) so the assertion stays in sync with the rest of the test?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
d716218 to
f8b72a7
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR extends the Flink quickstart example to demonstrate Source V2 and adds a null-safety fallback for the split enumerator's metric group on Flink 1.17. No critical issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. A couple of small naming/readability nits below.
cc @yihua
| .setUnassignedSplitsGauge(() -> Long.valueOf(splitProvider.pendingSplitCount())); | ||
| this.enumeratorMetrics = new FlinkStreamReadMetrics(enumeratorContext.metricGroup(), tableName); | ||
| enumeratorMetrics.registerMetrics(); | ||
| if (this.enumeratorContext.metricGroup() != null) { |
There was a problem hiding this comment.
🤖 nit: enumeratorContext.metricGroup() is called three times across lines 75–79 (null check, gauge registration, and metrics construction). Could you cache it in a local — e.g. SplitEnumeratorMetricGroup metricGroup = enumeratorContext.metricGroup(); — before the if, so all three uses reference the same local?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| execConf.setString("restart-strategy", "fixed-delay"); | ||
| execConf.setString("restart-strategy.fixed-delay.attempts", "0"); | ||
| this.streamTableEnv = streamTableEnv; | ||
| this.tableEnvironment = streamTableEnv; |
There was a problem hiding this comment.
🤖 nit: the local variable streamTableEnv (declared a few lines above) wasn't renamed when the field was renamed to tableEnvironment, so the two names sit side-by-side here. Could you rename the local to tableEnvironment (or tableEnv) as well for consistency?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| this.enumeratorMetrics = new FlinkStreamReadMetrics(this.enumeratorContext.metricGroup(), tableName); | ||
| } else { | ||
| // The metrics group returned from enumeratorContext is null in Flink 1.17. | ||
| this.enumeratorMetrics = new FlinkStreamReadMetrics(UnregisteredMetricsGroup.createSplitEnumeratorMetricGroup(), tableName); |
There was a problem hiding this comment.
There was a problem hiding this comment.
is UnregisteredMetricsGroup the right way to fix the issue?
There was a problem hiding this comment.
Probably not, UnregisteredMetricsGroup is usually used in test only. Shall we leave it as null?
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR extends the Flink quickstart to demonstrate the FLIP-27 Source V2 reader, parameterizes the existing IT to cover all four COW/MOR × V1/V2 combinations, and adds null-metric-group handling required by Flink 1.17. No new issues flagged from this automated pass beyond what prior rounds and other reviewers already raised — a Hudi committer or PMC member can take it from here for a final review. A couple of small naming suggestions below — overall the changes read cleanly.
cc @yihua
| private EnvironmentSettings settings = null; | ||
| @Getter | ||
| private TableEnvironment streamTableEnv = null; | ||
| private TableEnvironment tableEnvironment = null; |
There was a problem hiding this comment.
🤖 nit: the rename from streamTableEnv to tableEnvironment loses the streaming-specific semantics — this field is the streaming counterpart to getBatchTableEnv(). Keeping it as streamTableEnv (or naming it streamingTableEnv) makes the distinction clearer at call sites.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| execConf.setString("restart-strategy", "fixed-delay"); | ||
| execConf.setString("restart-strategy.fixed-delay.attempts", "0"); | ||
| this.streamTableEnv = streamTableEnv; | ||
| this.tableEnvironment = streamTableEnv; |
There was a problem hiding this comment.
🤖 nit: the local variable here is still named streamTableEnv while the field was renamed to tableEnvironment — having two different names for the same value in a small method is confusing. Worth aligning them.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| } else { | ||
| // The metrics group returned from enumeratorContext is null in Flink 1.17. | ||
| enumeratorMetrics = null; | ||
| } |
There was a problem hiding this comment.
🤖 nit: inconsistent use of this. between the two branches (this.enumeratorMetrics = ... in the if-branch vs. enumeratorMetrics = null in the else-branch). Pick one for consistency in this constructor.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
|
There are test failures: TestHoodieStaticSplitEnumerator.testConstructorWithNullMetricGroup:238 |
fb8a874 to
01d8944
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR extends the Flink quickstart to optionally route reads through the FLIP-27 Source V2 path and adds a defensive null-check around the enumerator metric group to handle the Flink 1.17 case where metricGroup() returns null. No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. One small readability note in the enumerator constructor; the rest of the changes are clean.
cc @yihua
| .setUnassignedSplitsGauge(() -> Long.valueOf(splitProvider.pendingSplitCount())); | ||
| this.enumeratorMetrics = new FlinkStreamReadMetrics(enumeratorContext.metricGroup(), tableName); | ||
| enumeratorMetrics.registerMetrics(); | ||
| if (this.enumeratorContext.metricGroup() != null) { |
There was a problem hiding this comment.
🤖 nit: enumeratorContext.metricGroup() is called three times (null-check, gauge registration, and metric-group constructor). Could you assign it to a local variable once — SplitEnumeratorMetricGroup metricGroup = enumeratorContext.metricGroup(); — to make the null guard and the two usages below read more clearly?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| splitProvider.onDiscoveredSplits(result.getSplits()); | ||
| position.get().getIssuedInstant().ifPresent(enumeratorMetrics::setIssuedInstant); | ||
| if (enumeratorMetrics != null) { | ||
| position.get().getIssuedInstant().ifPresent(enumeratorMetrics::setIssuedInstant); |
There was a problem hiding this comment.
does this mean for flink release under 1.8, this metrics would be unsupported?
There was a problem hiding this comment.
Yes, I think so. As we can't get the metrics group from the SplitEnumeratorContext. Any better suggestions?
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18518 +/- ##
============================================
- Coverage 68.90% 68.06% -0.84%
- Complexity 28581 28909 +328
============================================
Files 2482 2518 +36
Lines 137053 140572 +3519
Branches 16713 17422 +709
============================================
+ Hits 94436 95681 +1245
- Misses 35009 37035 +2026
- Partials 7608 7856 +248
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
Extend Flink quick start example to explicitly use Flink Hudi Source V2
Closes #14428
Summary and Changelog
Impact
none
Risk Level
none
Documentation Update
none
Contributor's checklist