Skip to content

Commit

Permalink
KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams t…
Browse files Browse the repository at this point in the history
…ests (apache#12821)

Batch 1 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14132 which use PowerMock/EasyMock and need to be moved to Mockito.

Reviewer: Bruno Cadonna <cadonna@apache.org>
  • Loading branch information
clolov authored and guozhangwang committed Jan 25, 2023
1 parent 52978a5 commit 24db020
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 273 deletions.
Expand Up @@ -31,8 +31,8 @@
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -47,22 +47,18 @@
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1;

import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.niceMock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@RunWith(PowerMockRunner.class)
@PrepareForTest({Cluster.class})
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class RepartitionTopicsTest {

private static final String SOURCE_TOPIC_NAME1 = "source1";
Expand Down Expand Up @@ -100,35 +96,36 @@ public class RepartitionTopicsTest {
);
final StreamsConfig config = new DummyStreamsConfig();

final InternalTopologyBuilder internalTopologyBuilder = mock(InternalTopologyBuilder.class);
final InternalTopicManager internalTopicManager = mock(InternalTopicManager.class);
final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer = mock(CopartitionedTopicsEnforcer.class);
final Cluster clusterMetadata = niceMock(Cluster.class);
@Mock
InternalTopologyBuilder internalTopologyBuilder;
@Mock
InternalTopicManager internalTopicManager;
@Mock
CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
@Mock
Cluster clusterMetadata;

@Before
public void setup() {
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
expect(internalTopologyBuilder.topologyName()).andStubReturn(null);
when(internalTopologyBuilder.hasNamedTopology()).thenReturn(false);
when(internalTopologyBuilder.topologyName()).thenReturn(null);
}

@Test
public void shouldSetupRepartitionTopics() {
expect(internalTopologyBuilder.subtopologyToTopicsInfo())
.andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
when(internalTopologyBuilder.subtopologyToTopicsInfo())
.thenReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
final Set<String> coPartitionGroup1 = mkSet(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2);
final Set<String> coPartitionGroup2 = mkSet(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_NAME2);
final List<Set<String>> coPartitionGroups = Arrays.asList(coPartitionGroup1, coPartitionGroup2);
expect(internalTopologyBuilder.copartitionGroups()).andReturn(coPartitionGroups);
copartitionedTopicsEnforcer.enforce(eq(coPartitionGroup1), anyObject(), eq(clusterMetadata));
copartitionedTopicsEnforcer.enforce(eq(coPartitionGroup2), anyObject(), eq(clusterMetadata));
expect(internalTopicManager.makeReady(
when(internalTopologyBuilder.copartitionGroups()).thenReturn(coPartitionGroups);
when(internalTopicManager.makeReady(
mkMap(
mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1),
mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2)
))
).andReturn(Collections.emptySet());
).thenReturn(Collections.emptySet());
setupCluster();
replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
new TopologyMetadata(internalTopologyBuilder, config),
internalTopicManager,
Expand All @@ -139,7 +136,6 @@ public void shouldSetupRepartitionTopics() {

repartitionTopics.setup();

verify(internalTopicManager, internalTopologyBuilder);
final Map<TopicPartition, PartitionInfo> topicPartitionsInfo = repartitionTopics.topicPartitionsInfo();
assertThat(topicPartitionsInfo.size(), is(6));
verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_TOPIC_NAME1, 0);
Expand All @@ -151,22 +147,17 @@ public void shouldSetupRepartitionTopics() {

assertThat(repartitionTopics.topologiesWithMissingInputTopics().isEmpty(), is(true));
assertThat(repartitionTopics.missingSourceTopicExceptions().isEmpty(), is(true));

verify(copartitionedTopicsEnforcer).enforce(eq(coPartitionGroup1), any(), eq(clusterMetadata));
verify(copartitionedTopicsEnforcer).enforce(eq(coPartitionGroup2), any(), eq(clusterMetadata));
}

@Test
public void shouldReturnMissingSourceTopics() {
final Set<String> missingSourceTopics = mkSet(SOURCE_TOPIC_NAME1);
expect(internalTopologyBuilder.subtopologyToTopicsInfo())
.andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()), anyObject(), eq(clusterMetadata));
expect(internalTopicManager.makeReady(
mkMap(
mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1)
))
).andReturn(Collections.emptySet());
when(internalTopologyBuilder.subtopologyToTopicsInfo())
.thenReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
setupClusterWithMissingTopics(missingSourceTopics);
replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
new TopologyMetadata(internalTopologyBuilder, config),
internalTopicManager,
Expand All @@ -190,20 +181,12 @@ public void shouldReturnMissingSourceTopics() {
public void shouldThrowTaskAssignmentExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopics() {
final RepartitionTopicConfig repartitionTopicConfigWithoutPartitionCount =
new RepartitionTopicConfig(REPARTITION_WITHOUT_PARTITION_COUNT, TOPIC_CONFIG5);
expect(internalTopologyBuilder.subtopologyToTopicsInfo())
.andReturn(mkMap(
when(internalTopologyBuilder.subtopologyToTopicsInfo())
.thenReturn(mkMap(
mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1),
mkEntry(SUBTOPOLOGY_1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
));
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()), anyObject(), eq(clusterMetadata));
expect(internalTopicManager.makeReady(
mkMap(
mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1)
))
).andReturn(Collections.emptySet());
setupCluster();
replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
new TopologyMetadata(internalTopologyBuilder, config),
internalTopicManager,
Expand All @@ -230,20 +213,12 @@ public void shouldThrowTaskAssignmentExceptionIfSourceTopicHasNoPartitionCount()
),
Collections.emptyMap()
);
expect(internalTopologyBuilder.subtopologyToTopicsInfo())
.andReturn(mkMap(
when(internalTopologyBuilder.subtopologyToTopicsInfo())
.thenReturn(mkMap(
mkEntry(SUBTOPOLOGY_0, topicsInfo),
mkEntry(SUBTOPOLOGY_1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
));
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()), anyObject(), eq(clusterMetadata));
expect(internalTopicManager.makeReady(
mkMap(
mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT, repartitionTopicConfigWithoutPartitionCount)
))
).andReturn(Collections.emptySet());
setupClusterWithMissingPartitionCounts(mkSet(SOURCE_TOPIC_NAME1));
replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
new TopologyMetadata(internalTopologyBuilder, config),
internalTopicManager,
Expand Down Expand Up @@ -275,22 +250,20 @@ public void shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTop
),
Collections.emptyMap()
);
expect(internalTopologyBuilder.subtopologyToTopicsInfo())
.andReturn(mkMap(
when(internalTopologyBuilder.subtopologyToTopicsInfo())
.thenReturn(mkMap(
mkEntry(SUBTOPOLOGY_0, topicsInfo),
mkEntry(SUBTOPOLOGY_1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
));
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()), anyObject(), eq(clusterMetadata));
expect(internalTopicManager.makeReady(
when(internalTopologyBuilder.copartitionGroups()).thenReturn(Collections.emptyList());
when(internalTopicManager.makeReady(
mkMap(
mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1),
mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2),
mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT, repartitionTopicConfigWithoutPartitionCount)
))
).andReturn(Collections.emptySet());
).thenReturn(Collections.emptySet());
setupCluster();
replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
new TopologyMetadata(internalTopologyBuilder, config),
internalTopicManager,
Expand All @@ -301,7 +274,6 @@ public void shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTop

repartitionTopics.setup();

verify(internalTopicManager, internalTopologyBuilder);
final Map<TopicPartition, PartitionInfo> topicPartitionsInfo = repartitionTopics.topicPartitionsInfo();
assertThat(topicPartitionsInfo.size(), is(9));
verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_TOPIC_NAME1, 0);
Expand Down Expand Up @@ -332,22 +304,20 @@ public void shouldSetRepartitionTopicPartitionCountFromUpstreamInternalRepartiti
),
Collections.emptyMap()
);
expect(internalTopologyBuilder.subtopologyToTopicsInfo())
.andReturn(mkMap(
when(internalTopologyBuilder.subtopologyToTopicsInfo())
.thenReturn(mkMap(
mkEntry(SUBTOPOLOGY_0, topicsInfo),
mkEntry(SUBTOPOLOGY_1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
));
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()), anyObject(), eq(clusterMetadata));
expect(internalTopicManager.makeReady(
when(internalTopologyBuilder.copartitionGroups()).thenReturn(Collections.emptyList());
when(internalTopicManager.makeReady(
mkMap(
mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1),
mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2),
mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT, repartitionTopicConfigWithoutPartitionCount)
))
).andReturn(Collections.emptySet());
).thenReturn(Collections.emptySet());
setupCluster();
replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
new TopologyMetadata(internalTopologyBuilder, config),
internalTopicManager,
Expand All @@ -358,7 +328,6 @@ public void shouldSetRepartitionTopicPartitionCountFromUpstreamInternalRepartiti

repartitionTopics.setup();

verify(internalTopicManager, internalTopologyBuilder);
final Map<TopicPartition, PartitionInfo> topicPartitionsInfo = repartitionTopics.topicPartitionsInfo();
assertThat(topicPartitionsInfo.size(), is(10));
verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_TOPIC_NAME1, 0);
Expand All @@ -384,10 +353,9 @@ public void shouldNotSetupRepartitionTopicsWhenTopologyDoesNotContainAnyRepartit
Collections.emptyMap(),
Collections.emptyMap()
);
expect(internalTopologyBuilder.subtopologyToTopicsInfo())
.andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, topicsInfo)));
when(internalTopologyBuilder.subtopologyToTopicsInfo())
.thenReturn(mkMap(mkEntry(SUBTOPOLOGY_0, topicsInfo)));
setupCluster();
replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
new TopologyMetadata(internalTopologyBuilder, config),
internalTopicManager,
Expand All @@ -398,7 +366,6 @@ public void shouldNotSetupRepartitionTopicsWhenTopologyDoesNotContainAnyRepartit

repartitionTopics.setup();

verify(internalTopicManager, internalTopologyBuilder);
final Map<TopicPartition, PartitionInfo> topicPartitionsInfo = repartitionTopics.topicPartitionsInfo();
assertThat(topicPartitionsInfo, is(Collections.emptyMap()));

Expand Down Expand Up @@ -447,13 +414,9 @@ private void setupClusterWithMissingTopicsAndMissingPartitionCounts(final Set<St
SOME_OTHER_TOPIC
);
topics.removeAll(missingTopics);
expect(clusterMetadata.topics()).andStubReturn(topics);
expect(clusterMetadata.partitionCountForTopic(SOURCE_TOPIC_NAME1))
.andStubReturn(topicsWithMissingPartitionCounts.contains(SOURCE_TOPIC_NAME1) ? null : 3);
expect(clusterMetadata.partitionCountForTopic(SOURCE_TOPIC_NAME2))
.andStubReturn(topicsWithMissingPartitionCounts.contains(SOURCE_TOPIC_NAME2) ? null : 1);
expect(clusterMetadata.partitionCountForTopic(SOURCE_TOPIC_NAME3))
.andStubReturn(topicsWithMissingPartitionCounts.contains(SOURCE_TOPIC_NAME3) ? null : 2);
when(clusterMetadata.topics()).thenReturn(topics);
when(clusterMetadata.partitionCountForTopic(SOURCE_TOPIC_NAME1))
.thenReturn(topicsWithMissingPartitionCounts.contains(SOURCE_TOPIC_NAME1) ? null : 3);
}

private TopicsInfo setupTopicInfoWithRepartitionTopicWithoutPartitionCount(final RepartitionTopicConfig repartitionTopicConfigWithoutPartitionCount) {
Expand Down

0 comments on commit 24db020

Please sign in to comment.