Skip to content

fix: prevent heartbeat timer from being permanently killed by slow or delayed heartbeats#18904

Open
prashantwason wants to merge 3 commits into
apache:masterfrom
prashantwason:heartbeat-timer-resilience
Open

fix: prevent heartbeat timer from being permanently killed by slow or delayed heartbeats#18904
prashantwason wants to merge 3 commits into
apache:masterfrom
prashantwason:heartbeat-timer-resilience

Conversation

@prashantwason
Copy link
Copy Markdown
Member

Describe the issue this Pull Request addresses

Closes #18903

HoodieHeartbeatClient can permanently stop generating heartbeats for an in-flight instant, causing a later commit to abort with HoodieException: Heartbeat for instant <t> has expired even though the writer is still alive. Two independent causes, both in updateHeartbeat():

  1. The heartbeat file is written synchronously on the Timer thread. Since the timer uses scheduleAtFixedRate, a slow or hung storage write blocks the thread and freezes all subsequent heartbeats for the instant.
  2. When a refresh is delayed past the tolerable interval, updateHeartbeat() calls Thread.currentThread().interrupt(), which permanently kills the timer thread — turning a transient delay (GC pause, driver stall, single slow write) into a permanent blackout.

Summary and Changelog

  • Perform the heartbeat file write on a bounded daemon executor and wait with a timeout (Future.get(heartbeatWriteTimeoutMs)), so a slow or hung storage call can no longer block the timer thread. The write timeout is one heartbeat interval; a timed-out write does not advance the last-heartbeat time and is retried on the next tick. A cached thread pool is used so that if one write hangs, subsequent ticks proceed on a fresh thread.
  • Remove the self-interrupt in updateHeartbeat(). Instead of Thread.currentThread().interrupt(), log a warning and continue refreshing. The commit-time check HeartbeatUtils.abortIfHeartbeatExpired() remains the sole enforcement point for staleness.
  • Shut the executor down in close().
  • Add TestHoodieHeartbeatClient.testTimerSurvivesHungHeartbeatWrite, which blocks the first heartbeat write and asserts the timer keeps generating heartbeats (covering both fixes).

Impact

No public API or config change. Heartbeat refresh becomes resilient to transient storage latency and driver pauses: a transient stall no longer permanently disables heartbeats for an instant. Staleness is still enforced at commit time, so correctness of the concurrency guard is unchanged.

Risk Level

low

Behavior change is confined to HoodieHeartbeatClient. Existing TestHoodieHeartbeatClient tests pass and a new regression test was added.

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

… delayed heartbeats

HoodieHeartbeatClient could permanently stop generating heartbeats for an
instant, causing later commits to abort with "Heartbeat for instant ... has
expired" even though the writer was still alive:

- The heartbeat file is written synchronously on the Timer thread. Because the
  timer uses scheduleAtFixedRate, a slow or hung storage write blocks the thread
  and freezes all subsequent heartbeats for that instant.
- When a heartbeat refresh is delayed past the tolerable interval,
  updateHeartbeat() called Thread.currentThread().interrupt(), permanently
  killing the timer thread and turning a transient delay into a permanent
  blackout.

Fix:
- Perform the heartbeat file write on a bounded daemon executor (Future.get with
  a per-interval timeout) so a slow or hung storage call cannot block the timer
  thread; a timed-out write is retried on the next tick.
- Remove the self-interrupt; log a warning and continue. The commit-time check
  HeartbeatUtils.abortIfHeartbeatExpired() remains the sole enforcement point.

Add TestHoodieHeartbeatClient.testTimerSurvivesHungHeartbeatWrite.
@github-actions github-actions Bot added the size:M PR with lines of changes in (100, 300] label Jun 2, 2026
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working on this! The PR addresses two real issues with the heartbeat client: (a) synchronous storage writes blocking the timer thread, and (b) the self-interrupt permanently killing the timer. The new design (executor with bounded timeout + remove self-interrupt) is reasonable. One edge case worth double-checking in the inline comments around what happens when the very first heartbeat write times out during start(). Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One minor type inconsistency worth noting in the new field declaration.

heartbeat.setInstantTime(instantTime);
heartbeat.setLastHeartbeatTime(newHeartbeatTime);
heartbeat.setNumHeartbeats(heartbeat.getNumHeartbeats() + 1);
} catch (TimeoutException te) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 If the very first writeHeartbeatFile() call (invoked synchronously from start()) hits this timeout path, heartbeat.setLastHeartbeatTime(...) is never reached, so the Heartbeat's lastHeartbeatTime stays null. A subsequent call to isHeartbeatExpired() (e.g., from HeartbeatUtils.abortIfHeartbeatExpired() during a fast write-then-commit) would NPE at line 189 (currentTime - getLastHeartbeatTime() unboxes a null Long). Could lastHeartbeatTime be initialized conservatively here (or in start() before scheduling), or isHeartbeatExpired guarded against the null case, to close this corner?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — fixed. isHeartbeatExpired() now falls back to the DFS read when the in-memory last-heartbeat time is null (which can happen if the first synchronous write times out), so it no longer NPEs on the unboxing comparison. A missing heartbeat file reads as 0 and is correctly treated as expired.

// Maximum time the timer thread will wait for a single heartbeat file write to complete before
// abandoning it and letting the next tick retry. Bounded to one interval so that a slow/hung
// storage write cannot block the timer thread (and thus freeze all subsequent heartbeats).
private final long heartbeatWriteTimeoutMs;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the two sibling duration fields (heartbeatIntervalInMs, maxAllowableHeartbeatIntervalInMs) are boxed Long — could you align heartbeatWriteTimeoutMs to Long as well to keep the field group consistent?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — changed heartbeatWriteTimeoutMs to boxed Long to match the sibling duration fields.

public void close() {
this.stopHeartbeatTimers();
this.instantToHeartbeatMap.clear();
synchronized (this) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have concurernt close on this client, should we add a isClosed flag to avoid repetitive close.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a closed flag — close() is now synchronized and idempotent (a second call is a no-op), and the executor getter throws if used after close so a late timer tick cannot resurrect the executor.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction to my note above: CI caught that the heartbeat client is legitimately reused after close() (startCommit / acquireRollbackHeartbeatIfMultiWriter call start() again on a closed instance), so a sticky closed flag broke that and failed TestJavaHoodieBackedMetadata / TestHoodieJavaClientOnCopyOnWriteStorage. I removed the flag. close() is now idempotent via synchronized + the executor null-check (repeated/concurrent close is a no-op), while reuse-after-close still works (the executor is lazily re-created on the next heartbeat). Fixed in 165a2fc.

// future heartbeats for this instant, turning a transient delay into a permanent blackout.
// Enforcement is done at commit time in HeartbeatUtils.abortIfHeartbeatExpired(), which is the
// correct and sole enforcement point.
log.warn("Missed generating heartbeat for instant {} within allowable interval {} ms; continuing to refresh",
Copy link
Copy Markdown
Contributor

@danny0405 danny0405 Jun 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense somehow, but I do see some risk for correctness: when failed writes rollback strategy is configured as LAZY, the async cleaner would possibility rollback the current instant by removing some data files(not remove the metadata files on timeline yet), and then the write finish to commit, then the commit got data loss.

Should we also increase the tolerale missing cnt, the current default is 2, should we change it to 10 or 20

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is an important catch. I reworked the expiry handling to preserve the original safety property:

  • On a detected lapse we now stop refreshing the heartbeat (cancel the timer) and do not advance the last-heartbeat time. So a lapsed writer still fails at commit via abortIfHeartbeatExpired() — it cannot resurrect a heartbeat that an async cleaner (LAZY policy) may already have acted on, which closes the data-loss window you described. The only change from the original is that the timer is cancelled cleanly instead of via Thread.currentThread().interrupt() (which permanently killed the timer thread on the first miss).
  • Separately, I raised the default hoodie.client.heartbeat.tolerable.misses from 2 to 10 as you suggested, so transient GC/storage pauses are far less likely to trip expiry in the first place. Combined with moving the heartbeat write off the timer thread (time-bounded), transient delays no longer cause spurious aborts, while a genuine lapse still aborts safely.

- isHeartbeatExpired: handle a null last-heartbeat-time (the very first write
  can time out, leaving it unset) by falling back to the DFS read, avoiding an
  NPE on the unboxing comparison.
- On a detected lapse, stop refreshing the heartbeat (cancel the timer) and do
  not advance the last-heartbeat time, so a lapsed writer still aborts at commit
  via HeartbeatUtils.abortIfHeartbeatExpired() and cannot resurrect a heartbeat
  that a concurrent cleaner (LAZY failed-writes policy) may already have acted
  on. The timer is cancelled cleanly rather than via Thread.interrupt().
- close(): make idempotent via a closed flag and guard executor creation after
  close.
- Use boxed Long for heartbeatWriteTimeoutMs to match the sibling duration
  fields.
- Raise the default hoodie.client.heartbeat.tolerable.misses from 2 to 10 so
  transient driver pauses (e.g. GC) or storage-latency spikes do not abort a
  still-healthy writer.
A write client reuses its HoodieHeartbeatClient across operations: after
close(), startCommit() and acquireRollbackHeartbeatIfMultiWriter() call
heartbeatClient.start() again. The previously added "already closed" guard in
the executor accessor turned this valid reuse into a failure (observed in
TestJavaHoodieBackedMetadata and TestHoodieJavaClientOnCopyOnWriteStorage).

Remove the guard and the closed flag; close() remains idempotent via
synchronization plus the null-check on the executor, so repeated/concurrent
close() is still safe while reuse-after-close works (the executor is lazily
re-created on the next heartbeat).
// kill the timer thread (turning a transient delay into a permanent blackout on the first miss).
log.error("Missed generating heartbeat for instant {} within allowable interval {} ms; stopping heartbeat refresh",
instantTime, this.maxAllowableHeartbeatIntervalInMs);
heartbeat.getTimer().cancel();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we replace the Timer with a Java ScheduledExecutorService, java.util.Timer. Timer is a legacy class (introduced in Java 1.3) with severe design flaws. If a TimerTask throws an uncaught exception, the entire timer thread dies, killing all other scheduled tasks permanently.

@hudi-bot
Copy link
Copy Markdown
Collaborator

hudi-bot commented Jun 4, 2026

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@prashantwason
Copy link
Copy Markdown
Member Author

CI on 165a2fce is green except for two unrelated, pre-existing flaky jobs:

  • test-spark-java17-java-tests-part2 (spark4.1) — FAILURE:
    • TestHoodieClientMultiWriter.testHoodieClientBasicMultiWriterWithEarlyConflictDetectionDirect[3] — first attempt failed with a transient SparkException: Lost task ... HoodieUpsertException on an executor, and the retries then failed with Unable to instantiate class org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider (ZooKeeper connectivity). This test has a known flaky history (see test: Fix flaky test in TestHoodieClientMultiWriter #17793 "Fix flaky test in TestHoodieClientMultiWriter"). surefire also reported Flakes: 1 for this run.
    • TestHoodieIndex.testTagLocationDuringUpdatesAndFailures[7] — transient Failed to upsert for commit time ....
  • test-spark-java17-java-tests-part2 (spark4.0) — CANCELLED (fail-fast collateral from the spark4.1 job).

None of these touch HoodieHeartbeatClient or the heartbeat config changed in this PR (the failures are in the ZK lock-provider and upsert code paths). The job this PR's change is actually relevant to — test-hudi-hadoop-mr-and-hudi-java-client — passes.

Could a committer please re-run the two failed test-spark-java17-java-tests-part2 jobs? Thanks!

@danny0405
Copy link
Copy Markdown
Contributor

@prashantwason do you think this is a valuable suggestion: #18904 (comment)

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 75.60976% with 10 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.79%. Comparing base (b7adecc) to head (165a2fc).
⚠️ Report is 13 commits behind head on master.

Files with missing lines Patch % Lines
...e/hudi/client/heartbeat/HoodieHeartbeatClient.java 74.35% 9 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18904      +/-   ##
============================================
- Coverage     68.81%   68.79%   -0.02%     
+ Complexity    29160    29097      -63     
============================================
  Files          2520     2517       -3     
  Lines        140056   139848     -208     
  Branches      17209    17213       +4     
============================================
- Hits          96373    96206     -167     
+ Misses        35909    35867      -42     
- Partials       7774     7775       +1     
Flag Coverage Δ
common-and-other-modules 44.26% <75.60%> (-0.07%) ⬇️
hadoop-mr-java-client 44.75% <56.09%> (-0.13%) ⬇️
spark-client-hadoop-common 48.05% <56.09%> (-0.12%) ⬇️
spark-java-tests 49.33% <56.09%> (-0.03%) ⬇️
spark-scala-tests 45.22% <56.09%> (-0.03%) ⬇️
utilities 37.29% <56.09%> (-0.09%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...java/org/apache/hudi/config/HoodieWriteConfig.java 89.97% <100.00%> (ø)
...e/hudi/client/heartbeat/HoodieHeartbeatClient.java 81.13% <74.35%> (-6.37%) ⬇️

... and 61 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the iteration — the rollback-safety reasoning you added to the in-flight expiry path and the DFS fallback in isHeartbeatExpired() both look well thought through. One question about whether the cleaner-rollback safety property is fully preserved when the very first writeHeartbeatFile from start() times out and recovery only happens after the tolerable window. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here.

outputStream.close();
writeHeartbeatFile(instantTime);
Heartbeat heartbeat = instantToHeartbeatMap.get(instantTime);
if (heartbeat.getLastHeartbeatTime() != null && isHeartbeatExpired(instantTime)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The comment below correctly explains why we must stop refreshing once a lapse has been observed — but this lastHeartbeatTime != null guard means the same protection isn't applied to the first-write-timeout case. If the synchronous writeHeartbeatFile from start() times out and the subsequent ticks also time out for longer than maxAllowableHeartbeatIntervalInMs before one finally lands, a concurrent LAZY-policy cleaner could have rolled back this instant in the gap (no file on DFS → static HoodieHeartbeatUtils.isHeartbeatExpired returns expired → getInstantsToRollbackForLazyCleanPolicy picks it up). When a tick finally succeeds, this guard skips the expiry check and sets a fresh lastHeartbeatTime, so the commit-time abortIfHeartbeatExpired() then sees a healthy in-memory time and proceeds — exactly the "commit on top of rolled-back files" outcome the comment is trying to prevent. Was this edge case considered? One option is to also abort if lastHeartbeatTime is null and currentTime - <start()-time> already exceeds maxAllowableHeartbeatIntervalInMs.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:M PR with lines of changes in (100, 300]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Heartbeat timer can be permanently killed, causing valid in-flight commits to abort with "Heartbeat ... has expired"

5 participants