Skip to content

fix(flink): Trigger a failover after pending instants recommitted for…#18793

Open
cshuo wants to merge 1 commit into
apache:masterfrom
cshuo:fix_rli_failover
Open

fix(flink): Trigger a failover after pending instants recommitted for…#18793
cshuo wants to merge 1 commit into
apache:masterfrom
cshuo:fix_rli_failover

Conversation

@cshuo
Copy link
Copy Markdown
Collaborator

@cshuo cshuo commented May 20, 2026

… both global and partitioned RLI

Describe the issue this Pull Request addresses

Flink streaming recovery can leave pending instants that need to be recommitted before record level index state is bootstrapped or reused after failover. The previous guard was tied specifically to global RLI bootstrap configuration, which made the recovery/failover handling too narrow for RLI-backed writes.

This PR adjusts the write coordinator recovery path so pending instant recommits and follow-up failover handling are driven from the coordinator's table-state RLI flag rather than the removed bootstrap-specific helper.

Summary and Changelog

  • Removed OptionsResolver.isRLIWithBootstrap, which only checked global RLI with index bootstrap enabled.
  • Updated StreamWriteOperatorCoordinator reset and subtask-reset paths to use tableState.isRecordLevelIndex when recommitting pending instants.
  • Updated bootstrap event handling to trigger job failover after pending instants are recommitted for RLI tables, allowing the RLI bootstrap operator to reload index state completely.
  • Added TableState.isRecordLevelIndex to centralize the coordinator's RLI-related recovery decision.

Impact

RLI recovery handling is intended to apply to both global and partitioned RLI paths when pending instants are recommitted after restart or failover.

Risk Level

low

Documentation Update

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR broadens the RLI recovery handling beyond just global RLI with bootstrap enabled. A couple of points worth double-checking around the scope of the behavioral change, since the new condition removes both the INDEX_BOOTSTRAP_ENABLED requirement and the global-only restriction. Please take a look at the inline comments, and this should be ready for a Hudi committer or PMC member to take it from here.

// start to recommit the instant.
boolean committed = recommitInstant(event.getCheckpointId(), event.getInstantTime(), eventBuffer);
if (committed && tableState.isRLIWithBootstrap) {
if (committed && tableState.isRecordLevelIndex) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 @danny0405 The old isRLIWithBootstrap also required INDEX_BOOTSTRAP_ENABLED=true, but the new isRecordLevelIndex drops that check. Looking at Pipelines.streamBootstrap, RLIBootstrapOperator is only added when INDEX_BOOTSTRAP_ENABLED || bounded. For RLI tables without bootstrap enabled (and for partitioned RLI, which never gets RLIBootstrapOperator), this now triggers a context.failJob(...) even though there's no RLI bootstrap operator in the pipeline to benefit from the reload. Is the broader failover intended (e.g. to refresh IndexWriteOperator state)? If so, could you also update the exception message — it still says "so that RLI bootstrap operator can load the record level index completely", which is misleading for the non-bootstrap streaming case.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);
this.isStreamingIndexWriteEnabled = OptionsResolver.isStreamingIndexWriteEnabled(conf);
this.isRLIWithBootstrap = OptionsResolver.isRLIWithBootstrap(conf);
this.isRecordLevelIndex = OptionsResolver.isGlobalRecordLevelIndex(conf) || OptionsResolver.isRecordLevelIndex(conf);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Minor — isRecordLevelIndex here is computed as the disjunction isGlobalRecordLevelIndex(conf) || isRecordLevelIndex(conf). Would it be cleaner to add a dedicated helper like OptionsResolver.isAnyRecordLevelIndex(conf) (or similar) so future call sites can't accidentally use just one of the two? Right now the static method OptionsResolver.isRecordLevelIndex only covers the partitioned variant, which makes the naming a bit easy to confuse.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@github-actions github-actions Bot added the size:S PR with lines of changes in (10, 100] label May 20, 2026
@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 50.00000% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.25%. Comparing base (9565926) to head (365ed6c).

Files with missing lines Patch % Lines
...ache/hudi/sink/StreamWriteOperatorCoordinator.java 50.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff            @@
##             master   #18793   +/-   ##
=========================================
  Coverage     68.25%   68.25%           
+ Complexity    29337    29335    -2     
=========================================
  Files          2527     2527           
  Lines        141858   141857    -1     
  Branches      17627    17628    +1     
=========================================
+ Hits          96827    96829    +2     
+ Misses        37068    37064    -4     
- Partials       7963     7964    +1     
Flag Coverage Δ
common-and-other-modules 44.42% <50.00%> (+<0.01%) ⬆️
hadoop-mr-java-client 44.91% <ø> (ø)
spark-client-hadoop-common 48.23% <ø> (-0.01%) ⬇️
spark-java-tests 48.85% <ø> (-0.03%) ⬇️
spark-scala-tests 44.93% <ø> (+<0.01%) ⬆️
utilities 37.44% <ø> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...org/apache/hudi/configuration/OptionsResolver.java 71.62% <ø> (-0.20%) ⬇️
...ache/hudi/sink/StreamWriteOperatorCoordinator.java 79.10% <50.00%> (+0.74%) ⬆️

... and 9 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

public void coordinatorFails() throws Exception {
this.coordinator.close();
if (isStreamingWriteIndexEnabled) {
this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we fix the potential thread leak?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:S PR with lines of changes in (10, 100]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants