From c92e015f6bca3c47678b33a844d48b087223804f Mon Sep 17 00:00:00 2001 From: Christo Lolov Date: Thu, 20 Apr 2023 16:00:08 +0100 Subject: [PATCH 1/2] KAFKA-14133: Migrate ChangeLogReader mock in TaskManagerTest to Mockito --- .../processor/internals/TaskManagerTest.java | 244 ++++++++---------- 1 file changed, 112 insertions(+), 132 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 6072c96ca6e2..0918c3b2ad8c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -184,7 +184,7 @@ public class TaskManagerTest { private InternalTopologyBuilder topologyBuilder; @Mock(type = MockType.DEFAULT) private StateDirectory stateDirectory; - @Mock(type = MockType.NICE) + @org.mockito.Mock private ChangelogReader changeLogReader; @Mock(type = MockType.STRICT) private Consumer consumer; @@ -1826,7 +1826,7 @@ public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { task00.setCommittableOffsetsAndMetadata(offsets); // first `handleAssignment` - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); expect(activeTaskCreator.createTasks(anyObject(), eq(emptyMap()))).andStubReturn(emptyList()); activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00); @@ -1841,7 +1841,7 @@ public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { consumer.commitSync(offsets); expectLastCall(); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -1866,14 +1866,14 @@ public void closeClean() { }; // first `handleAssignment` - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); expect(activeTaskCreator.createTasks(anyObject(), eq(emptyMap()))).andStubReturn(emptyList()); activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00); expectLastCall(); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(emptyList()); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); taskManager.handleRevocation(taskId00Partitions); @@ -1897,7 +1897,7 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception { final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false); // `handleAssignment` - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01)); @@ -1916,7 +1916,7 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception { activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00); expectLastCall(); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, taskId01Assignment); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -1959,7 +1959,7 @@ public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() { task00.setCommittableOffsetsAndMetadata(offsets); // `handleAssignment` - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(emptyList()); @@ -1969,7 +1969,7 @@ public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() { activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00); expectLastCall().andThrow(new RuntimeException("KABOOM!")); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -2009,11 +2009,11 @@ public void postCommit(final boolean enforceCheckpoint) { }; // `handleAssignment` - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); expect(consumer.assignment()).andReturn(taskId00Partitions); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true)); @@ -2047,11 +2047,11 @@ public void suspend() { } }; - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); expect(consumer.assignment()).andReturn(taskId00Partitions); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true)); @@ -2086,12 +2086,13 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { .andStubReturn(asList(corruptedTask, nonCorruptedTask)); expect(standbyTaskCreator.createTasks(anyObject())) .andStubReturn(Collections.emptySet()); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(consumer.assignment()).andReturn(taskId00Partitions); // check that we should not commit empty map either consumer.commitSync(eq(emptyMap())); expectLastCall().andStubThrow(new AssertionError("should not invoke commitSync when offset map is empty")); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true)); @@ -2129,7 +2130,7 @@ public void shouldNotCommitNonRunningNonCorruptedTasks() { expect(standbyTaskCreator.createTasks(anyObject())) .andStubReturn(Collections.emptySet()); expect(consumer.assignment()).andReturn(taskId00Partitions); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(assignment, emptyMap()); @@ -2163,9 +2164,9 @@ public Map prepareCommit() { expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singleton(corruptedStandby)); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId01Assignment))).andStubReturn(singleton(runningNonCorruptedActive)); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId01Assignment, taskId00Assignment); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -2204,11 +2205,11 @@ public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() { expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions)); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader, stateDirectory, stateManager); + replay(activeTaskCreator, standbyTaskCreator, consumer, stateDirectory, stateManager); uncorruptedActive.setCommittableOffsetsAndMetadata(offsets); @@ -2256,14 +2257,14 @@ public void markChangelogAsCorrupted(final Collection partitions expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); consumer.commitSync(offsets); expectLastCall().andThrow(new TimeoutException()); expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions)); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -2332,7 +2333,7 @@ public void markChangelogAsCorrupted(final Collection partitions expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActiveTask, uncorruptedActiveTask)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId"); expect(consumer.groupMetadata()).andReturn(groupMetadata); @@ -2341,7 +2342,7 @@ public void markChangelogAsCorrupted(final Collection partitions expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions)); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader, stateManager); + replay(activeTaskCreator, standbyTaskCreator, consumer, stateManager); taskManager.handleAssignment(assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -2409,7 +2410,7 @@ public void markChangelogAsCorrupted(final Collection partitions mkEntry(taskId02, taskId02Partitions) ); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))).andReturn(asList(revokedActiveTask, unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); @@ -2419,7 +2420,7 @@ public void markChangelogAsCorrupted(final Collection partitions expectLastCall().andThrow(new TimeoutException()); expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions)); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(assignmentActive, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -2473,7 +2474,7 @@ public void markChangelogAsCorrupted(final Collection partitions mkEntry(taskId02, taskId02Partitions) ); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))).andReturn(asList(revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); @@ -2487,7 +2488,7 @@ public void markChangelogAsCorrupted(final Collection partitions expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions)); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader, stateManager); + replay(activeTaskCreator, standbyTaskCreator, consumer, stateManager); taskManager.handleAssignment(assignmentActive, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -2512,13 +2513,13 @@ public void markChangelogAsCorrupted(final Collection partitions public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() { final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singletonList(task00)); expect(activeTaskCreator.createTasks(anyObject(), anyObject())).andStubReturn(Collections.emptySet()); expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andStubReturn(Collections.emptySet()); consumer.commitSync(Collections.emptyMap()); expectLastCall(); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(emptyMap(), taskId00Assignment); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -2535,14 +2536,14 @@ public void shouldAddNonResumedSuspendedTasks() { final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true); final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, false); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); // expect these calls twice (because we're going to tryToCompleteRestoration twice) - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andReturn(singletonList(task00)); expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet()); expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andReturn(singletonList(task01)).anyTimes(); expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, taskId01Assignment); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -2561,13 +2562,13 @@ public void shouldAddNonResumedSuspendedTasks() { public void shouldUpdateInputPartitionsAfterRebalance() { final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); // expect these calls twice (because we're going to tryToCompleteRestoration twice) - expectRestoreToBeCompleted(consumer, changeLogReader, false); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andReturn(singletonList(task00)); expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet()); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -2579,7 +2580,7 @@ public void shouldUpdateInputPartitionsAfterRebalance() { assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); assertThat(task00.state(), is(Task.State.RUNNING)); assertEquals(newPartitionsSet, task00.inputPartitions()); - verify(activeTaskCreator, consumer, changeLogReader); + verify(activeTaskCreator, consumer); } @Test @@ -2587,15 +2588,13 @@ public void shouldAddNewActiveTasks() { final Map> assignment = taskId00Assignment; final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true); - expect(changeLogReader.completedChangelogs()).andReturn(emptySet()); expect(consumer.assignment()).andReturn(emptySet()); consumer.resume(eq(emptySet())); expectLastCall(); - changeLogReader.enforceRestoreActive(); expectLastCall(); expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList()); - replay(consumer, activeTaskCreator, standbyTaskCreator, changeLogReader); + replay(consumer, activeTaskCreator, standbyTaskCreator); taskManager.handleAssignment(assignment, emptyMap()); @@ -2607,6 +2606,7 @@ public void shouldAddNewActiveTasks() { assertThat(taskManager.activeTaskMap(), Matchers.equalTo(singletonMap(taskId00, task00))); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); verify(activeTaskCreator); + Mockito.verify(changeLogReader).enforceRestoreActive(); } @Test @@ -2630,15 +2630,13 @@ public void initializeIfNeeded() { consumer.commitSync(Collections.emptyMap()); expectLastCall(); - expect(changeLogReader.completedChangelogs()).andReturn(emptySet()); expect(consumer.assignment()).andReturn(emptySet()); consumer.resume(eq(emptySet())); expectLastCall(); - changeLogReader.enforceRestoreActive(); expectLastCall(); expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(task00, task01)); expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList()); - replay(consumer, activeTaskCreator, standbyTaskCreator, changeLogReader); + replay(consumer, activeTaskCreator, standbyTaskCreator); taskManager.handleAssignment(assignment, emptyMap()); @@ -2655,6 +2653,7 @@ public void initializeIfNeeded() { ); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); verify(activeTaskCreator); + Mockito.verify(changeLogReader).enforceRestoreActive(); } @Test @@ -2671,15 +2670,13 @@ public void completeRestoration(final java.util.function.Consumer offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task00.setCommittableOffsetsAndMetadata(offsets); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); consumer.commitSync(offsets); expectLastCall(); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -2752,7 +2750,7 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo final Map> assignmentStandby = mkMap( mkEntry(taskId10, taskId10Partitions) ); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))) .andReturn(asList(task00, task01, task02)); @@ -2776,7 +2774,7 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo task10.committedOffsets(); EasyMock.expectLastCall(); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(assignmentActive, assignmentStandby); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -2824,7 +2822,7 @@ public void shouldCommitAllNeededTasksOnHandleRevocation() { final Map> assignmentStandby = mkMap( mkEntry(taskId10, taskId10Partitions) ); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))) .andReturn(asList(task00, task01, task02)); @@ -2835,7 +2833,7 @@ public void shouldCommitAllNeededTasksOnHandleRevocation() { consumer.commitSync(expectedCommittedOffsets); expectLastCall(); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(assignmentActive, assignmentStandby); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -2866,14 +2864,14 @@ public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() { final Map> assignmentActive = singletonMap(taskId00, taskId00Partitions); final Map> assignmentStandby = singletonMap(taskId10, taskId10Partitions); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))).andReturn(singleton(task00)); expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet()); expect(standbyTaskCreator.createTasks(eq(assignmentStandby))).andReturn(singletonList(task10)); expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(assignmentActive, assignmentStandby); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -2898,14 +2896,14 @@ public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() { final Map> assignmentActive = singletonMap(taskId00, taskId00Partitions); final Map> assignmentStandby = singletonMap(taskId10, taskId10Partitions); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))).andReturn(singleton(task00)); expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet()); expect(standbyTaskCreator.createTasks(eq(assignmentStandby))).andReturn(singletonList(task10)); expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(assignmentActive, assignmentStandby); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -2926,7 +2924,7 @@ public void shouldNotCommitCreatedTasksOnRevocationOrClosure() { activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00)); expectLastCall().once(); expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(task00.state(), is(Task.State.CREATED)); @@ -2948,10 +2946,10 @@ public void suspend() { } }; - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); assertThat(task00.state(), is(Task.State.RUNNING)); @@ -3020,9 +3018,6 @@ public void closeDirty() { } }; - resetToStrict(changeLogReader); - changeLogReader.enforceRestoreActive(); - expect(changeLogReader.completedChangelogs()).andReturn(emptySet()); expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))) .andStubReturn(asList(task00, task01, task02, task03)); activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject()); @@ -3030,7 +3025,7 @@ public void closeDirty() { activeTaskCreator.closeThreadProducerIfNeeded(); expectLastCall(); expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList()); - replay(activeTaskCreator, standbyTaskCreator, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator); taskManager.handleAssignment(assignment, emptyMap()); @@ -3057,6 +3052,7 @@ public void closeDirty() { ) ); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); + Mockito.verify(changeLogReader).enforceRestoreActive(); final RuntimeException exception = assertThrows( RuntimeException.class, @@ -3074,7 +3070,7 @@ public void closeDirty() { assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); // the active task creator should also get closed (so that it closes the thread producer if applicable) - verify(activeTaskCreator, changeLogReader); + verify(activeTaskCreator); } @Test @@ -3092,16 +3088,13 @@ public Set changelogPartitions() { final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task00.setCommittableOffsetsAndMetadata(offsets); - resetToStrict(changeLogReader); - changeLogReader.enforceRestoreActive(); - expect(changeLogReader.completedChangelogs()).andReturn(emptySet()); expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(singletonList(task00)); activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00)); expectLastCall().andThrow(new RuntimeException("whatever")); activeTaskCreator.closeThreadProducerIfNeeded(); expectLastCall(); expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList()); - replay(activeTaskCreator, standbyTaskCreator, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator); taskManager.handleAssignment(assignment, emptyMap()); @@ -3119,6 +3112,7 @@ public Set changelogPartitions() { ) ); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); + Mockito.verify(changeLogReader).enforceRestoreActive(); final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.shutdown(true)); @@ -3127,7 +3121,7 @@ public Set changelogPartitions() { assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); // the active task creator should also get closed (so that it closes the thread producer if applicable) - verify(activeTaskCreator, changeLogReader); + verify(activeTaskCreator); } @Test @@ -3143,16 +3137,13 @@ public Set changelogPartitions() { } }; - resetToStrict(changeLogReader); - changeLogReader.enforceRestoreActive(); - expect(changeLogReader.completedChangelogs()).andReturn(emptySet()); expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(singletonList(task00)); activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00)); expectLastCall(); activeTaskCreator.closeThreadProducerIfNeeded(); expectLastCall().andThrow(new RuntimeException("whatever")); expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList()); - replay(activeTaskCreator, standbyTaskCreator, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator); taskManager.handleAssignment(assignment, emptyMap()); @@ -3170,6 +3161,7 @@ public Set changelogPartitions() { ) ); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); + Mockito.verify(changeLogReader).enforceRestoreActive(); final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.shutdown(true)); @@ -3178,7 +3170,7 @@ public Set changelogPartitions() { assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); // the active task creator should also get closed (so that it closes the thread producer if applicable) - verify(activeTaskCreator, changeLogReader); + verify(activeTaskCreator); } @Test @@ -3272,16 +3264,13 @@ public void suspend() { } }; - resetToStrict(changeLogReader); - changeLogReader.enforceRestoreActive(); - expect(changeLogReader.completedChangelogs()).andReturn(emptySet()); expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(task00, task01, task02)); activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject()); expectLastCall().andThrow(new RuntimeException("whatever")).times(3); activeTaskCreator.closeThreadProducerIfNeeded(); expectLastCall().andThrow(new RuntimeException("whatever all")); expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList()); - replay(activeTaskCreator, standbyTaskCreator, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator); taskManager.handleAssignment(assignment, emptyMap()); @@ -3305,6 +3294,7 @@ public void suspend() { ) ); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); + Mockito.verify(changeLogReader).enforceRestoreActive(); taskManager.shutdown(false); @@ -3314,7 +3304,7 @@ public void suspend() { assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); // the active task creator should also get closed (so that it closes the thread producer if applicable) - verify(activeTaskCreator, changeLogReader); + verify(activeTaskCreator); } @Test @@ -3327,7 +3317,6 @@ public void shouldCloseStandbyTasksOnShutdown() { expect(standbyTaskCreator.createTasks(eq(assignment))).andStubReturn(singletonList(task00)); // `tryToCompleteRestoration` - expect(changeLogReader.completedChangelogs()).andReturn(emptySet()); expect(consumer.assignment()).andReturn(emptySet()); consumer.resume(eq(emptySet())); expectLastCall(); @@ -3338,7 +3327,7 @@ public void shouldCloseStandbyTasksOnShutdown() { activeTaskCreator.closeThreadProducerIfNeeded(); expectLastCall(); - replay(consumer, activeTaskCreator, standbyTaskCreator, changeLogReader); + replay(consumer, activeTaskCreator, standbyTaskCreator); taskManager.handleAssignment(emptyMap(), assignment); assertThat(task00.state(), is(Task.State.CREATED)); @@ -3436,11 +3425,11 @@ public void shouldShutDownStateUpdaterAndAddRemovedTasksToTaskRegistry() { @Test public void shouldInitializeNewActiveTasks() { final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))) .andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -3456,11 +3445,11 @@ public void shouldInitializeNewActiveTasks() { public void shouldInitializeNewStandbyTasks() { final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), anyObject())).andStubReturn(Collections.emptySet()); expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01)); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(emptyMap(), taskId01Assignment); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -3492,7 +3481,7 @@ public void shouldCommitActiveAndStandbyTasks() { task00.setCommittableOffsetsAndMetadata(offsets); final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))) .andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))) @@ -3500,7 +3489,7 @@ public void shouldCommitActiveAndStandbyTasks() { consumer.commitSync(offsets); expectLastCall(); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, taskId01Assignment); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -3536,7 +3525,7 @@ public void shouldCommitProvidedTasksIfNeeded() { mkEntry(taskId05, taskId05Partitions) ); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))) .andStubReturn(Arrays.asList(task00, task01, task02)); expect(standbyTaskCreator.createTasks(eq(assignmentStandby))) @@ -3544,7 +3533,7 @@ public void shouldCommitProvidedTasksIfNeeded() { consumer.commitSync(eq(emptyMap())); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(assignmentActive, assignmentStandby); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -3570,12 +3559,12 @@ public void shouldCommitProvidedTasksIfNeeded() { public void shouldNotCommitOffsetsIfOnlyStandbyTasksAssigned() { final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, false); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), anyObject())).andStubReturn(Collections.emptySet()); expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singletonList(task00)); expectLastCall(); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -3595,13 +3584,13 @@ public void shouldNotCommitActiveAndStandbyTasksWhileRebalanceInProgress() throw makeTaskFolders(taskId00.toString(), task01.toString()); expectLockObtainedFor(taskId00, taskId01); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))) .andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))) .andStubReturn(singletonList(task01)); - replay(activeTaskCreator, standbyTaskCreator, stateDirectory, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, stateDirectory, consumer); taskManager.handleAssignment(taskId00Assignment, taskId01Assignment); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -3710,10 +3699,10 @@ public Map prepareCommit() { } }; - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -3736,11 +3725,11 @@ public Map prepareCommit() { } }; - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), anyObject())).andStubReturn(Collections.emptySet()); expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01)); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(emptyMap(), taskId01Assignment); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -3771,11 +3760,11 @@ public Map purgeableOffsets() { } }; - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -3807,10 +3796,10 @@ public Map purgeableOffsets() { } }; - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -3833,14 +3822,14 @@ public Map purgeableOffsets() { public void shouldIgnorePurgeDataErrors() { final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); final KafkaFutureImpl futureDeletedRecords = new KafkaFutureImpl<>(); final DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(singletonMap(t1p1, futureDeletedRecords)); futureDeletedRecords.completeExceptionally(new Exception("KABOOM!")); expect(adminClient.deleteRecords(anyObject())).andReturn(deleteRecordsResult).times(2); - replay(activeTaskCreator, adminClient, consumer, changeLogReader); + replay(activeTaskCreator, adminClient, consumer); taskManager.addTask(task00); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -3884,7 +3873,7 @@ public void shouldMaybeCommitAllActiveTasksThatNeedCommit() { mkEntry(taskId10, taskId10Partitions) ); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))) .andStubReturn(asList(task00, task01, task02, task03)); expect(standbyTaskCreator.createTasks(eq(assignmentStandby))) @@ -3892,7 +3881,7 @@ public void shouldMaybeCommitAllActiveTasksThatNeedCommit() { consumer.commitSync(expectedCommittedOffsets); expectLastCall(); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(assignmentActive, assignmentStandby); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -3928,10 +3917,10 @@ public void shouldProcessActiveTasks() { assignment.put(taskId00, taskId00Partitions); assignment.put(taskId01, taskId01Partitions); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(Arrays.asList(task00, task01)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -4041,10 +4030,10 @@ public boolean process(final long wallClockTime) { } }; - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -4066,11 +4055,11 @@ public boolean process(final long wallClockTime) { } }; - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))) .andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -4095,10 +4084,10 @@ public boolean maybePunctuateStreamTime() { } }; - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -4117,10 +4106,10 @@ public boolean maybePunctuateStreamTime() { } }; - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -4144,12 +4133,12 @@ public boolean maybePunctuateSystemTime() { } }; - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))) .andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(anyObject())) .andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -4169,10 +4158,9 @@ public Set changelogPartitions() { } }; - expect(changeLogReader.completedChangelogs()).andReturn(emptySet()); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, changeLogReader, consumer); + replay(activeTaskCreator, standbyTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false)); @@ -4188,13 +4176,13 @@ public void shouldHaveRemainingPartitionsUncleared() { final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task00.setCommittableOffsetsAndMetadata(offsets); - expectRestoreToBeCompleted(consumer, changeLogReader); + expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); consumer.commitSync(offsets); expectLastCall(); - replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + replay(activeTaskCreator, standbyTaskCreator, consumer); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(TaskManager.class)) { LogCaptureAppender.setClassLoggerToDebug(TaskManager.class); @@ -4351,8 +4339,8 @@ private Map handleAssignment(final Map consumer, - final ChangelogReader changeLogReader) { - expectRestoreToBeCompleted(consumer, changeLogReader, true); - } - - private static void expectRestoreToBeCompleted(final Consumer consumer, - final ChangelogReader changeLogReader, - final boolean changeLogUpdateRequired) { + private static void expectRestoreToBeCompleted(final Consumer consumer) { final Set assignment = singleton(new TopicPartition("assignment", 0)); expect(consumer.assignment()).andReturn(assignment); consumer.resume(assignment); expectLastCall(); - expect(changeLogReader.completedChangelogs()).andReturn(emptySet()).times(changeLogUpdateRequired ? 1 : 0, 1); } private static KafkaFutureImpl completedFuture() { From 1a9f6e770738a1143445385d9040e91daabf9de1 Mon Sep 17 00:00:00 2001 From: Christo Lolov Date: Tue, 2 May 2023 12:00:10 +0100 Subject: [PATCH 2/2] Address Bruno's comments --- .../kafka/streams/processor/internals/TaskManagerTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 0918c3b2ad8c..48d046e1b903 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -2591,7 +2591,6 @@ public void shouldAddNewActiveTasks() { expect(consumer.assignment()).andReturn(emptySet()); consumer.resume(eq(emptySet())); expectLastCall(); - expectLastCall(); expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(singletonList(task00)); expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList()); replay(consumer, activeTaskCreator, standbyTaskCreator); @@ -2633,7 +2632,6 @@ public void initializeIfNeeded() { expect(consumer.assignment()).andReturn(emptySet()); consumer.resume(eq(emptySet())); expectLastCall(); - expectLastCall(); expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(task00, task01)); expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList()); replay(consumer, activeTaskCreator, standbyTaskCreator); @@ -3053,6 +3051,7 @@ public void closeDirty() { ); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); Mockito.verify(changeLogReader).enforceRestoreActive(); + Mockito.verify(changeLogReader).completedChangelogs(); final RuntimeException exception = assertThrows( RuntimeException.class, @@ -3113,6 +3112,7 @@ public Set changelogPartitions() { ); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); Mockito.verify(changeLogReader).enforceRestoreActive(); + Mockito.verify(changeLogReader).completedChangelogs(); final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.shutdown(true)); @@ -3162,6 +3162,7 @@ public Set changelogPartitions() { ); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); Mockito.verify(changeLogReader).enforceRestoreActive(); + Mockito.verify(changeLogReader).completedChangelogs(); final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.shutdown(true)); @@ -3295,6 +3296,7 @@ public void suspend() { ); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); Mockito.verify(changeLogReader).enforceRestoreActive(); + Mockito.verify(changeLogReader).completedChangelogs(); taskManager.shutdown(false);