feat(flink): add metrics for RLIBootstrapOperator#18685
feat(flink): add metrics for RLIBootstrapOperator#18685HuangZhenQiu wants to merge 3 commits intoapache:masterfrom
Conversation
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds gauge metrics (file slices processed, records emitted, bootstrap cost, throughput) for the Flink RLIBootstrapOperator, populated once after bootstrap loading completes in initializeState() and snapshotted in open(). No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One naming nit on the throughput metric constant — otherwise the code is clean.
cc @yihua
2eab756 to
ca601a2
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR adds Flink metrics for the RLI bootstrap operator and accompanying tests. A couple of issues worth checking in the inline comments — the metrics object never seems to be updated with the real values after bootstrap completes, and the test file appears out of sync with the source constant name. Please take a look at the inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. One naming issue worth fixing — the throughput metric constant and key both say "perMs" but the gauge value is computed in records/second.
ca601a2 to
70c496b
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR adds a FlinkRLIBootstrapMetrics class and wires it into RLIBootstrapOperator to expose bootstrap-loading gauges. Earlier review rounds already flagged the main correctness concern (metrics being captured before preLoadRLIRecords runs) and naming inconsistencies, so no new issues are flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. One naming mismatch worth fixing — the PerMs constant/key implies records-per-millisecond but the computation returns records-per-second.
There was a problem hiding this comment.
Pull request overview
This PR adds Flink-side metrics for the record-level-index bootstrap path so RLIBootstrapOperator can expose basic bootstrap counts and timing through the existing Hudi/Flink metrics infrastructure.
Changes:
- Add a new
FlinkRLIBootstrapMetricsmetrics class with gauges for file slices processed, index records emitted, bootstrap duration, and derived throughput. - Wire
RLIBootstrapOperatorto create/register the new metrics and publish bootstrap results. - Add unit tests for metric registration, default values, updates, and throughput calculation.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkRLIBootstrapMetrics.java |
Introduces the new bootstrap metric definitions and gauge calculations. |
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java |
Hooks the operator into the new metrics class and captures bootstrap counters/timing. |
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkRLIBootstrapMetrics.java |
Adds unit tests covering metric names, registration, initial values, and update behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds Flink metrics to track RLI bootstrap progress. One concern worth a closer look on the operator lifecycle ordering — please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small nits in the test file — naming/comment mismatch with the metric unit, and a prefer-assertTrue suggestion.
| conf.get(FlinkOptions.PATH)); | ||
| // Load RLI records | ||
| preLoadRLIRecords(); | ||
| this.metrics.updateLoadResult(numFileSlicesProcessed, loadedCnt, bootstrapCostMs); |
There was a problem hiding this comment.
🤖 In Flink's operator lifecycle, initializeState() runs before open(), so this.metrics is still null here and this call would NPE. Could you move the metrics initialization into initializeState() (before preLoadRLIRecords()), or guard this update with a null check? @danny0405 could you confirm the lifecycle ordering for this operator?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
There was a problem hiding this comment.
🤖 Agreed — StreamOperator#initializeState() runs before open() in Flink's lifecycle, so the call at line 85 will NPE on this.metrics. The current TestRLIBootstrapOperator only calls operator.open() in setUp, which is why the bug isn't caught there. Initializing metrics in initializeState() (or guarding the call) seems like the right fix.
| } | ||
|
|
||
| @Test | ||
| void testThroughputIsRecordsPerSecond() { |
There was a problem hiding this comment.
🤖 nit: the test name and the comment on the next line say "records/sec", but the metric (BOOTSTRAP_RECORD_PER_MS) and the asserted value (4.0 for 2000/500ms) are records-per-ms. Could you rename to testThroughputIsRecordsPerMs and update the comment to match? Otherwise it's easy for a future reader to think there's a unit bug.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
There was a problem hiding this comment.
🤖 Confirming this read: getThroughput() is numIndexRecordsEmitted / bootstrapCostMs (no * 1000), so 2000/500 = 4 is records-per-ms, matching the BOOTSTRAP_RECORD_PER_MS constant. Renaming the test to testThroughputIsRecordsPerMs (or aligning the production name + math to records/sec, if that's the intended unit) would resolve the ambiguity.
| @Test | ||
| void testAllMetricsAreRegistered() { | ||
| assertEquals(4, metricGroup.gauges.size()); | ||
| assertEquals(true, metricGroup.gauges.containsKey(NUM_FILE_SLICES_PROCESSED)); |
There was a problem hiding this comment.
🤖 nit: assertEquals(true, ...) reads better as assertTrue(...) here (and on the three lines below). Minor, but it expresses intent more directly.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| super.open(); | ||
| this.metrics = new FlinkRLIBootstrapMetrics(getRuntimeContext().getMetricGroup()); | ||
| this.metrics.registerMetrics(); | ||
| this.metrics.updateLoadResult(numFileSlicesProcessed, loadedCnt, bootstrapCostMs); |
There was a problem hiding this comment.
🤖 nit: this updateLoadResult call passes the just-default zero values, so it's a no-op (gauges already return 0 from the field defaults). Could you drop it and rely on the call in initializeState after preLoadRLIRecords to publish the real values?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
984685a to
3d05686
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds a metrics class and test coverage for the RLI bootstrap operator on Flink. Prior review rounds have already surfaced the main concerns (lifecycle ordering of initializeState() vs open() causing a potential NPE on this.metrics, the misleading BOOTSTRAP_RECORD_PER_MS naming when the value is actually records/sec, and the no-op zero-value update in open()). No additional issues flagged from this automated pass — please take a look at the prior inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. One misleading unit label in a test method name — the comment on the same test compounds the confusion.
| conf.get(FlinkOptions.PATH)); | ||
| // Load RLI records | ||
| preLoadRLIRecords(); | ||
| this.metrics.updateLoadResult(numFileSlicesProcessed, loadedCnt, bootstrapCostMs); |
There was a problem hiding this comment.
🤖 Agreed — StreamOperator#initializeState() runs before open() in Flink's lifecycle, so the call at line 85 will NPE on this.metrics. The current TestRLIBootstrapOperator only calls operator.open() in setUp, which is why the bug isn't caught there. Initializing metrics in initializeState() (or guarding the call) seems like the right fix.
| } | ||
|
|
||
| @Test | ||
| void testThroughputIsRecordsPerSecond() { |
There was a problem hiding this comment.
🤖 Confirming this read: getThroughput() is numIndexRecordsEmitted / bootstrapCostMs (no * 1000), so 2000/500 = 4 is records-per-ms, matching the BOOTSTRAP_RECORD_PER_MS constant. Renaming the test to testThroughputIsRecordsPerMs (or aligning the production name + math to records/sec, if that's the intended unit) would resolve the ambiguity.
3d05686 to
2b3be32
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR adds Flink metrics for the RLI bootstrap operator and accompanying tests. The critical issues (NPE during initializeState() lifecycle, metric naming/units, test coverage gap) have already been thoroughly raised in prior rounds — no new issues from this automated pass. Please address the previously flagged points, and this should be ready for a Hudi committer or PMC member to take it from here. A few minor naming and unit-consistency suggestions below.
| } | ||
|
|
||
| private double getThroughput() { | ||
| return bootstrapCostMs > 0 ? (double) numIndexRecordsEmitted / bootstrapCostMs : 0; |
There was a problem hiding this comment.
🤖 nit: getThroughput() doesn't reveal the unit — could you rename it to getRecordsPerMs() to stay consistent with the gauge key bootstrapRecordPerMs right above it?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| loadedCnt += 1; | ||
|
|
||
| // update the metrics every 1000 records | ||
| if (loadedCnt % 1000 == 0) { |
There was a problem hiding this comment.
🤖 nit: the magic number 1000 could be a named constant (e.g., METRICS_UPDATE_INTERVAL) so the sampling frequency is visible at the declaration site rather than buried in emitIndexRecord.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
|
|
||
| @Test | ||
| void testThroughputIsRecordsPerSecond() { | ||
| // 2000 records in 500 ms → 4000 records/sec |
There was a problem hiding this comment.
🤖 nit: the test name says RecordsPerSecond but the gauge is bootstrapRecordPerMs (records per millisecond) — could you rename it to testThroughputIsRecordsPerMs and fix the inline comment on the next line (4.0 records/ms, not 4000 records/sec)?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #18685 +/- ##
============================================
+ Coverage 68.07% 68.09% +0.01%
- Complexity 28943 28960 +17
============================================
Files 2519 2520 +1
Lines 140664 140684 +20
Branches 17428 17429 +1
============================================
+ Hits 95757 95798 +41
+ Misses 37043 37028 -15
+ Partials 7864 7858 -6
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
Add metrics for Flink RLI boot strap operator
Summary and Changelog
Impact
none
Risk Level
none
Documentation Update
none
Contributor's checklist