From fdb6da1379e2337757503f4dac5acca6cf0c71b0 Mon Sep 17 00:00:00 2001 From: Christo Lolov Date: Thu, 20 Jul 2023 17:16:18 +0100 Subject: [PATCH] KAFKA-14133: Migrate various mocks in TaskManagerTest to Mockito (#13874) Reviewers: Divij Vaidya --- .../processor/internals/TaskManagerTest.java | 97 ++++++++----------- 1 file changed, 41 insertions(+), 56 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 22da72feecdc..3c626a8adba2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -2014,10 +2014,7 @@ public void shouldReAddRevivedTasksToStateUpdater() { @Test public void shouldReviveCorruptTasks() { - final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class); - stateManager.markChangelogAsCorrupted(taskId00Partitions); - EasyMock.expectLastCall().once(); - replay(stateManager); + final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class); final AtomicBoolean enforcedCheckpoint = new AtomicBoolean(false); final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { @@ -2050,15 +2047,13 @@ public void postCommit(final boolean enforceCheckpoint) { assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00))); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); - verify(stateManager); verify(consumer); + Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions); } @Test public void shouldReviveCorruptTasksEvenIfTheyCannotCloseClean() { - final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class); - stateManager.markChangelogAsCorrupted(taskId00Partitions); - replay(stateManager); + final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class); final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { @Override @@ -2085,15 +2080,13 @@ public void suspend() { assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00))); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); - verify(stateManager); verify(consumer); + Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions); } @Test public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { - final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class); - stateManager.markChangelogAsCorrupted(taskId00Partitions); - replay(stateManager); + final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class); final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); @@ -2125,13 +2118,12 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions)); verify(consumer); + Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions); } @Test public void shouldNotCommitNonRunningNonCorruptedTasks() { - final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class); - stateManager.markChangelogAsCorrupted(taskId00Partitions); - replay(stateManager); + final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class); final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); final StateMachineTask nonRunningNonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); @@ -2158,13 +2150,12 @@ public void shouldNotCommitNonRunningNonCorruptedTasks() { assertFalse(nonRunningNonCorruptedTask.commitPrepared); verify(consumer); + Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions); } @Test public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() { - final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class); - stateManager.markChangelogAsCorrupted(taskId00Partitions); - replay(stateManager); + final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class); final StateMachineTask corruptedStandby = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager); final StateMachineTask runningNonCorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { @@ -2199,11 +2190,12 @@ public Map prepareCommit() { assertThat(corruptedStandby.commitPrepared, is(true)); assertThat(corruptedStandby.state(), is(Task.State.CREATED)); verify(consumer); + Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions); } @Test public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() { - final ProcessorStateManager stateManager = EasyMock.createNiceMock(ProcessorStateManager.class); + final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class); expect(stateDirectory.listNonEmptyTaskDirectories()).andStubReturn(new ArrayList<>()); final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); @@ -2224,7 +2216,7 @@ public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() { expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions)); - replay(consumer, stateDirectory, stateManager); + replay(consumer, stateDirectory); uncorruptedActive.setCommittableOffsetsAndMetadata(offsets); @@ -2250,10 +2242,8 @@ public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() { } @Test - public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALSO() { - final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class); - stateManager.markChangelogAsCorrupted(taskId00Partitions); - replay(stateManager); + public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS() { + final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class); final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); final StateMachineTask uncorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { @@ -2310,6 +2300,7 @@ public void markChangelogAsCorrupted(final Collection partitions assertThat(corruptedActive.state(), is(Task.State.CREATED)); assertThat(uncorruptedActive.state(), is(Task.State.CREATED)); verify(consumer); + Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions); } @Test @@ -2317,7 +2308,7 @@ public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCo final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false); final StreamsProducer producer = mock(StreamsProducer.class); when(activeTaskCreator.threadProducer()).thenReturn(producer); - final ProcessorStateManager stateManager = EasyMock.createMock(ProcessorStateManager.class); + final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class); final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false); final StateMachineTask corruptedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { @@ -2327,7 +2318,6 @@ public void markChangelogAsCorrupted(final Collection partitions corruptedTaskChangelogMarkedAsCorrupted.set(true); } }; - stateManager.markChangelogAsCorrupted(taskId00ChangelogPartitions); final AtomicBoolean uncorruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false); final StateMachineTask uncorruptedActiveTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { @@ -2339,7 +2329,6 @@ public void markChangelogAsCorrupted(final Collection partitions }; final Map offsets = singletonMap(t1p1, new OffsetAndMetadata(0L, null)); uncorruptedActiveTask.setCommittableOffsetsAndMetadata(offsets); - stateManager.markChangelogAsCorrupted(taskId01ChangelogPartitions); // handleAssignment final Map> assignment = new HashMap<>(); @@ -2357,7 +2346,7 @@ public void markChangelogAsCorrupted(final Collection partitions expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions)); - replay(consumer, stateManager); + replay(consumer); taskManager.handleAssignment(assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -2394,6 +2383,8 @@ public void markChangelogAsCorrupted(final Collection partitions assertThat(corruptedTaskChangelogMarkedAsCorrupted.get(), is(true)); assertThat(uncorruptedTaskChangelogMarkedAsCorrupted.get(), is(true)); verify(consumer); + Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions); + Mockito.verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions); } @Test @@ -2454,7 +2445,7 @@ public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCo final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false); final StreamsProducer producer = mock(StreamsProducer.class); when(activeTaskCreator.threadProducer()).thenReturn(producer); - final ProcessorStateManager stateManager = EasyMock.createMock(ProcessorStateManager.class); + final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class); final StateMachineTask revokedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); final Map revokedActiveTaskOffsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); @@ -2479,9 +2470,6 @@ public void markChangelogAsCorrupted(final Collection partitions expectedCommittedOffsets.putAll(revokedActiveTaskOffsets); expectedCommittedOffsets.putAll(unrevokedTaskOffsets); - stateManager.markChangelogAsCorrupted(taskId00ChangelogPartitions); - stateManager.markChangelogAsCorrupted(taskId01ChangelogPartitions); - final Map> assignmentActive = mkMap( mkEntry(taskId00, taskId00Partitions), mkEntry(taskId01, taskId01Partitions), @@ -2500,7 +2488,7 @@ public void markChangelogAsCorrupted(final Collection partitions expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions)); - replay(consumer, stateManager); + replay(consumer); taskManager.handleAssignment(assignmentActive, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); @@ -2519,6 +2507,8 @@ public void markChangelogAsCorrupted(final Collection partitions assertThat(revokedActiveTask.state(), is(State.SUSPENDED)); assertThat(unrevokedActiveTask.state(), is(State.CREATED)); assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING)); + Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions); + Mockito.verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions); } @Test @@ -3594,24 +3584,23 @@ public void shouldCommitViaConsumerIfEosDisabled() { @Test public void shouldCommitViaProducerIfEosAlphaEnabled() { - final StreamsProducer producer = EasyMock.mock(StreamsProducer.class); + final StreamsProducer producer = Mockito.mock(StreamsProducer.class); when(activeTaskCreator.streamsProducerForTask(any(TaskId.class))) .thenReturn(producer); final Map offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(0L, null)); final Map offsetsT02 = singletonMap(t1p2, new OffsetAndMetadata(1L, null)); - producer.commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId")); - expectLastCall(); - producer.commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId")); - expectLastCall(); + shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_ALPHA, offsetsT01, offsetsT02); - shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_ALPHA, producer, offsetsT01, offsetsT02); + Mockito.verify(producer).commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId")); + Mockito.verify(producer).commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId")); + Mockito.verifyNoMoreInteractions(producer); } @Test public void shouldCommitViaProducerIfEosV2Enabled() { - final StreamsProducer producer = EasyMock.mock(StreamsProducer.class); + final StreamsProducer producer = Mockito.mock(StreamsProducer.class); when(activeTaskCreator.threadProducer()).thenReturn(producer); final Map offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(0L, null)); @@ -3620,14 +3609,13 @@ public void shouldCommitViaProducerIfEosV2Enabled() { allOffsets.putAll(offsetsT01); allOffsets.putAll(offsetsT02); - producer.commitTransaction(allOffsets, new ConsumerGroupMetadata("appId")); - expectLastCall(); + shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_V2, offsetsT01, offsetsT02); - shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_V2, producer, offsetsT01, offsetsT02); + Mockito.verify(producer).commitTransaction(allOffsets, new ConsumerGroupMetadata("appId")); + Mockito.verifyNoMoreInteractions(producer); } private void shouldCommitViaProducerIfEosEnabled(final ProcessingMode processingMode, - final StreamsProducer producer, final Map offsetsT01, final Map offsetsT02) { final TaskManager taskManager = setUpTaskManager(processingMode, false); @@ -3643,11 +3631,11 @@ private void shouldCommitViaProducerIfEosEnabled(final ProcessingMode processing reset(consumer); expect(consumer.groupMetadata()).andStubReturn(new ConsumerGroupMetadata("appId")); - replay(consumer, producer); + replay(consumer); taskManager.commitAll(); - verify(producer, consumer); + verify(consumer); } @Test @@ -4533,21 +4521,18 @@ public void suspend() { @Test public void shouldConvertActiveTaskToStandbyTask() { - final StreamTask activeTask = EasyMock.mock(StreamTask.class); - expect(activeTask.id()).andStubReturn(taskId00); - expect(activeTask.inputPartitions()).andStubReturn(taskId00Partitions); - expect(activeTask.isActive()).andStubReturn(true); - expect(activeTask.prepareCommit()).andStubReturn(Collections.emptyMap()); + final StreamTask activeTask = Mockito.mock(StreamTask.class); + when(activeTask.id()).thenReturn(taskId00); + when(activeTask.inputPartitions()).thenReturn(taskId00Partitions); + when(activeTask.isActive()).thenReturn(true); - final StandbyTask standbyTask = EasyMock.mock(StandbyTask.class); - expect(standbyTask.id()).andStubReturn(taskId00); + final StandbyTask standbyTask = Mockito.mock(StandbyTask.class); + when(standbyTask.id()).thenReturn(taskId00); when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(activeTask)); - activeTask.prepareRecycle(); - expectLastCall().once(); when(standbyTaskCreator.createStandbyTaskFromActive(Mockito.any(), Mockito.eq(taskId00Partitions))).thenReturn(standbyTask); - replay(activeTask, standbyTask, consumer); + replay(consumer); taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap()); taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment);