Skip to content

Commit

Permalink
KAFKA-14133: Migrate various mocks in TaskManagerTest to Mockito (apa…
Browse files Browse the repository at this point in the history
…che#13874)

Reviewers: Divij Vaidya <diviv@amazon.com>
  • Loading branch information
clolov authored and jeqo committed Aug 15, 2023
1 parent e907db9 commit fdb6da1
Showing 1 changed file with 41 additions and 56 deletions.
Expand Up @@ -2014,10 +2014,7 @@ public void shouldReAddRevivedTasksToStateUpdater() {

@Test
public void shouldReviveCorruptTasks() {
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
stateManager.markChangelogAsCorrupted(taskId00Partitions);
EasyMock.expectLastCall().once();
replay(stateManager);
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);

final AtomicBoolean enforcedCheckpoint = new AtomicBoolean(false);
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
Expand Down Expand Up @@ -2050,15 +2047,13 @@ public void postCommit(final boolean enforceCheckpoint) {
assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());

verify(stateManager);
verify(consumer);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}

@Test
public void shouldReviveCorruptTasksEvenIfTheyCannotCloseClean() {
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
stateManager.markChangelogAsCorrupted(taskId00Partitions);
replay(stateManager);
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);

final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
Expand All @@ -2085,15 +2080,13 @@ public void suspend() {
assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());

verify(stateManager);
verify(consumer);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}

@Test
public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
stateManager.markChangelogAsCorrupted(taskId00Partitions);
replay(stateManager);
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);

final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
Expand Down Expand Up @@ -2125,13 +2118,12 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions));

verify(consumer);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}

@Test
public void shouldNotCommitNonRunningNonCorruptedTasks() {
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
stateManager.markChangelogAsCorrupted(taskId00Partitions);
replay(stateManager);
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);

final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask nonRunningNonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
Expand All @@ -2158,13 +2150,12 @@ public void shouldNotCommitNonRunningNonCorruptedTasks() {

assertFalse(nonRunningNonCorruptedTask.commitPrepared);
verify(consumer);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}

@Test
public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
stateManager.markChangelogAsCorrupted(taskId00Partitions);
replay(stateManager);
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);

final StateMachineTask corruptedStandby = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager);
final StateMachineTask runningNonCorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
Expand Down Expand Up @@ -2199,11 +2190,12 @@ public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
assertThat(corruptedStandby.commitPrepared, is(true));
assertThat(corruptedStandby.state(), is(Task.State.CREATED));
verify(consumer);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}

@Test
public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {
final ProcessorStateManager stateManager = EasyMock.createNiceMock(ProcessorStateManager.class);
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
expect(stateDirectory.listNonEmptyTaskDirectories()).andStubReturn(new ArrayList<>());

final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
Expand All @@ -2224,7 +2216,7 @@ public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {

expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));

replay(consumer, stateDirectory, stateManager);
replay(consumer, stateDirectory);

uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);

Expand All @@ -2250,10 +2242,8 @@ public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {
}

@Test
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALSO() {
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
stateManager.markChangelogAsCorrupted(taskId00Partitions);
replay(stateManager);
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS() {
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);

final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask uncorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
Expand Down Expand Up @@ -2310,14 +2300,15 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
assertThat(corruptedActive.state(), is(Task.State.CREATED));
assertThat(uncorruptedActive.state(), is(Task.State.CREATED));
verify(consumer);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}

@Test
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
final ProcessorStateManager stateManager = EasyMock.createMock(ProcessorStateManager.class);
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);

final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
final StateMachineTask corruptedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
Expand All @@ -2327,7 +2318,6 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
corruptedTaskChangelogMarkedAsCorrupted.set(true);
}
};
stateManager.markChangelogAsCorrupted(taskId00ChangelogPartitions);

final AtomicBoolean uncorruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
final StateMachineTask uncorruptedActiveTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
Expand All @@ -2339,7 +2329,6 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
};
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
uncorruptedActiveTask.setCommittableOffsetsAndMetadata(offsets);
stateManager.markChangelogAsCorrupted(taskId01ChangelogPartitions);

// handleAssignment
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
Expand All @@ -2357,7 +2346,7 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions

expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));

replay(consumer, stateManager);
replay(consumer);

taskManager.handleAssignment(assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
Expand Down Expand Up @@ -2394,6 +2383,8 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
assertThat(corruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
assertThat(uncorruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
verify(consumer);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
}

@Test
Expand Down Expand Up @@ -2454,7 +2445,7 @@ public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCo
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
final ProcessorStateManager stateManager = EasyMock.createMock(ProcessorStateManager.class);
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);

final StateMachineTask revokedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> revokedActiveTaskOffsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
Expand All @@ -2479,9 +2470,6 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
expectedCommittedOffsets.putAll(revokedActiveTaskOffsets);
expectedCommittedOffsets.putAll(unrevokedTaskOffsets);

stateManager.markChangelogAsCorrupted(taskId00ChangelogPartitions);
stateManager.markChangelogAsCorrupted(taskId01ChangelogPartitions);

final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
mkEntry(taskId00, taskId00Partitions),
mkEntry(taskId01, taskId01Partitions),
Expand All @@ -2500,7 +2488,7 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions

expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));

replay(consumer, stateManager);
replay(consumer);

taskManager.handleAssignment(assignmentActive, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
Expand All @@ -2519,6 +2507,8 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
assertThat(unrevokedActiveTask.state(), is(State.CREATED));
assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING));
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
}

@Test
Expand Down Expand Up @@ -3594,24 +3584,23 @@ public void shouldCommitViaConsumerIfEosDisabled() {

@Test
public void shouldCommitViaProducerIfEosAlphaEnabled() {
final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
final StreamsProducer producer = Mockito.mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
.thenReturn(producer);

final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
final Map<TopicPartition, OffsetAndMetadata> offsetsT02 = singletonMap(t1p2, new OffsetAndMetadata(1L, null));

producer.commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId"));
expectLastCall();
producer.commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId"));
expectLastCall();
shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_ALPHA, offsetsT01, offsetsT02);

shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_ALPHA, producer, offsetsT01, offsetsT02);
Mockito.verify(producer).commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId"));
Mockito.verify(producer).commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId"));
Mockito.verifyNoMoreInteractions(producer);
}

@Test
public void shouldCommitViaProducerIfEosV2Enabled() {
final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
final StreamsProducer producer = Mockito.mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);

final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
Expand All @@ -3620,14 +3609,13 @@ public void shouldCommitViaProducerIfEosV2Enabled() {
allOffsets.putAll(offsetsT01);
allOffsets.putAll(offsetsT02);

producer.commitTransaction(allOffsets, new ConsumerGroupMetadata("appId"));
expectLastCall();
shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_V2, offsetsT01, offsetsT02);

shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_V2, producer, offsetsT01, offsetsT02);
Mockito.verify(producer).commitTransaction(allOffsets, new ConsumerGroupMetadata("appId"));
Mockito.verifyNoMoreInteractions(producer);
}

private void shouldCommitViaProducerIfEosEnabled(final ProcessingMode processingMode,
final StreamsProducer producer,
final Map<TopicPartition, OffsetAndMetadata> offsetsT01,
final Map<TopicPartition, OffsetAndMetadata> offsetsT02) {
final TaskManager taskManager = setUpTaskManager(processingMode, false);
Expand All @@ -3643,11 +3631,11 @@ private void shouldCommitViaProducerIfEosEnabled(final ProcessingMode processing

reset(consumer);
expect(consumer.groupMetadata()).andStubReturn(new ConsumerGroupMetadata("appId"));
replay(consumer, producer);
replay(consumer);

taskManager.commitAll();

verify(producer, consumer);
verify(consumer);
}

@Test
Expand Down Expand Up @@ -4533,21 +4521,18 @@ public void suspend() {

@Test
public void shouldConvertActiveTaskToStandbyTask() {
final StreamTask activeTask = EasyMock.mock(StreamTask.class);
expect(activeTask.id()).andStubReturn(taskId00);
expect(activeTask.inputPartitions()).andStubReturn(taskId00Partitions);
expect(activeTask.isActive()).andStubReturn(true);
expect(activeTask.prepareCommit()).andStubReturn(Collections.emptyMap());
final StreamTask activeTask = Mockito.mock(StreamTask.class);
when(activeTask.id()).thenReturn(taskId00);
when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
when(activeTask.isActive()).thenReturn(true);

final StandbyTask standbyTask = EasyMock.mock(StandbyTask.class);
expect(standbyTask.id()).andStubReturn(taskId00);
final StandbyTask standbyTask = Mockito.mock(StandbyTask.class);
when(standbyTask.id()).thenReturn(taskId00);

when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(activeTask));
activeTask.prepareRecycle();
expectLastCall().once();
when(standbyTaskCreator.createStandbyTaskFromActive(Mockito.any(), Mockito.eq(taskId00Partitions))).thenReturn(standbyTask);

replay(activeTask, standbyTask, consumer);
replay(consumer);

taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap());
taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment);
Expand Down

0 comments on commit fdb6da1

Please sign in to comment.