Skip to content

feat(flink): support remote partitioner for simple bucket index#18897

Open
fhan688 wants to merge 3 commits into
apache:masterfrom
fhan688:flink-support-remote-partitioner-for-bucket-index
Open

feat(flink): support remote partitioner for simple bucket index#18897
fhan688 wants to merge 3 commits into
apache:masterfrom
fhan688:flink-support-remote-partitioner-for-bucket-index

Conversation

@fhan688
Copy link
Copy Markdown
Contributor

@fhan688 fhan688 commented Jun 2, 2026

Describe the issue this Pull Request addresses

This PR wires Flink simple bucket index writes to the existing timeline-service-backed remote partitioner path.

Today Flink simple bucket index routes records with local bucket-to-task mapping. When writer parallelism changes, bucket ownership can shift across subtasks, which can make the writer route records and load bucket file groups inconsistently. Hudi already has remote partition helper support in the timeline service and write config, but the Flink simple bucket index write pipeline does not use it.

This PR enables Flink writers to use remote bucket partition assignment when hoodie.bucket.index.remote.partitioner.enable=true.

Summary and Changelog

This PR adds remote partitioner support for Flink simple bucket index.

Changes:

  • Add Flink option hoodie.bucket.index.remote.partitioner.enable, backed by the existing core bucket remote partitioner config key.
  • Add Flink option resolution logic to enable remote partitioning only for simple bucket index with embedded timeline server enabled.
  • Add BucketIndexRemotePartitioner for Flink partitionCustom, using RemotePartitionHelper and NumBucketsFunction.
  • Wire the remote partitioner into Flink bulk insert and streaming write bucket index pipelines.
  • Make BucketStreamWriteFunction use remote partition assignment when deciding whether a bucket belongs to the current writer subtask.
  • Propagate the Flink option into HoodieWriteConfig.
  • Include the remote partitioner flag in EmbeddedTimelineService reuse identity to avoid sharing a timeline service across incompatible writer
    configs.
  • Add unit tests and an integration test covering the enabled remote partitioner path.

No code was copied from external sources.

Impact

This is a user-facing Flink write feature, disabled by default.

When hoodie.bucket.index.remote.partitioner.enable=true, Flink simple bucket index writers use the timeline service to determine bucket-to-task assignment. This change centralizes bucket-to-task assignment through the timeline service by reusing the existing remote partitioner capability. This makes the routing side and bucket-loading side use the same remote assignment source, instead of maintaining separate local assignment calculations.

Public API/config impact:

  • Adds Flink support for hoodie.bucket.index.remote.partitioner.enable.

Storage format impact:

  • None.

Performance impact:

  • Remote partitioning introduces timeline-service lookups during bucket routing. The option is disabled by default and only applies to simple bucket index when embedded timeline server is enabled.

Risk Level

medium

This changes Flink bucket index routing behavior when the new option is enabled. The default behavior is unchanged because the option defaults to false.

Risk mitigation:

  • The feature is gated to simple bucket index.
  • The feature requires embedded timeline server to be enabled.
  • Added unit tests for option resolution, write config propagation, remote partition calculation, and Flink write client config.
  • Added an integration test for Flink bucket stream writes with remote partitioner enabled.

Verification:

  • mvn -pl hudi-flink-datasource/hudi-flink -am -Dcheckstyle.skip=true -DskipITs -DfailIfNoTests=false -Dsurefire.failIfNoSpecifiedTests=false -Dtest=TestOptionsResolver,TestStreamerUtil,TestFlinkWriteClients,TestBucketIndexRemotePartitioner test
  • mvn -pl hudi-flink-datasource/hudi-flink -am -Dcheckstyle.skip=true -DfailIfNoTests=false -Dsurefire.failIfNoSpecifiedTests=false -Dtest=ITTestBucketStreamWrite#testRemotePartitioner test

Documentation Update

Required.

The Hudi website/config documentation should be updated to describe Flink support for:

  • hoodie.bucket.index.remote.partitioner.enable

The documentation should mention that this option applies to Flink simple bucket index writes, requires embedded timeline server, and defaults to false.

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions Bot added the size:L PR with lines of changes in (300, 1000] label Jun 2, 2026
@fhan688 fhan688 closed this Jun 2, 2026
@fhan688 fhan688 reopened this Jun 2, 2026
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR adds remote partitioner support for Flink simple bucket index, gated behind a new option that requires simple bucket index + embedded timeline server, and propagates the setting into HoodieWriteConfig, EmbeddedTimelineService reuse identity, and both the bulk insert / streaming write pipelines. The implementation mirrors the existing Spark BucketPartitionUtils pattern, and the lazy load of RemotePartitionHelper on the task side is safe given the coordinator writes ViewStorageProperties during start() before subtasks process records. As a bonus, the rewritten TimelineServiceIdentifier.equals fixes a latent bug where two identifiers with null hostAddr would compare equal without checking the other fields. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A few naming nits below — method predicate convention, an ambiguous parameter name, and a missing is prefix on a boolean field.

cc @yihua

@danny0405
Copy link
Copy Markdown
Contributor

This improves bucket routing consistency for simple bucket index writes, especially when writer parallelism changes.

Can you explain a bit more about this part, the record keys are routed based on bucket ids instead of tasks, even if the task parallelism changes, the same key should still route to the same bucket?

@fhan688
Copy link
Copy Markdown
Contributor Author

fhan688 commented Jun 3, 2026

This improves bucket routing consistency for simple bucket index writes, especially when writer parallelism changes.

Can you explain a bit more about this part, the record keys are routed based on bucket ids instead of tasks, even if the task parallelism changes, the same key should still route to the same bucket?

Thanks for the correction. I agree that the current local simple bucket logic keeps the key-to-bucket mapping stable. A parallelism change only affects the bucket-to-task mapping, and both partitionCustom and BucketStreamWriteFunction#bootstrapIndexIfNeed recompute that mapping with the current parallelism, so it is not accurate to say that parallelism changes cause key routing inconsistency.

I updated the motivation to be more precise: this change centralizes bucket-to-task assignment through the timeline service by reusing the existing remote partitioner capability. This makes the routing side and bucket-loading side use the same remote assignment source, instead of maintaining separate local assignment calculations.

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR wires the Flink simple bucket index path to the existing remote partitioner via the embedded timeline service, and also fixes a latent equals/hashCode bug in TimelineServiceIdentifier. One thing worth a second look is the withProps(...) reordering in FlinkWriteClients.getHoodieClientConfig, which silently flips override precedence for multiple settings (not just the timeline server flag this feature needs). Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small readability suggestions around the remote partitioner's error handling.

@hudi-bot
Copy link
Copy Markdown
Collaborator

hudi-bot commented Jun 3, 2026

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 68.62745% with 16 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.80%. Comparing base (af38b88) to head (35815a0).
⚠️ Report is 8 commits behind head on master.

Files with missing lines Patch % Lines
...ain/java/org/apache/hudi/sink/utils/Pipelines.java 0.00% 6 Missing ⚠️
...he/hudi/sink/bucket/BucketStreamWriteFunction.java 16.66% 4 Missing and 1 partial ⚠️
...sink/partitioner/BucketIndexRemotePartitioner.java 84.61% 3 Missing and 1 partial ⚠️
.../hudi/client/embedded/EmbeddedTimelineService.java 83.33% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18897      +/-   ##
============================================
+ Coverage     67.01%   68.80%   +1.79%     
- Complexity    28461    29187     +726     
============================================
  Files          2520     2521       +1     
  Lines        140046   140097      +51     
  Branches      17197    17210      +13     
============================================
+ Hits          93850    96399    +2549     
+ Misses        38529    35925    -2604     
- Partials       7667     7773     +106     
Flag Coverage Δ
common-and-other-modules 44.33% <68.62%> (-0.02%) ⬇️
hadoop-mr-java-client 44.87% <66.66%> (-0.04%) ⬇️
spark-client-hadoop-common 48.16% <66.66%> (-0.07%) ⬇️
spark-java-tests 49.35% <66.66%> (+0.02%) ⬆️
spark-scala-tests 45.25% <66.66%> (-0.02%) ⬇️
utilities 37.38% <66.66%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...va/org/apache/hudi/configuration/FlinkOptions.java 99.78% <ø> (ø)
...org/apache/hudi/configuration/OptionsResolver.java 74.37% <100.00%> (+1.47%) ⬆️
...n/java/org/apache/hudi/util/FlinkWriteClients.java 96.09% <100.00%> (ø)
...c/main/java/org/apache/hudi/util/StreamerUtil.java 66.92% <100.00%> (+0.08%) ⬆️
.../hudi/client/embedded/EmbeddedTimelineService.java 88.88% <83.33%> (+2.27%) ⬆️
...sink/partitioner/BucketIndexRemotePartitioner.java 84.61% <84.61%> (ø)
...he/hudi/sink/bucket/BucketStreamWriteFunction.java 68.65% <16.66%> (-5.12%) ⬇️
...ain/java/org/apache/hudi/sink/utils/Pipelines.java 14.91% <0.00%> (-0.27%) ⬇️

... and 160 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR wires Flink simple bucket index writes to the existing remote partitioner path, and fixes a latent equals/hashCode bug in TimelineServiceIdentifier along the way. No new issues flagged from this automated pass beyond what previous review rounds have already raised — a Hudi committer or PMC member can take it from here for a final review. A couple of minor readability nits below; otherwise the code is clean.

cc @yihua

}

private void setRemotePartitionHelper(
BucketIndexRemotePartitioner<HoodieKey> partitioner,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: using reflection to inject the mock is brittle — could remotePartitionHelper be exposed via a package-private setter or test-only constructor so the test doesn't need setAccessible(true)?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

* Bucket index input partitioner backed by the embedded timeline service.
*
* @param <T> The type of object to hash
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the javadoc says @param <T> The type of object to hash, but T is bounded to HoodieKey and the class doesn't really hash — could you reword this (e.g. "the key type to partition by") to avoid confusion?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@Override
public int partition(T key, int numPartitions) {
String partitionPath = normalizePartitionPath(key.getPartitionPath());
int numBuckets = numBucketsFunction.getNumBuckets(partitionPath);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the two getRemotePartition overloads have very similar signatures (one takes NumBucketsFunction, the other int numBuckets) — could the private one be renamed (e.g. doGetRemotePartition) to make it obvious at call sites which is which?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants