From de61b8fc299d95425a8eb003a2b532e2d73cf91d Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Thu, 26 Jan 2017 16:26:03 +0000 Subject: [PATCH] improve test coverage --- .../internals/StreamPartitionAssignor.java | 109 +++++++------- .../CopartitionedTopicsValidatorTest.java | 137 ++++++++++++++++++ .../StreamPartitionAssignorTest.java | 128 +++++----------- 3 files changed, 237 insertions(+), 137 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index c790a1e99970b..1ad6dbcbfb4db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -124,7 +124,7 @@ public String toString() { } } - private static class InternalTopicMetadata { + static class InternalTopicMetadata { public final InternalTopicConfig config; public int numPartitions; @@ -160,6 +160,7 @@ public int compare(TopicPartition p1, TopicPartition p2) { private Map> activeTasks; private InternalTopicManager internalTopicManager; + private CopartitionedTopicsValidator copartitionedTopicsValidator; /** * We need to have the PartitionAssignor and its StreamThread to be mutually accessible @@ -211,6 +212,8 @@ public void configure(Map configs) { configs.containsKey(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) ? (Long) configs.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) : WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT); + + this.copartitionedTopicsValidator = new CopartitionedTopicsValidator(streamThread.getName()); } @Override @@ -642,54 +645,7 @@ private void ensureCopartitioning(Collection> copartitionGroups, Map allRepartitionTopicsNumPartitions, Cluster metadata) { for (Set copartitionGroup : copartitionGroups) { - ensureCopartitioning(copartitionGroup, allRepartitionTopicsNumPartitions, metadata); - } - } - - private void ensureCopartitioning(Set copartitionGroup, - Map allRepartitionTopicsNumPartitions, - Cluster metadata) { - int numPartitions = UNKNOWN; - - for (String topic : copartitionGroup) { - if (!allRepartitionTopicsNumPartitions.containsKey(topic)) { - Integer partitions = metadata.partitionCountForTopic(topic); - - if (partitions == null) - throw new TopologyBuilderException(String.format("stream-thread [%s] Topic not found: %s", streamThread.getName(), topic)); - - if (numPartitions == UNKNOWN) { - numPartitions = partitions; - } else if (numPartitions != partitions) { - String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]); - Arrays.sort(topics); - throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not co-partitioned: [%s]", streamThread.getName(), Utils.mkString(Arrays.asList(topics), ","))); - } - } else { - if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) { - numPartitions = NOT_AVAILABLE; - break; - } - } - } - - // if all topics for this co-partition group is repartition topics, - // then set the number of partitions to be the maximum of the number of partitions. - if (numPartitions == UNKNOWN) { - for (Map.Entry entry: allRepartitionTopicsNumPartitions.entrySet()) { - if (copartitionGroup.contains(entry.getKey())) { - int partitions = entry.getValue().numPartitions; - if (partitions > numPartitions) { - numPartitions = partitions; - } - } - } - } - // enforce co-partitioning restrictions to repartition topics by updating their number of partitions - for (Map.Entry entry : allRepartitionTopicsNumPartitions.entrySet()) { - if (copartitionGroup.contains(entry.getKey())) { - entry.getValue().numPartitions = numPartitions; - } + copartitionedTopicsValidator.validate(copartitionGroup, allRepartitionTopicsNumPartitions, metadata); } } @@ -757,4 +713,59 @@ public String toString() { public void close() { internalTopicManager.close(); } + + static class CopartitionedTopicsValidator { + private final String threadName; + + CopartitionedTopicsValidator(final String threadName) { + this.threadName = threadName; + } + + void validate(final Set copartitionGroup, + final Map allRepartitionTopicsNumPartitions, + final Cluster metadata) { + int numPartitions = UNKNOWN; + + for (final String topic : copartitionGroup) { + if (!allRepartitionTopicsNumPartitions.containsKey(topic)) { + final Integer partitions = metadata.partitionCountForTopic(topic); + + if (partitions == null) { + throw new TopologyBuilderException(String.format("stream-thread [%s] Topic not found: %s", threadName, topic)); + } + + if (numPartitions == UNKNOWN) { + numPartitions = partitions; + } else if (numPartitions != partitions) { + final String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]); + Arrays.sort(topics); + throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not co-partitioned: [%s]", threadName, Utils.mkString(Arrays.asList(topics), ","))); + } + } else if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) { + numPartitions = NOT_AVAILABLE; + break; + } + } + + // if all topics for this co-partition group is repartition topics, + // then set the number of partitions to be the maximum of the number of partitions. + if (numPartitions == UNKNOWN) { + for (Map.Entry entry: allRepartitionTopicsNumPartitions.entrySet()) { + if (copartitionGroup.contains(entry.getKey())) { + final int partitions = entry.getValue().numPartitions; + if (partitions > numPartitions) { + numPartitions = partitions; + } + } + } + } + // enforce co-partitioning restrictions to repartition topics by updating their number of partitions + for (Map.Entry entry : allRepartitionTopicsNumPartitions.entrySet()) { + if (copartitionGroup.contains(entry.getKey())) { + entry.getValue().numPartitions = numPartitions; + } + } + + } + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java new file mode 100644 index 0000000000000..e7235b03c9bc3 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.TopologyBuilderException; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +public class CopartitionedTopicsValidatorTest { + + private final StreamPartitionAssignor.CopartitionedTopicsValidator validator + = new StreamPartitionAssignor.CopartitionedTopicsValidator("thread"); + private final Map partitions = new HashMap<>(); + private final Cluster cluster = Cluster.empty(); + + @Before + public void before() { + partitions.put(new TopicPartition("first", 0), new PartitionInfo("first", 0, null, null, null)); + partitions.put(new TopicPartition("first", 1), new PartitionInfo("first", 1, null, null, null)); + partitions.put(new TopicPartition("second", 0), new PartitionInfo("second", 0, null, null, null)); + partitions.put(new TopicPartition("second", 1), new PartitionInfo("second", 1, null, null, null)); + } + + @Test(expected = TopologyBuilderException.class) + public void shouldThrowTopologyBuilderExceptionIfNoPartitionsFoundForCoPartitionedTopic() throws Exception { + validator.validate(Collections.singleton("topic"), + Collections.emptyMap(), + cluster); + } + + @Test(expected = TopologyBuilderException.class) + public void shouldThrowTopologyBuilderExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch() throws Exception { + partitions.remove(new TopicPartition("second", 0)); + validator.validate(Utils.mkSet("first", "second"), + Collections.emptyMap(), + cluster.withPartitions(partitions)); + } + + + @Test + public void shouldEnforceCopartitioningOnRepartitionTopics() throws Exception { + final StreamPartitionAssignor.InternalTopicMetadata metadata = createTopicMetadata("repartitioned", 10); + + validator.validate(Utils.mkSet("first", "second", metadata.config.name()), + Collections.singletonMap(metadata.config.name(), + metadata), + cluster.withPartitions(partitions)); + + assertThat(metadata.numPartitions, equalTo(2)); + } + + + @Test + public void shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics() throws Exception { + final StreamPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one", 1); + final StreamPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two", 15); + final StreamPartitionAssignor.InternalTopicMetadata three = createTopicMetadata("three", 5); + final Map repartitionTopicConfig = new HashMap<>(); + + repartitionTopicConfig.put(one.config.name(), one); + repartitionTopicConfig.put(two.config.name(), two); + repartitionTopicConfig.put(three.config.name(), three); + + validator.validate(Utils.mkSet(one.config.name(), + two.config.name(), + three.config.name()), + repartitionTopicConfig, + cluster + ); + + assertThat(one.numPartitions, equalTo(15)); + assertThat(two.numPartitions, equalTo(15)); + assertThat(three.numPartitions, equalTo(15)); + } + + @Test + public void shouldSetRepartitionTopicsPartitionCountToNotAvailableIfAnyNotAvaliable() throws Exception { + final StreamPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one", 1); + final StreamPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two", StreamPartitionAssignor.NOT_AVAILABLE); + final Map repartitionTopicConfig = new HashMap<>(); + + repartitionTopicConfig.put(one.config.name(), one); + repartitionTopicConfig.put(two.config.name(), two); + + validator.validate(Utils.mkSet("first", + "second", + one.config.name(), + two.config.name()), + repartitionTopicConfig, + cluster.withPartitions(partitions)); + + assertThat(one.numPartitions, equalTo(StreamPartitionAssignor.NOT_AVAILABLE)); + assertThat(two.numPartitions, equalTo(StreamPartitionAssignor.NOT_AVAILABLE)); + + } + + private StreamPartitionAssignor.InternalTopicMetadata createTopicMetadata(final String repartitionTopic, + final int partitions) { + final InternalTopicConfig repartitionTopicConfig + = new InternalTopicConfig(repartitionTopic, + Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), + Collections.emptyMap()); + + + final StreamPartitionAssignor.InternalTopicMetadata metadata + = new StreamPartitionAssignor.InternalTopicMetadata(repartitionTopicConfig); + metadata.numPartitions = partitions; + return metadata; + } + +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index eff2179abcb5e..6503038e5b3ef 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -99,7 +100,11 @@ public class StreamPartitionAssignorTest { private final TaskId task1 = new TaskId(0, 1); private final TaskId task2 = new TaskId(0, 2); private final TaskId task3 = new TaskId(0, 3); - private String userEndPoint = "localhost:2171"; + private final String userEndPoint = "localhost:2171"; + private final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); + private final TopologyBuilder builder = new TopologyBuilder(); + private final StreamsConfig config = new StreamsConfig(configProps()); private Properties configProps() { return new Properties() { @@ -115,9 +120,6 @@ private Properties configProps() { @SuppressWarnings("unchecked") @Test public void testSubscription() throws Exception { - StreamsConfig config = new StreamsConfig(configProps()); - - TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); @@ -142,7 +144,6 @@ public Set cachedTasks() { } }; - StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId)); PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2")); @@ -159,9 +160,6 @@ public Set cachedTasks() { @Test public void testAssignBasic() throws Exception { - StreamsConfig config = new StreamsConfig(configProps()); - - TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); @@ -180,12 +178,10 @@ public void testAssignBasic() throws Exception { String client1 = "client1"; - MockClientSupplier mockClientSupplier = new MockClientSupplier(); StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); - StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); @@ -236,7 +232,6 @@ public void testAssignWithPartialTopology() throws Exception { props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class); StreamsConfig config = new StreamsConfig(props); - TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source1", "topic1"); builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor1"); @@ -249,10 +244,8 @@ public void testAssignWithPartialTopology() throws Exception { UUID uuid1 = UUID.randomUUID(); String client1 = "client1"; - final MockClientSupplier mockClientSupplier = new MockClientSupplier(); StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); - StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); Map subscriptions = new HashMap<>(); @@ -275,9 +268,6 @@ public void testAssignWithPartialTopology() throws Exception { @Test public void testAssignEmptyMetadata() throws Exception { - StreamsConfig config = new StreamsConfig(configProps()); - - TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); @@ -295,7 +285,6 @@ public void testAssignEmptyMetadata() throws Exception { StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); - StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); Map subscriptions = new HashMap<>(); @@ -336,9 +325,6 @@ public void testAssignEmptyMetadata() throws Exception { @Test public void testAssignWithNewTasks() throws Exception { - StreamsConfig config = new StreamsConfig(configProps()); - - TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addSource("source3", "topic3"); @@ -355,12 +341,9 @@ public void testAssignWithNewTasks() throws Exception { UUID uuid2 = UUID.randomUUID(); String client1 = "client1"; - - MockClientSupplier mockClientSupplier = new MockClientSupplier(); StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); - StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); @@ -399,9 +382,7 @@ public void testAssignWithNewTasks() throws Exception { @Test public void testAssignWithStates() throws Exception { - StreamsConfig config = new StreamsConfig(configProps()); String applicationId = "test"; - TopologyBuilder builder = new TopologyBuilder(); builder.setApplicationId(applicationId); builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); @@ -428,10 +409,8 @@ public void testAssignWithStates() throws Exception { String client1 = "client1"; - MockClientSupplier mockClientSupplier = new MockClientSupplier(); StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); - StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); @@ -496,7 +475,6 @@ public void testAssignWithStandbyReplicas() throws Exception { props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1"); StreamsConfig config = new StreamsConfig(props); - TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); @@ -515,11 +493,9 @@ public void testAssignWithStandbyReplicas() throws Exception { UUID uuid2 = UUID.randomUUID(); String client1 = "client1"; - final MockClientSupplier mockClientSupplier = new MockClientSupplier(); StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); - StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); @@ -568,8 +544,6 @@ public void testAssignWithStandbyReplicas() throws Exception { @Test public void testOnAssignment() throws Exception { - StreamsConfig config = new StreamsConfig(configProps()); - TopicPartition t2p3 = new TopicPartition("topic2", 3); TopologyBuilder builder = new TopologyBuilder(); @@ -580,10 +554,9 @@ public void testOnAssignment() throws Exception { UUID uuid = UUID.randomUUID(); String client1 = "client1"; - StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + StreamThread thread = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); - StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1)); List activeTaskList = Utils.mkList(task0, task3); @@ -604,9 +577,7 @@ public void testOnAssignment() throws Exception { @Test public void testAssignWithInternalTopics() throws Exception { - StreamsConfig config = new StreamsConfig(configProps()); String applicationId = "test"; - TopologyBuilder builder = new TopologyBuilder(); builder.setApplicationId(applicationId); builder.addInternalTopic("topicX"); builder.addSource("source1", "topic1"); @@ -620,14 +591,12 @@ public void testAssignWithInternalTopics() throws Exception { UUID uuid1 = UUID.randomUUID(); String client1 = "client1"; - MockClientSupplier clientSupplier = new MockClientSupplier(); - StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); - StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1)); - MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, clientSupplier.restoreConsumer); + MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer); partitionAssignor.setInternalTopicManager(internalTopicManager); Map subscriptions = new HashMap<>(); @@ -644,9 +613,7 @@ public void testAssignWithInternalTopics() throws Exception { @Test public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception { - StreamsConfig config = new StreamsConfig(configProps()); String applicationId = "test"; - TopologyBuilder builder = new TopologyBuilder(); builder.setApplicationId(applicationId); builder.addInternalTopic("topicX"); builder.addSource("source1", "topic1"); @@ -663,14 +630,11 @@ public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throw UUID uuid1 = UUID.randomUUID(); String client1 = "client1"; - MockClientSupplier clientSupplier = new MockClientSupplier(); - - StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); - StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1)); - MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, clientSupplier.restoreConsumer); + MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer); partitionAssignor.setInternalTopicManager(internalTopicManager); Map subscriptions = new HashMap<>(); @@ -690,7 +654,6 @@ public void shouldAddUserDefinedEndPointToSubscription() throws Exception { final Properties properties = configProps(); properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080"); final StreamsConfig config = new StreamsConfig(properties); - final TopologyBuilder builder = new TopologyBuilder(); final String applicationId = "application-id"; builder.setApplicationId(applicationId); builder.addSource("source", "input"); @@ -700,12 +663,9 @@ public void shouldAddUserDefinedEndPointToSubscription() throws Exception { final UUID uuid1 = UUID.randomUUID(); final String client1 = "client1"; - final MockClientSupplier clientSupplier = new MockClientSupplier(); - - final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); - final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1)); final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input")); final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData()); @@ -718,7 +678,6 @@ public void shouldMapUserEndPointToTopicPartitions() throws Exception { final String myEndPoint = "localhost:8080"; properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint); final StreamsConfig config = new StreamsConfig(properties); - final TopologyBuilder builder = new TopologyBuilder(); final String applicationId = "application-id"; builder.setApplicationId(applicationId); builder.addSource("source", "topic1"); @@ -730,14 +689,12 @@ public void shouldMapUserEndPointToTopicPartitions() throws Exception { final UUID uuid1 = UUID.randomUUID(); final String client1 = "client1"; - final MockClientSupplier clientSupplier = new MockClientSupplier(); - - final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1)); - partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, clientSupplier.restoreConsumer)); + partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer)); final Map subscriptions = new HashMap<>(); final Set emptyTasks = Collections.emptySet(); @@ -761,18 +718,14 @@ public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() thr final StreamsConfig config = new StreamsConfig(properties); final UUID uuid1 = UUID.randomUUID(); final String client1 = "client1"; - final TopologyBuilder builder = new TopologyBuilder(); final String applicationId = "application-id"; builder.setApplicationId(applicationId); - final MockClientSupplier clientSupplier = new MockClientSupplier(); - - final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, + final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); - final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); - partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, clientSupplier.restoreConsumer)); + partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer)); try { partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1)); @@ -790,18 +743,14 @@ public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() th final StreamsConfig config = new StreamsConfig(properties); final UUID uuid1 = UUID.randomUUID(); final String client1 = "client1"; - final TopologyBuilder builder = new TopologyBuilder(); final String applicationId = "application-id"; builder.setApplicationId(applicationId); - final MockClientSupplier clientSupplier = new MockClientSupplier(); - final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, + final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); - final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); - try { partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1)); Assert.fail("expected to an exception due to invalid config"); @@ -812,7 +761,6 @@ public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() th @Test public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception { - final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); List topic = Collections.singletonList(new TopicPartition("topic", 0)); final Map> hostState = Collections.singletonMap(new HostInfo("localhost", 80), @@ -826,8 +774,6 @@ public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exceptio @Test public void shouldSetClusterMetadataOnAssignment() throws Exception { - final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); - final List topic = Collections.singletonList(new TopicPartition("topic", 0)); final Map> hostState = Collections.singletonMap(new HostInfo("localhost", 80), @@ -848,7 +794,6 @@ public void shouldSetClusterMetadataOnAssignment() throws Exception { @Test public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws Exception { - final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); final Cluster cluster = partitionAssignor.clusterMetadata(); assertNotNull(cluster); } @@ -916,11 +861,8 @@ public Object apply(Object value1, Object value2) { final UUID uuid = UUID.randomUUID(); final String client = "client1"; - final StreamsConfig config = new StreamsConfig(configProps()); - final MockClientSupplier mockClientSupplier = new MockClientSupplier(); final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); - final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client)); final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer); partitionAssignor.setInternalTopicManager(mockInternalTopicManager); @@ -954,7 +896,6 @@ public Object apply(Object value1, Object value2) { @Test public void shouldUpdatePartitionHostInfoMapOnAssignment() throws Exception { - final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); final TopicPartition partitionOne = new TopicPartition("topic", 1); final TopicPartition partitionTwo = new TopicPartition("topic", 2); final Map> firstHostState = Collections.singletonMap( @@ -972,7 +913,6 @@ public void shouldUpdatePartitionHostInfoMapOnAssignment() throws Exception { @Test public void shouldUpdateClusterMetadataOnAssignment() throws Exception { - final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); final TopicPartition topicOne = new TopicPartition("topic", 1); final TopicPartition topicTwo = new TopicPartition("topic2", 2); final Map> firstHostState = Collections.singletonMap( @@ -987,15 +927,6 @@ public void shouldUpdateClusterMetadataOnAssignment() throws Exception { assertEquals(Utils.mkSet("topic", "topic2"), partitionAssignor.clusterMetadata().topics()); } - private PartitionAssignor.Assignment createAssignment(final Map> firstHostState) { - final AssignmentInfo info = new AssignmentInfo(Collections.emptyList(), - Collections.>emptyMap(), - firstHostState); - - return new PartitionAssignor.Assignment( - Collections.emptyList(), info.encode()); - } - @Test public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception { final Properties props = configProps(); @@ -1009,10 +940,8 @@ public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Except final UUID uuid = UUID.randomUUID(); final String client = "client1"; - final MockClientSupplier mockClientSupplier = new MockClientSupplier(); final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); - final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer)); @@ -1046,6 +975,29 @@ public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Except assertThat(allAssignedPartitions, equalTo(allPartitions)); } + @Test(expected = KafkaException.class) + public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() throws Exception { + partitionAssignor.configure(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); + } + + @Test(expected = KafkaException.class) + public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotStreamThreadInstance() throws Exception { + final Map config = new HashMap<>(); + config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); + config.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, "i am not a stream thread"); + + partitionAssignor.configure(config); + } + + private PartitionAssignor.Assignment createAssignment(final Map> firstHostState) { + final AssignmentInfo info = new AssignmentInfo(Collections.emptyList(), + Collections.>emptyMap(), + firstHostState); + + return new PartitionAssignor.Assignment( + Collections.emptyList(), info.encode()); + } + private AssignmentInfo checkAssignment(Set expectedTopics, PartitionAssignor.Assignment assignment) { // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group.