From 6cc1149d22ef85d6b0c5ace3d7e624f4f66fefa6 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Wed, 31 Aug 2016 12:51:16 +0100 Subject: [PATCH 1/2] build Cluster in StreamPartitionAssignor.onAssignment --- .../internals/StreamPartitionAssignor.java | 19 +++++++++++ .../StreamPartitionAssignorTest.java | 32 +++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index fd70a010b060c..b84c0cbb88a69 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -532,6 +532,22 @@ public void onAssignment(Assignment assignment) { } this.partitionToTaskIds = partitionToTaskIds; this.partitionsByHostState = info.partitionsByHostState; + // only need to build when not coordinator + if (metadataWithInternalTopics == null) { + final Collection> values = partitionsByHostState.values(); + final Map topicToPartitionInfo = new HashMap<>(); + final List partitionInfos = new ArrayList<>(); + for (Set value : values) { + for (TopicPartition topicPartition : value) { + topicToPartitionInfo.put(topicPartition, new PartitionInfo(topicPartition.topic(), + topicPartition.partition(), + null, + new Node[0], + new Node[0])); + } + } + metadataWithInternalTopics = Cluster.empty().withPartitions(topicToPartitionInfo); + } } public Map> getPartitionsByHostState() { @@ -542,6 +558,9 @@ public Map> getPartitionsByHostState() { } public Cluster clusterMetadata() { + if (metadataWithInternalTopics == null) { + return Cluster.empty(); + } return metadataWithInternalTopics; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index 9d261bbd8d2fe..e3009665aafd1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -52,6 +52,7 @@ import java.util.UUID; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; public class StreamPartitionAssignorTest { @@ -691,6 +692,37 @@ public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exceptio assertEquals(hostState, partitionAssignor.getPartitionsByHostState()); } + @Test + public void shouldSetClusterMetadataOnAssignment() throws Exception { + final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + + final List topic = Arrays.asList(new TopicPartition("topic", 0)); + final Map> hostState = + Collections.singletonMap(new HostInfo("localhost", 80), + Collections.singleton(new TopicPartition("topic", 0))); + final AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)), + Collections.>emptyMap(), + hostState); + + + partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode())); + final Cluster cluster = partitionAssignor.clusterMetadata(); + final List partitionInfos = cluster.partitionsForTopic("topic"); + final PartitionInfo partitionInfo = partitionInfos.get(0); + assertEquals(1, partitionInfos.size()); + assertEquals("topic", partitionInfo.topic()); + assertEquals(0, partitionInfo.partition()); + } + + @Test + public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws Exception { + final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + final Cluster cluster = partitionAssignor.clusterMetadata(); + assertNotNull(cluster); + + } + + private class MockInternalTopicManager extends InternalTopicManager { public Map readyTopics = new HashMap<>(); From 04a5dc83e7a9b8b00eb9a49c20a4d3628c23ad0f Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Wed, 31 Aug 2016 12:56:07 +0100 Subject: [PATCH 2/2] remove redundant field --- .../streams/processor/internals/StreamPartitionAssignor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index b84c0cbb88a69..09e192d883ff5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -536,7 +536,6 @@ public void onAssignment(Assignment assignment) { if (metadataWithInternalTopics == null) { final Collection> values = partitionsByHostState.values(); final Map topicToPartitionInfo = new HashMap<>(); - final List partitionInfos = new ArrayList<>(); for (Set value : values) { for (TopicPartition topicPartition : value) { topicToPartitionInfo.put(topicPartition, new PartitionInfo(topicPartition.topic(),