[Fix][Zeta] Job stuck permanently after master failover, unable to complete (affects BATCH / bounded source / job shutdown phase)#10836
Conversation
…mplete (affects BATCH / bounded source / job shutdown phase)
|
@davidzollo @dybyte Please help review any suggestions. Thank you very much |
… stuck after master failover
DanielLeens
left a comment
There was a problem hiding this comment.
Hi @nzw921rx, thanks for the detailed reproduction work here. I reviewed the latest head locally as seatunnel-review-10836 at 2fdbd8e2255e against upstream/dev. I did not run Maven locally in this batch; this is a source-level review plus current GitHub check metadata.
What This PR Fixes
- User pain point: after master failover, a bounded-source job can enter shutdown phase and then get stuck permanently instead of completing the final close/checkpoint path.
- Fix approach: the PR persists
readyToCloseStartingTaskintorunningJobStateIMapand restores it after master switch so the new coordinator can continue the completion-point logic. - One-line summary: the direction is right, but the latest head still only persists the fully complete
readyToCloseStartingTaskset, so the failover window is not actually closed for multi-source pipelines.
1. Code Review
1.1 Core Logic Analysis
- Exact change:
- Persistence/restore logic:
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:431-488 - Starting-task composition:
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java:333-350 - New E2E timing assumption:
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MasterFailoverBatchJobIT.java:115-143and.../batch_fake_to_localfile_master_failover_template.conf:24-45
- Persistence/restore logic:
- Before / after summary:
readyToCloseStartingTask.add(taskLocation);
if (readyToCloseStartingTask.size() == plan.getStartingSubtasks().size()) {
updateReadyToCloseStartingTask();
tryTriggerPendingCheckpoint(CheckpointType.COMPLETED_POINT_TYPE);
}- Key findings:
startingSubtaskscontains one starting task per source action, not a guaranteed singleton.- The latest head only persists the set when
readyToCloseStartingTask.size() == plan.getStartingSubtasks().size(). - So if source A has already reported
readyToClose, source B has not, and the master fails in that middle window, the new coordinator still loses A''s progress because nothing was written yet. - The new E2E waits until all rows are written and explicitly assumes every source task has already sent
LastCheckpointNotifyOperation, so it never exercises that partial-progress failover window.
- Logic correctness deep dive:
Pipeline planning
-> PhysicalPlanGenerator.getEnumeratorTask() [333-350]
-> one starting task is added for each source action
Shutdown path
-> SourceSplitEnumeratorTask.RUNNING
-> send LastCheckpointNotifyOperation when prepareCloseStatus becomes true
-> CheckpointManager.readyToClose(taskLocation)
-> CheckpointCoordinator.readyToClose(taskLocation) [431-436]
-> add taskLocation into readyToCloseStartingTask
-> persist only when the set is already complete
Failover window that still remains
-> source A already called readyToClose
-> source B has not yet called readyToClose
-> master fails here
-> no IMap snapshot exists for the partial set
-> restored coordinator cannot recover source A's ready-to-close progress from persisted state
That means the current patch fixes the all-starting-tasks-already-ready-to-close case, but it still leaves the middle window open for pipelines with more than one source action.
1.2 Compatibility Impact
- Partially compatible.
- The change is internal and additive, but the failover contract is still incomplete for supported multi-source topologies.
1.3 Performance / Side Effects
- The new IMap write itself is cheap and acceptable.
- The risk is correctness, not overhead.
1.4 Error Handling and Logs
Issue 1: partial ready-to-close progress is still lost for multi-source failover windows
- Location:
CheckpointCoordinator.java:431-469,PhysicalPlanGenerator.java:333-350 - Problem: persistence happens only when the ready-to-close set is already complete.
- Risk: a multi-source bounded job can still get stuck after master failover if the failure happens after some, but not all, source enumerators have entered the shutdown path.
- Best recommendation: persist
readyToCloseStartingTaskon every successfulreadyToClose(...)update, not only on the terminal all-sources-complete transition. - Severity: High
2. Code Quality Assessment
2.1 Code Style
- The new persistence and restore helpers are readable and land at the right layer.
2.2 Test Coverage
- The new E2E is useful for the fully-finished-source case.
- It still does not cover the partial-progress multi-source failover window described above.
2.3 Documentation
- No user-facing docs update is required for this internal engine fix.
3. Architecture
3.1 Elegance of the Solution
- Persisting coordinator shutdown progress is the right long-term direction.
- The current snapshot timing is just too late.
3.2 Maintainability
- If partial progress is not made durable, this class of failover bug will stay difficult to reason about for multi-source jobs.
3.3 Extensibility
- Making the progress durable on each ready-to-close transition will scale better than only persisting the terminal state.
3.4 Historical Compatibility
- No historical API break, but the failover behavior is still incomplete.
4. Issue Summary
| No. | Issue | Location | Severity |
|---|---|---|---|
| 1 | Partial ready-to-close progress is still not durable across multi-source failover windows | CheckpointCoordinator.java:431-469, PhysicalPlanGenerator.java:333-350 |
High |
5. Merge Conclusion
Conclusion: merge after fixes
- Blocking items
- Issue 1: persist
readyToCloseStartingTaskon every ready-to-close transition so master failover can recover partial progress, not only the fully-complete set.
- Suggested non-blocking follow-ups
- Extend the new E2E coverage with a true multi-source pipeline and trigger failover after only a subset of starting tasks has reported
readyToClose.
Overall assessment: the repair direction is correct, but the latest head still does not fully close the real failover window it is trying to fix.
CI note: the current Build is still pending on the latest head, but the code-level blocker above already needs addressing before merge.
@DanielLeens Great suggestion, fixed |
DanielLeens
left a comment
There was a problem hiding this comment.
Hi @nzw921rx, thanks for the quick follow-up. I re-reviewed the latest head locally as seatunnel-review-10836 at 0182cd6a58bd against upstream/dev. I did not run Maven locally in this batch; this is a source-level review plus current GitHub check metadata. The important change from your previous update is that readyToCloseStartingTask is now persisted on every readyToClose(...) transition, so the multi-source partial-progress failover window Daniel called out earlier is actually covered on the latest head.
What This PR Fixes
- User pain point: after master failover, a bounded-source job could enter the shutdown path and then get stuck permanently because the new coordinator lost partial
readyToCloseprogress from already-finished starting enumerators. - Fix approach: the latest head persists
readyToCloseStartingTaskintorunningJobStateIMapon every successfulreadyToClose(...), restores that set inrestoreCoordinator(...), and adds a failover IT that triggers master switch while only a strict subset of starting tasks has reported ready-to-close. - One-line summary: the latest head closes the real multi-source failover gap instead of only handling the all-sources-already-finished case.
1. Code Review
1.1 Core Logic Analysis
- Exact change:
- persistence / restore logic:
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:161-183,431-488,543-578,940-979 - starting-task cardinality:
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java:333-355 - regression coverage:
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointCoordinatorFailoverIT.java:68-314
- persistence / restore logic:
- Before / after summary:
// before
readyToCloseStartingTask.add(taskLocation);
if (readyToCloseStartingTask.size() == plan.getStartingSubtasks().size()) {
tryTriggerPendingCheckpoint(CheckpointType.COMPLETED_POINT_TYPE);
}// after
readyToCloseStartingTask.add(taskLocation);
updateReadyToCloseStartingTask();
if (readyToCloseStartingTask.size() == plan.getStartingSubtasks().size()) {
tryTriggerPendingCheckpoint(CheckpointType.COMPLETED_POINT_TYPE);
}Set<TaskLocation> restoredReadyToClose = loadReadyToCloseStartingTask();
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET);
if (restoredReadyToClose != null && !restoredReadyToClose.isEmpty()) {
readyToCloseStartingTask.addAll(restoredReadyToClose);
}- Key findings:
PhysicalPlanGeneratorstill creates one starting task per source action, so multi-source pipelines are the real target shape here.- The latest head now persists the partial
readyToCloseStartingTaskset on every transition, not only when the set is already complete. restoreCoordinator(...)reloads that persisted set before deciding whether to trigger a normal checkpoint or a completed-point checkpoint.- The new batch IT explicitly waits for
0 < readyToClose subset < total starting tasksbefore triggering failover, so it exercises the exact window that used to be broken.
- Logic correctness deep dive:
Pipeline planning
-> PhysicalPlanGenerator.getEnumeratorTask() [333-355]
-> one starting task is created for each source action
Shutdown path
-> SourceSplitEnumeratorTask.RUNNING [334-339]
-> prepareCloseStatus becomes true
-> send LastCheckpointNotifyOperation(jobId, taskLocation)
-> CheckpointManager.readyToClose(taskLocation) [209-216]
-> CheckpointCoordinator.readyToClose(taskLocation) [431-436]
-> add taskLocation into readyToCloseStartingTask
-> persist a copy of the set into IMAP_RUNNING_JOB_STATE
-> trigger COMPLETED_POINT only when the set is fully complete
Master failover recovery
-> CheckpointCoordinator.restoreCoordinator(alreadyStarted=true) [543-578]
-> loadReadyToCloseStartingTask() from IMap
-> cleanPendingCheckpoint(CHECKPOINT_COORDINATOR_RESET) without deleting the IMap key
-> restore the set into memory
-> choose COMPLETED_POINT vs normal checkpoint based on restored set size
This means the normal runtime path does hit the new logic, and the previous partial-progress failover hole is no longer left open on the latest head.
1.2 Compatibility Impact
- Fully compatible.
- No public API, config option, default value, protocol contract, or serialization format is changed.
- The new IMap key is internal coordinator state only.
1.3 Performance / Side Effects
- CPU / memory / GC impact is small: one bounded-size set copy plus one IMap write per starting-task
readyToClosetransition. - Network impact is limited to that extra coordinator-state write and is justified by the failover durability gain.
- I do not see a new concurrency, retry, idempotency, or resource-release regression in the latest head.
1.4 Error Handling and Logs
Issue 1: latest Build is still queued, so there is not yet a full CI result on this head
- Location: GitHub checks /
Build - Problem: I do not see a remaining code-level blocker, but the latest head
0182cd6a58bd2c1124c5efbdf6c9b83b39b954a0still does not have a finishedBuildresult. - Risk: if CI later finds an engine or e2e regression, that signal is not visible yet.
- Best recommendation: wait for the current
Buildrun to finish green before merge. - Severity: Low
2. Code Quality Assessment
2.1 Code Style
- The new persistence / restore helpers are readable and stay inside the coordinator boundary where this state belongs.
2.2 Test Coverage
- The new batch failover IT now covers the exact partial-progress window Daniel blocked before.
- The streaming IT also checks that checkpoint ids continue growing after master failover, which is a useful coordinator-liveness companion check.
2.3 Documentation
- No user-facing documentation update is required for this internal engine recovery fix.
3. Architecture
3.1 Elegance of the Solution
- This is a precise fix, not a workaround.
- Persisting shutdown progress at each
readyToClosetransition is the right durability point for this coordinator lifecycle.
3.2 Maintainability
- The lifecycle is now easier to reason about because the persisted source-of-truth and the restore path are explicit in one class.
3.3 Extensibility
- This approach is reusable for future coordinator failover fixes that also depend on transient shutdown-progress state.
3.4 Historical Compatibility
- Compatible with historical behavior and existing upgrade paths.
4. Issue Summary
| No. | Issue | Location | Severity |
|---|---|---|---|
| 1 | Latest Build is still queued, so there is not yet a full CI result on this head | GitHub checks / Build |
Low |
5. Merge Conclusion
Conclusion: merge after fixes
- Blocking items
- Issue 1: wait for the current
Buildon head0182cd6a58bd2c1124c5efbdf6c9b83b39b954a0to finish green before merge.
- Suggested non-blocking follow-ups
- I do not see a new code-level follow-up that should block this round.
Overall assessment: the latest head does address the real partial readyToClose failover gap Daniel called out earlier. The runtime chain is now coherent, the recovery logic reloads the persisted set at the right point, and the new IT finally targets the real broken window instead of a happier end-state path.
DanielLeens
left a comment
There was a problem hiding this comment.
Hi @nzw921rx, thanks for the continued follow-up. I re-reviewed the latest head locally as seatunnel-review-10836 at c787d203a607 against upstream/dev. I did not run Maven locally in this batch; this is a source-level review plus current GitHub check metadata. The core runtime fix Daniel accepted earlier is still in place: readyToCloseStartingTask is now persisted on every transition and restored before checkpoint triggering after master failover. The part that changed again after Daniel's previous review is mostly the failover IT and related observability, and that newest test delta reopens one review concern from the verification side.
What This PR Fixes
- User pain point: after master failover, a bounded-source job can get stuck in the shutdown / completed-point path because the new coordinator loses already-finished source progress.
- Fix approach: the runtime code persists
readyToCloseStartingTaskinto the running-job IMap on everyreadyToClose(...)transition, restores it after failover, and adds failover IT coverage around that path. - One-line summary: the runtime repair still looks right, but the latest batch failover IT no longer proves the exact partial-progress window this PR is trying to protect.
1. Code Review
1.1 Core Logic Analysis
- Exact change:
- runtime persistence / restore path:
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:431-488,543-578,941-980 - latest batch failover IT:
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointCoordinatorFailoverIT.java:120-180 - streaming checkpoint-growth IT:
.../CheckpointCoordinatorFailoverIT.java:292-340 - batch test topology:
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_to_localfile_master_failover_template.conf:24-153 - starting-task composition:
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java:329-355
- runtime persistence / restore path:
- Before / after summary:
readyToCloseStartingTask.add(taskLocation);
updateReadyToCloseStartingTask();
if (readyToCloseStartingTask.size() == plan.getStartingSubtasks().size()) {
tryTriggerPendingCheckpoint(CheckpointType.COMPLETED_POINT_TYPE);
}for (int pipelineId = 1; pipelineId <= sourceCount; pipelineId++) {
Object stored = runningJobStateImap.get(readyToCloseImapKey);
if (stored instanceof Set) {
int size = ((Set<?>) stored).size();
if (size > 0 && size < sourceCount) {
observedSubsetSize.compareAndSet(-1, size);
observedPipelineId.compareAndSet(-1, pipelineId);
}
}
}- Key findings:
- The real runtime path is still correct:
CheckpointCoordinator.readyToClose()persists the set on every transition, andrestoreCoordinator()reloads it before deciding whether to trigger a normal checkpoint or a completed point. - The batch IT topology defines five separate
FakeSource -> LocalFilepairs, not one pipeline with five starting tasks. PhysicalPlanGenerator.getEnumeratorTask()adds one starting task per source action within a pipeline, whileCheckpointCoordinatorcompares readiness againstplan.getStartingSubtasks().size()per pipeline.- The new batch IT now checks
0 < stored.size() < sourceCountacross pipeline ids, so it can be satisfied by a fully complete single-source pipeline (size = 1) without proving the real per-pipeline partial-progress window Daniel originally blocked on.
- The real runtime path is still correct:
- Logic correctness deep dive:
Batch test topology
-> batch_fake_to_localfile_master_failover_template.conf [24-153]
-> five FakeSource blocks producing table1..table5
-> five LocalFile sinks consuming table1..table5
-> disconnected source/sink pairs, so the job is not a single five-source pipeline
Pipeline planning
-> PhysicalPlanGenerator.getEnumeratorTask(...) [329-355]
-> one starting task is added for each source action inside that pipeline
Runtime fix path
-> CheckpointCoordinator.readyToClose(taskLocation) [431-436]
-> add taskLocation into readyToCloseStartingTask
-> persist the current set into IMAP_RUNNING_JOB_STATE
-> trigger COMPLETED_POINT only when size == plan.getStartingSubtasks().size()
-> CheckpointCoordinator.restoreCoordinator(alreadyStarted=true) [543-578]
-> load persisted set
-> preserve it across CHECKPOINT_COORDINATOR_RESET
-> choose COMPLETED_POINT vs normal checkpoint from restored size
Latest batch IT check
-> CheckpointCoordinatorFailoverIT.testBatchJobCanFinishAfterMasterFailover() [120-155]
-> scans pipelineId = 1..sourceCount
-> accepts any stored set with 0 < size < sourceCount
Why that matters
-> if a pipeline has exactly one starting task, size = 1 already means "fully complete for that pipeline"
-> but the IT still accepts it because sourceCount = 5 globally
-> so the test no longer guarantees it hit the real "partial readyToClose inside one pipeline" window
The runtime code still looks like the right fix, but the newest regression test no longer proves the precise failure window the PR description and earlier review were targeting.
1.2 Compatibility Impact
- Fully compatible from the runtime contract side.
- No public API, config option, default value, or serialization format is changed by the latest delta.
- The open concern is test precision, not backward compatibility.
1.3 Performance / Side Effects
- The extra IMap write on each
readyToClose(...)transition is still small and justified. - The streaming IT observability additions are also fine.
- The main side effect risk in the latest delta is verification quality: the batch IT became easier to satisfy than the actual bug window it is supposed to guard.
1.4 Error Handling and Logs
Issue 1: the latest batch failover IT no longer proves the real per-pipeline partial-progress window
- Location:
CheckpointCoordinatorFailoverIT.java:120-155,batch_fake_to_localfile_master_failover_template.conf:24-153,PhysicalPlanGenerator.java:329-355,CheckpointCoordinator.java:431-436,569-573 - Problem: the current assertion compares each pipeline's persisted
readyToCloseset against the globalsourceCount, even thoughCheckpointCoordinatorreasons per pipeline and a pipeline in this topology can have only one starting task. - Risk: the test can pass without ever exercising the real failover window where a pipeline has only a strict subset of its own starting tasks persisted before master failover.
- Best recommendation:
- Option A: build a topology where one pipeline truly has multiple source actions, then assert
0 < size < plan.getStartingSubtasks().size()for that pipeline. - Option B: keep the current multi-pipeline topology, but drive failover from a pipeline whose starting-task cardinality is explicitly verified to be > 1 before the assertion.
- Option A: build a topology where one pipeline truly has multiple source actions, then assert
- Severity: Medium
Issue 2: the current Build job is red on the latest head
- Location: GitHub checks /
Build - Problem: the latest head
c787d203a6078a1f88f5bfb70b91fa4cbac7ea32is not green yet. - Risk: even if the runtime fix is right, the project gate is still signaling unresolved validation problems.
- Best recommendation: fix the batch IT precision above, rerun the Build job, and merge on green.
- Severity: Low
2. Code Quality Assessment
2.1 Code Style
- The runtime coordinator changes remain readable and localized.
- The new log cleanup is fine.
- The concern is not style; it is that the latest regression assertion drifted away from the exact bug shape.
2.2 Test Coverage
- The streaming checkpoint-growth check is a useful companion signal.
- The batch IT is the important regression proof for this PR, and that is exactly where the current blind spot now sits.
2.3 Documentation
- No user-facing docs update is required for this internal engine fix.
3. Architecture
3.1 Elegance of the Solution
- The runtime persistence/restore design is still the right fix.
- The newest batch IT, however, is no longer as precise as the runtime bug it is supposed to validate.
3.2 Maintainability
- Keeping the regression test aligned with the exact per-pipeline contract matters a lot here, because this is coordinator failover logic and later regressions will be hard to debug from production symptoms alone.
3.3 Extensibility
- The persisted ready-to-close state is a good foundation for future failover durability work.
- The test should stay equally precise, otherwise the safety net will not scale with future coordinator changes.
3.4 Historical Compatibility
- Compatible with historical behavior.
- No upgrade-path regression is visible from the code change itself.
4. Issue Summary
| No. | Issue | Location | Severity |
|---|---|---|---|
| 1 | Latest batch failover IT no longer proves the real per-pipeline partial-progress window | CheckpointCoordinatorFailoverIT.java, batch_fake_to_localfile_master_failover_template.conf, PhysicalPlanGenerator.java, CheckpointCoordinator.java |
Medium |
| 2 | Latest-head Build is still red | GitHub checks / Build |
Low |
5. Merge Conclusion
Conclusion: merge after fixes
- Blocking items
- Issue 1: tighten the batch failover IT so it really validates a strict subset of starting tasks inside one pipeline, not just any pipeline whose single starting task has already fully finished.
- Issue 2: rerun and get the current Build job on head
c787d203a6078a1f88f5bfb70b91fa4cbac7ea32back to green.
- Suggested non-blocking follow-ups
- The streaming checkpoint-growth assertion is already a good companion check and should stay; my concern is specifically the batch proof shape above.
Overall assessment: the runtime coordinator fix still looks correct to me, and Daniel's earlier core concern about persisting partial readyToClose progress in the coordinator is still addressed in code. The latest head became weaker again on the verification side, though, because the batch IT no longer guarantees it hit the exact window this PR is supposed to protect.
|
@DanielLeens Thanks for the review. I added a UT that explicitly validates per-pipeline partial progress (1/3 starting tasks ready) is persisted and restored correctly across coordinator failover, and tightened the batch IT assertion to avoid the previous false-positive condition. |
DanielLeens
left a comment
There was a problem hiding this comment.
Hi @nzw921rx, thanks for the continued follow-up here. I re-reviewed the latest head locally as seatunnel-review-10836 at b2a1eaea52f536e9de356aac4e27b4146935cb51 against upstream/dev. I did not run Maven in this batch; this is a source-level review based on the current full diff, the actual coordinator failover path, and the latest test changes. The important point from Daniel's earlier review is now addressed in runtime code: partial readyToClose progress is persisted on every transition and restored before the new coordinator decides which checkpoint path to continue.
What This PR Fixes
- User pain point: after master failover, a bounded-source job could get stuck in the shutdown / completion-point path because the new coordinator lost partial
readyToCloseprogress from source enumerators that had already reached the close phase. - Fix approach: the latest head stores
readyToCloseStartingTaskintorunningJobStateIMapon every successfulreadyToClose(...), restores that set inrestoreCoordinator(true), preserves the key acrossCHECKPOINT_COORDINATOR_RESET, and adds direct unit coverage for partial-progress and full-progress recovery. - One-line summary: the latest head now closes the real partial-progress failover gap Daniel blocked earlier, and it does so in the coordinator runtime path itself rather than only in test timing.
1. Code Review
1.1 Core Logic Analysis
-
Exact change:
- runtime persistence / restore path:
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:431-488,543-578,975-979 - trigger path from source shutdown:
.../SourceSplitEnumeratorTask.java:334-339,.../LastCheckpointNotifyOperation.java:45-50,.../CheckpointManager.java:214-215 - direct regression coverage:
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java:378-667 - end-to-end failover coverage:
seatunnel-e2e/.../CheckpointCoordinatorFailoverIT.java:67-339
- runtime persistence / restore path:
-
Before / after:
// before
readyToCloseStartingTask.add(taskLocation);
if (readyToCloseStartingTask.size() == plan.getStartingSubtasks().size()) {
tryTriggerPendingCheckpoint(CheckpointType.COMPLETED_POINT_TYPE);
}// after
readyToCloseStartingTask.add(taskLocation);
updateReadyToCloseStartingTask();
if (readyToCloseStartingTask.size() == plan.getStartingSubtasks().size()) {
tryTriggerPendingCheckpoint(CheckpointType.COMPLETED_POINT_TYPE);
}Set<TaskLocation> restoredReadyToClose = loadReadyToCloseStartingTask();
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET);
if (restoredReadyToClose != null && !restoredReadyToClose.isEmpty()) {
readyToCloseStartingTask.addAll(restoredReadyToClose);
}
if (readyToCloseStartingTask.size() == plan.getStartingSubtasks().size()) {
tryTriggerPendingCheckpoint(CheckpointType.COMPLETED_POINT_TYPE);
} else {
tryTriggerPendingCheckpoint(CHECKPOINT_TYPE);
}-
Key findings:
- The normal runtime path does hit this change, because
readyToClose(...)is triggered byLastCheckpointNotifyOperationfromSourceSplitEnumeratorTaskduring source shutdown. - Daniel's earlier blocker is fixed in the latest head: partial
readyToCloseStartingTaskstate is now durable before failover, not only after all starting tasks have already finished. - The new
CheckpointCoordinatorTestdirectly validates the exact contract Daniel asked for: partial-progress restore continues with a normal checkpoint, while full-progress restore jumps to a completed point. - The updated batch E2E is now mainly an end-to-end "job still finishes after failover" proof, while the precise partial-progress contract is better covered by the new coordinator unit test.
- The normal runtime path does hit this change, because
-
Full runtime chain:
Source shutdown path
-> SourceSplitEnumeratorTask.RUNNING [SourceSplitEnumeratorTask.java:334-339]
-> when prepareCloseStatus becomes true
-> send LastCheckpointNotifyOperation(jobId, taskLocation)
-> LastCheckpointNotifyOperation.runInternal() [LastCheckpointNotifyOperation.java:45-50]
-> server.getCoordinatorService()
-> getJobMaster(jobId).getCheckpointManager().readyToClose(taskLocation)
-> CheckpointManager.readyToClose() [CheckpointManager.java:214-215]
-> getCheckpointCoordinator(taskLocation).readyToClose(taskLocation)
-> CheckpointCoordinator.readyToClose() [CheckpointCoordinator.java:431-436]
-> add taskLocation into readyToCloseStartingTask
-> persist a copy of the current set into IMAP_RUNNING_JOB_STATE
-> trigger COMPLETED_POINT only when all starting subtasks are ready
Master failover recovery
-> CheckpointCoordinator.restoreCoordinator(alreadyStarted=true) [CheckpointCoordinator.java:543-578]
-> loadReadyToCloseStartingTask() from IMap
-> cleanPendingCheckpoint(CHECKPOINT_COORDINATOR_RESET)
-> keep readyToCloseImapKey across reset [CheckpointCoordinator.java:975-979]
-> restore the set into in-memory readyToCloseStartingTask
-> if restored size == plan.getStartingSubtasks().size()
-> trigger COMPLETED_POINT_TYPE
-> else
-> trigger normal CHECKPOINT_TYPE and wait for the remaining source notifications
-
Logic correctness:
- The latest runtime chain is coherent now. The missing piece before was durability of partial shutdown progress; the latest code adds exactly that durability point.
- I do not see the earlier multi-source failover hole remaining in the latest head.
- The newly added unit tests are more convincing than the previous timing-based batch E2E alone, because they exercise the coordinator state directly.
-
Local verification results:
git fetch upstream pull/10836/head:seatunnel-review-10836: passed; local review branch updated tob2a1eaea52f536e9de356aac4e27b4146935cb51.git log c787d203a607..HEAD: passed; confirmed the new follow-up commits after Daniel's previous review.git diff upstream/dev...HEAD: passed; core runtime change remains localized toCheckpointCoordinator, with the latest delta focused on UT/E2E coverage.- PR metadata from local queue snapshot: passed; after Daniel's previous review there was an author follow-up comment plus new commits, but no new line-level review thread requiring a separate
in_reply_toresponse. - Maven compile / tests: not run in this review batch.
1.2 Compatibility Impact
- Fully compatible.
- No public API, config option, default value, protocol field, or serialization contract is changed.
- The behavior change is internal failover durability only.
1.3 Performance / Side Effects
- CPU / memory / GC overhead is small: one extra
IMap.set(...)with a smallHashSetsnapshot per starting-taskreadyToClosetransition. - Network overhead is limited to that extra coordinator-state write and is justified by the failover durability gain.
- I do not see a new concurrency, retry, idempotency, or resource-release regression in the latest head.
- Failing the job when persistence itself cannot be retried successfully is the safer tradeoff than silently risking another unrecoverable stuck state.
1.4 Error Handling and Logs
No new blocking issue found.
Additional note:
- The current
Buildcheck is still queued on this head. That is a project gate to wait for, not a code-level defect I found in the latest implementation.
2. Code Quality Assessment
2.1 Code Style
- The new coordinator persistence / restore helpers are readable and stay in the right layer.
- Logging is also clearer now around checkpoint / pipeline / job identifiers.
2.2 Test Coverage
CheckpointCoordinatorTestnow directly covers partial-progress persistence, full-progress recovery, repeated restore idempotency, missing-key fallback, and corrupted-value fallback.CheckpointCoordinatorFailoverITstill gives end-to-end confidence that batch jobs finish and streaming jobs continue checkpoint growth after master failover.- I do not see a blocking coverage gap in the latest head.
2.3 Documentation
- No user-facing documentation update is required for this internal engine failover fix.
3. Architecture
3.1 Elegance of the Solution
- This is a precise fix, not a workaround.
- It repairs the missing durable coordinator state instead of weakening the completion-point contract.
3.2 Maintainability
- Making the ready-to-close state explicit in IMap should make future failover debugging easier, because the recovery source-of-truth is now visible and lifecycle-aware.
3.3 Extensibility
- This sets a useful pattern for future coordinator failover fixes: if an in-memory lifecycle state must survive master switch, make that durability requirement explicit instead of relying on timing.
3.4 Historical Compatibility
- Compatible with historical behavior and existing upgrade paths.
4. Issue Summary
- No formal blocking issue found in the latest head.
5. Merge Conclusion
Conclusion: can merge
- Blocking items
- None from the code review side.
- Suggested non-blocking follow-ups
- Let the current
Buildcheck on the latest head finish green before the actual merge.
Overall assessment:
- The latest head does address the real partial
readyToClosefailover gap Daniel called out earlier. The runtime logic is now coherent, the restore path makes the correct checkpoint-vs-completed-point decision from persisted state, and the new unit coverage finally proves the exact contract directly instead of relying only on end-to-end timing.
| // Only remove the persisted ready-to-close state when the coordinator truly ends | ||
| // (completed/failed/cancelled). During a reset (master failover), the IMap entry | ||
| // must be preserved so restoreCoordinator() can recover from it. | ||
| if (closedReason != CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET) { | ||
| runningJobStateIMap.remove(readyToCloseImapKey); | ||
| } |
There was a problem hiding this comment.
One lifecycle question: readyToCloseImapKey should survive coordinator reset/master failover, but it seems like it only needs to be cleaned up once when the job/pipeline is finally finished.
Since it is derived from checkpointStateImapKey, would it be more consistent to clean it up together with checkpointStateImapKey in the JobMaster lifecycle cleanup path?
There was a problem hiding this comment.
Thanks for raising this. After rechecking the lifecycle, I agree that cleaning readyToCloseImapKey from the same final terminal path as checkpointStateImapKey would make the ownership clearer. I did not treat the current placement as a correctness blocker because this PR still keeps the map alive across coordinator reset/master failover and only removes it on the final cleanup path, so the restore window is preserved. Still, your suggestion is the cleaner lifecycle expression and I would support adopting it in a follow-up update.
There was a problem hiding this comment.
@dybyte Thank you for your review ,This is a great question.
From a design-consistency perspective, readyToCloseImapKey is derived from checkpointStateImapKey, so maintaining lifecycle consistency between them is logically sound and also a natural, positive direction. Your suggestion is constructive, and I agree with the semantic alignment it aims for.
The reason I currently lean toward “retain during failover, and clean up when the coordinator reaches a terminal state” is mainly based on two engineering considerations:
-
readyToCloseImapKeyis essentially a transient runtime state in the checkpoint coordination process. Its core value is recovery correctness, rather than long-term business-state representation. If it is bound to a longer job-wide lifecycle by default, then in large CDC clusters, such transient keys can scale with concurrency and introduce additional IMap storage and governance overhead. -
from an operations perspective, this state has limited external observability and diagnostic value during the checkpoint process. Keeping it beyond the coordination window provides low marginal benefit, while increasing the complexity of the overall state surface.
Therefore, I agree with the principle that derived state should maintain lifecycle consistency, while also aiming for a more practical balance between consistency and cost: ensure recoverability in reset/master-failover scenarios, and reclaim it promptly once the coordinator reaches a terminal state, so that temporary state does not linger unnecessarily.
There was a problem hiding this comment.
I think this is more than a lifecycle-style preference.
Since readyToCloseImapKey is persisted in IMap and restored by restoreCoordinator() after master failover, it becomes part of the recoverable distributed state for the job/pipeline. If the active master fails after this key is written but before it is removed, I am not sure which recoverable owner guarantees that this key will eventually be cleaned up.
This looks similar to the cleanup ownership issue discussed in #10687. Even though #10687 has not been merged yet, I think the same principle applies here: once a key is persisted for master-failover recovery, its cleanup should also be failover-safe and idempotent.
So I think this key should either be included in the JobMaster final cleanup path together with the checkpoint state key, or we need a clear explanation of why it is safe to exclude it.
Without this clarification, I am concerned that this fix may leave an orphan/stale readyToCloseImapKey after master failover.
There was a problem hiding this comment.
Thanks for pushing on this point. I rechecked the ownership path in the latest code, including CheckpointCoordinator.cleanPendingCheckpoint(...) and JobMaster.removeJobIMap().
You are right that JobMaster.removeJobIMap() currently removes checkpointStateImapKey but not readyToCloseImapKey, so the ownership is not expressed there as explicitly as in #10687. The reason I still do not read the current patch as leaving a stale-key correctness hole is that every terminal coordinator path I traced here still funnels through cleanPendingCheckpoint(...), and that method removes readyToCloseImapKey for all non-reset exits while preserving it only for CHECKPOINT_COORDINATOR_RESET, which is exactly the failover-recovery window this PR is fixing.
So from the current runtime chain, I still see a concrete cleanup owner and I do not view this as a blocker for the bug fix itself. That said, I agree your suggestion would make the lifecycle ownership easier to reason about, and mirroring the final cleanup in the JobMaster path as an extra safety net would be a reasonable hardening follow-up if the author touches this area again.
There was a problem hiding this comment.
Thanks for the thoughtful review — this is a great point, and I agree this is more than a lifecycle-style preference; it is a cleanup-ownership issue.
You are right about the risk window: if a failure happens before cleanup, or if the job is killed before cleanup, a stale key can remain.
I’ll address this by adding cleanup in the JobMaster final cleanup path (together with the checkpoint state key). This makes final cleanup ownership failover-safe, and it is still safe because remove is idempotent.
Also, in the normal path, once all related checkpoints are completed, we will clean up readyToCloseImapKey promptly to avoid unnecessary long-term retention and IMap growth.
…n Imap and optimize the log format
|
@DanielLeens please help me review, thanks |
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the follow-up. I re-reviewed the latest head end to end after the new commits, including the checkpoint coordinator state machine, the readyToClose durability path, the reset vs terminal cleanup split, and the new UT/E2E coverage.
What This PR Fixes
- User pain point: after master failover, a bounded-source job can get stuck permanently during shutdown instead of finishing the last close/checkpoint sequence.
- Fix approach: this PR persists
readyToCloseStartingTaskintorunningJobStateIMap, restores it on the new coordinator, and then decides whether recovery should continue with a normal checkpoint or a completed-point transition. - One-line summary: the latest head now closes the partial
readyToClosefailover gap Daniel had previously called out, and I did not find a new blocking issue in the current version.
1. Code Review
1.1 Core Logic Analysis
-
Exact change:
readyToClosepersist/restore:seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:431-580- reset vs terminal cleanup split:
.../CheckpointCoordinator.java:960-1005 - final JobMaster cleanup:
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:659-669 - UT coverage:
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java:406-666 - E2E failover coverage:
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointCoordinatorFailoverIT.java:67-353
-
Runtime chain:
Bounded source enters shutdown
-> CheckpointCoordinator.readyToClose(taskLocation)
-> readyToCloseStartingTask.add(taskLocation)
-> updateReadyToCloseStartingTask() // persist current partial/full set
-> if all starting subtasks are ready
-> trigger COMPLETED_POINT_TYPE
Master failover
-> restoreCoordinator(true)
-> loadReadyToCloseStartingTask()
-> cleanPendingCheckpoint(CHECKPOINT_COORDINATOR_RESET) // keep persisted key
-> restore readyToCloseStartingTask
-> notifyCompleted(latestCompletedCheckpoint)
-> all restored starting subtasks ready
-> COMPLETED_POINT_TYPE
-> else
-> normal CHECKPOINT_TYPE
Final cleanup
-> cleanPendingCheckpoint(non-reset)
-> remove(readyToCloseImapKey)
-> JobMaster.removeJobIMap()
-> remove(checkpointStateImapKey)
-> remove(readyToCloseImapKey)
- Key findings:
readyToClose()now persists partial progress on every transition, not only on the terminal "all sources ready" state.restoreCoordinator(true)now makes the checkpoint-vs-completed-point decision from restored durable state, which is the missing piece Daniel had previously blocked on.- The latest cleanup split preserves the key across coordinator reset but removes it once the job/pipeline truly ends.
- The new UTs directly cover partial restore, full restore, idempotent restore, and fallback behavior, which makes this much more convincing than the earlier timing-only E2E.
1.2 Compatibility Impact
- Fully compatible.
- No public API, config option, default value, protocol field, or serialization format changes.
1.3 Performance / Side Effects
- The extra IMap write on each
readyToClosetransition is small and justified by the durability gain. - I do not see a new concurrency, retry, idempotency, or cleanup regression in the latest head.
1.4 Error Handling and Logs
- No formal blocking issue found in the latest head.
- Additional note:
- The current
Buildcheck is still queued on this revision. That is something to wait for before merge, but it is not a code-level blocker from my side.
- The current
2. Code Quality Assessment
2.1 Code Style
- The lifecycle comments and state-handling structure are clearer now.
2.2 Test Coverage
- The UT additions finally exercise the exact state contract that was previously missing.
- I do not see a blocking coverage gap in the latest head.
2.3 Documentation
- No user-facing documentation update is required for this internal engine fix.
3. Architecture
3.1 Elegance of the Solution
- This is a precise fix, not a workaround.
3.2 Maintainability
- Making the
readyToClosestate durable and explicit should make future failover debugging easier.
3.3 Extensibility
- This sets a good pattern for future coordinator failover fixes: if an in-memory state must survive master switch, make that durability requirement explicit.
3.4 Historical Compatibility
- Compatible with historical behavior and upgrade paths.
4. Issue Summary
- No formal blocking issue found in the latest head.
5. Merge Conclusion
Conclusion: can merge
- Blocking items
- None from the code review side.
- Suggested non-blocking follow-ups
- Let the current
Buildcheck finish green on the latest head before the actual merge.
Overall assessment:
- The latest head now does close the real partial
readyToClosefailover window Daniel had flagged earlier. - The runtime chain, recovery decision, and cleanup lifecycle are coherent in the current implementation.
Purpose of this pull request
Close #10834
When a BATCH job's source tasks finish reading and enter the shutdown phase
(
PREPARE_CLOSE), each source sendsLastCheckpointNotifyOperationto theCheckpointCoordinator, which adds the task location to the in-memory setreadyToCloseStartingTask. Once all sources have reported, the coordinatortriggers a
COMPLETED_POINT_TYPEcheckpoint to finalize the job.If a master failover occurs after sources have reported but before the final
checkpoint completes, the new coordinator starts with an empty
readyToCloseStartingTask. Since sources will not re-send the notification,the coordinator waits forever and the job is stuck.
Root cause:
readyToCloseStartingTaskwas purely in-memory and notpersisted to the distributed IMap, so it was lost on failover.
How was this patch tested?
Fix (
CheckpointCoordinator):When all source tasks have reported ready-to-close, persist the complete set
to IMap under key
checkpoint_state_{jobId}_{pipelineId}_ready_to_close.The write happens only when the set is full to avoid stale overwrites from
concurrent
readyToClosecalls.In
restoreCoordinator(), read the IMap entry beforecleanPendingCheckpointclears in-memory state, then refillreadyToCloseStartingTaskand immediately triggerCOMPLETED_POINT_TYPEif all tasks had already reported.
IMap entry is removed only on normal job end (completed / failed /
cancelled). During a coordinator reset (failover), the entry is preserved.
Both write and read failures throw
RuntimeExceptionto fail the job fastrather than silently continuing with unrecoverable state.
Tests (
CheckpointCoordinatorFailoverIT):testBatchJobCompletesAfterMasterFailover: submits a BATCH job with 5 independentFakeSourceactions (parallelism=5, 200 rows each → 5000 rows total). Once ~1/4 ofthe rows are written — i.e. the job is mid-flight, with some sources already having
signalled
readyToClose(and persistedreadyToCloseStartingTaskto IMap) whileothers are still reading — the test shuts down the master to trigger failover and
asserts the job recovers and reaches
FINISHEDwith the full row count.testStreamJobContinuesAfterMasterFailover: submits an UNBOUNDED STREAMING job;after enough rows are written (≥
rowNum * parallelism / 4) the master is shutdown. Asserts (1) the job stays
RUNNINGon the new master, and (2) the checkpointid in
IMAP_CHECKPOINT_IDstrictly grows over a window larger thancheckpoint.interval, proving the new coordinator continues to trigger checkpoints.The job is explicitly cancelled at the end (UNBOUNDED never finishes naturally).
Does this PR introduce any user-facing change?
No.
Check list
incompatible-changes.md