Skip to content

fix(flink): Trigger a failover after pending instants recommitted for…#18789

Merged
danny0405 merged 1 commit into
apache:release-1.2.0from
cshuo:fix_global_rli_recover
May 20, 2026
Merged

fix(flink): Trigger a failover after pending instants recommitted for…#18789
danny0405 merged 1 commit into
apache:release-1.2.0from
cshuo:fix_global_rli_recover

Conversation

@cshuo
Copy link
Copy Markdown
Collaborator

@cshuo cshuo commented May 20, 2026

… both global and partitioned RLI

Describe the issue this Pull Request addresses

Flink streaming recovery can leave pending instants that need to be recommitted before record level index state is bootstrapped or reused after failover. The previous guard was tied specifically to global RLI bootstrap configuration, which made the recovery/failover handling too narrow for RLI-backed writes.

This PR adjusts the write coordinator recovery path so pending instant recommits and follow-up failover handling are driven from the coordinator's table-state RLI flag rather than the removed bootstrap-specific helper.

Summary and Changelog

  • Removed OptionsResolver.isRLIWithBootstrap, which only checked global RLI with index bootstrap enabled.
  • Updated StreamWriteOperatorCoordinator reset and subtask-reset paths to use tableState.isRecordLevelIndex when recommitting pending instants.
  • Updated bootstrap event handling to trigger job failover after pending instants are recommitted for RLI tables, allowing the RLI bootstrap operator to reload index state completely.
  • Added TableState.isRecordLevelIndex to centralize the coordinator's RLI-related recovery decision.

Impact

RLI recovery handling is intended to apply to both global and partitioned RLI paths when pending instants are recommitted after restart or failover.

Risk Level

Documentation Update

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@cshuo cshuo force-pushed the fix_global_rli_recover branch 2 times, most recently from 3dfd618 to dd4852e Compare May 20, 2026 07:41
@yihua yihua added this to the release-1.2.0 milestone May 20, 2026
@cshuo cshuo force-pushed the fix_global_rli_recover branch from dd4852e to 77df4f3 Compare May 20, 2026 07:50
@github-actions github-actions Bot added the size:S PR with lines of changes in (10, 100] label May 20, 2026
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR broadens the coordinator's RLI recovery/failover handling from a global-RLI-with-bootstrap-only gate to any RLI table (global or partitioned), centralizing the decision via a new tableState.isRecordLevelIndex flag. The logic appears correct — handleBootstrapEvent is only reachable when a bootstrap operator exists in the pipeline, and the broadened check also covers the bounded-mode case the previous gate missed. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One naming nit on the new TableState field.

cc @yihua

final boolean isDeltaTimeCompaction;
final boolean isStreamingIndexWriteEnabled;
final boolean isRLIWithBootstrap;
final boolean isRecordLevelIndex;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you rename isRecordLevelIndex to something like isAnyRecordLevelIndex? The field name is identical to OptionsResolver.isRecordLevelIndex(), which only covers one of the two variants, so a reader scanning the constructor assignment on the next screen (isGlobalRecordLevelIndex || isRecordLevelIndex) may have to stop and work out that the field is a broader OR of both.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@cshuo cshuo force-pushed the fix_global_rli_recover branch 2 times, most recently from 3613b0c to 1f5a5a5 Compare May 20, 2026 09:19
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! The change broadens the recovery/failover guard from "global RLI with bootstrap enabled" to "any RLI table", which addresses the original concern. One edge case around the failover behavior when index.bootstrap.enabled=false may be worth double-checking — see the inline comment. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One naming nit: the new isRecordLevelIndex field shadows an OptionsResolver method of the same name that only covers the non-global variant, which could mislead a future reader about its actual scope.

// start to recommit the instant.
boolean committed = recommitInstant(event.getCheckpointId(), event.getInstantTime(), eventBuffer);
if (committed && tableState.isRLIWithBootstrap) {
if (committed && tableState.isRecordLevelIndex) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The old guard required INDEX_BOOTSTRAP_ENABLED=true, so an RLIBootstrapOperator (or BootstrapOperator) was guaranteed to be in the pipeline when this failJob fired. With the new isRecordLevelIndex flag, this branch also runs when index.bootstrap.enabled=false — but per Pipelines.streamBootstrap, no index_bootstrap operator is wired into the graph in that case. Could you confirm whether the global failover is still desired/needed when there is no bootstrap operator to reload state? If yes, the exception message ("so that RLI bootstrap operator can load the record level index completely") is misleading; if no, the condition may need to keep the INDEX_BOOTSTRAP_ENABLED check (or be replaced with one that reflects when index state must actually be re-bootstrapped). @danny0405 could you weigh in on the intended scope here?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

final boolean isDeltaTimeCompaction;
final boolean isStreamingIndexWriteEnabled;
final boolean isRLIWithBootstrap;
final boolean isRecordLevelIndex;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: isRecordLevelIndex here shadows the name of OptionsResolver.isRecordLevelIndex(), which only covers the non-global variant — yet this field is true for either type. A reader seeing tableState.isRecordLevelIndex might assume it has the same scope as the resolver method. Something like isAnyRecordLevelIndex (or even just a brief Javadoc on the field) would make the "global OR non-global" intent explicit.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@cshuo cshuo force-pushed the fix_global_rli_recover branch from 1f5a5a5 to 240290e Compare May 20, 2026 10:31
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR broadens the RLI recovery/failover guard in the Flink write coordinator from "global RLI with bootstrap enabled" to "any RLI table" by introducing TableState.isRecordLevelIndex. The main correctness concern — whether triggering a global failover is appropriate when no RLI bootstrap operator is guaranteed to be in the pipeline — was already raised in prior review rounds. No additional critical issues flagged from this automated pass; please address the existing inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. One naming nit below — code is otherwise clean.

this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);
this.isStreamingIndexWriteEnabled = OptionsResolver.isStreamingIndexWriteEnabled(conf);
this.isRLIWithBootstrap = OptionsResolver.isRLIWithBootstrap(conf);
this.isRecordLevelIndex = OptionsResolver.isGlobalRecordLevelIndex(conf) || OptionsResolver.isRecordLevelIndex(conf);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the field is named isRecordLevelIndex but it's the union of both variants (isGlobalRecordLevelIndex || isRecordLevelIndex), while OptionsResolver.isRecordLevelIndex() covers only one of the two. A future reader seeing tableState.isRecordLevelIndex at a call site might assume it's the narrower check. Could you use something like isAnyRecordLevelIndex or hasRecordLevelIndex to make the breadth explicit?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 50.00000% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.11%. Comparing base (f848a40) to head (240290e).

Files with missing lines Patch % Lines
...ache/hudi/sink/StreamWriteOperatorCoordinator.java 50.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@                 Coverage Diff                 @@
##             release-1.2.0   #18789      +/-   ##
===================================================
- Coverage            68.11%   68.11%   -0.01%     
+ Complexity           29188    29183       -5     
===================================================
  Files                 2527     2527              
  Lines               141496   141495       -1     
  Branches             17575    17576       +1     
===================================================
- Hits                 96381    96377       -4     
  Misses               37182    37182              
- Partials              7933     7936       +3     
Flag Coverage Δ
common-and-other-modules 44.38% <50.00%> (+<0.01%) ⬆️
hadoop-mr-java-client 44.96% <ø> (-0.02%) ⬇️
spark-client-hadoop-common 48.27% <ø> (-0.01%) ⬇️
spark-java-tests 49.02% <ø> (+0.01%) ⬆️
spark-scala-tests 44.82% <ø> (+<0.01%) ⬆️
utilities 37.60% <ø> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...org/apache/hudi/configuration/OptionsResolver.java 71.62% <ø> (-0.20%) ⬇️
...ache/hudi/sink/StreamWriteOperatorCoordinator.java 79.10% <50.00%> (+0.74%) ⬆️

... and 9 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405
Copy link
Copy Markdown
Contributor

@danny0405 danny0405 merged commit 7f0d64b into apache:release-1.2.0 May 20, 2026
65 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:S PR with lines of changes in (10, 100]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants