Skip to content

Commit

Permalink
KAFKA-14133: Migrate ActiveTaskCreatorTest, ChangelogTopicsTest and G…
Browse files Browse the repository at this point in the history
…lobalProcessorContextImplTest to Mockito (#14209)

Reviewers: Divij Vaidya <diviv@amazon.com>
  • Loading branch information
clolov committed Aug 16, 2023
1 parent cfe49d1 commit d0e9e94
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockClientSupplier;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.io.File;
import java.util.Collections;
Expand All @@ -50,26 +49,24 @@
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertThrows;
import static java.util.Collections.emptySet;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@RunWith(EasyMockRunner.class)
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class ActiveTaskCreatorTest {

@Mock(type = MockType.NICE)
@Mock
private InternalTopologyBuilder builder;
@Mock(type = MockType.NICE)
@Mock
private StateDirectory stateDirectory;
@Mock(type = MockType.NICE)
@Mock
private ChangelogReader changeLogReader;

private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
Expand Down Expand Up @@ -476,21 +473,16 @@ private void createTasks() {
final ProcessorTopology topology = mock(ProcessorTopology.class);
final SourceNode sourceNode = mock(SourceNode.class);

reset(builder, stateDirectory);
expect(builder.topologyConfigs()).andStubReturn(new TopologyConfig(new StreamsConfig(properties)));
expect(builder.buildSubtopology(0)).andReturn(topology).anyTimes();
expect(topology.sinkTopics()).andStubReturn(emptySet());
expect(stateDirectory.getOrCreateDirectoryForTask(task00)).andReturn(mock(File.class));
expect(stateDirectory.checkpointFileFor(task00)).andReturn(mock(File.class));
expect(stateDirectory.getOrCreateDirectoryForTask(task01)).andReturn(mock(File.class));
expect(stateDirectory.checkpointFileFor(task01)).andReturn(mock(File.class));
expect(topology.storeToChangelogTopic()).andReturn(Collections.emptyMap()).anyTimes();
expect(topology.source("topic")).andReturn(sourceNode).anyTimes();
expect(sourceNode.getTimestampExtractor()).andReturn(mock(TimestampExtractor.class)).anyTimes();
expect(topology.globalStateStores()).andReturn(Collections.emptyList()).anyTimes();
expect(topology.terminalNodes()).andStubReturn(Collections.singleton(sourceNode.name()));
expect(topology.sources()).andStubReturn(Collections.singleton(sourceNode));
replay(builder, stateDirectory, topology, sourceNode);
when(builder.topologyConfigs()).thenReturn(new TopologyConfig(new StreamsConfig(properties)));
when(builder.buildSubtopology(0)).thenReturn(topology);
when(topology.sinkTopics()).thenReturn(emptySet());
when(stateDirectory.getOrCreateDirectoryForTask(task00)).thenReturn(mock(File.class));
when(stateDirectory.checkpointFileFor(task00)).thenReturn(mock(File.class));
when(stateDirectory.getOrCreateDirectoryForTask(task01)).thenReturn(mock(File.class));
when(stateDirectory.checkpointFileFor(task01)).thenReturn(mock(File.class));
when(topology.source("topic")).thenReturn(sourceNode);
when(sourceNode.getTimestampExtractor()).thenReturn(mock(TimestampExtractor.class));
when(topology.sources()).thenReturn(Collections.singleton(sourceNode));

final StreamsConfig config = new StreamsConfig(properties);
activeTaskCreator = new ActiveTaskCreator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Collections;
import java.util.Map;
Expand All @@ -32,13 +34,12 @@
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;

import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class ChangelogTopicsTest {

private static final String SOURCE_TOPIC_NAME = "source";
Expand Down Expand Up @@ -83,16 +84,14 @@ public class ChangelogTopicsTest {

@Test
public void shouldNotContainChangelogsForStatelessTasks() {
expect(internalTopicManager.makeReady(Collections.emptyMap())).andStubReturn(Collections.emptySet());
when(internalTopicManager.makeReady(Collections.emptyMap())).thenReturn(Collections.emptySet());
final Map<Subtopology, TopicsInfo> topicGroups = mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO2));
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = mkMap(mkEntry(SUBTOPOLOGY_0, mkSet(TASK_0_0, TASK_0_1, TASK_0_2)));
replay(internalTopicManager);

final ChangelogTopics changelogTopics =
new ChangelogTopics(internalTopicManager, topicGroups, tasksForTopicGroup, "[test] ");
changelogTopics.setup();

verify(internalTopicManager);
assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_0), is(Collections.emptySet()));
assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_1), is(Collections.emptySet()));
assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_2), is(Collections.emptySet()));
Expand All @@ -102,18 +101,16 @@ public void shouldNotContainChangelogsForStatelessTasks() {

@Test
public void shouldNotContainAnyPreExistingChangelogsIfChangelogIsNewlyCreated() {
expect(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG))))
.andStubReturn(mkSet(CHANGELOG_TOPIC_NAME1));
when(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG))))
.thenReturn(mkSet(CHANGELOG_TOPIC_NAME1));
final Map<Subtopology, TopicsInfo> topicGroups = mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1));
final Set<TaskId> tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = mkMap(mkEntry(SUBTOPOLOGY_0, tasks));
replay(internalTopicManager);

final ChangelogTopics changelogTopics =
new ChangelogTopics(internalTopicManager, topicGroups, tasksForTopicGroup, "[test] ");
changelogTopics.setup();

verify(internalTopicManager);
assertThat(CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE), is(3));
assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_0), is(Collections.emptySet()));
assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_1), is(Collections.emptySet()));
Expand All @@ -124,18 +121,16 @@ public void shouldNotContainAnyPreExistingChangelogsIfChangelogIsNewlyCreated()

@Test
public void shouldOnlyContainPreExistingNonSourceBasedChangelogs() {
expect(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG))))
.andStubReturn(Collections.emptySet());
when(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG))))
.thenReturn(Collections.emptySet());
final Map<Subtopology, TopicsInfo> topicGroups = mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1));
final Set<TaskId> tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = mkMap(mkEntry(SUBTOPOLOGY_0, tasks));
replay(internalTopicManager);

final ChangelogTopics changelogTopics =
new ChangelogTopics(internalTopicManager, topicGroups, tasksForTopicGroup, "[test] ");
changelogTopics.setup();

verify(internalTopicManager);
assertThat(CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE), is(3));
final TopicPartition changelogPartition0 = new TopicPartition(CHANGELOG_TOPIC_NAME1, 0);
final TopicPartition changelogPartition1 = new TopicPartition(CHANGELOG_TOPIC_NAME1, 1);
Expand All @@ -152,17 +147,15 @@ public void shouldOnlyContainPreExistingNonSourceBasedChangelogs() {

@Test
public void shouldOnlyContainPreExistingSourceBasedChangelogs() {
expect(internalTopicManager.makeReady(Collections.emptyMap())).andStubReturn(Collections.emptySet());
when(internalTopicManager.makeReady(Collections.emptyMap())).thenReturn(Collections.emptySet());
final Map<Subtopology, TopicsInfo> topicGroups = mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO3));
final Set<TaskId> tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = mkMap(mkEntry(SUBTOPOLOGY_0, tasks));
replay(internalTopicManager);

final ChangelogTopics changelogTopics =
new ChangelogTopics(internalTopicManager, topicGroups, tasksForTopicGroup, "[test] ");
changelogTopics.setup();

verify(internalTopicManager);
final TopicPartition changelogPartition0 = new TopicPartition(SOURCE_TOPIC_NAME, 0);
final TopicPartition changelogPartition1 = new TopicPartition(SOURCE_TOPIC_NAME, 1);
final TopicPartition changelogPartition2 = new TopicPartition(SOURCE_TOPIC_NAME, 2);
Expand All @@ -178,18 +171,16 @@ public void shouldOnlyContainPreExistingSourceBasedChangelogs() {

@Test
public void shouldContainBothTypesOfPreExistingChangelogs() {
expect(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG))))
.andStubReturn(Collections.emptySet());
when(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG))))
.thenReturn(Collections.emptySet());
final Map<Subtopology, TopicsInfo> topicGroups = mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO4));
final Set<TaskId> tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = mkMap(mkEntry(SUBTOPOLOGY_0, tasks));
replay(internalTopicManager);

final ChangelogTopics changelogTopics =
new ChangelogTopics(internalTopicManager, topicGroups, tasksForTopicGroup, "[test] ");
changelogTopics.setup();

verify(internalTopicManager);
assertThat(CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE), is(3));
final TopicPartition changelogPartition0 = new TopicPartition(CHANGELOG_TOPIC_NAME1, 0);
final TopicPartition changelogPartition1 = new TopicPartition(CHANGELOG_TOPIC_NAME1, 1);
Expand Down

0 comments on commit d0e9e94

Please sign in to comment.