diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index e8a3442ec1324..80f0f6e45f475 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -429,8 +429,8 @@ private void maybeThrowTaskExceptions(final Map taskEx if (exception instanceof TaskMigratedException) { lastTaskMigrated = (TaskMigratedException) exception; } else if (exception instanceof TaskCorruptedException) { - log.warn("Encounter corrupted task " + taskId + ", will group it with other corrupted tasks " + - "and handle together", exception); + log.warn("Encounter corrupted task {}, will group it with other corrupted tasks " + + "and handle together", taskId, exception); aggregatedCorruptedTaskIds.add(taskId); } else { ((StreamsException) exception).setTaskId(taskId); @@ -1057,81 +1057,91 @@ void handleRevocation(final Collection revokedPartitions) { "have been cleaned up by the handleAssignment callback.", remainingRevokedPartitions); } - if (revokedTasksNeedCommit) { - prepareCommitAndAddOffsetsToMap(revokedActiveTasks, consumedOffsetsPerTask); - // if we need to commit any revoking task then we just commit all of those needed committing together - prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); - } - - // even if commit failed, we should still continue and complete suspending those tasks, so we would capture - // any exception and rethrow it at the end. some exceptions may be handled immediately and then swallowed, - // as such we just need to skip those dirty tasks in the checkpoint + // even if prepare, commit, or postCommit failed, we must still suspend revoked tasks and unlock, + // so we use try-finally to guarantee that. Exceptions are captured and rethrown at the end. final Set dirtyTasks = new TreeSet<>(Comparator.comparing(Task::id)); + boolean prepareCommitSucceeded = false; try { if (revokedTasksNeedCommit) { - // in handleRevocation we must call commitOffsetsOrTransaction() directly rather than - // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make sure we don't skip the - // offset commit because we are in a rebalance - taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask); - } - } catch (final TaskCorruptedException e) { - log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", - e.corruptedTasks()); - - // If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here - dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks())); - closeDirtyAndRevive(dirtyTasks, true); - } catch (final TimeoutException e) { - log.warn("Timed out while trying to commit all tasks during revocation, these will be cleaned and revived"); - - // If we hit a TimeoutException it must be ALOS, just close dirty and revive without wiping the state - dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); - closeDirtyAndRevive(dirtyTasks, false); - } catch (final RuntimeException e) { - log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e); - firstException.compareAndSet(null, e); - dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); - } - - // we enforce checkpointing upon suspending a task: if it is resumed later we just proceed normally, if it is - // going to be closed we would checkpoint by then - for (final Task task : revokedActiveTasks) { - if (!dirtyTasks.contains(task)) { try { - task.postCommit(true); + prepareCommitAndAddOffsetsToMap(revokedActiveTasks, consumedOffsetsPerTask); + // if we need to commit any revoking task then we just commit all of those needed committing together + prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); + prepareCommitSucceeded = true; } catch (final RuntimeException e) { - log.error("Exception caught while post-committing task " + task.id(), e); - maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException); + log.error("Exception caught while preparing to commit revoked tasks {}", revokedActiveTasks, e); + maybeSetFirstException(false, e, firstException); + dirtyTasks.addAll(revokedActiveTasks); + dirtyTasks.addAll(commitNeededActiveTasks); } } - } - if (revokedTasksNeedCommit) { - for (final Task task : commitNeededActiveTasks) { + try { + if (revokedTasksNeedCommit && prepareCommitSucceeded) { + // in handleRevocation we must call commitOffsetsOrTransaction() directly rather than + // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make sure we don't skip the + // offset commit because we are in a rebalance + taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask); + } + } catch (final TaskCorruptedException e) { + log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", + e.corruptedTasks()); + + // If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here + dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks())); + closeDirtyAndRevive(dirtyTasks, true); + } catch (final TimeoutException e) { + log.warn("Timed out while trying to commit all tasks during revocation, these will be cleaned and revived"); + + // If we hit a TimeoutException it must be ALOS, just close dirty and revive without wiping the state + dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); + closeDirtyAndRevive(dirtyTasks, false); + } catch (final RuntimeException e) { + log.error("Exception caught while committing those revoked tasks {}", revokedActiveTasks, e); + maybeSetFirstException(false, e, firstException); + dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); + } + + // we enforce checkpointing upon suspending a task: if it is resumed later we just proceed normally, if it is + // going to be closed we would checkpoint by then + for (final Task task : revokedActiveTasks) { if (!dirtyTasks.contains(task)) { try { - // for non-revoking active tasks, we should not enforce checkpoint - // since if it is EOS enabled, no checkpoint should be written while - // the task is in RUNNING tate - task.postCommit(false); + task.postCommit(true); } catch (final RuntimeException e) { - log.error("Exception caught while post-committing task " + task.id(), e); + log.error("Exception caught while post-committing task {}", task.id(), e); maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException); } } } - } - for (final Task task : revokedActiveTasks) { - try { - task.suspend(); - } catch (final RuntimeException e) { - log.error("Caught the following exception while trying to suspend revoked task " + task.id(), e); - maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException); + if (revokedTasksNeedCommit) { + for (final Task task : commitNeededActiveTasks) { + if (!dirtyTasks.contains(task)) { + try { + // for non-revoking active tasks, we should not enforce checkpoint + // since if it is EOS enabled, no checkpoint should be written while + // the task is in RUNNING tate + task.postCommit(false); + } catch (final RuntimeException e) { + log.error("Exception caught while post-committing task {}", task.id(), e); + maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException); + } + } + } + } + } finally { + for (final Task task : revokedActiveTasks) { + try { + task.suspend(); + } catch (final RuntimeException e) { + log.error("Caught the following exception while trying to suspend revoked task {}", task.id(), e); + maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException); + } } - } - maybeUnlockTasks(lockedTaskIds); + maybeUnlockTasks(lockedTaskIds); + } if (firstException.get() != null) { throw firstException.get(); @@ -1384,14 +1394,14 @@ void shutdown(final boolean clean) { executeAndMaybeSwallow( clean, () -> closeAndCleanUpTasks(activeTasks, standbyTasks, clean), - e -> firstException.compareAndSet(null, e), + e -> maybeSetFirstException(false, e, firstException), e -> log.warn("Ignoring an exception while unlocking remaining task directories.", e) ); executeAndMaybeSwallow( clean, activeTaskCreator::close, - e -> firstException.compareAndSet(null, e), + e -> maybeSetFirstException(false, e, firstException), e -> log.warn("Ignoring an exception while closing thread producer.", e) ); @@ -1402,7 +1412,7 @@ void shutdown(final boolean clean) { executeAndMaybeSwallow( clean, this::releaseLockedUnassignedTaskDirectories, - e -> firstException.compareAndSet(null, e), + e -> maybeSetFirstException(false, e, firstException), e -> log.warn("Ignoring an exception while unlocking remaining task directories.", e) ); @@ -1511,10 +1521,10 @@ private Collection tryCloseCleanActiveTasks(final Collection tryCloseCleanActiveTasks(final Collection tryCloseCleanActiveTasks(final Collection firstException) { if (!ignoreTaskMigrated || !(exception instanceof TaskMigratedException)) { - firstException.compareAndSet(null, exception); + if (!firstException.compareAndSet(null, exception)) { + firstException.get().addSuppressed(exception); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index baee5286cae1a..fe728588e7625 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -2989,6 +2989,54 @@ public void shouldSuspendActiveTasksDuringRevocation() { verify(task00).suspend(); } + @Test + public void shouldSuspendRevokedTasksWhenPrepareCommitThrows() { + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); + + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allInitializedTasks()).thenReturn(Set.of(task00)); + + when(task00.commitNeeded()).thenReturn(true); + when(task00.prepareCommit(true)).thenThrow(new TaskMigratedException("task migrated")); + + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); + + final StreamsException thrown = assertThrows(StreamsException.class, + () -> taskManager.handleRevocation(taskId00Partitions)); + + assertInstanceOf(TaskMigratedException.class, thrown); + + verify(task00).suspend(); + verify(task00, never()).postCommit(anyBoolean()); + } + + @Test + public void shouldAttachSuppressedExceptionsWhenMultiplePhasesFailDuringRevocation() { + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); + + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allInitializedTasks()).thenReturn(Set.of(task00)); + + when(task00.commitNeeded()).thenReturn(true); + when(task00.prepareCommit(true)).thenThrow(new TaskMigratedException("task migrated")); + doThrow(new RuntimeException("suspend failed")).when(task00).suspend(); + + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); + + final StreamsException thrown = assertThrows(StreamsException.class, + () -> taskManager.handleRevocation(taskId00Partitions)); + + assertInstanceOf(TaskMigratedException.class, thrown); + assertEquals(1, thrown.getSuppressed().length); + assertInstanceOf(StreamsException.class, thrown.getSuppressed()[0]); + } + @Test public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEosV2() { // task being revoked, needs commit