From 02b5731f8af9996744426f9c2c3feb1a222d55d0 Mon Sep 17 00:00:00 2001 From: Gabriella Fu <2234862823@qq.com> Date: Wed, 13 May 2026 13:51:45 -0400 Subject: [PATCH 1/4] add a new test and process the exception --- .../processor/internals/TaskManager.java | 25 +++++++++++++------ .../processor/internals/TaskManagerTest.java | 24 ++++++++++++++++++ 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index e8a3442ec1324..947102aa3ecd9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1057,18 +1057,27 @@ void handleRevocation(final Collection revokedPartitions) { "have been cleaned up by the handleAssignment callback.", remainingRevokedPartitions); } + // even if prepare or commit failed, we should still continue and complete suspending those tasks, + // so we would capture any exception and rethrow it at the end. some exceptions may be handled + // immediately and then swallowed, as such we just need to skip those dirty tasks in the checkpoint + final Set dirtyTasks = new TreeSet<>(Comparator.comparing(Task::id)); + boolean prepareCommitSucceeded = false; if (revokedTasksNeedCommit) { - prepareCommitAndAddOffsetsToMap(revokedActiveTasks, consumedOffsetsPerTask); - // if we need to commit any revoking task then we just commit all of those needed committing together - prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); + try { + prepareCommitAndAddOffsetsToMap(revokedActiveTasks, consumedOffsetsPerTask); + // if we need to commit any revoking task then we just commit all of those needed committing together + prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); + prepareCommitSucceeded = true; + } catch (final RuntimeException e) { + log.error("Exception caught while preparing to commit revoked tasks " + revokedActiveTasks, e); + firstException.compareAndSet(null, e); + dirtyTasks.addAll(revokedActiveTasks); + dirtyTasks.addAll(commitNeededActiveTasks); + } } - // even if commit failed, we should still continue and complete suspending those tasks, so we would capture - // any exception and rethrow it at the end. some exceptions may be handled immediately and then swallowed, - // as such we just need to skip those dirty tasks in the checkpoint - final Set dirtyTasks = new TreeSet<>(Comparator.comparing(Task::id)); try { - if (revokedTasksNeedCommit) { + if (revokedTasksNeedCommit && prepareCommitSucceeded) { // in handleRevocation we must call commitOffsetsOrTransaction() directly rather than // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make sure we don't skip the // offset commit because we are in a rebalance diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index baee5286cae1a..a6aa06d4f173f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -2989,6 +2989,30 @@ public void shouldSuspendActiveTasksDuringRevocation() { verify(task00).suspend(); } + @Test + public void shouldSuspendRevokedTasksWhenPrepareCommitThrows() { + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); + + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allInitializedTasks()).thenReturn(Set.of(task00)); + + when(task00.commitNeeded()).thenReturn(true); + when(task00.prepareCommit(true)).thenThrow(new TaskMigratedException("task migrated")); + + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); + + final StreamsException thrown = assertThrows(StreamsException.class, + () -> taskManager.handleRevocation(taskId00Partitions)); + + assertInstanceOf(TaskMigratedException.class, thrown); + + verify(task00).suspend(); + verify(task00, never()).postCommit(anyBoolean()); + } + @Test public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEosV2() { // task being revoked, needs commit From ad6bc7a839ebf9d5744ab0ecb778753a969409d6 Mon Sep 17 00:00:00 2001 From: Gabriella Fu <2234862823@qq.com> Date: Wed, 13 May 2026 18:12:50 -0400 Subject: [PATCH 2/4] no matter what before suspend throws exception, we still do suspend to avoid leave the task at running --- .../processor/internals/TaskManager.java | 131 +++++++++--------- 1 file changed, 66 insertions(+), 65 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 947102aa3ecd9..45555a39a1520 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1057,90 +1057,91 @@ void handleRevocation(final Collection revokedPartitions) { "have been cleaned up by the handleAssignment callback.", remainingRevokedPartitions); } - // even if prepare or commit failed, we should still continue and complete suspending those tasks, - // so we would capture any exception and rethrow it at the end. some exceptions may be handled - // immediately and then swallowed, as such we just need to skip those dirty tasks in the checkpoint + // even if prepare, commit, or postCommit failed, we must still suspend revoked tasks and unlock, + // so we use try-finally to guarantee that. Exceptions are captured and rethrown at the end. final Set dirtyTasks = new TreeSet<>(Comparator.comparing(Task::id)); boolean prepareCommitSucceeded = false; - if (revokedTasksNeedCommit) { - try { - prepareCommitAndAddOffsetsToMap(revokedActiveTasks, consumedOffsetsPerTask); - // if we need to commit any revoking task then we just commit all of those needed committing together - prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); - prepareCommitSucceeded = true; - } catch (final RuntimeException e) { - log.error("Exception caught while preparing to commit revoked tasks " + revokedActiveTasks, e); - firstException.compareAndSet(null, e); - dirtyTasks.addAll(revokedActiveTasks); - dirtyTasks.addAll(commitNeededActiveTasks); - } - } - try { - if (revokedTasksNeedCommit && prepareCommitSucceeded) { - // in handleRevocation we must call commitOffsetsOrTransaction() directly rather than - // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make sure we don't skip the - // offset commit because we are in a rebalance - taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask); - } - } catch (final TaskCorruptedException e) { - log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", - e.corruptedTasks()); - - // If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here - dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks())); - closeDirtyAndRevive(dirtyTasks, true); - } catch (final TimeoutException e) { - log.warn("Timed out while trying to commit all tasks during revocation, these will be cleaned and revived"); - - // If we hit a TimeoutException it must be ALOS, just close dirty and revive without wiping the state - dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); - closeDirtyAndRevive(dirtyTasks, false); - } catch (final RuntimeException e) { - log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e); - firstException.compareAndSet(null, e); - dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); - } - - // we enforce checkpointing upon suspending a task: if it is resumed later we just proceed normally, if it is - // going to be closed we would checkpoint by then - for (final Task task : revokedActiveTasks) { - if (!dirtyTasks.contains(task)) { + if (revokedTasksNeedCommit) { try { - task.postCommit(true); + prepareCommitAndAddOffsetsToMap(revokedActiveTasks, consumedOffsetsPerTask); + // if we need to commit any revoking task then we just commit all of those needed committing together + prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); + prepareCommitSucceeded = true; } catch (final RuntimeException e) { - log.error("Exception caught while post-committing task " + task.id(), e); - maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException); + log.error("Exception caught while preparing to commit revoked tasks " + revokedActiveTasks, e); + firstException.compareAndSet(null, e); + dirtyTasks.addAll(revokedActiveTasks); + dirtyTasks.addAll(commitNeededActiveTasks); } } - } - if (revokedTasksNeedCommit) { - for (final Task task : commitNeededActiveTasks) { + try { + if (revokedTasksNeedCommit && prepareCommitSucceeded) { + // in handleRevocation we must call commitOffsetsOrTransaction() directly rather than + // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make sure we don't skip the + // offset commit because we are in a rebalance + taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask); + } + } catch (final TaskCorruptedException e) { + log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", + e.corruptedTasks()); + + // If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here + dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks())); + closeDirtyAndRevive(dirtyTasks, true); + } catch (final TimeoutException e) { + log.warn("Timed out while trying to commit all tasks during revocation, these will be cleaned and revived"); + + // If we hit a TimeoutException it must be ALOS, just close dirty and revive without wiping the state + dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); + closeDirtyAndRevive(dirtyTasks, false); + } catch (final RuntimeException e) { + log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e); + firstException.compareAndSet(null, e); + dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); + } + + // we enforce checkpointing upon suspending a task: if it is resumed later we just proceed normally, if it is + // going to be closed we would checkpoint by then + for (final Task task : revokedActiveTasks) { if (!dirtyTasks.contains(task)) { try { - // for non-revoking active tasks, we should not enforce checkpoint - // since if it is EOS enabled, no checkpoint should be written while - // the task is in RUNNING tate - task.postCommit(false); + task.postCommit(true); } catch (final RuntimeException e) { log.error("Exception caught while post-committing task " + task.id(), e); maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException); } } } - } - for (final Task task : revokedActiveTasks) { - try { - task.suspend(); - } catch (final RuntimeException e) { - log.error("Caught the following exception while trying to suspend revoked task " + task.id(), e); - maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException); + if (revokedTasksNeedCommit) { + for (final Task task : commitNeededActiveTasks) { + if (!dirtyTasks.contains(task)) { + try { + // for non-revoking active tasks, we should not enforce checkpoint + // since if it is EOS enabled, no checkpoint should be written while + // the task is in RUNNING tate + task.postCommit(false); + } catch (final RuntimeException e) { + log.error("Exception caught while post-committing task " + task.id(), e); + maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException); + } + } + } + } + } finally { + for (final Task task : revokedActiveTasks) { + try { + task.suspend(); + } catch (final RuntimeException e) { + log.error("Caught the following exception while trying to suspend revoked task " + task.id(), e); + maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException); + } } - } - maybeUnlockTasks(lockedTaskIds); + maybeUnlockTasks(lockedTaskIds); + } if (firstException.get() != null) { throw firstException.get(); From 9b53b5e1c0a1e7c03dfff797a1d59df760ace817 Mon Sep 17 00:00:00 2001 From: Gabriella Fu <2234862823@qq.com> Date: Wed, 13 May 2026 18:18:18 -0400 Subject: [PATCH 3/4] exception handling: --- .../processor/internals/TaskManager.java | 18 +++++++------- .../processor/internals/TaskManagerTest.java | 24 +++++++++++++++++++ 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 45555a39a1520..cbd7ec2e3608e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1070,7 +1070,7 @@ void handleRevocation(final Collection revokedPartitions) { prepareCommitSucceeded = true; } catch (final RuntimeException e) { log.error("Exception caught while preparing to commit revoked tasks " + revokedActiveTasks, e); - firstException.compareAndSet(null, e); + maybeSetFirstException(false, e, firstException); dirtyTasks.addAll(revokedActiveTasks); dirtyTasks.addAll(commitNeededActiveTasks); } @@ -1098,7 +1098,7 @@ void handleRevocation(final Collection revokedPartitions) { closeDirtyAndRevive(dirtyTasks, false); } catch (final RuntimeException e) { log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e); - firstException.compareAndSet(null, e); + maybeSetFirstException(false, e, firstException); dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); } @@ -1394,14 +1394,14 @@ void shutdown(final boolean clean) { executeAndMaybeSwallow( clean, () -> closeAndCleanUpTasks(activeTasks, standbyTasks, clean), - e -> firstException.compareAndSet(null, e), + e -> maybeSetFirstException(false, e, firstException), e -> log.warn("Ignoring an exception while unlocking remaining task directories.", e) ); executeAndMaybeSwallow( clean, activeTaskCreator::close, - e -> firstException.compareAndSet(null, e), + e -> maybeSetFirstException(false, e, firstException), e -> log.warn("Ignoring an exception while closing thread producer.", e) ); @@ -1412,7 +1412,7 @@ void shutdown(final boolean clean) { executeAndMaybeSwallow( clean, this::releaseLockedUnassignedTaskDirectories, - e -> firstException.compareAndSet(null, e), + e -> maybeSetFirstException(false, e, firstException), e -> log.warn("Ignoring an exception while unlocking remaining task directories.", e) ); @@ -1521,10 +1521,10 @@ private Collection tryCloseCleanActiveTasks(final Collection firstException) { if (!ignoreTaskMigrated || !(exception instanceof TaskMigratedException)) { - firstException.compareAndSet(null, exception); + if (!firstException.compareAndSet(null, exception)) { + firstException.get().addSuppressed(exception); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index a6aa06d4f173f..fe728588e7625 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -3013,6 +3013,30 @@ public void shouldSuspendRevokedTasksWhenPrepareCommitThrows() { verify(task00, never()).postCommit(anyBoolean()); } + @Test + public void shouldAttachSuppressedExceptionsWhenMultiplePhasesFailDuringRevocation() { + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); + + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allInitializedTasks()).thenReturn(Set.of(task00)); + + when(task00.commitNeeded()).thenReturn(true); + when(task00.prepareCommit(true)).thenThrow(new TaskMigratedException("task migrated")); + doThrow(new RuntimeException("suspend failed")).when(task00).suspend(); + + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); + + final StreamsException thrown = assertThrows(StreamsException.class, + () -> taskManager.handleRevocation(taskId00Partitions)); + + assertInstanceOf(TaskMigratedException.class, thrown); + assertEquals(1, thrown.getSuppressed().length); + assertInstanceOf(StreamsException.class, thrown.getSuppressed()[0]); + } + @Test public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEosV2() { // task being revoked, needs commit From e77f84c15fb8e46baa92cca5dfdb2e9dc6f03ff1 Mon Sep 17 00:00:00 2001 From: Gabriella Fu <2234862823@qq.com> Date: Fri, 15 May 2026 12:01:27 -0400 Subject: [PATCH 4/4] error message modification --- .../processor/internals/TaskManager.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index cbd7ec2e3608e..80f0f6e45f475 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -429,8 +429,8 @@ private void maybeThrowTaskExceptions(final Map taskEx if (exception instanceof TaskMigratedException) { lastTaskMigrated = (TaskMigratedException) exception; } else if (exception instanceof TaskCorruptedException) { - log.warn("Encounter corrupted task " + taskId + ", will group it with other corrupted tasks " + - "and handle together", exception); + log.warn("Encounter corrupted task {}, will group it with other corrupted tasks " + + "and handle together", taskId, exception); aggregatedCorruptedTaskIds.add(taskId); } else { ((StreamsException) exception).setTaskId(taskId); @@ -1069,7 +1069,7 @@ void handleRevocation(final Collection revokedPartitions) { prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); prepareCommitSucceeded = true; } catch (final RuntimeException e) { - log.error("Exception caught while preparing to commit revoked tasks " + revokedActiveTasks, e); + log.error("Exception caught while preparing to commit revoked tasks {}", revokedActiveTasks, e); maybeSetFirstException(false, e, firstException); dirtyTasks.addAll(revokedActiveTasks); dirtyTasks.addAll(commitNeededActiveTasks); @@ -1097,7 +1097,7 @@ void handleRevocation(final Collection revokedPartitions) { dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); closeDirtyAndRevive(dirtyTasks, false); } catch (final RuntimeException e) { - log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e); + log.error("Exception caught while committing those revoked tasks {}", revokedActiveTasks, e); maybeSetFirstException(false, e, firstException); dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); } @@ -1109,7 +1109,7 @@ void handleRevocation(final Collection revokedPartitions) { try { task.postCommit(true); } catch (final RuntimeException e) { - log.error("Exception caught while post-committing task " + task.id(), e); + log.error("Exception caught while post-committing task {}", task.id(), e); maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException); } } @@ -1124,7 +1124,7 @@ void handleRevocation(final Collection revokedPartitions) { // the task is in RUNNING tate task.postCommit(false); } catch (final RuntimeException e) { - log.error("Exception caught while post-committing task " + task.id(), e); + log.error("Exception caught while post-committing task {}", task.id(), e); maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException); } } @@ -1135,7 +1135,7 @@ void handleRevocation(final Collection revokedPartitions) { try { task.suspend(); } catch (final RuntimeException e) { - log.error("Caught the following exception while trying to suspend revoked task " + task.id(), e); + log.error("Caught the following exception while trying to suspend revoked task {}", task.id(), e); maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException); } } @@ -1537,7 +1537,7 @@ private Collection tryCloseCleanActiveTasks(final Collection tryCloseCleanActiveTasks(final Collection