[Fix][Zeta] Make deployTask idempotent during master failover recovery#10567
Conversation
Issue 1: TOCTOU Race Condition (Theoretical Risk)Location: synchronized (this) {
if (executionContexts.containsKey(taskGroup.getTaskGroupLocation())) {
logger.warning(...);
return TaskDeployState.success();
}
deployLocalTask(taskGroup, classLoaders, taskJars);
return TaskDeployState.success();
}Related Context:
Problem Description: There is theoretically a TOCTOU (Time-of-Check-Time-of-Use) race condition: Potential Risks:
Impact Scope:
Severity: MINOR Rationale:
Improvement Suggestions: No code changes needed, but comments are recommended: synchronized (this) {
// Note: If the task completes between the containsKey check and deployLocalTask,
// a new executionContext will be created. This is acceptable because:
// 1. The old context is moved to finishedExecutionContexts atomically by taskDone()
// 2. Resources are cleaned up correctly (cancellationFutures, ClassLoaders)
// 3. The new deployment proceeds normally
// 4. This window is extremely small due to synchronized protection
if (executionContexts.containsKey(taskGroup.getTaskGroupLocation())) {
logger.warning(
String.format(
"TaskGroupLocation %s already exists and is active, "
+ "skipping redeploy for master failover recovery",
taskGroup.getTaskGroupLocation()));
return TaskDeployState.success();
}
deployLocalTask(taskGroup, classLoaders, taskJars);
return TaskDeployState.success();
}Issue 2: Risk of Silent Failure in Error ScenariosLocation: if (executionContexts.containsKey(taskGroup.getTaskGroupLocation())) {
logger.warning(...);
return TaskDeployState.success();
}Related Context:
Problem Description: If
Potential Risks:
Impact Scope:
Severity: MINOR Rationale:
Improvement Suggestions:
if (executionContexts.containsKey(taskGroup.getTaskGroupLocation())) {
TaskGroupContext existingContext = executionContexts.get(taskGroup.getTaskGroupLocation());
// Optional: Verify task is still healthy by checking its last activity time
// This requires adding a lastActivityTime field to TaskGroupContext
logger.warning(
String.format(
"TaskGroupLocation %s already exists and is active, "
+ "skipping redeploy for master failover recovery",
taskGroup.getTaskGroupLocation()));
return TaskDeployState.success();
}
if (ExecutionState.RUNNING.equals(currExecutionState)) {
if (!checkTaskGroupIsExecuting(taskGroupLocation)) {
updateTaskState(ExecutionState.FAILING);
}
}Additional timeout checks are recommended: // After some timeout, if still RUNNING but no progress, mark as failed
Explain in JavaDoc: /**
* ...
* <p><b>Note:</b> If a task is in executionContexts but has become unresponsive
* (zombie state), this method will still return success. The master relies on
* other mechanisms (heartbeats, timeouts, health checks) to detect and handle
* such scenarios.
*/Issue 3: Missing Test CasesLocation: Related Context:
Problem Description: The current test suite does not cover idempotent deployment behavior under master failover scenarios. Missing test scenarios:
Potential Risks:
Impact Scope:
Severity: MAJOR Rationale:
Improvement Suggestions: Add the following test cases to @Test
public void testRedeployDuringMasterFailover() throws InterruptedException {
TaskExecutionService taskExecutionService = server.getTaskExecutionService();
AtomicBoolean stop = new AtomicBoolean(false);
long sleepTime = 5000; // Long enough to simulate a running task
TestTask testTask = new TestTask(stop, sleepTime, true);
TaskGroupLocation location = new TaskGroupLocation(jobId, pipeLineId, FLAKE_ID_GENERATOR.newId());
TaskGroupDefaultImpl taskGroup = new TaskGroupDefaultImpl(
location, "ts", Lists.newArrayList(testTask));
// Create TaskGroupImmutableInformation
List<Data> tasksData = Arrays.asList(
nodeEngine.getSerializationService().toData(testTask));
TaskGroupImmutableInformation taskInfo = new TaskGroupImmutableInformation(
jobId,
1,
TaskGroupType.INTERMEDIATE_BLOCKING_QUEUE,
location,
"testRedeployDuringMasterFailover",
tasksData,
Arrays.asList(Collections.emptySet()),
Arrays.asList(emptySet()));
Data data = nodeEngine.getSerializationService().toData(taskInfo);
// 1. First deployment
TaskDeployState state1 = taskExecutionService.deployTask(data);
assertTrue(state1.isSuccess());
assertNotNull(taskExecutionService.getActiveExecutionContext(location));
// 2. Simulate master failover: redeploy the same task
TaskDeployState state2 = taskExecutionService.deployTask(data);
assertTrue(state2.isSuccess(), "Second deployment should return success");
// 3. Verify task is still in executionContexts (not moved to finished)
assertNotNull(taskExecutionService.getActiveExecutionContext(location));
// 4. Let the task complete
stop.set(true);
await().atMost(sleepTime + 5000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
TaskGroupContext ctx = taskExecutionService.getExecutionContext(location);
// Task should be in finishedExecutionContexts after completion
assertNotNull(ctx);
});
}
@Test
public void testRedeployAfterTaskCompletion() {
TaskExecutionService taskExecutionService = server.getTaskExecutionService();
AtomicBoolean stop = new AtomicBoolean(false);
long sleepTime = 100;
TestTask testTask = new TestTask(stop, sleepTime, true);
TaskGroupLocation location = new TaskGroupLocation(jobId, pipeLineId, FLAKE_ID_GENERATOR.newId());
// Deploy and wait for completion
CompletableFuture<TaskExecutionState> future = deployLocalTask(
taskExecutionService,
new TaskGroupDefaultImpl(location, "ts", Lists.newArrayList(testTask)));
stop.set(true);
await().atMost(sleepTime + 5000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
assertEquals(FINISHED, future.get().getExecutionState());
});
// Verify task is in finishedExecutionContexts
assertNotNull(taskExecutionService.getExecutionContext(location));
try {
taskExecutionService.getActiveExecutionContext(location);
fail("Task should not be in executionContexts after completion");
} catch (TaskGroupContextNotFoundException e) {
// Expected
}
// Create new TaskGroupImmutableInformation for redeployment
List<Data> tasksData = Arrays.asList(
nodeEngine.getSerializationService().toData(testTask));
TaskGroupImmutableInformation taskInfo = new TaskGroupImmutableInformation(
jobId,
1,
TaskGroupType.INTERMEDIATE_BLOCKING_QUEUE,
location,
"testRedeployAfterTaskCompletion",
tasksData,
Arrays.asList(Collections.emptySet()),
Arrays.asList(emptySet()));
Data data = nodeEngine.getSerializationService().toData(taskInfo);
// Redeploy the completed task
TaskDeployState state = taskExecutionService.deployTask(data);
assertTrue(state.isSuccess(), "Redeploy of completed task should succeed");
// Verify new executionContext is created
assertNotNull(taskExecutionService.getActiveExecutionContext(location));
// Cleanup
taskExecutionService.cancelTaskGroup(location);
}Issue 4: Missing JavaDoc DocumentationLocation: public TaskDeployState deployTask(@NonNull TaskGroupImmutableInformation taskImmutableInfo)Related Context:
Problem Description: The
Potential Risks:
Impact Scope:
Severity: MINOR Rationale:
Improvement Suggestions: Add complete JavaDoc: /**
* Deploys a task group to this worker node.
*
* <p><b>Idempotency:</b> This operation is idempotent with respect to the task group location.
* If a task group with the same {@link TaskGroupLocation} is already actively running in
* {@code executionContexts}, this method returns {@link TaskDeployState#success()} without
* redeploying. This behavior supports master failover recovery where the new master may
* attempt to redeploy tasks that never stopped on the worker.
*
* <p><b>Master Failover Scenario:</b> During master rolling restart or failover:
* <ul>
* <li>Workers keep tasks running in {@code executionContexts}</li>
* <li>The new master restores state and attempts to redeploy tasks</li>
* <li>This method detects the duplicate and returns success</li>
* <li>The master reconnects without interrupting the running task</li>
* <li>When the task completes, the worker notifies the master via
* {@link NotifyTaskStatusOperation}</li>
* </ul>
*
* <p><b>Legitimate Redeployment:</b> Completed tasks are moved to {@code finishedExecutionContexts}
* by {@link TaskGroupExecutionTracker#taskDone(Task)}, so {@code executionContexts.containsKey()}
* returns false and normal deploy proceeds.
*
* <p><b>Thread Safety:</b> This method is synchronized on {@code this} to ensure atomic
* check-and-deploy semantics.
*
* @param taskImmutableInformation the immutable information of task group to deploy
* @return {@link TaskDeployState#success()} if deployment succeeded or task already running;
* {@link TaskDeployState#failed(Throwable)} if deployment failed
* @throws RuntimeException if deployment fails (caught and converted to failed state)
*/
public TaskDeployState deployTask(@NonNull TaskGroupImmutableInformation taskImmutableInfo)Issue 5: Incomplete Log Tracing InformationLocation: logger.warning(
String.format(
"TaskGroupLocation %s already exists and is active, "
+ "skipping redeploy for master failover recovery",
taskGroup.getTaskGroupLocation()));Related Context:
Problem Description: Current logs lack the following tracing information:
Potential Risks:
Impact Scope:
Severity: MINOR Rationale:
Improvement Suggestions: Enhance log content: logger.warning(
String.format(
"TaskGroupLocation %s already exists and is active, "
+ "skipping redeploy for master failover recovery. "
+ "ExecutionId: %s, JobId: %d, PipelineId: %d",
taskGroup.getTaskGroupLocation(),
taskImmutableInfo.getExecutionId(),
taskGroup.getTaskGroupLocation().getJobId(),
taskGroup.getTaskGroupLocation().getPipelineId()));Or use structured logging (if supported): logger.warning(
String.format(
"TaskGroupLocation %s already exists and is active, skipping redeploy. " +
"reason=master_failover_recovery, executionId=%s, jobId=%d, pipelineId=%d",
taskGroup.getTaskGroupLocation(),
taskImmutableInfo.getExecutionId(),
taskGroup.getTaskGroupLocation().getJobId(),
taskGroup.getTaskGroupLocation().getPipelineId())); |
|
@dybyte can you help with this?? |
dybyte
left a comment
There was a problem hiding this comment.
Could you please add test cases for this change?
|
@dybyte added the test cases for the change. Please review and give suggestions |
6cea96c to
2b908cc
Compare
...ine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
Outdated
Show resolved
Hide resolved
dybyte
left a comment
There was a problem hiding this comment.
LGTM if CI passes. Thanks @ricky2129 !
920cf70 to
7813f79
Compare
During master rolling restart, workers keep tasks running in executionContexts. The new master restores state and tries to re-deploy tasks that never stopped, causing TaskGroupLocation already exists error, deploy failure, infinite restore loop, and jobs showing FAILED in UI while still writing to sink. Fix: when deployTask finds a TaskGroupLocation already in executionContexts (task actively running), return success instead of throwing. The worker will notify the master of the terminal state via NotifyTaskStatusOperation when the task eventually completes, maintaining correct lifecycle tracking. Legitimate redeploy is unaffected: completed tasks are moved to finishedExecutionContexts by taskDone(), so executionContexts.containsKey() returns false and normal deploy proceeds.
…ocess During master failover, IMap may still hold DEPLOYING state if the master crashed after the worker finished deploying but before the master processed NotifyTaskStatusOperation and updated IMap to RUNNING. restoreExecutionState() previously called stateProcess() with whatever currExecutionState was set at construction time, causing stateProcess() to enter the DEPLOYING branch and send a redundant DeployTaskOperation to the worker — which already has the task running in executionContexts. Fix: re-read currExecutionState from IMap in restoreExecutionState(), then check whether the task is actually executing on the worker. If DEPLOYING but the task is confirmed running, advance state to RUNNING before calling stateProcess() so no redundant deploy is sent. This is the master-side complement to the worker-side idempotent deployTask() fix. Together they make master failover recovery fully robust: - Master side: avoids sending the redundant DeployTaskOperation - Worker side: handles it gracefully if one is sent anyway
…nState fix - testDeployTaskIdempotentWhenAlreadyRunning: verifies that calling deployTask() twice for the same active TaskGroupLocation returns TaskDeployState.success() on both calls and does not interrupt the running task (before the fix the second call threw RuntimeException) - testRestoreExecutionStateAdvancesDeployingToRunningWhenTaskIsExecuting: verifies that restoreExecutionState() advances IMap state from DEPLOYING to RUNNING when CheckTaskGroupIsExecutingOperation confirms the task is already executing on the worker, so stateProcess() does not send a redundant DeployTaskOperation
…-side idempotency only Worker-side deployTask() idempotency is sufficient for master failover recovery. The master-side checkTaskGroupIsExecuting() pre-check required an extra network round-trip without meaningful benefit, so it is removed. The test for it is also removed; testDeployTaskIdempotentWhenAlreadyRunning covers the remaining fix.
7813f79 to
a6ca331
Compare
...nel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
Show resolved
Hide resolved
Co-authored-by: David Zollo <davidzollo365@gmail.com>
…Service GitHub UI applied reviewer suggestion with 4-space continuation indent instead of the required 8-space continuation indent for method call args. Spotless requires 8-space (double-indent) for wrapped method parameters.
|
@davidzollo added the changes. |
During master rolling restart, workers keep tasks running in executionContexts. The new master restores state and tries to re-deploy tasks that never stopped, causing TaskGroupLocation already exists error, deploy failure, infinite restore loop, and jobs showing FAILED in UI while still writing to sink.
Fix: when deployTask finds a TaskGroupLocation already in executionContexts (task actively running), return success instead of throwing. The worker will notify the master of the terminal state via NotifyTaskStatusOperation when the task eventually completes, maintaining correct lifecycle tracking.
Legitimate redeploy is unaffected: completed tasks are moved to finishedExecutionContexts by taskDone(), so executionContexts.containsKey() returns false and normal deploy proceeds.
Purpose of this pull request
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.