From aefe738f6fec1fd8589a31fb0a4bf8cb5bd9a14b Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Mon, 3 Nov 2025 20:04:37 -0800 Subject: [PATCH 1/5] wip-1 --- .../processor/internals/TaskManagerTest.java | 448 ++++++++++-------- 1 file changed, 259 insertions(+), 189 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 0156d101ed8db..ff42f7197869d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -2872,88 +2872,87 @@ public void shouldAddNewActiveTasks() { @Test public void shouldNotCompleteRestorationIfTasksCannotInitialize() { + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.CREATED) + .build(); + + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); final Map> assignment = mkMap( - mkEntry(taskId00, taskId00Partitions), - mkEntry(taskId01, taskId01Partitions) + mkEntry(taskId00, taskId00Partitions) ); - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { - @Override - public void initializeIfNeeded() { - throw new LockException("can't lock"); - } - }; - final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { - @Override - public void initializeIfNeeded() { - throw new TimeoutException("timed out"); - } - }; - - when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(asList(task00, task01)); + when(activeTaskCreator.createTasks(any(), eq(assignment))) + .thenReturn(singletonList(task00)); taskManager.handleAssignment(assignment, emptyMap()); - assertThat(task00.state(), is(Task.State.CREATED)); - assertThat(task01.state(), is(Task.State.CREATED)); + verify(tasks).addPendingTasksToInit(singletonList(task00)); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false)); + when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00)); + final LockException lockException = new LockException("can't lock"); + doThrow(lockException).when(task00).initializeIfNeeded(); + when(tasks.hasPendingTasksToInit()).thenReturn(true); - assertThat(task00.state(), is(Task.State.CREATED)); - assertThat(task01.state(), is(Task.State.CREATED)); - assertThat( - taskManager.activeTaskMap(), - Matchers.equalTo(mkMap(mkEntry(taskId00, task00), mkEntry(taskId01, task01))) + final boolean restorationComplete = taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); + + assertFalse(restorationComplete); + verify(task00).initializeIfNeeded(); + verify(tasks, times(2)).addPendingTasksToInit( + argThat(tasksToInit -> tasksToInit.contains(task00)) ); - assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); - verify(changeLogReader).enforceRestoreActive(); + verify(stateUpdater, never()).add(task00); verifyNoInteractions(consumer); } @Test public void shouldNotCompleteRestorationIfTaskCannotCompleteRestoration() { - final Map> assignment = mkMap( - mkEntry(taskId00, taskId00Partitions) - ); - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { - @Override - public void completeRestoration(final java.util.function.Consumer> offsetResetter) { - throw new TimeoutException("timeout!"); - } - }; - - when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00)); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RESTORING) + .build(); - taskManager.handleAssignment(assignment, emptyMap()); + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); - assertThat(task00.state(), is(Task.State.CREATED)); + when(stateUpdater.restoresActiveTasks()).thenReturn(true); + when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(Set.of(task00)); + final TimeoutException timeoutException = new TimeoutException("timeout!"); + doThrow(timeoutException).when(task00).completeRestoration(any()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false)); + final boolean restorationComplete = taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); - assertThat(task00.state(), is(Task.State.RESTORING)); - assertThat( - taskManager.activeTaskMap(), - Matchers.equalTo(mkMap(mkEntry(taskId00, task00))) - ); - assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); - verify(changeLogReader).enforceRestoreActive(); - verifyNoInteractions(consumer); + assertFalse(restorationComplete); + verify(task00).completeRestoration(any()); + verify(stateUpdater).add(task00); + verify(tasks, never()).addTask(task00); + verify(consumer, never()).resume(task00.inputPartitions()); } @Test public void shouldSuspendActiveTasksDuringRevocation() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); + + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task00)); + final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); - task00.setCommittableOffsetsAndMetadata(offsets); - when(consumer.assignment()).thenReturn(assignment); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); + when(task00.commitNeeded()).thenReturn(true); + when(task00.prepareCommit(true)).thenReturn(offsets); + doNothing().when(task00).postCommit(true); + doNothing().when(task00).suspend(); - taskManager.handleAssignment(taskId00Assignment, emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); - assertThat(task00.state(), is(Task.State.RUNNING)); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleRevocation(taskId00Partitions); - assertThat(task00.state(), is(Task.State.SUSPENDED)); + + verify(task00).prepareCommit(true); + verify(task00).postCommit(true); + verify(task00).suspend(); } @SuppressWarnings("removal") @@ -3041,218 +3040,272 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo @Test public void shouldCommitAllNeededTasksOnHandleRevocation() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + // revoked task that needs commit + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); final Map offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); - task00.setCommittableOffsetsAndMetadata(offsets00); - task00.setCommitNeeded(); + when(task00.commitNeeded()).thenReturn(true); + when(task00.prepareCommit(true)).thenReturn(offsets00); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + // non revoked task that needs commit + final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions) + .withInputPartitions(taskId01Partitions) + .inState(State.RUNNING) + .build(); final Map offsets01 = singletonMap(t1p1, new OffsetAndMetadata(1L, null)); - task01.setCommittableOffsetsAndMetadata(offsets01); - task01.setCommitNeeded(); + when(task01.commitNeeded()).thenReturn(true); + when(task01.prepareCommit(true)).thenReturn(offsets01); - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); - final Map offsets02 = singletonMap(t1p2, new OffsetAndMetadata(2L, null)); - task02.setCommittableOffsetsAndMetadata(offsets02); + // non revoked task that does NOT need commit + final StreamTask task02 = statefulTask(taskId02, taskId02ChangelogPartitions) + .withInputPartitions(taskId02Partitions) + .inState(State.RUNNING) + .build(); + when(task02.commitNeeded()).thenReturn(false); - final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateManager); + // standby task (not be affected by revocation) + final StandbyTask task03 = standbyTask(taskId03, taskId03ChangelogPartitions) + .withInputPartitions(taskId03Partitions) + .inState(State.RUNNING) + .build(); final Map expectedCommittedOffsets = new HashMap<>(); expectedCommittedOffsets.putAll(offsets00); expectedCommittedOffsets.putAll(offsets01); - final Map> assignmentActive = mkMap( - mkEntry(taskId00, taskId00Partitions), - mkEntry(taskId01, taskId01Partitions), - mkEntry(taskId02, taskId02Partitions) - ); - - final Map> assignmentStandby = mkMap( - mkEntry(taskId10, taskId10Partitions) - ); - when(consumer.assignment()).thenReturn(assignment); - - when(activeTaskCreator.createTasks(any(), eq(assignmentActive))) - .thenReturn(asList(task00, task01, task02)); - when(standbyTaskCreator.createTasks(assignmentStandby)) - .thenReturn(singletonList(task10)); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02, task03)); - taskManager.handleAssignment(assignmentActive, assignmentStandby); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); - assertThat(task00.state(), is(Task.State.RUNNING)); - assertThat(task01.state(), is(Task.State.RUNNING)); - assertThat(task02.state(), is(Task.State.RUNNING)); - assertThat(task10.state(), is(Task.State.RUNNING)); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleRevocation(taskId00Partitions); - assertThat(task00.commitNeeded, is(false)); - assertThat(task00.commitPrepared, is(true)); - assertThat(task01.commitNeeded, is(false)); - assertThat(task01.commitPrepared, is(true)); - assertThat(task02.commitPrepared, is(false)); - assertThat(task10.commitPrepared, is(false)); + // both tasks needing commit had prepareCommit called + verify(task00).prepareCommit(true); + verify(task01).prepareCommit(true); + verify(task02, never()).prepareCommit(anyBoolean()); + verify(task03, never()).prepareCommit(anyBoolean()); verify(consumer).commitSync(expectedCommittedOffsets); + + // revoked task suspended + verify(task00).suspend(); + verify(task00).postCommit(true); + + // non-revoked task with commit was also post-committed (but not suspended) + verify(task01).postCommit(false); + verify(task01, never()).suspend(); + + // task02 and task03 should not be affected + verify(task02, never()).postCommit(anyBoolean()); + verify(task02, never()).suspend(); + verify(task03, never()).postCommit(anyBoolean()); + verify(task03, never()).suspend(); } @Test public void shouldNotCommitIfNoRevokedTasksNeedCommitting() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); - task01.setCommitNeeded(); + // task00 being revoked, no commit needed + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); + // task01 NOT being revoked, commit needed + final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions) + .withInputPartitions(taskId01Partitions) + .inState(State.RUNNING) + .build(); - final Map> assignmentActive = mkMap( - mkEntry(taskId00, taskId00Partitions), - mkEntry(taskId01, taskId01Partitions), - mkEntry(taskId02, taskId02Partitions) - ); + // task02 NOT being revoked, no commit needed + final StreamTask task02 = statefulTask(taskId02, taskId02ChangelogPartitions) + .withInputPartitions(taskId02Partitions) + .inState(State.RUNNING) + .build(); - when(consumer.assignment()).thenReturn(assignment); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02)); - when(activeTaskCreator.createTasks(any(), eq(assignmentActive))) - .thenReturn(asList(task00, task01, task02)); + when(task00.commitNeeded()).thenReturn(false); + when(task01.commitNeeded()).thenReturn(true); // only task01 needs commit + when(task02.commitNeeded()).thenReturn(false); - taskManager.handleAssignment(assignmentActive, Collections.emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); - assertThat(task00.state(), is(Task.State.RUNNING)); - assertThat(task01.state(), is(Task.State.RUNNING)); - assertThat(task02.state(), is(Task.State.RUNNING)); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleRevocation(taskId00Partitions); - assertThat(task00.commitPrepared, is(false)); - assertThat(task01.commitPrepared, is(false)); - assertThat(task02.commitPrepared, is(false)); + verify(task00, never()).prepareCommit(anyBoolean()); + verify(task01, never()).prepareCommit(anyBoolean()); + verify(task02, never()).prepareCommit(anyBoolean()); + + verify(task00).suspend(); + verify(task01, never()).suspend(); + verify(task02, never()).suspend(); } @Test public void shouldNotCommitIfNoRevokedTasksNeedCommittingWithEOSv2() { - final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false); - - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); - task01.setCommitNeeded(); + // task00 being revoked, no commit needed + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); + // task01 NOT being revoked, commit needed + final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions) + .withInputPartitions(taskId01Partitions) + .inState(State.RUNNING) + .build(); - final Map> assignmentActive = mkMap( - mkEntry(taskId00, taskId00Partitions), - mkEntry(taskId01, taskId01Partitions), - mkEntry(taskId02, taskId02Partitions) - ); + // task02 NOT being revoked, no commit needed + final StreamTask task02 = statefulTask(taskId02, taskId02ChangelogPartitions) + .withInputPartitions(taskId02Partitions) + .inState(State.RUNNING) + .build(); - when(consumer.assignment()).thenReturn(assignment); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02)); - when(activeTaskCreator.createTasks(any(), eq(assignmentActive))) - .thenReturn(asList(task00, task01, task02)); + when(task00.commitNeeded()).thenReturn(false); + when(task01.commitNeeded()).thenReturn(true); // only task01 needs commit + when(task02.commitNeeded()).thenReturn(false); - taskManager.handleAssignment(assignmentActive, Collections.emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); - assertThat(task00.state(), is(Task.State.RUNNING)); - assertThat(task01.state(), is(Task.State.RUNNING)); - assertThat(task02.state(), is(Task.State.RUNNING)); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks); taskManager.handleRevocation(taskId00Partitions); - assertThat(task00.commitPrepared, is(false)); - assertThat(task01.commitPrepared, is(false)); - assertThat(task02.commitPrepared, is(false)); + verify(task00, never()).prepareCommit(anyBoolean()); + verify(task01, never()).prepareCommit(anyBoolean()); + verify(task02, never()).prepareCommit(anyBoolean()); + + verify(task00).suspend(); + verify(task01, never()).suspend(); + verify(task02, never()).suspend(); } @Test public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - final Map offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); - task00.setCommittableOffsetsAndMetadata(offsets00); - task00.setCommitNeeded(); - - final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateManager); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions).build(); + final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId01Partitions).build(); - final Map> assignmentActive = singletonMap(taskId00, taskId00Partitions); - final Map> assignmentStandby = singletonMap(taskId10, taskId10Partitions); + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); - when(consumer.assignment()).thenReturn(assignment); + when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00)); + when(stateUpdater.tasks()).thenReturn(Set.of(task01)); - when(activeTaskCreator.createTasks(any(), eq(assignmentActive))).thenReturn(singleton(task00)); - when(standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(singletonList(task10)); + final Map> assignmentActive = singletonMap(taskId00, taskId00Partitions); + final Map> assignmentStandby = singletonMap(taskId01, taskId01Partitions); taskManager.handleAssignment(assignmentActive, assignmentStandby); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); - assertThat(task00.state(), is(Task.State.RUNNING)); - assertThat(task10.state(), is(Task.State.RUNNING)); - taskManager.handleAssignment(assignmentActive, assignmentStandby); + // active task stays in task manager + verify(tasks, never()).removeTask(task00); + verify(task00, never()).prepareCommit(anyBoolean()); + verify(task00, never()).postCommit(anyBoolean()); + + // standby task not removed from state updater + verify(stateUpdater, never()).remove(task01.id()); + verify(task01, never()).prepareCommit(anyBoolean()); + verify(task01, never()).postCommit(anyBoolean()); - assertThat(task00.commitNeeded, is(true)); - assertThat(task10.commitPrepared, is(false)); + verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); + verify(standbyTaskCreator).createTasks(Collections.emptyMap()); } @Test public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - final Map offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); - task00.setCommittableOffsetsAndMetadata(offsets00); - task00.setCommitNeeded(); - - final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateManager); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions).build(); + final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId01Partitions).build(); - final Map> assignmentActive = singletonMap(taskId00, taskId00Partitions); - final Map> assignmentStandby = singletonMap(taskId10, taskId10Partitions); + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); - when(consumer.assignment()).thenReturn(assignment); + when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00)); + when(stateUpdater.tasks()).thenReturn(Set.of(task01)); - when(activeTaskCreator.createTasks(any(), eq(assignmentActive))).thenReturn(singleton(task00)); - when(standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(singletonList(task10)); + // mock to remove standby task from state updater + final CompletableFuture future = new CompletableFuture<>(); + when(stateUpdater.remove(task01.id())).thenReturn(future); + future.complete(new StateUpdater.RemovedTaskResult(task01)); - taskManager.handleAssignment(assignmentActive, assignmentStandby); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); - assertThat(task00.state(), is(Task.State.RUNNING)); - assertThat(task10.state(), is(Task.State.RUNNING)); + final Map> assignmentActive = singletonMap(taskId00, taskId00Partitions); taskManager.handleAssignment(assignmentActive, Collections.emptyMap()); - assertThat(task00.commitNeeded, is(true)); + verify(task00, never()).prepareCommit(anyBoolean()); + verify(task00, never()).postCommit(anyBoolean()); + + verify(stateUpdater).remove(task01.id()); + verify(task01).suspend(); + verify(task01).closeClean(); + + verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); + verify(standbyTaskCreator).createTasks(Collections.emptyMap()); } @Test public void shouldNotCommitCreatedTasksOnRevocationOrClosure() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.CREATED) + .withInputPartitions(taskId00Partitions) + .build(); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + + when(activeTaskCreator.createTasks(consumer, taskId00Assignment)) + .thenReturn(singletonList(task00)); taskManager.handleAssignment(taskId00Assignment, emptyMap()); - assertThat(task00.state(), is(Task.State.CREATED)); + verify(tasks).addPendingTasksToInit(singletonList(task00)); + // when handle revocation is called, the tasks in pendingTasksToInit are NOT affected + // by revocation. They remain in the pending queue untouched taskManager.handleRevocation(taskId00Partitions); - assertThat(task00.state(), is(Task.State.SUSPENDED)); + // tasks in pendingTasksToInit are not managed by handleRevocation + verify(task00, never()).suspend(); + verify(task00, never()).prepareCommit(anyBoolean()); + + when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00)); + + // this calls handleTasksPendingInitialization() + // which drains pendingTasksToInit and closes those tasks taskManager.handleAssignment(emptyMap(), emptyMap()); - assertThat(task00.state(), is(Task.State.CLOSED)); + + // close clean without ever being committed + verify(task00).closeClean(); + verify(task00, never()).prepareCommit(anyBoolean()); } @Test public void shouldPassUpIfExceptionDuringSuspend() { - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { - @Override - public void suspend() { - super.suspend(); - throw new RuntimeException("KABOOM!"); - } - }; + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); - when(consumer.assignment()).thenReturn(assignment); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); - taskManager.handleAssignment(taskId00Assignment, emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); - assertThat(task00.state(), is(Task.State.RUNNING)); + doThrow(new RuntimeException("KABOOM!")).when(task00).suspend(); + + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task00)); + + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); assertThrows(RuntimeException.class, () -> taskManager.handleRevocation(taskId00Partitions)); - assertThat(task00.state(), is(Task.State.SUSPENDED)); + + verify(task00).suspend(); } @Test @@ -4805,9 +4858,26 @@ public void shouldConvertStandbyTaskToActiveTask() { @Test public void shouldListNotPausedTasks() { - handleAssignment(taskId00Assignment, taskId01Assignment, emptyMap()); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); + + final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions) + .withInputPartitions(taskId01Partitions) + .inState(State.RUNNING) + .build(); + + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task00, task01)); + + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + + when(stateUpdater.tasks()).thenReturn(Collections.emptySet()); assertEquals(2, taskManager.notPausedTasks().size()); + assertTrue(taskManager.notPausedTasks().containsKey(taskId00)); + assertTrue(taskManager.notPausedTasks().containsKey(taskId01)); topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY); From 3376140dbb7056b4eaaef3a569b89ddca836385f Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Tue, 4 Nov 2025 14:34:15 -0800 Subject: [PATCH 2/5] parameterize tests --- .../processor/internals/TaskManagerTest.java | 49 +++---------------- 1 file changed, 6 insertions(+), 43 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index ff42f7197869d..a88f3507b2479 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -61,6 +61,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -3105,8 +3107,9 @@ public void shouldCommitAllNeededTasksOnHandleRevocation() { verify(task03, never()).suspend(); } - @Test - public void shouldNotCommitIfNoRevokedTasksNeedCommitting() { + @ParameterizedTest + @EnumSource(ProcessingMode.class) + public void shouldNotCommitIfNoRevokedTasksNeedCommitting(final ProcessingMode processingMode) { // task00 being revoked, no commit needed final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) .withInputPartitions(taskId00Partitions) @@ -3132,47 +3135,7 @@ public void shouldNotCommitIfNoRevokedTasksNeedCommitting() { when(task01.commitNeeded()).thenReturn(true); // only task01 needs commit when(task02.commitNeeded()).thenReturn(false); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); - - taskManager.handleRevocation(taskId00Partitions); - - verify(task00, never()).prepareCommit(anyBoolean()); - verify(task01, never()).prepareCommit(anyBoolean()); - verify(task02, never()).prepareCommit(anyBoolean()); - - verify(task00).suspend(); - verify(task01, never()).suspend(); - verify(task02, never()).suspend(); - } - - @Test - public void shouldNotCommitIfNoRevokedTasksNeedCommittingWithEOSv2() { - // task00 being revoked, no commit needed - final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) - .withInputPartitions(taskId00Partitions) - .inState(State.RUNNING) - .build(); - - // task01 NOT being revoked, commit needed - final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions) - .withInputPartitions(taskId01Partitions) - .inState(State.RUNNING) - .build(); - - // task02 NOT being revoked, no commit needed - final StreamTask task02 = statefulTask(taskId02, taskId02ChangelogPartitions) - .withInputPartitions(taskId02Partitions) - .inState(State.RUNNING) - .build(); - - final TasksRegistry tasks = mock(TasksRegistry.class); - when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02)); - - when(task00.commitNeeded()).thenReturn(false); - when(task01.commitNeeded()).thenReturn(true); // only task01 needs commit - when(task02.commitNeeded()).thenReturn(false); - - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(processingMode, tasks); taskManager.handleRevocation(taskId00Partitions); From ee4969b4fb68b7c960f809fac75bdd842d6838e9 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Tue, 4 Nov 2025 14:38:28 -0800 Subject: [PATCH 3/5] added missing verification --- .../kafka/streams/processor/internals/TaskManagerTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index a88f3507b2479..14630f2b0ed80 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -2929,6 +2929,7 @@ public void shouldNotCompleteRestorationIfTaskCannotCompleteRestoration() { verify(stateUpdater).add(task00); verify(tasks, never()).addTask(task00); verify(consumer, never()).resume(task00.inputPartitions()); + verifyNoInteractions(consumer); } @Test From fa92b0b47b43b9188f6e434552cca6b834b67b72 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Thu, 6 Nov 2025 06:08:24 -0800 Subject: [PATCH 4/5] minor nits --- .../kafka/streams/processor/internals/TaskManagerTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 14630f2b0ed80..2fa59c354aa58 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -2928,7 +2928,6 @@ public void shouldNotCompleteRestorationIfTaskCannotCompleteRestoration() { verify(task00).completeRestoration(any()); verify(stateUpdater).add(task00); verify(tasks, never()).addTask(task00); - verify(consumer, never()).resume(task00.inputPartitions()); verifyNoInteractions(consumer); } @@ -2946,8 +2945,6 @@ public void shouldSuspendActiveTasksDuringRevocation() { when(task00.commitNeeded()).thenReturn(true); when(task00.prepareCommit(true)).thenReturn(offsets); - doNothing().when(task00).postCommit(true); - doNothing().when(task00).suspend(); final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); From e2da034cf3b9d2ba08c49be80bcb9edd59ec85ee Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Thu, 6 Nov 2025 06:19:26 -0800 Subject: [PATCH 5/5] revert shouldNotCompleteRestorationIfTasksCannotInitialize to original --- .../processor/internals/TaskManagerTest.java | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 2fa59c354aa58..7db9c957fe0d1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -2874,36 +2874,40 @@ public void shouldAddNewActiveTasks() { @Test public void shouldNotCompleteRestorationIfTasksCannotInitialize() { - final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) - .withInputPartitions(taskId00Partitions) - .inState(State.CREATED) - .build(); - - final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); final Map> assignment = mkMap( - mkEntry(taskId00, taskId00Partitions) + mkEntry(taskId00, taskId00Partitions), + mkEntry(taskId01, taskId01Partitions) ); + final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + @Override + public void initializeIfNeeded() { + throw new LockException("can't lock"); + } + }; + final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { + @Override + public void initializeIfNeeded() { + throw new TimeoutException("timed out"); + } + }; - when(activeTaskCreator.createTasks(any(), eq(assignment))) - .thenReturn(singletonList(task00)); - taskManager.handleAssignment(assignment, emptyMap()); + when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(asList(task00, task01)); - verify(tasks).addPendingTasksToInit(singletonList(task00)); + taskManager.handleAssignment(assignment, emptyMap()); - when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00)); - final LockException lockException = new LockException("can't lock"); - doThrow(lockException).when(task00).initializeIfNeeded(); - when(tasks.hasPendingTasksToInit()).thenReturn(true); + assertThat(task00.state(), is(Task.State.CREATED)); + assertThat(task01.state(), is(Task.State.CREATED)); - final boolean restorationComplete = taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); + assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false)); - assertFalse(restorationComplete); - verify(task00).initializeIfNeeded(); - verify(tasks, times(2)).addPendingTasksToInit( - argThat(tasksToInit -> tasksToInit.contains(task00)) + assertThat(task00.state(), is(Task.State.CREATED)); + assertThat(task01.state(), is(Task.State.CREATED)); + assertThat( + taskManager.activeTaskMap(), + Matchers.equalTo(mkMap(mkEntry(taskId00, task00), mkEntry(taskId01, task01))) ); - verify(stateUpdater, never()).add(task00); + assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); + verify(changeLogReader).enforceRestoreActive(); verifyNoInteractions(consumer); }