feat: add bucketassign minibatch cache hit/miss metrics#18761
Conversation
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds two simple counter metrics (bucketassign.minibatch.cache.hit.count / miss.count) to the GRLI backend by mirroring the existing RocksDBIndexBackend / FlinkRocksDBIndexMetrics pattern, with a re-registration guard and a null-guard at the call site. The hit/miss arithmetic (recordKeys.size() - missedKeys.size() vs missedKeys.size()) lines up with how the existing miss-collection loop builds missedKeys, and the test coverage exercises both the single-key and batch paths plus the unregistered-metrics path. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small naming and delegation nits below, but overall the code is clean and well-structured.
cc @yihua
| } | ||
|
|
||
| public void markCacheHit() { | ||
| cacheHitCount.inc(); |
There was a problem hiding this comment.
🤖 nit: could you have the no-arg markCacheHit() delegate to markCacheHit(1L) (and same for markCacheMiss())? Right now the two overloads are independent, so if any logic is ever added to the bulk path the single-increment path would silently miss it.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
508d5d8 to
3fc37dc
Compare
| metricGroup.counter(BUCKET_ASSIGN_MINIBATCH_CACHE_MISS_COUNT, cacheMissCount); | ||
| } | ||
|
|
||
| public void markCacheHit(long n) { |
There was a problem hiding this comment.
do you think the cache hit ratio makes more sense? hitCnt/missCnt + hitCnt instead of specific hit/miss numbers
There was a problem hiding this comment.
yeah we can emit cache hit ratio directly, but we will only get the ratio number per mini batch. keeping the cache hit / miss count can give us the flexibility of querying the ratio at any time range. wdyt?
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds cumulative hit/miss counters to the Flink bucket-assign GRLI cache metrics, wired in alongside the existing histograms. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small naming inconsistencies below — the new metric key values mix dot-separated style with the existing camelCase convention, and the n parameter in the public markCacheHit/markCacheMiss methods could use a more descriptive name.
cc @yihua
| private static final String LOCAL_INDEX_LOOKUP_KEY = "local_index_lookup"; | ||
| private static final String REMOTE_INDEX_LOOKUP_KEY = "remote_index_lookup"; | ||
|
|
||
| public static final String BUCKET_ASSIGN_MINIBATCH_CACHE_HIT_COUNT = "bucketassign.minibatch.cache.hit.count"; |
There was a problem hiding this comment.
🤖 nit: the metric name values use dot-separated format ("bucketassign.minibatch.cache.hit.count") while the other metrics registered in this class use camelCase ("remoteIndexLookupLatency", "localLookupKeysNum", etc.). Could you align them — e.g. "bucketAssignMinibatchCacheHitCount" / "bucketAssignMinibatchCacheMissCount"?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| } | ||
|
|
||
| public void markCacheHit(long n) { | ||
| if (n <= 0) { |
There was a problem hiding this comment.
🤖 nit: the parameter n is a bit opaque for a public method — could you rename it to count in both markCacheHit and markCacheMiss? Callers reading metrics.markCacheHit(hitCount) will see the argument, but the method signature itself is easier to reason about with count.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| metricGroup.histogram("localLookupKeysNum", localLookupKeysNum); | ||
| metricGroup.histogram("remoteLookupKeysNum", remoteLookupKeysNum); | ||
| metricGroup.counter(BUCKET_ASSIGN_MINIBATCH_CACHE_HIT_COUNT, cacheHitCount); | ||
| metricGroup.counter(BUCKET_ASSIGN_MINIBATCH_CACHE_MISS_COUNT, cacheMissCount); |
There was a problem hiding this comment.
We already have localLookupKeysNum and remoteLookupKeysNum, can we can add a hit ratio gauge metric , like 'lookupCacheHitRatio' based on these two fields?
There was a problem hiding this comment.
localLookupKeysNum and remoteLookupKeysNum are histogram, which has a 100s aggregation, if we want the counter data point, we should keep these 2. we can add the hit ratio gauge to get the ratio per mini batch., but keeping the cache hit / miss count can give us the flexibility of querying the ratio at any time range. wdyt?
There was a problem hiding this comment.
we can add the hit ratio gauge to get the ratio per mini batch
The per mini-batch hit ratio makes more sense instead of the global one, we can reuse the numbers used by updateRemoteLookupKeysCount & updateLocalLookupKeysCount, and keep the ratio as a gague metric?
There was a problem hiding this comment.
makes sense. addressed
bbdf42f to
d1435b0
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR replaces the previous hit/miss counter design with a single per-mini-batch hit-ratio gauge in FlinkIndexBackendMetrics and wires it up at the local-cache lookup site in GlobalRecordLevelIndexBackend. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One naming inconsistency in the metric key value worth resolving for dashboard clarity.
cc @yihua
| private static final String LOCAL_INDEX_LOOKUP_KEY = "local_index_lookup"; | ||
| private static final String REMOTE_INDEX_LOOKUP_KEY = "remote_index_lookup"; | ||
|
|
||
| public static final String BUCKET_ASSIGN_MINIBATCH_CACHE_HIT_RATIO = "bucketassign.minibatch.cache.hit_ratio"; |
There was a problem hiding this comment.
🤖 nit: the value "bucketassign.minibatch.cache.hit_ratio" uses dot-separated lowercase with an underscore, while every other metric registered in this class uses flat camelCase ("remoteIndexLookupLatency", "localLookupKeysNum", etc.). In Flink, dots in a metric name create sub-group path segments, so this gauge would land in a different namespace than the histograms — likely surprising on a dashboard. Could you align it to something like "minibatchCacheHitRatio" or "bucketAssignMinibatchCacheHitRatio"?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Co-authored-by: Cursor <cursoragent@cursor.com>
d1435b0 to
e3548ca
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #18761 +/- ##
=========================================
Coverage 68.25% 68.25%
- Complexity 29324 29340 +16
=========================================
Files 2527 2527
Lines 141804 141868 +64
Branches 17622 17628 +6
=========================================
+ Hits 96787 96836 +49
- Misses 37052 37067 +15
Partials 7965 7965
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! This PR replaces the proposed counter pair with a single per-mini-batch lookupCacheHitRatio gauge on FlinkIndexBackendMetrics, wired up from GlobalRecordLevelIndexBackend.get(...) after the local cache lookup. The volatile-double + early-return-on-zero-total pattern looks safe for the Flink reporter/task-thread interaction, and the tests cover the relevant edge cases (zero total, negative inputs, all-hit, all-miss, end-to-end via the backend). No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One javadoc phrasing could mislead future readers into thinking the method reads shared state; everything else looks clean.
cc @yihua
| * When the lookup observed no keys, the previous value is preserved so dashboards | ||
| * don't oscillate back to zero on idle mini-batches. | ||
| */ | ||
| public void updateLookupCacheHitRatio(long hitCount, long missCount) { |
There was a problem hiding this comment.
🤖 nit: the Javadoc says "from the hit/miss counts already fed into updateLocalLookupKeysCount and updateRemoteLookupKeysCount", which implies this method reads from shared state populated by those calls — but it actually uses its own hitCount/missCount parameters directly. Could you drop that cross-reference and just describe what the two parameters represent?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Describe the issue this Pull Request addresses
The Flink bucket-assign operator on the global record-level index (GRLI) path keeps an in-memory
RecordIndexCacheto serve record-key lookups, falling back to the metadata table on a miss. Today the cache is a black box at runtime — operators have no visibility into how effective it is, which makes it impossible to alert on a degraded hit ratio (e.g. due to undersized cache, eviction churn, or key-skew patterns) or to tuneINDEX_RLI_CACHE_SIZEbased on real workload behavior.This PR adds two counter metrics so the in-memory cache hit / miss volume becomes observable and alertable. It builds on top of #18762, which recently introduced
FlinkIndexBackendMetricsand the per-call histograms (localLookupKeysNum,remoteLookupKeysNum). Those histograms describe the per-batch distribution; the new counters add the cumulative running totals that are cheaper to scrape and easier to ratio in dashboards/alerts.Summary and Changelog
Adds two Flink counter metrics on the bucket-assign operator's
MetricGroup, registered automatically whenever the GRLI backend is in use:bucketassign.minibatch.cache.hit.count— cumulative record-key lookups served by the in-memoryRecordIndexCache.bucketassign.minibatch.cache.miss.count— cumulative record-key lookups that fell back to a metadata-table read.Changelog:
org.apache.hudi.metrics.FlinkIndexBackendMetrics(existing class, introduced in feat(flink): add metrics for RLI load time for bucket assign functions #18762):SimpleCounterfieldscacheHitCount/cacheMissCountand public constantsBUCKET_ASSIGN_MINIBATCH_CACHE_HIT_COUNT/BUCKET_ASSIGN_MINIBATCH_CACHE_MISS_COUNT.registerMetrics()alongside the existing histograms.markCacheHit(long)/markCacheMiss(long)helpers that short-circuit onn <= 0so a single call site can drain a whole batch's hit/miss counts in one shot.@VisibleForTestinggettersgetCacheHitCount()/getCacheMissCount().org.apache.hudi.sink.partitioner.index.GlobalRecordLevelIndexBackend:get(List<String> recordKeys), alongside the existingupdateLocalLookupKeysCount(...)call, bumps the new hit/miss counters byrecordKeys.size() - missedKeys.size()andmissedKeys.size()respectively. No new field, no extra branching — same metrics object that the backend already constructs.TestFlinkIndexBackendMetrics(counter registration, increments, zero/negative no-op) andTestGlobalRecordLevelIndexBackend(end-to-end: counters bump correctly across a cold and a warm batch lookup; counters are registered on the suppliedMetricGroup).No code copied from other projects.
Impact
Counter.inc(long)calls per batch lookup, sitting next to the existing histogram update.Risk Level
low
The change is observability-only: cache lookup semantics are unchanged, no new public APIs, no new config keys, and the increment path is a constant-time pair of counter bumps next to existing metric updates. The backend constructor already pre-initializes
metricswith anUnregisteredMetricsGroup, so the new calls cannot NPE regardless of whetherregisterMetrics(...)was later invoked by the operator.Verification: new and existing unit tests covering the bucket-assign + metrics surface pass with
-Pspark3.3,flink1.18(TestFlinkIndexBackendMetrics,TestFlinkBucketAssignMetrics,TestGlobalRecordLevelIndexBackend).Documentation Update
none — no new config or user-facing feature beyond two additional metrics emitted by the existing operator. The metric names are self-descriptive and follow the same convention as the sibling histograms in
FlinkIndexBackendMetrics.Contributor's checklist