feat(flink): add metrics for RLI load time for bucket assign functions#18762
Conversation
2467ee7 to
bceeae4
Compare
cshuo
left a comment
There was a problem hiding this comment.
Thks for contributing, left some comments.
aac24a8 to
c3491bc
Compare
HuangZhenQiu
left a comment
There was a problem hiding this comment.
@cshuo Thanks for fixing the index lookup. Actually, it confuses me when I find test access index for 1001 times.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds metrics for local/remote RLI lookup latency, key counts, and record buffering time to the Flink bucket assign functions. One edge case worth double-checking in the inline comments around partial buffer flushes. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A few naming and simplification suggestions below.
| // Process the buffer if it reaches the configured size | ||
| if (recordBuffer.size() >= miniBatchSize) { | ||
| // Record how long the oldest record in the batch was buffered | ||
| delegateFunction.getMetrics().endRecordBuffering(); |
There was a problem hiding this comment.
🤖 endRecordBuffering() is only called when the buffer fills to miniBatchSize, but processBufferedRecords() is also invoked from prepareSnapshotPreBarrier() (line 202) and endInput() (line 223) on partial buffers. In those paths the timer is never stopped, so the buffering metric is dropped for partial batches and the next startRecordBuffering() will hit the Restarting timer for name: record_buffering, overriding the existing value warning in HoodieFlinkMetrics#startTimer on every checkpoint. Could you move the endRecordBuffering() call into processBufferedRecords() (after the recordBuffer.isEmpty() guard) so all three flush paths are covered?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| } | ||
|
|
||
| @VisibleForTesting | ||
| public long getLocalLookupKeysNumCount() { |
There was a problem hiding this comment.
🤖 nit: getLocalLookupKeysNumCount() is easy to misread — 'Num' (keys-per-lookup distribution) plus 'Count' (histogram sample count) together read as 'total key count', which is not what this returns. Something like getLocalLookupKeysSampleCount() would be less ambiguous (same concern for getRemoteLookupKeysNumCount()).
- AI-generated; verify before applying. React 👍/👎 to flag quality.
c3491bc to
d8f6c25
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! The PR adds histogram metrics for local/remote RLI cache lookup latency, key counts, and record buffering time across the Flink bucket-assign functions. The major concerns (timer leak when a partial buffer is flushed via prepareSnapshotPreBarrier/endInput, per-record hot-path overhead, and a few naming nits) have already been flagged in prior rounds. No additional correctness or architectural issues surfaced from this automated pass — a Hudi committer or PMC member can take it from here for a final review. A few small naming and readability suggestions below — magic constant, terse parameter name, and repeated null guards in the hot path.
cc @yihua
6752919 to
02d2590
Compare
| this.metaClient = StreamerUtil.createMetaClient(conf); | ||
| this.conf = conf; | ||
| this.recordIndexCache = new RecordIndexCache(conf, initCheckpointId); | ||
| registerMetrics(new UnregisteredMetricsGroup()); |
There was a problem hiding this comment.
initialize metrics, so that we don't need to do metrics != null check.
02d2590 to
8d6bdb3
Compare
|
cc @danny0405 |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #18762 +/- ##
============================================
+ Coverage 68.16% 68.18% +0.01%
- Complexity 29158 29185 +27
============================================
Files 2521 2523 +2
Lines 141371 141427 +56
Branches 17549 17550 +1
============================================
+ Hits 96365 96425 +60
+ Misses 37076 37074 -2
+ Partials 7930 7928 -2
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the iterative improvements on this PR! The buffering and index-lookup metric instrumentation looks reasonable now — start/end pairings trace through correctly for the normal flow, checkpoint flushes, endInput, and index-record interleaving paths. The constructor-initializes-with-UnregisteredMetricsGroup pattern avoids the null-guard noise from earlier rounds. No new critical correctness issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. A couple of naming inconsistencies worth tidying up.
cc @yihua
| /** Number of keys resolved from the local cache per lookup. */ | ||
| private final Histogram localLookupKeysNum; | ||
|
|
||
| /** Number of keys that missed the local cache and were fetched remotely per lookup. */ |
There was a problem hiding this comment.
🤖 nit: the fields/registered metric names use Num (e.g. localLookupKeysNum, "localLookupKeysNum") but the public methods use Count (e.g. updateLocalLookupKeysCount). Could you pick one suffix and use it consistently? Either rename the fields/strings to localLookupKeysCount/remoteLookupKeysCount, or rename the methods to updateLocalLookupKeysNum.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| this.delegateFunction.setCorrespondent(correspondent); | ||
| } | ||
|
|
||
| @VisibleForTesting |
There was a problem hiding this comment.
🤖 nit: getDelegateMetrics() leaks the internal delegation structure — if the implementation ever stops delegating, the name becomes misleading. Since BucketAssignFunction already exposes getMetrics(), could you use the same name here for consistency?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Describe the issue this Pull Request addresses
Add metrics for RLI load time for bucket assign functions
Close #18733
Summary and Changelog
Impact
none
Risk Level
none
Documentation Update
none
Contributor's checklist