Skip to content

feat(flink): add shard assign metrics for bucket assign function#18790

Open
HuangZhenQiu wants to merge 2 commits into
apache:masterfrom
HuangZhenQiu:shard-assign-metrics
Open

feat(flink): add shard assign metrics for bucket assign function#18790
HuangZhenQiu wants to merge 2 commits into
apache:masterfrom
HuangZhenQiu:shard-assign-metrics

Conversation

@HuangZhenQiu
Copy link
Copy Markdown
Member

@HuangZhenQiu HuangZhenQiu commented May 20, 2026

Describe the issue this Pull Request addresses

Add shard assign metrics for bucket assign function. We have the GlobalRecordIndexPartitioner that assigns potional of RLI shards to each bucket assign task so that there is no cross task access to the same RLI shard, the metrics will measure the the assignment of each task and we could detect skew if the assignment is not that even.

Close #18732

Summary and Changelog

  1. GlobalRecordIndexPartitioner: Expose the file group count computation as a static package-private method and add a computeNumShardsAssigned helper.
  2. FlinkBucketAssignMetrics: Add a numShardsAssigned gauge backed by an AtomicInteger.
  3. BucketAssignFunction.open(): When global RLI is active, compute the shard count for this task and push it to metrics.

Impact

none

Risk Level

none

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR adds a numShardsAssigned gauge to FlinkBucketAssignMetrics so operators can detect skew in global-RLI shard assignment across bucket-assign subtasks. The new computeNumShardsAssigned helper correctly mirrors the fgIndex % numPartitions distribution used by the partitioner, the MDT read is wrapped in try-with-resources, and the initRliShardAssignMetric failure path safely degrades to the -1 sentinel rather than failing open(). No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One Javadoc inaccuracy worth fixing; the rest of the code is clean and well-structured.

cc @yihua

}

/**
* Computes and registers the number of RLI shards assigned to this task when global RLI is active.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the word "registers" is misleading here — the gauge was already registered in registerMetrics() via metricGroup.gauge(...). This method only sets the gauge's value. Could you change it to something like "Computes and sets the value of the numShardsAssigned gauge..."?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@github-actions github-actions Bot added the size:M PR with lines of changes in (100, 300] label May 20, 2026
@HuangZhenQiu HuangZhenQiu force-pushed the shard-assign-metrics branch from de04111 to f065508 Compare May 20, 2026 20:47
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR exposes per-task RLI shard counts as a Flink gauge so operators can detect skew under GlobalRecordIndexPartitioner. The math in computeNumShardsAssigned correctly mirrors the fgIndex % numPartitions distribution, and the failure path in initRliShardAssignMetric safely degrades to the -1 sentinel. One edge case worth thinking through is flagged inline regarding the bootstrap-with-global-RLI configuration where the partitioner isn't actually used upstream. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small naming/doc nits below.

* Each task owns the file groups whose index satisfies {@code fgIndex % numPartitions == taskIndex}.
*/
private void initRliShardAssignMetric() {
if (!OptionsResolver.isGlobalRecordLevelIndex(conf)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 In Pipelines.createBucketAssignStream (around line 464), GlobalRecordIndexPartitioner is only wired upstream when isGlobalRecordLevelIndex(conf) && !INDEX_BOOTSTRAP_ENABLED; the bootstrap-with-global-RLI case falls through to the else branch and uses plain BucketAssignFunction with keyBy(recordKey) instead. Should this guard also exclude OptionsResolver.isRLIWithBootstrap(conf) so the gauge only reports a value when the partitioner is actually routing records?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

}

/**
* Computes and registers the number of RLI shards assigned to this task when global RLI is active.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: "registers" is a bit misleading here — the gauge is already registered in registerMetrics() which runs just before this call. Could you change it to something like "Computes and sets the value of the numShardsAssigned gauge" so a reader doesn't go hunting for a second registration path?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

metricGroup.gauge("numShardsAssigned", numShardsAssigned::get);
}

public void setNumShardsAssigned(int count) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the parameter count is a bit generic given the method is setNumShardsAssigned and the field is numShardsAssigned — could you rename it to numShards to keep the naming consistent throughout?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@HuangZhenQiu HuangZhenQiu force-pushed the shard-assign-metrics branch from f065508 to 759652f Compare May 21, 2026 06:00
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR adds a numShardsAssigned gauge to track RLI shard distribution across bucket-assign tasks and introduces a JVM-level cache for the file group count to avoid repeated metadata table lookups. No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. A couple of small naming/type nits below, otherwise the code is clean and readable.

cc @yihua

* Set once during open() when global RLI is active; remains -1 otherwise.
* Compare across task subtasks to detect skew in shard distribution.
*/
private final AtomicInteger numShardsAssigned = new AtomicInteger(-1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you extract -1 into a named constant (e.g. private static final int UNSET_SHARD_COUNT = -1)? It's referenced in the Javadoc, here, and in test assertions, so a constant would make the sentinel's meaning self-evident without readers having to recall the magic value.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

public class GlobalRecordIndexPartitioner implements Partitioner<HoodieKey> {
/**
* JVM-level cache to avoid repeatedly fetching the file group count from the metadata table.
* Keyed by table path; shared across all instances (partitioner + BucketAssignFunction) in the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: it might be worth declaring the field as ConcurrentMap<String, Integer> rather than the concrete ConcurrentHashMap — the interface type makes the thread-safety contract visible to readers and leaves the implementation easy to swap (e.g. a bounded cache) without touching the field declaration.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@HuangZhenQiu HuangZhenQiu force-pushed the shard-assign-metrics branch from 759652f to 29415c5 Compare May 21, 2026 16:33
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working on this! The change adds a numShardsAssigned gauge to FlinkBucketAssignMetrics and a JVM-scoped cache so multiple BucketAssignFunction subtasks on the same TaskManager share a single MDT lookup. No additional issues flagged from this automated pass beyond what prior rounds have already raised — a Hudi committer or PMC member can take it from here for a final review. One Javadoc accuracy issue in BucketAssignFunction; the rest of the code is clean and well-commented.

cc @yihua

* Computes and registers the number of RLI shards assigned to this task when global RLI is active.
* Each task owns the file groups whose index satisfies {@code fgIndex % numPartitions == taskIndex}.
*/
private void initRliShardAssignMetric() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the Javadoc says "Computes and registers the number of RLI shards" but the gauge is already registered inside registerMetrics() — this method only sets the gauge's value. Could you change it to "Computes and sets the numShardsAssigned metric value" to avoid sending future readers on a hunt for a registration call that isn't here?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@HuangZhenQiu HuangZhenQiu force-pushed the shard-assign-metrics branch from 051351a to a3a2656 Compare May 25, 2026 05:40
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the iterations on this PR! The shard assignment metric now properly caches the metadata table lookup at the JVM level and the shard count math in computeNumShardsAssigned correctly mirrors the modulo distribution used in partition(). No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. A couple of small readability suggestions below.

cc @yihua

* Number of RLI file group shards assigned to this bucket assign task.
* Set once during open() when global RLI is active; remains -1 otherwise.
* Compare across task subtasks to detect skew in shard distribution.
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: since this value is written exactly once (in open()) and only read by the gauge, an AtomicInteger overstates the concurrency requirement. A volatile int with a plain setter would convey intent more clearly.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

*/
private static final ConcurrentHashMap<String, Integer> NUM_FILE_GROUPS_CACHE = new ConcurrentHashMap<>();

private final Configuration conf;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the cache is keyed only by table path, but the lambda captures the first conf it sees. Worth calling that out in the Javadoc so a future caller doesn't expect per-conf semantics when reusing the same path.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 44.91%. Comparing base (b82a5b2) to head (a3a2656).
⚠️ Report is 2 commits behind head on master.

❗ There is a different number of reports uploaded between BASE (b82a5b2) and HEAD (a3a2656). Click for more details.

HEAD has 33 uploads less than BASE
Flag BASE (b82a5b2) HEAD (a3a2656)
spark-client-hadoop-common 1 0
spark-scala-tests 12 0
spark-java-tests 18 0
utilities 1 0
common-and-other-modules 1 0
Additional details and impacted files
@@              Coverage Diff              @@
##             master   #18790       +/-   ##
=============================================
- Coverage     68.25%   44.91%   -23.35%     
+ Complexity    29348     8573    -20775     
=============================================
  Files          2527     1202     -1325     
  Lines        141886    62973    -78913     
  Branches      17634     6851    -10783     
=============================================
- Hits          96849    28283    -68566     
+ Misses        37070    31562     -5508     
+ Partials       7967     3128     -4839     
Flag Coverage Δ
common-and-other-modules ?
hadoop-mr-java-client 44.91% <ø> (+0.06%) ⬆️
spark-client-hadoop-common ?
spark-java-tests ?
spark-scala-tests ?
utilities ?

Flags with carried forward coverage won't be shown. Click here to find out more.
see 2125 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:M PR with lines of changes in (100, 300]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add metrics for number of RLI shards per sub task in BucketAssign op

6 participants