From 9b4b193aeb5905a61de4228a711703b9b9d149d0 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Fri, 7 Nov 2025 14:27:40 -0800 Subject: [PATCH 1/5] wip-1 --- .../processor/internals/TaskManagerTest.java | 216 ++++++------------ 1 file changed, 74 insertions(+), 142 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 01aa1d265413e..dc0ffd90bd2ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -85,7 +85,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -1928,7 +1927,7 @@ public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() { mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN) )); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01, restoringStatefulTask))); when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask)); @@ -2180,7 +2179,7 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() { @Test public void shouldReInitializeStreamsProducerOnHandleLostAllIfEosV2Enabled() { - final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false); taskManager.handleLostAll(); @@ -3323,94 +3322,31 @@ public void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdownWithExact } private void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(final ProcessingMode processingMode) { - final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(processingMode, null, false); - final TopicPartition changelog = new TopicPartition("changelog", 0); - final Map> assignment = mkMap( - mkEntry(taskId00, taskId00Partitions), - mkEntry(taskId01, taskId01Partitions), - mkEntry(taskId02, taskId02Partitions), - mkEntry(taskId03, taskId03Partitions) - ); - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { - @Override - public Set changelogPartitions() { - return singleton(changelog); - } - }; - final AtomicBoolean closedDirtyTask01 = new AtomicBoolean(false); - final AtomicBoolean closedDirtyTask02 = new AtomicBoolean(false); - final AtomicBoolean closedDirtyTask03 = new AtomicBoolean(false); - final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { - @Override - public void suspend() { - super.suspend(); - throw new TaskMigratedException("migrated", new RuntimeException("cause")); - } - - @Override - public void closeDirty() { - super.closeDirty(); - closedDirtyTask01.set(true); - } - }; - final Task task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager) { - @Override - public void suspend() { - super.suspend(); - throw new RuntimeException("oops"); - } - - @Override - public void closeDirty() { - super.closeDirty(); - closedDirtyTask02.set(true); - } - }; - final Task task03 = new StateMachineTask(taskId03, taskId03Partitions, true, stateManager) { - @Override - public void suspend() { - super.suspend(); - throw new RuntimeException("oops"); - } - - @Override - public void closeDirty() { - super.closeDirty(); - closedDirtyTask03.set(true); - } - }; - - when(activeTaskCreator.createTasks(any(), eq(assignment))) - .thenReturn(asList(task00, task01, task02, task03)); - - taskManager.handleAssignment(assignment, emptyMap()); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId01Partitions).build(); + final StreamTask task02 = statefulTask(taskId02, taskId02ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId02Partitions).build(); + final StreamTask task03 = statefulTask(taskId03, taskId03ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId03Partitions).build(); - assertThat(task00.state(), is(Task.State.CREATED)); - assertThat(task01.state(), is(Task.State.CREATED)); - assertThat(task02.state(), is(Task.State.CREATED)); - assertThat(task03.state(), is(Task.State.CREATED)); + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(processingMode, tasks); - taskManager.tryToCompleteRestoration(time.milliseconds(), null); + doThrow(new TaskMigratedException("migrated", new RuntimeException("cause"))) + .when(task01).suspend(); + doThrow(new RuntimeException("oops")) + .when(task02).suspend(); + doThrow(new RuntimeException("oops")) + .when(task03).suspend(); - assertThat(task00.state(), is(Task.State.RESTORING)); - assertThat(task01.state(), is(Task.State.RUNNING)); - assertThat(task02.state(), is(Task.State.RUNNING)); - assertThat(task03.state(), is(Task.State.RUNNING)); - assertThat( - taskManager.activeTaskMap(), - Matchers.equalTo( - mkMap( - mkEntry(taskId00, task00), - mkEntry(taskId01, task01), - mkEntry(taskId02, task02), - mkEntry(taskId03, task03) - ) - ) - ); - assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); - verify(changeLogReader).enforceRestoreActive(); - verify(changeLogReader).completedChangelogs(); + when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02, task03)); final RuntimeException exception = assertThrows( RuntimeException.class, @@ -3418,62 +3354,54 @@ public void closeDirty() { ); assertThat(exception.getCause().getMessage(), is("oops")); - assertThat(closedDirtyTask01.get(), is(true)); - assertThat(closedDirtyTask02.get(), is(true)); - assertThat(closedDirtyTask03.get(), is(true)); - assertThat(task00.state(), is(Task.State.CLOSED)); - assertThat(task01.state(), is(Task.State.CLOSED)); - assertThat(task02.state(), is(Task.State.CLOSED)); - assertThat(task03.state(), is(Task.State.CLOSED)); + // Verify tasks that threw exceptions were closed dirty + verify(task00).prepareCommit(true); + verify(task00).suspend(); + verify(task00).closeClean(); + verify(task01).prepareCommit(true); + verify(task01, times(2)).suspend(); + verify(task01).closeDirty(); + verify(task02).prepareCommit(true); + verify(task02, times(2)).suspend(); + verify(task02).closeDirty(); + verify(task03).prepareCommit(true); + verify(task03, times(2)).suspend(); + verify(task03).closeDirty(); + assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); - // the active task creator should also get closed (so that it closes the thread producer if applicable) verify(activeTaskCreator).close(); + verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE)); } @Test public void shouldCloseActiveTasksAndPropagateStreamsProducerExceptionsOnCleanShutdown() { - final TopicPartition changelog = new TopicPartition("changelog", 0); - final Map> assignment = mkMap( - mkEntry(taskId00, taskId00Partitions) - ); - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { - @Override - public Set changelogPartitions() { - return singleton(changelog); - } - }; - - when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00)); - doThrow(new RuntimeException("whatever")).when(activeTaskCreator).close(); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions) + .build(); - taskManager.handleAssignment(assignment, emptyMap()); + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); - assertThat(task00.state(), is(Task.State.CREATED)); + doThrow(new RuntimeException("whatever")).when(activeTaskCreator).close(); - taskManager.tryToCompleteRestoration(time.milliseconds(), null); + when(tasks.activeTasks()).thenReturn(Set.of(task00)); - assertThat(task00.state(), is(Task.State.RESTORING)); - assertThat( - taskManager.activeTaskMap(), - Matchers.equalTo( - mkMap( - mkEntry(taskId00, task00) - ) - ) + final RuntimeException exception = assertThrows( + RuntimeException.class, + () -> taskManager.shutdown(true) ); - assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); - verify(changeLogReader).enforceRestoreActive(); - verify(changeLogReader).completedChangelogs(); - - final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.shutdown(true)); - assertThat(task00.state(), is(Task.State.CLOSED)); assertThat(exception.getMessage(), is("whatever")); + + verify(task00).prepareCommit(true); + verify(task00).suspend(); + verify(task00).closeClean(); assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); - // the active task creator should also get closed (so that it closes the thread producer if applicable) verify(activeTaskCreator).close(); + verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE)); } @Test @@ -3509,31 +3437,35 @@ public Map prepareCommit(final boolean clean) @Test public void shouldSuspendAllRevokedActiveTasksAndPropagateSuspendException() { - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + // will not be revoked + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions).build(); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { - @Override - public void suspend() { - super.suspend(); - throw new RuntimeException("task 0_1 suspend boom!"); - } - }; + // will be revoked and throws exception during suspend + final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId01Partitions).build(); + doThrow(new RuntimeException("task 0_1 suspend boom!")).when(task01).suspend(); - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); + // will be revoked with no exception + final StreamTask task02 = statefulTask(taskId02, taskId02ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId02Partitions).build(); - taskManager.addTask(task00); - taskManager.addTask(task01); - taskManager.addTask(task02); + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02)); final RuntimeException thrown = assertThrows(RuntimeException.class, () -> taskManager.handleRevocation(union(HashSet::new, taskId01Partitions, taskId02Partitions))); - assertThat(thrown.getCause().getMessage(), is("task 0_1 suspend boom!")); - assertThat(task00.state(), is(Task.State.CREATED)); - assertThat(task01.state(), is(Task.State.SUSPENDED)); - assertThat(task02.state(), is(Task.State.SUSPENDED)); + assertThat(thrown.getCause().getMessage(), is("task 0_1 suspend boom!")); + verify(task01).suspend(); + verify(task02).suspend(); + verify(task00, never()).suspend(); verifyNoInteractions(activeTaskCreator); } From 77849eac4eb760d2bbeec6caa3db9eaf2b89defb Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Fri, 7 Nov 2025 20:20:31 -0800 Subject: [PATCH 2/5] wip-2 --- .../processor/internals/TaskManagerTest.java | 151 ++++++++++-------- 1 file changed, 80 insertions(+), 71 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index dc0ffd90bd2ae..2bc07b514a588 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -3539,29 +3539,36 @@ public void suspend() { @Test public void shouldCloseStandbyTasksOnShutdown() { - final Map> assignment = singletonMap(taskId00, taskId00Partitions); - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager); + final TasksRegistry tasks = mock(TasksRegistry.class); + final StandbyTask standbyTask00 = standbyTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions) + .build(); - // `handleAssignment` - when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task00)); + when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask00)); + when(stateUpdater.standbyTasks()).thenReturn(Set.of(standbyTask00)); - taskManager.handleAssignment(emptyMap(), assignment); - assertThat(task00.state(), is(Task.State.CREATED)); + final CompletableFuture futureForStandbyTask = new CompletableFuture<>(); + when(stateUpdater.remove(taskId00)).thenReturn(futureForStandbyTask); - taskManager.tryToCompleteRestoration(time.milliseconds(), null); - assertThat(task00.state(), is(Task.State.RUNNING)); - assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); - assertThat(taskManager.standbyTaskMap(), Matchers.equalTo(singletonMap(taskId00, task00))); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + + futureForStandbyTask.complete(new StateUpdater.RemovedTaskResult(standbyTask00)); // simulate successful removal taskManager.shutdown(true); - assertThat(task00.state(), is(Task.State.CLOSED)); - assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); - assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); + + verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE)); + + verify(tasks).addTask(standbyTask00); + + verify(standbyTask00).prepareCommit(true); + verify(standbyTask00).postCommit(true); + verify(standbyTask00).suspend(); + verify(standbyTask00).closeClean(); + // the active task creator should also get closed (so that it closes the thread producer if applicable) verify(activeTaskCreator).close(); - // `tryToCompleteRestoration` - verify(consumer).assignment(); - verify(consumer).resume(eq(emptySet())); + verifyNoInteractions(consumer); } @Test @@ -3671,36 +3678,29 @@ public void shouldShutDownStateUpdaterAndCloseDirtyTasksFailedDuringRemoval() { } @Test - public void shouldInitializeNewActiveTasks() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - when(consumer.assignment()).thenReturn(assignment); + public void shouldInitialiseNewStandbyTasks() { + final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions) + .inState(State.CREATED) + .withInputPartitions(taskId01Partitions) + .build(); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))) - .thenReturn(singletonList(task00)); + final Map> assignment = taskId01Assignment; + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); - taskManager.handleAssignment(taskId00Assignment, emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); + when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task01)); - assertThat(task00.state(), is(Task.State.RUNNING)); - assertThat(taskManager.activeTaskMap(), Matchers.equalTo(singletonMap(taskId00, task00))); - assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); - // verifies that we actually resume the assignment at the end of restoration. - verify(consumer).resume(assignment); - } + taskManager.handleAssignment(emptyMap(), assignment); - @Test - public void shouldInitialiseNewStandbyTasks() { - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager); + verify(tasks).addPendingTasksToInit(singletonList(task01)); - when(consumer.assignment()).thenReturn(assignment); - when(standbyTaskCreator.createTasks(taskId01Assignment)).thenReturn(singletonList(task01)); + when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task01)); - taskManager.handleAssignment(emptyMap(), taskId01Assignment); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); + taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); - assertThat(task01.state(), is(Task.State.RUNNING)); - assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); - assertThat(taskManager.standbyTaskMap(), Matchers.equalTo(singletonMap(taskId01, task01))); + verify(task01).initializeIfNeeded(); + verify(stateUpdater).add(task01); + verifyNoInteractions(consumer); } @Test @@ -3877,14 +3877,25 @@ public void shouldNotCommitActiveAndStandbyTasksWhileRebalanceInProgress() { @Test public void shouldCommitViaConsumerIfEosDisabled() { - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions) + .withInputPartitions(taskId01Partitions) + .inState(State.RUNNING) + .build(); final Map offsets = singletonMap(t1p1, new OffsetAndMetadata(0L, null)); - task01.setCommittableOffsetsAndMetadata(offsets); - task01.setCommitNeeded(); - taskManager.addTask(task01); - taskManager.commitAll(); + when(task01.commitNeeded()).thenReturn(true); + when(task01.prepareCommit(true)).thenReturn(offsets); + + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task01)); + + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + + assertThat(taskManager.commitAll(), equalTo(1)); + verify(task01, times(2)).commitNeeded(); + verify(task01).prepareCommit(true); + verify(task01).postCommit(false); verify(consumer).commitSync(offsets); } @@ -3939,50 +3950,48 @@ public void shouldCommitViaProducerIfEosV2Enabled() { @Test public void shouldPropagateExceptionFromActiveCommit() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { - @Override - public Map prepareCommit(final boolean clean) { - throw new RuntimeException("opsh."); - } - }; - - when(consumer.assignment()).thenReturn(assignment); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); - taskManager.handleAssignment(taskId00Assignment, emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); + when(task00.commitNeeded()).thenReturn(true); + when(task00.prepareCommit(true)).thenThrow(new RuntimeException("opsh.")); - assertThat(task00.state(), is(Task.State.RUNNING)); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task00)); - task00.setCommitNeeded(); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); final RuntimeException thrown = - assertThrows(RuntimeException.class, () -> taskManager.commitAll()); + assertThrows(RuntimeException.class, taskManager::commitAll); assertThat(thrown.getMessage(), equalTo("opsh.")); + + verify(task00).commitNeeded(); + verify(task00).prepareCommit(true); } @Test public void shouldPropagateExceptionFromStandbyCommit() { - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager) { - @Override - public Map prepareCommit(final boolean clean) { - throw new RuntimeException("opsh."); - } - }; - - when(consumer.assignment()).thenReturn(assignment); - when(standbyTaskCreator.createTasks(taskId01Assignment)).thenReturn(singletonList(task01)); + final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions) + .withInputPartitions(taskId01Partitions) + .inState(State.RUNNING) + .build(); - taskManager.handleAssignment(emptyMap(), taskId01Assignment); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); + when(task01.commitNeeded()).thenReturn(true); + when(task01.prepareCommit(true)).thenThrow(new RuntimeException("opsh.")); - assertThat(task01.state(), is(Task.State.RUNNING)); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task01)); - task01.setCommitNeeded(); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); final RuntimeException thrown = assertThrows(RuntimeException.class, () -> taskManager.commitAll()); assertThat(thrown.getMessage(), equalTo("opsh.")); + + verify(task01).commitNeeded(); + verify(task01).prepareCommit(true); } @Test From 7a0dac2ad10b702b852fbc17fae04a881c6f48a8 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Mon, 10 Nov 2025 20:17:49 -0800 Subject: [PATCH 3/5] wip-3 --- .../processor/internals/TaskManagerTest.java | 129 ++++++++++-------- 1 file changed, 72 insertions(+), 57 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 2bc07b514a588..cc76c1c8a23a9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -4003,27 +4003,24 @@ public void shouldSendPurgeData() { final InOrder inOrder = inOrder(adminClient); - final Map purgableOffsets = new HashMap<>(); - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { - @Override - public Map purgeableOffsets() { - return purgableOffsets; - } - }; - - when(consumer.assignment()).thenReturn(assignment); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); - taskManager.handleAssignment(taskId00Assignment, emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); + when(task00.purgeableOffsets()) + .thenReturn(new HashMap<>()) + .thenReturn(singletonMap(t1p1, 5L)) + .thenReturn(singletonMap(t1p1, 17L)); - assertThat(task00.state(), is(Task.State.RUNNING)); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task00)); - purgableOffsets.put(t1p1, 5L); - taskManager.maybePurgeCommittedRecords(); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); - purgableOffsets.put(t1p1, 17L); - taskManager.maybePurgeCommittedRecords(); + taskManager.maybePurgeCommittedRecords(); // no-op + taskManager.maybePurgeCommittedRecords(); // sends purge for offset 5L + taskManager.maybePurgeCommittedRecords(); // sends purge for offset 17L inOrder.verify(adminClient).deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(5L))); inOrder.verify(adminClient).deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(17L))); @@ -4036,29 +4033,27 @@ public void shouldNotSendPurgeDataIfPreviousNotDone() { when(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(5L)))) .thenReturn(new DeleteRecordsResult(singletonMap(t1p1, futureDeletedRecords))); - final Map purgableOffsets = new HashMap<>(); - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { - @Override - public Map purgeableOffsets() { - return purgableOffsets; - } - }; + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); - when(consumer.assignment()).thenReturn(assignment); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); + when(task00.purgeableOffsets()) + .thenReturn(new HashMap<>()) + .thenReturn(singletonMap(t1p1, 5L)) + .thenReturn(singletonMap(t1p1, 17L)); - taskManager.handleAssignment(taskId00Assignment, emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task00)); - assertThat(task00.state(), is(Task.State.RUNNING)); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); - purgableOffsets.put(t1p1, 5L); + taskManager.maybePurgeCommittedRecords(); taskManager.maybePurgeCommittedRecords(); // this call should be a no-op. - // this is verified, as there is no expectation on adminClient for this second call, + // this is verified, as there is no expectation on adminClient for this third call, // so it would fail verification if we invoke the admin client again. - purgableOffsets.put(t1p1, 17L); taskManager.maybePurgeCommittedRecords(); } @@ -4366,7 +4361,6 @@ public boolean maybePunctuateStreamTime() { @Test public void shouldPunctuateActiveTasks() { - final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) .withInputPartitions(taskId00Partitions) .inState(State.RUNNING) @@ -4759,45 +4753,66 @@ public void suspend() { @Test public void shouldConvertActiveTaskToStandbyTask() { - final StreamTask activeTask = mock(StreamTask.class); - when(activeTask.id()).thenReturn(taskId00); - when(activeTask.inputPartitions()).thenReturn(taskId00Partitions); - when(activeTask.isActive()).thenReturn(true); - - final StandbyTask standbyTask = mock(StandbyTask.class); - when(standbyTask.id()).thenReturn(taskId00); + final StreamTask activeTaskToRecycle = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions).build(); + final StandbyTask recycledStandbyTask = standbyTask(taskId00, taskId00ChangelogPartitions) + .inState(State.CREATED) + .withInputPartitions(taskId00Partitions).build(); + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(activeTask)); - when(standbyTaskCreator.createStandbyTaskFromActive(any(), eq(taskId00Partitions))).thenReturn(standbyTask); + when(activeTaskCreator.createTasks(consumer, taskId00Assignment)).thenReturn(singletonList(activeTaskToRecycle)); + when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, taskId00Partitions)) + .thenReturn(recycledStandbyTask); + // create active task taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap()); + + // convert active to standby + when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle)); + final CompletableFuture future = new CompletableFuture<>(); + when(stateUpdater.remove(activeTaskToRecycle.id())).thenReturn(future); + future.complete(new StateUpdater.RemovedTaskResult(activeTaskToRecycle)); + taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment); - verify(activeTaskCreator).createTasks(any(), eq(emptyMap())); + verify(activeTaskCreator).createTasks(consumer, emptyMap()); verify(standbyTaskCreator, times(2)).createTasks(Collections.emptyMap()); - verifyNoInteractions(consumer); + verify(standbyTaskCreator).createStandbyTaskFromActive(activeTaskToRecycle, taskId00Partitions); + verify(tasks).addPendingTasksToInit(Collections.singleton(recycledStandbyTask)); } @Test public void shouldConvertStandbyTaskToActiveTask() { - final StandbyTask standbyTask = mock(StandbyTask.class); - when(standbyTask.id()).thenReturn(taskId00); - when(standbyTask.isActive()).thenReturn(false); - when(standbyTask.prepareCommit(true)).thenReturn(Collections.emptyMap()); - - final StreamTask activeTask = mock(StreamTask.class); - when(activeTask.id()).thenReturn(taskId00); - when(activeTask.inputPartitions()).thenReturn(taskId00Partitions); - when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singletonList(standbyTask)); - when(activeTaskCreator.createActiveTaskFromStandby(eq(standbyTask), eq(taskId00Partitions), any())) - .thenReturn(activeTask); + final StandbyTask standbyTaskToRecycle = standbyTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions).build(); + final StreamTask recycledActiveTask = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.CREATED) + .withInputPartitions(taskId00Partitions).build(); + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + + when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singletonList(standbyTaskToRecycle)); + when(activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle, taskId00Partitions, consumer)) + .thenReturn(recycledActiveTask); + // create standby task taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment); + + // convert standby to active + when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle)); + final CompletableFuture future = new CompletableFuture<>(); + when(stateUpdater.remove(standbyTaskToRecycle.id())).thenReturn(future); + future.complete(new StateUpdater.RemovedTaskResult(standbyTaskToRecycle)); + taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap()); - verify(activeTaskCreator, times(2)).createTasks(any(), eq(emptyMap())); + verify(activeTaskCreator, times(2)).createTasks(consumer, emptyMap()); verify(standbyTaskCreator).createTasks(Collections.emptyMap()); - verifyNoInteractions(consumer); + verify(activeTaskCreator).createActiveTaskFromStandby(standbyTaskToRecycle, taskId00Partitions, consumer); + verify(tasks).addPendingTasksToInit(Collections.singleton(recycledActiveTask)); } @Test From 5e36ed5dd2103df2f7aa2a9106e724c32d51cec6 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Thu, 13 Nov 2025 18:40:55 -0800 Subject: [PATCH 4/5] change task to running and removed redundant task --- .../streams/processor/internals/TaskManagerTest.java | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index cc76c1c8a23a9..9c7a5501a9091 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -3324,7 +3324,7 @@ public void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdownWithExact private void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(final ProcessingMode processingMode) { final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) - .inState(State.RESTORING) + .inState(State.RUNNING) .withInputPartitions(taskId00Partitions).build(); final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions) .inState(State.RUNNING) @@ -3332,9 +3332,6 @@ private void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(final P final StreamTask task02 = statefulTask(taskId02, taskId02ChangelogPartitions) .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); - final StreamTask task03 = statefulTask(taskId03, taskId03ChangelogPartitions) - .inState(State.RUNNING) - .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); final TaskManager taskManager = setUpTaskManagerWithStateUpdater(processingMode, tasks); @@ -3343,10 +3340,8 @@ private void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(final P .when(task01).suspend(); doThrow(new RuntimeException("oops")) .when(task02).suspend(); - doThrow(new RuntimeException("oops")) - .when(task03).suspend(); - when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02, task03)); + when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02)); final RuntimeException exception = assertThrows( RuntimeException.class, @@ -3364,9 +3359,6 @@ private void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(final P verify(task02).prepareCommit(true); verify(task02, times(2)).suspend(); verify(task02).closeDirty(); - verify(task03).prepareCommit(true); - verify(task03, times(2)).suspend(); - verify(task03).closeDirty(); assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); From 745ebb9c7fd385e9336943f7c0fcf6a0dc941f5b Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Thu, 13 Nov 2025 18:41:37 -0800 Subject: [PATCH 5/5] nit: typo and comment fix --- .../kafka/streams/processor/internals/TaskManagerTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 9c7a5501a9091..dc323f019b1fd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -3670,7 +3670,7 @@ public void shouldShutDownStateUpdaterAndCloseDirtyTasksFailedDuringRemoval() { } @Test - public void shouldInitialiseNewStandbyTasks() { + public void shouldInitializeNewStandbyTasks() { final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions) .inState(State.CREATED) .withInputPartitions(taskId01Partitions) @@ -4044,8 +4044,8 @@ public void shouldNotSendPurgeDataIfPreviousNotDone() { taskManager.maybePurgeCommittedRecords(); // this call should be a no-op. - // this is verified, as there is no expectation on adminClient for this third call, - // so it would fail verification if we invoke the admin client again. + // because the previous deleteRecords request + // has not completed yet, so no new request is sent. taskManager.maybePurgeCommittedRecords(); }