chore(flink): cherrypick append write code path fixes#18757
chore(flink): cherrypick append write code path fixes#18757HuangZhenQiu wants to merge 7 commits into
Conversation
…ial uncommitted write metadata event (apache#7611)
…ing heartbeat for lazy failed writes clean policy (apache#9135)
…delete heartbeat files for close (apache#9160)
…pache#9357) Co-authored-by: danny0405 <yuzhao.cyz@gmail.com>
… handle CheckPointComplete not finished (apache#10767) Test Plan: unit tests Reviewers: O955 Project Hoodie Project Reviewer: Add blocking reviewers, #hoodie_blocking_reviewers, peter.huang, vamshi Reviewed By: O955 Project Hoodie Project Reviewer: Add blocking reviewers, #hoodie_blocking_reviewers, peter.huang, vamshi Tags: #has_java JIRA Issues: HUDI-6847 Differential Revision: https://code.uberinternal.com/D17898557
67f0ffd to
bc2b16a
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the cherry-pick! This brings several Append-write-path fixes from master into release-0.14.2. The main piece I'd ask you to double-check is the ConcurrentHashMap → HashMap swap in HoodieHeartbeatClient, since the internal Timer thread reads/writes the map concurrently with the main thread. A couple of other questions in the inline comments. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. One naming/import nit worth addressing; the rest of the changes look clean.
| this.heartbeatIntervalInMs = heartbeatIntervalInMs; | ||
| this.maxAllowableHeartbeatIntervalInMs = this.heartbeatIntervalInMs * numTolerableHeartbeatMisses; | ||
| this.instantToHeartbeatMap = new ConcurrentHashMap<>(); | ||
| this.instantToHeartbeatMap = new HashMap<>(); |
There was a problem hiding this comment.
🤖 I'm a bit worried about downgrading from ConcurrentHashMap to HashMap here. The HeartbeatTask runs on the internal Timer thread (Timer(true) in Heartbeat) and reads instantToHeartbeatMap via updateHeartbeat while the caller thread can be doing put/remove/clear (e.g. start, stop, close). Even though the class is @NotThreadSafe for external callers, the internal Timer thread isn't external — race between Timer thread get and main thread put/clear on a plain HashMap can corrupt the map state. Was there a specific reason for the change? @danny0405
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| return; | ||
| } | ||
|
|
||
| initInstant(instant); |
There was a problem hiding this comment.
🤖 Just to double-check the scope: this reuse path only triggers when tableState.operationType == INSERT. For other append-style ops (e.g. INSERT_OVERWRITE, BULK_INSERT) the bootstrap will still re-init a new instant and the prior pending one will be rolled back — is that intentional, or should this also cover those operation types where the same "data was snapshotted but not committed" scenario can occur?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy() && !WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) { | ||
| writeClient.getHeartbeatClient().stop(this.instant); | ||
| } | ||
| // starts a new instant |
There was a problem hiding this comment.
🤖 With the explicit heartbeatClient.stop(this.instant) removed from initInstant, the only path that stops the prior instant's heartbeat is writeClient.commit(...) inside the recommit branch. If initInstant takes the reset()/no-recommit path (e.g. the instant was already in the completed timeline but a prior pending this.instant still has a live heartbeat, or the bootstrap path resets without recommit), does the prior heartbeat get cleaned up anywhere, or could it leak until process exit? Worth confirming there's no scenario where a stale heartbeat is left running for the previous this.instant.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| import org.apache.hudi.util.ClusteringUtil; | ||
| import org.apache.hudi.util.CompactionUtil; | ||
| import org.apache.hudi.util.FlinkWriteClients; | ||
| import org.apache.hudi.util.*; |
There was a problem hiding this comment.
🤖 nit: wildcard imports are generally discouraged in Java (and flagged by most checkstyle configs) — could you restore the four explicit imports (ClientIds, ClusteringUtil, CompactionUtil, FlinkWriteClients) rather than using org.apache.hudi.util.*?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for cherry-picking these fixes! This PR backports a set of upstream append-write fixes (heartbeat handling, bootstrap event reuse on partial failover, and related test helpers). Prior review rounds have already flagged the key concerns (HashMap thread-safety, heartbeat lifecycle on initInstant, INSERT-only scope, wildcard imports); no new correctness or architectural issues surfaced from this pass. Please take a look at the existing inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of minor readability nits below — one on the log format placeholder and one on the Javadoc wording; the rest of the changes look clean.
cc @yihua
| if (this.metaClient.reloadActiveTimeline().filterInflightsAndRequested().containsInstant(this.instant) | ||
| && instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT) | ||
| && this.tableState.operationType == WriteOperationType.INSERT) { | ||
| LOG.warn("Reuse current pending Instant {} with {} operationType, " |
There was a problem hiding this comment.
🤖 nit: the {} placeholder is always filled with WriteOperationType.INSERT.value() since we just checked operationType == INSERT — it could be confusing to a reader who wonders why it's a placeholder if it never varies. Could you either inline "INSERT" in the string, or use this.tableState.operationType.value() so it reads as a runtime value like the first {}?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| } | ||
|
|
||
| /** | ||
| * Triggers Job level fail, so the coordinator need re-create a new instance. |
There was a problem hiding this comment.
🤖 nit: "Triggers Job level fail" describes the scenario rather than what the method does, and it blurs the distinction with coordinatorFails() which already exists. Something like "Simulates a coordinator restart by closing and re-creating the coordinator instance" would be clearer — and "need re-create" should be "needs to re-create".
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| this.heartbeatIntervalInMs = heartbeatIntervalInMs; | ||
| this.maxAllowableHeartbeatIntervalInMs = this.heartbeatIntervalInMs * numTolerableHeartbeatMisses; | ||
| this.instantToHeartbeatMap = new ConcurrentHashMap<>(); | ||
| this.instantToHeartbeatMap = new HashMap<>(); |
Describe the issue this Pull Request addresses
Cherrypick Append write code path fixes
Summary and Changelog
#7611
#9357
#9867
#10119
#10767
#10312
Impact
none
Risk Level
none
Documentation Update
none
Contributor's checklist