From 708718c1be23fad25fa6206f665cbb619c1b5097 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Mon, 19 Oct 2015 12:38:06 -0700 Subject: [PATCH 01/13] partition grouping --- .../processor/DefaultPartitionGrouper.java | 108 ++++++++++++++++++ .../streams/processor/PartitionGrouper.java | 32 ++++++ .../streams/processor/TopologyBuilder.java | 80 ++++++++++++- .../processor/internals/QuickUnion.java | 57 +++++++++ 4 files changed, 272 insertions(+), 5 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java new file mode 100644 index 000000000000..43f8f91f61ae --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +public class DefaultPartitionGrouper implements PartitionGrouper { + + public Map> groups(Collection> topicGroups, KafkaConsumer consumer) { + + List> sortedTopicGroups = sort(topicGroups); + + Map> groups = new HashMap<>(); + + int groupId = 0; + for (List topicGroup : sortedTopicGroups) { + int numPartitions = ensureCopartitioning(topicGroup, consumer); + + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + ArrayList group = new ArrayList<>(topicGroup.size()); + for (String topic : topicGroup) { + group.add(new TopicPartition(topic, partitionId)); + } + groups.put(groupId++, Collections.unmodifiableList(group)); + } + } + + return Collections.unmodifiableMap(groups); + } + + protected int ensureCopartitioning(List topicGroup, KafkaConsumer consumer) { + int numPartitions = -1; + + for (String topic : topicGroup) { + List infos = consumer.partitionsFor(topic); + + if (infos == null) + throw new KafkaException("topic not found :" + topic); + + if (numPartitions == -1) { + numPartitions = infos.size(); + } else if (numPartitions != infos.size()) { + throw new KafkaException("not copartitioned : [" + toString(topicGroup, ", ") + "]"); + } + } + return numPartitions; + } + + protected List> sort(Collection> topicGroups) { + TreeMap sortedMap = new TreeMap<>(); + + for (Set group : topicGroups) { + String[] arr = group.toArray(new String[group.size()]); + Arrays.sort(arr); + sortedMap.put(arr[0], arr); + } + + ArrayList> list = new ArrayList(sortedMap.size()); + for (String[] arr : sortedMap.values()) { + list.add(Arrays.asList(arr)); + } + + return list; + } + + protected CharSequence toString(Collection set, String separator) { + StringBuilder sb = new StringBuilder(); + Iterator iter = set.iterator(); + if (iter.hasNext()) { + sb.append(iter.next().toString()); + + while (iter.hasNext()) { + sb.append(separator); + sb.append(iter.next().toString()); + } + } + return sb; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java new file mode 100644 index 000000000000..8dd27f3f5572 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public interface PartitionGrouper { + + Map> groups(Collection> topicGroups, KafkaConsumer consumer); + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 833e29b9d9ee..8a558965a743 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -22,10 +22,13 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorTopology; +import org.apache.kafka.streams.processor.internals.QuickUnion; import org.apache.kafka.streams.processor.internals.SinkNode; import org.apache.kafka.streams.processor.internals.SourceNode; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -45,10 +48,14 @@ public class TopologyBuilder { // list of node factories in a topological order - private ArrayList nodeFactories = new ArrayList<>(); + private final ArrayList nodeFactories = new ArrayList<>(); - private Set nodeNames = new HashSet<>(); - private Set sourceTopicNames = new HashSet<>(); + private final Set nodeNames = new HashSet<>(); + private final Set sourceTopicNames = new HashSet<>(); + + private final PartitionGrouper partitionGrouper; + private final QuickUnion nodeGroups = new QuickUnion<>(); + private final HashMap nodeToTopics = new HashMap<>(); private interface NodeFactory { ProcessorNode build(); @@ -111,9 +118,18 @@ public ProcessorNode build() { } /** - * Create a new builder. + * Create a new builder with the default PartitionGrouper. */ - public TopologyBuilder() {} + public TopologyBuilder() { + this(new DefaultPartitionGrouper()); + } + + /** + * Create a new builder with the specified PartitionGrouper + */ + public TopologyBuilder(PartitionGrouper partitionGrouper) { + this.partitionGrouper = partitionGrouper; + } /** * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes. @@ -158,6 +174,9 @@ public final TopologyBuilder addSource(String name, Deserializer keyDeserializer nodeNames.add(name); nodeFactories.add(new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer)); + nodeToTopics.put(name, topics.clone()); + nodeGroups.add(name); + return this; } @@ -237,9 +256,60 @@ public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplie nodeNames.add(name); nodeFactories.add(new ProcessorNodeFactory(name, parentNames, supplier)); + nodeGroups.add(name); return this; } + /** + * Declares that the streams of the specified processor nodes must be copartitioned. + * + * @param nodeName + * @param otherNodeNames + */ + public void copartition(String nodeName, String... otherNodeNames) { + for (String other : otherNodeNames) { + nodeGroups.unite(nodeName, other); + } + } + + /** + * Returns the grouped topics + * + * @return groups of topic names + */ + public Collection> topicGroups() { + HashMap> topicGroupMap = new HashMap<>(); + + // collect topics by node groups + for (String nodeName : nodeNames) { + String[] topics = nodeToTopics.get(nodeName); + if (topics != null) { + String root = nodeGroups.root(nodeName); + Set topicGroup = topicGroupMap.get(root); + if (topicGroup == null) { + topicGroup = new HashSet<>(); + topicGroupMap.put(root, topicGroup); + } + topicGroup.addAll(Arrays.asList(topics)); + } + } + + // make the data unmodifiable, then return + List> topicGroups = new ArrayList<>(topicGroupMap.size()); + for (Set group : topicGroupMap.values()) { + topicGroups.add(Collections.unmodifiableSet(group)); + } + return Collections.unmodifiableList(topicGroups); + } + + /** + * Returns the instance of PartitionGrouper for this TopologyBuilder + * @return + */ + public PartitionGrouper partitionGrouper() { + return partitionGrouper; + } + /** * Build the topology. This is typically called automatically when passing this builder into the * {@link KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)} constructor. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java new file mode 100644 index 000000000000..5487eba19e5a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import java.util.HashMap; +import java.util.NoSuchElementException; + +public class QuickUnion { + + private HashMap ids = new HashMap<>(); + + public void add(T id) { + ids.put(id, id); + } + + public T root(T id) { + T current = id; + T parent = ids.get(current); + + if (parent == null) + throw new NoSuchElementException("id: " + id.toString()); + + while (!parent.equals(current)) { + // do the path compression + T grandparent = ids.get(parent); + ids.put(current, grandparent); + + current = parent; + parent = grandparent; + } + return current; + } + + public void unite(T id1, T id2) { + T root1 = root(id1); + T root2 = root(id2); + + if (!root1.equals(root2)) + ids.put(root1, root2); + } + +} From d2bae046b5509022e2821a2c5eb08853d228e791 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Mon, 19 Oct 2015 13:19:54 -0700 Subject: [PATCH 02/13] wip --- .../streams/processor/DefaultPartitionGrouper.java | 14 ++++++-------- .../kafka/streams/processor/PartitionGrouper.java | 4 ++-- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java index 43f8f91f61ae..d058abd05165 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.processor; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -35,15 +35,13 @@ public class DefaultPartitionGrouper implements PartitionGrouper { - public Map> groups(Collection> topicGroups, KafkaConsumer consumer) { - - List> sortedTopicGroups = sort(topicGroups); - + public Map> groups(Collection> topicGroups, Cluster metadata) { Map> groups = new HashMap<>(); + List> sortedTopicGroups = sort(topicGroups); int groupId = 0; for (List topicGroup : sortedTopicGroups) { - int numPartitions = ensureCopartitioning(topicGroup, consumer); + int numPartitions = ensureCopartitioning(topicGroup, metadata); for (int partitionId = 0; partitionId < numPartitions; partitionId++) { ArrayList group = new ArrayList<>(topicGroup.size()); @@ -57,11 +55,11 @@ public Map> groups(Collection> topicGr return Collections.unmodifiableMap(groups); } - protected int ensureCopartitioning(List topicGroup, KafkaConsumer consumer) { + protected int ensureCopartitioning(List topicGroup, Cluster metadata) { int numPartitions = -1; for (String topic : topicGroup) { - List infos = consumer.partitionsFor(topic); + List infos = metadata.partitionsForTopic(topic); if (infos == null) throw new KafkaException("topic not found :" + topic); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java index 8dd27f3f5572..f7a05587b80a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.processor; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.Cluster; import org.apache.kafka.common.TopicPartition; import java.util.Collection; @@ -27,6 +27,6 @@ public interface PartitionGrouper { - Map> groups(Collection> topicGroups, KafkaConsumer consumer); + Map> groups(Collection> topicGroups, Cluster metadata); } From 86fa8110b23ee1992fbd19daa08c63a4b427448e Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Tue, 20 Oct 2015 13:01:37 -0700 Subject: [PATCH 03/13] long task id --- .../apache/kafka/streams/StreamingConfig.java | 37 ++++++++--- .../kstream/SlidingWindowSupplier.java | 4 +- .../kstream/internals/KStreamJoin.java | 4 -- .../processor/DefaultPartitionGrouper.java | 14 ++-- .../streams/processor/PartitionGrouper.java | 28 +++++++- .../streams/processor/ProcessorContext.java | 11 +--- .../streams/processor/TopologyBuilder.java | 22 +------ .../internals/PartitionAssignorImpl.java | 66 +++++++++++++++++++ .../internals/ProcessorContextImpl.java | 43 ++---------- .../internals/ProcessorStateManager.java | 14 ++-- .../processor/internals/StreamTask.java | 15 +++-- .../processor/internals/StreamThread.java | 6 +- .../streams/state/MeteredKeyValueStore.java | 4 +- .../streams/state/RocksDBKeyValueStore.java | 8 +-- .../state/KeyValueStoreTestDriver.java | 56 ++++++++-------- .../kafka/test/MockProcessorContext.java | 11 +--- 16 files changed, 199 insertions(+), 144 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionAssignorImpl.java diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java index 93df4c249450..c544d57d1af6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.streams.processor.DefaultPartitionGrouper; +import org.apache.kafka.streams.processor.PartitionGrouper; import java.util.Map; @@ -70,6 +72,10 @@ public class StreamingConfig extends AbstractConfig { public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor"; private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the TimestampExtractor interface."; + /** partition.grouper */ + public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper"; + private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the PartitionGrouper interface."; + /** client.id */ public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; @@ -108,15 +114,15 @@ public class StreamingConfig extends AbstractConfig { Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC) .define(STATE_DIR_CONFIG, - Type.STRING, - SYSTEM_TEMP_DIRECTORY, - Importance.MEDIUM, - STATE_DIR_DOC) + Type.STRING, + SYSTEM_TEMP_DIRECTORY, + Importance.MEDIUM, + STATE_DIR_DOC) .define(COMMIT_INTERVAL_MS_CONFIG, - Type.LONG, - 30000, - Importance.HIGH, - COMMIT_INTERVAL_MS_DOC) + Type.LONG, + 30000, + Importance.HIGH, + COMMIT_INTERVAL_MS_DOC) .define(POLL_MS_CONFIG, Type.LONG, 100, @@ -167,6 +173,11 @@ public class StreamingConfig extends AbstractConfig { Type.CLASS, Importance.HIGH, TIMESTAMP_EXTRACTOR_CLASS_DOC) + .define(PARTITION_GROUPER_CLASS_CONFIG, + Type.CLASS, + DefaultPartitionGrouper.class, + Importance.HIGH, + PARTITION_GROUPER_CLASS_DOC) .define(BOOTSTRAP_SERVERS_CONFIG, Type.STRING, Importance.HIGH, @@ -190,10 +201,20 @@ public class StreamingConfig extends AbstractConfig { CommonClientConfigs.METRICS_NUM_SAMPLES_DOC); } + public static class InternalConfig { + public static final String PARTITION_GROUPER_INSTANCE = "__partition.grouper.instance__"; + } + public StreamingConfig(Map props) { super(CONFIG, props); } + public Map getConsumerConfigs(PartitionGrouper partitionGrouper) { + Map props = getConsumerConfigs(); + props.put(StreamingConfig.InternalConfig.PARTITION_GROUPER_INSTANCE, partitionGrouper); + return props; + } + public Map getConsumerConfigs() { Map props = this.originals(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java index bf6b4dcd818e..6d7fdedab8dd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java @@ -75,6 +75,7 @@ public Window get() { public class SlidingWindow extends WindowSupport implements Window { private final Object lock = new Object(); private ProcessorContext context; + private int partition; private int slotNum; // used as a key for Kafka log compaction private LinkedList list = new LinkedList(); private HashMap> map = new HashMap<>(); @@ -82,6 +83,7 @@ public class SlidingWindow extends WindowSupport implements Window { @Override public void init(ProcessorContext context) { this.context = context; + this.partition = context.statePartition(); SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback(); context.register(this, restoreFunc); @@ -210,7 +212,7 @@ public void flush() { if (offset != combined.length) throw new IllegalStateException("serialized length does not match"); - collector.send(new ProducerRecord<>(name, context.id(), slot, combined), byteArraySerializer, byteArraySerializer); + collector.send(new ProducerRecord<>(name, partition, slot, combined), byteArraySerializer, byteArraySerializer); } values.clearDirtyValues(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java index 997953f579d5..5e8186ebe5a0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java @@ -59,10 +59,6 @@ public KStreamJoinProcessor(String windowName) { public void init(ProcessorContext context) { super.init(context); - // check if these two streams are joinable - if (!context.joinable()) - throw new IllegalStateException("Streams are not joinable."); - final Window window = (Window) context.getStateStore(windowName); this.finder = new Finder() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java index d058abd05165..9ef1d0342967 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java @@ -33,23 +33,27 @@ import java.util.Set; import java.util.TreeMap; -public class DefaultPartitionGrouper implements PartitionGrouper { +public class DefaultPartitionGrouper extends PartitionGrouper { - public Map> groups(Collection> topicGroups, Cluster metadata) { - Map> groups = new HashMap<>(); + public Map> partitionGroups(Cluster metadata) { + Map> groups = new HashMap<>(); List> sortedTopicGroups = sort(topicGroups); - int groupId = 0; + long groupId = 0; for (List topicGroup : sortedTopicGroups) { int numPartitions = ensureCopartitioning(topicGroup, metadata); for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + long taskId = (groupId << 32) | (long) partitionId; + ArrayList group = new ArrayList<>(topicGroup.size()); for (String topic : topicGroup) { group.add(new TopicPartition(topic, partitionId)); } - groups.put(groupId++, Collections.unmodifiableList(group)); + + groups.put(taskId, Collections.unmodifiableList(group)); } + groupId++; } return Collections.unmodifiableMap(groups); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java index f7a05587b80a..2110da61c663 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java @@ -25,8 +25,32 @@ import java.util.Map; import java.util.Set; -public interface PartitionGrouper { +public abstract class PartitionGrouper { - Map> groups(Collection> topicGroups, Cluster metadata); + protected Collection> topicGroups; + + private Map> partitionToTaskIds; + + /** + * Returns a map of task ids to groups of partitions. The task id is the 64 bit integer + * which uniquely identifies a task. The higher 32 bit integer is an id assigned to a topic group. + * The lower 32 bit integer is a partition id with which the task's local states are associated. + * + * @param metadata + * @return a map of task ids to groups of partitions + */ + public abstract Map> partitionGroups(Cluster metadata); + + public final void topicGroups(Collection> topicGroups) { + this.topicGroups = topicGroups; + } + + public final void partitionToTaskIds(Map> partitionToTaskIds) { + this.partitionToTaskIds = partitionToTaskIds; + } + + public final Set taskIds(TopicPartition partition) { + return partitionToTaskIds.get(partition); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index adffe0ec8bf8..2164c12042b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -26,11 +26,11 @@ public interface ProcessorContext { /** - * Returns the partition group id + * Returns the state partition id * - * @return partition group id + * @return the state partition id */ - int id(); + int statePartition(); /** * Returns the key serializer @@ -74,11 +74,6 @@ public interface ProcessorContext { */ StreamingMetrics metrics(); - /** - * Check if this process's incoming streams are joinable - */ - boolean joinable(); - /** * Registers and possibly restores the specified storage engine. * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 8a558965a743..e0d4990ccafc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -53,7 +53,6 @@ public class TopologyBuilder { private final Set nodeNames = new HashSet<>(); private final Set sourceTopicNames = new HashSet<>(); - private final PartitionGrouper partitionGrouper; private final QuickUnion nodeGroups = new QuickUnion<>(); private final HashMap nodeToTopics = new HashMap<>(); @@ -118,18 +117,9 @@ public ProcessorNode build() { } /** - * Create a new builder with the default PartitionGrouper. + * Create a new builder. */ - public TopologyBuilder() { - this(new DefaultPartitionGrouper()); - } - - /** - * Create a new builder with the specified PartitionGrouper - */ - public TopologyBuilder(PartitionGrouper partitionGrouper) { - this.partitionGrouper = partitionGrouper; - } + public TopologyBuilder() {} /** * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes. @@ -302,14 +292,6 @@ public Collection> topicGroups() { return Collections.unmodifiableList(topicGroups); } - /** - * Returns the instance of PartitionGrouper for this TopologyBuilder - * @return - */ - public PartitionGrouper partitionGrouper() { - return partitionGrouper; - } - /** * Build the topology. This is typically called automatically when passing this builder into the * {@link KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)} constructor. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionAssignorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionAssignorImpl.java new file mode 100644 index 000000000000..aa83b7e745ba --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionAssignorImpl.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.streams.processor.PartitionGrouper; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class PartitionAssignorImpl implements Configurable { + + private PartitionGrouper partitionGrouper; + + @SuppressWarnings("unchecked") + public void configure(Map configs) { + Object o = configs.get(StreamingConfig.InternalConfig.PARTITION_GROUPER_INSTANCE); + if (o == null) + throw new KafkaException("PartitionGrouper is not specified"); + + if (!PartitionGrouper.class.isInstance(o)) + throw new KafkaException(o.getClass().getName() + " is not an instance of " + PartitionGrouper.class.getName()); + + partitionGrouper = (PartitionGrouper) o; + } + + public void partitionToTaskIds(List partitions, ByteBuffer data) { + Map> partitionToTaskIds = new HashMap<>(); + + int i = 0; + for (TopicPartition partition : partitions) { + Set taskIds = partitionToTaskIds.get(partition); + if (taskIds == null) { + taskIds = new HashSet<>(); + partitionToTaskIds.put(partition, taskIds); + } + taskIds.add(data.getLong(i * 8)); + i += 4; + } + partitionGrouper.partitionToTaskIds(Collections.unmodifiableMap(partitionToTaskIds)); + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 5cb53a41831f..0d54ab3796d0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.StreamingConfig; @@ -31,17 +30,12 @@ import org.slf4j.LoggerFactory; import java.io.File; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; public class ProcessorContextImpl implements ProcessorContext, RecordCollector.Supplier { private static final Logger log = LoggerFactory.getLogger(ProcessorContextImpl.class); - private final int id; + private final long id; private final StreamTask task; private final StreamingMetrics metrics; private final RecordCollector collector; @@ -55,7 +49,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S private boolean initialized; @SuppressWarnings("unchecked") - public ProcessorContextImpl(int id, + public ProcessorContextImpl(long id, StreamTask task, StreamingConfig config, RecordCollector collector, @@ -85,37 +79,8 @@ public void initialized() { } @Override - public boolean joinable() { - Set partitions = this.task.partitions(); - Map> partitionsById = new HashMap<>(); - int firstId = -1; - for (TopicPartition partition : partitions) { - if (!partitionsById.containsKey(partition.partition())) { - partitionsById.put(partition.partition(), new ArrayList()); - } - partitionsById.get(partition.partition()).add(partition.topic()); - - if (firstId < 0) - firstId = partition.partition(); - } - - List topics = partitionsById.get(firstId); - for (List topicsPerPartition : partitionsById.values()) { - if (topics.size() != topicsPerPartition.size()) - return false; - - for (String topic : topicsPerPartition) { - if (!topics.contains(topic)) - return false; - } - } - - return true; - } - - @Override - public int id() { - return id; + public int statePartition() { + return (int) (id & 0xFFFFFFFFL); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 59a6394678f6..3cb9cead8887 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -44,7 +44,7 @@ public class ProcessorStateManager { public static final String CHECKPOINT_FILE_NAME = ".checkpoint"; public static final String LOCK_FILE_NAME = ".lock"; - private final int id; + private final int partition; private final File baseDir; private final FileLock directoryLock; private final Map stores; @@ -52,8 +52,8 @@ public class ProcessorStateManager { private final Map restoredOffsets; private final Map checkpointedOffsets; - public ProcessorStateManager(int id, File baseDir, Consumer restoreConsumer) throws IOException { - this.id = id; + public ProcessorStateManager(int partition, File baseDir, Consumer restoreConsumer) throws IOException { + this.partition = partition; this.baseDir = baseDir; this.stores = new HashMap<>(); this.restoreConsumer = restoreConsumer; @@ -109,14 +109,14 @@ public void register(StateStore store, StateRestoreCallback stateRestoreCallback if (restoreConsumer.listTopics().containsKey(store.name())) { boolean partitionNotFound = true; for (PartitionInfo partitionInfo : restoreConsumer.partitionsFor(store.name())) { - if (partitionInfo.partition() == id) { + if (partitionInfo.partition() == partition) { partitionNotFound = false; break; } } if (partitionNotFound) - throw new IllegalStateException("Store " + store.name() + "'s change log does not contain the partition for group " + id); + throw new IllegalStateException("Store " + store.name() + "'s change log does not contain the partition " + partition); } else { throw new IllegalStateException("Change log topic for store " + store.name() + " does not exist yet"); @@ -127,7 +127,7 @@ public void register(StateStore store, StateRestoreCallback stateRestoreCallback // ---- try to restore the state from change-log ---- // // subscribe to the store's partition - TopicPartition storePartition = new TopicPartition(store.name(), id); + TopicPartition storePartition = new TopicPartition(store.name(), partition); if (!restoreConsumer.subscription().isEmpty()) { throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand"); } @@ -201,7 +201,7 @@ public void close(Map ackedOffsets) throws IOException { Map checkpointOffsets = new HashMap<>(); for (String storeName : stores.keySet()) { - TopicPartition part = new TopicPartition(storeName, id); + TopicPartition part = new TopicPartition(storeName, partition); // only checkpoint the offset to the offsets file if it is persistent; if (stores.get(storeName).persistent()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index a94202f8fab2..e035f2902c3f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -45,7 +45,7 @@ public class StreamTask implements Punctuator { private static final Logger log = LoggerFactory.getLogger(StreamTask.class); - private final int id; + private final long id; private final int maxBufferedSize; private final Consumer consumer; @@ -76,7 +76,7 @@ public class StreamTask implements Punctuator { * @param config the {@link StreamingConfig} specified by the user * @param metrics the {@link StreamingMetrics} created by the thread */ - public StreamTask(int id, + public StreamTask(long id, Consumer consumer, Producer producer, Consumer restoreConsumer, @@ -110,12 +110,13 @@ public StreamTask(int id, // create the record recordCollector that maintains the produced offsets this.recordCollector = new RecordCollector(producer); - log.info("Creating restoration consumer client for stream task [" + id + "]"); + log.info("Creating restoration consumer client for stream task #" + id()); // create the processor state manager try { - File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), Integer.toString(id)); - this.stateMgr = new ProcessorStateManager(id, stateFile, restoreConsumer); + int partition = (int) (id & 0xFFFFFFFF); + File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), Integer.toString(partition)); + this.stateMgr = new ProcessorStateManager(partition, stateFile, restoreConsumer); } catch (IOException e) { throw new KafkaException("Error while creating the state manager", e); } @@ -136,8 +137,8 @@ public StreamTask(int id, this.processorContext.initialized(); } - public int id() { - return id; + public String id() { + return (id >> 32) + ":" + (id & 0xFFFFFFFF); } public Set partitions() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 4a6833254d45..04a584dc6e69 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -39,6 +39,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamingConfig; import org.apache.kafka.streams.StreamingMetrics; +import org.apache.kafka.streams.processor.PartitionGrouper; import org.apache.kafka.streams.processor.TopologyBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +66,7 @@ public class StreamThread extends Thread { protected final StreamingConfig config; protected final TopologyBuilder builder; + protected final PartitionGrouper partitionGrouper; protected final Producer producer; protected final Consumer consumer; protected final Consumer restoreConsumer; @@ -119,6 +121,7 @@ public StreamThread(TopologyBuilder builder, this.config = config; this.builder = builder; this.clientId = clientId; + this.partitionGrouper = config.getConfiguredInstance(StreamingConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class); // set the producer and consumer clients this.producer = (producer != null) ? producer : createProducer(); @@ -155,7 +158,7 @@ private Producer createProducer() { private Consumer createConsumer() { log.info("Creating consumer client for stream thread [" + this.getName() + "]"); - return new KafkaConsumer<>(config.getConsumerConfigs(), + return new KafkaConsumer<>(config.getConsumerConfigs(partitionGrouper), new ByteArrayDeserializer(), new ByteArrayDeserializer()); } @@ -232,6 +235,7 @@ private void runLoop() { try { int totalNumBuffered = 0; + partitionGrouper.topicGroups(builder.topicGroups()); consumer.subscribe(new ArrayList<>(builder.sourceTopics()), rebalanceListener); while (stillRunning()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java index 90eee054371c..8da350d6b99f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java @@ -71,7 +71,7 @@ public MeteredKeyValueStore(final String name, final KeyValueStore inner, this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name); this.topic = name; - this.partition = context.id(); + this.partition = context.statePartition(); this.context = context; @@ -241,4 +241,4 @@ public void close() { } -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java index 32897ea5e79d..751e6b804bcc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java @@ -41,7 +41,7 @@ * * @param the type of keys * @param the type of values - * + * * @see Stores#create(String, ProcessorContext) */ public class RocksDBKeyValueStore extends MeteredKeyValueStore { @@ -81,7 +81,7 @@ private static class RocksDBStore implements KeyValueStore { public RocksDBStore(String name, ProcessorContext context, Serdes serdes) { this.topic = name; - this.partition = context.id(); + this.partition = context.statePartition(); this.context = context; this.serdes = serdes; @@ -166,7 +166,7 @@ public void putAll(List> entries) { for (Entry entry : entries) put(entry.key(), entry.value()); } - + @Override public V delete(K key) { V value = get(key); @@ -281,4 +281,4 @@ public boolean hasNext() { } } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 8fdbfff3f6f8..23a40b1ad5d8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -47,14 +47,14 @@ *

*

Basic usage

* This component can be used to help test a {@link KeyValueStore}'s ability to read and write entries. - * + * *
  * // Create the test driver ...
  * KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
  * KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
  *                                              .withIntegerKeys().withStringKeys()
  *                                              .inMemory().build();
- * 
+ *
  * // Verify that the store reads and writes correctly ...
  * store.put(0, "zero");
  * store.put(1, "one");
@@ -69,7 +69,7 @@
  * assertEquals("five", store.get(5));
  * assertNull(store.get(3));
  * store.delete(5);
- * 
+ *
  * // Flush the store and verify all current entries were properly flushed ...
  * store.flush();
  * assertEquals("zero", driver.flushedEntryStored(0));
@@ -77,14 +77,14 @@
  * assertEquals("two", driver.flushedEntryStored(2));
  * assertEquals("four", driver.flushedEntryStored(4));
  * assertEquals(null, driver.flushedEntryStored(5));
- * 
+ *
  * assertEquals(false, driver.flushedEntryRemoved(0));
  * assertEquals(false, driver.flushedEntryRemoved(1));
  * assertEquals(false, driver.flushedEntryRemoved(2));
  * assertEquals(false, driver.flushedEntryRemoved(4));
  * assertEquals(true, driver.flushedEntryRemoved(5));
  * 
- * + * *

*

Restoring a store

* This component can be used to test whether a {@link KeyValueStore} implementation properly @@ -94,30 +94,30 @@ * To do this, create an instance of this driver component, {@link #addEntryToRestoreLog(Object, Object) add entries} that will be * passed to the store upon creation (simulating the entries that were previously flushed to the topic), and then create the store * using this driver's {@link #context() ProcessorContext}: - * + * *
  * // Create the test driver ...
  * KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
- * 
+ *
  * // Add any entries that will be restored to any store that uses the driver's context ...
  * driver.addRestoreEntry(0, "zero");
  * driver.addRestoreEntry(1, "one");
  * driver.addRestoreEntry(2, "two");
  * driver.addRestoreEntry(4, "four");
- * 
+ *
  * // Create the store, which should register with the context and automatically
  * // receive the restore entries ...
  * KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
  *                                              .withIntegerKeys().withStringKeys()
  *                                              .inMemory().build();
- * 
+ *
  * // Verify that the store's contents were properly restored ...
  * assertEquals(0, driver.checkForRestoredEntries(store));
- * 
+ *
  * // and there are no other entries ...
  * assertEquals(4, driver.sizeOf(store));
  * 
- * + * * @param the type of keys placed in the store * @param the type of values placed in the store */ @@ -163,7 +163,7 @@ public void close() { * value serializers and deserializers. This can be used when the actual serializers and deserializers are supplied to the * store during creation, which should eliminate the need for a store to depend on the ProcessorContext's default key and * value serializers and deserializers. - * + * * @return the test driver; never null */ public static KeyValueStoreTestDriver create() { @@ -181,7 +181,7 @@ public static KeyValueStoreTestDriver create() { * deserializers for the given built-in key and value types (e.g., {@code String.class}, {@code Integer.class}, * {@code Long.class}, and {@code byte[].class}). This can be used when store is created to rely upon the * ProcessorContext's default key and value serializers and deserializers. - * + * * @param keyClass the class for the keys; must be one of {@code String.class}, {@code Integer.class}, * {@code Long.class}, or {@code byte[].class} * @param valueClass the class for the values; must be one of {@code String.class}, {@code Integer.class}, @@ -198,7 +198,7 @@ public static KeyValueStoreTestDriver create(Class keyClass, Cla * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides the specified serializers and * deserializers. This can be used when store is created to rely upon the ProcessorContext's default key and value serializers * and deserializers. - * + * * @param keySerializer the key serializer for the {@link ProcessorContext}; may not be null * @param keyDeserializer the key deserializer for the {@link ProcessorContext}; may not be null * @param valueSerializer the value serializer for the {@link ProcessorContext}; may not be null @@ -245,7 +245,7 @@ public void send(ProducerRecord record, Serializer keySeria this.context = new MockProcessorContext(null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(), serdes.valueDeserializer(), recordCollector) { @Override - public int id() { + public int statePartition() { return 1; } @@ -283,7 +283,7 @@ public File stateDir() { /** * Set the directory that should be used by the store for local disk storage. - * + * * @param dir the directory; may be null if no local storage is allowed */ public void useStateDir(File dir) { @@ -320,25 +320,25 @@ private void restoreEntries(StateRestoreCallback func) { *

* To create such a test, create the test driver, call this method one or more times, and then create the * {@link KeyValueStore}. Your tests can then check whether the store contains the entries from the log. - * + * *

      * // Set up the driver and pre-populate the log ...
      * KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
      * driver.addRestoreEntry(1,"value1");
      * driver.addRestoreEntry(2,"value2");
      * driver.addRestoreEntry(3,"value3");
-     * 
+     *
      * // Create the store using the driver's context ...
      * ProcessorContext context = driver.context();
      * KeyValueStore<Integer, String> store = ...
-     * 
+     *
      * // Verify that the store's contents were properly restored from the log ...
      * assertEquals(0, driver.checkForRestoredEntries(store));
-     * 
+     *
      * // and there are no other entries ...
      * assertEquals(3, driver.sizeOf(store));
      * 
- * + * * @param key the key for the entry * @param value the value for the entry * @see #checkForRestoredEntries(KeyValueStore) @@ -354,7 +354,7 @@ public void addEntryToRestoreLog(K key, V value) { *

* If the {@link KeyValueStore}'s are to be restored upon its startup, be sure to {@link #addEntryToRestoreLog(Object, Object) * add the restore entries} before creating the store with the {@link ProcessorContext} returned by this method. - * + * * @return the processing context; never null * @see #addEntryToRestoreLog(Object, Object) */ @@ -365,7 +365,7 @@ public ProcessorContext context() { /** * Get the entries that are restored to a KeyValueStore when it is constructed with this driver's {@link #context() * ProcessorContext}. - * + * * @return the restore entries; never null but possibly a null iterator */ public Iterable> restoredEntries() { @@ -375,7 +375,7 @@ public Iterable> restoredEntries() { /** * Utility method that will count the number of {@link #addEntryToRestoreLog(Object, Object) restore entries} missing from the * supplied store. - * + * * @param store the store that is to have all of the {@link #restoredEntries() restore entries} * @return the number of restore entries missing from the store, or 0 if all restore entries were found * @see #addEntryToRestoreLog(Object, Object) @@ -395,7 +395,7 @@ public int checkForRestoredEntries(KeyValueStore store) { /** * Utility method to compute the number of entries within the store. - * + * * @param store the key value store using this {@link #context()}. * @return the number of entries */ @@ -410,7 +410,7 @@ public int sizeOf(KeyValueStore store) { /** * Retrieve the value that the store {@link KeyValueStore#flush() flushed} with the given key. - * + * * @param key the key * @return the value that was flushed with the key, or {@code null} if no such key was flushed or if the entry with this * key was {@link #flushedEntryStored(Object) removed} upon flush @@ -421,7 +421,7 @@ public V flushedEntryStored(K key) { /** * Determine whether the store {@link KeyValueStore#flush() flushed} the removal of the given key. - * + * * @param key the key * @return {@code true} if the entry with the given key was removed when flushed, or {@code false} if the entry was not * removed when last flushed @@ -438,4 +438,4 @@ public void clear() { flushedEntries.clear(); flushedRemovals.clear(); } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index 761f5ce902ef..fb6e5ae4b9c5 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -82,13 +82,8 @@ public void setTime(long timestamp) { } @Override - public int id() { - return -1; - } - - @Override - public boolean joinable() { - return true; + public int statePartition() { + return 0; } @Override @@ -174,4 +169,4 @@ public long timestamp() { return this.timestamp; } -} \ No newline at end of file +} From e4ecf39b9ab0b0f4c915a4f43cfe771b1de69f7f Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Wed, 21 Oct 2015 12:33:05 -0700 Subject: [PATCH 04/13] joinability --- .../kafka/streams/kstream/KStreamBuilder.java | 10 ++- .../kstream/internals/KStreamImpl.java | 28 ++++--- .../internals/KStreamWindowedImpl.java | 19 ++++- .../processor/DefaultPartitionGrouper.java | 59 +++++-------- .../streams/processor/TopologyBuilder.java | 83 ++++++++++++------- .../processor/internals/QuickUnion.java | 12 ++- .../processor/internals/StreamTask.java | 6 +- .../processor/internals/StreamThread.java | 57 +++++++++++-- .../kstream/internals/KStreamJoinTest.java | 33 +++++++- .../processor/TopologyBuilderTest.java | 62 +++++++++++++- .../processor/internals/StreamThreadTest.java | 75 +++++++++++------ 11 files changed, 323 insertions(+), 121 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index 2d4dcc72f805..5b3feb6b97f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -21,6 +21,8 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl; import org.apache.kafka.streams.processor.TopologyBuilder; +import java.util.Collections; + /** * KStreamBuilder is the class to create KStream instances. */ @@ -31,7 +33,7 @@ public KStreamBuilder() { } /** - * Creates a KStream instance for the specified topic. The stream is added to the default synchronization group. + * Creates a KStream instance for the specified topic. * The default deserializers specified in the config are used. * * @param topics the topic names, if empty default to all the topics in the config @@ -42,11 +44,11 @@ public KStream from(String... topics) { addSource(name, topics); - return new KStreamImpl<>(this, name); + return new KStreamImpl<>(this, name, Collections.singleton(name)); } /** - * Creates a KStream instance for the specified topic. The stream is added to the default synchronization group. + * Creates a KStream instance for the specified topic. * * @param keyDeserializer key deserializer used to read this source KStream, * if not specified the default deserializer defined in the configs will be used @@ -60,6 +62,6 @@ public KStream from(Deserializer keyDeserializer, Dese addSource(name, keyDeserializer, valDeserializer, topics); - return new KStreamImpl<>(this, name); + return new KStreamImpl<>(this, name, Collections.singleton(name)); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 8f56e0968b9d..404193a815d5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -32,6 +32,8 @@ import org.apache.kafka.streams.processor.TopologyBuilder; import java.lang.reflect.Array; +import java.util.Collections; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; public class KStreamImpl implements KStream { @@ -72,10 +74,12 @@ public class KStreamImpl implements KStream { protected final TopologyBuilder topology; protected final String name; + protected final Set sourceNodes; - public KStreamImpl(TopologyBuilder topology, String name) { + public KStreamImpl(TopologyBuilder topology, String name, Set sourceNodes) { this.topology = topology; this.name = name; + this.sourceNodes = sourceNodes; } @Override @@ -84,7 +88,7 @@ public KStream filter(Predicate predicate) { topology.addProcessor(name, new KStreamFilter<>(predicate, false), this.name); - return new KStreamImpl<>(topology, name); + return new KStreamImpl<>(topology, name, sourceNodes); } @Override @@ -93,7 +97,7 @@ public KStream filterOut(final Predicate predicate) { topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name); - return new KStreamImpl<>(topology, name); + return new KStreamImpl<>(topology, name, sourceNodes); } @Override @@ -102,7 +106,7 @@ public KStream map(KeyValueMapper> mappe topology.addProcessor(name, new KStreamMap<>(mapper), this.name); - return new KStreamImpl<>(topology, name); + return new KStreamImpl<>(topology, name, null); } @Override @@ -111,7 +115,7 @@ public KStream mapValues(ValueMapper mapper) { topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name); - return new KStreamImpl<>(topology, name); + return new KStreamImpl<>(topology, name, sourceNodes); } @Override @@ -120,7 +124,7 @@ public KStream flatMap(KeyValueMapper(mapper), this.name); - return new KStreamImpl<>(topology, name); + return new KStreamImpl<>(topology, name, null); } @Override @@ -129,7 +133,7 @@ public KStream flatMapValues(ValueMapper> mapper) { topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name); - return new KStreamImpl<>(topology, name); + return new KStreamImpl<>(topology, name, sourceNodes); } @Override @@ -138,7 +142,7 @@ public KStreamWindowed with(WindowSupplier windowSupplier) { topology.addProcessor(name, new KStreamWindow<>(windowSupplier), this.name); - return new KStreamWindowedImpl<>(topology, name, windowSupplier); + return new KStreamWindowedImpl<>(topology, name, sourceNodes, windowSupplier); } @Override @@ -154,7 +158,7 @@ public KStream[] branch(Predicate... predicates) { topology.addProcessor(childName, new KStreamPassThrough(), branchName); - branchChildren[i] = new KStreamImpl<>(topology, childName); + branchChildren[i] = new KStreamImpl<>(topology, childName, sourceNodes); } return branchChildren; @@ -174,7 +178,7 @@ public KStream through(String topic, topology.addSource(sourceName, keyDeserializer, valDeserializer, topic); - return new KStreamImpl<>(topology, sourceName); + return new KStreamImpl<>(topology, sourceName, Collections.emptySet()); } @Override @@ -202,7 +206,7 @@ public KStream transform(TransformerSupplier(transformerSupplier), this.name); - return new KStreamImpl<>(topology, name); + return new KStreamImpl<>(topology, name, null); } @Override @@ -211,7 +215,7 @@ public KStream transformValues(ValueTransformerSupplier value topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name); - return new KStreamImpl<>(topology, name); + return new KStreamImpl<>(topology, name, sourceNodes); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java index 93160122e0ce..4e9f4c69da26 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java @@ -17,18 +17,22 @@ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamWindowed; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.WindowSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; +import java.util.HashSet; +import java.util.Set; + public final class KStreamWindowedImpl extends KStreamImpl implements KStreamWindowed { private final WindowSupplier windowSupplier; - public KStreamWindowedImpl(TopologyBuilder topology, String name, WindowSupplier windowSupplier) { - super(topology, name); + public KStreamWindowedImpl(TopologyBuilder topology, String name, Set sourceNodes, WindowSupplier windowSupplier) { + super(topology, name, sourceNodes); this.windowSupplier = windowSupplier; } @@ -36,6 +40,14 @@ public KStreamWindowedImpl(TopologyBuilder topology, String name, WindowSupplier public KStream join(KStreamWindowed other, ValueJoiner valueJoiner) { String thisWindowName = this.windowSupplier.name(); String otherWindowName = ((KStreamWindowedImpl) other).windowSupplier.name(); + Set thisSourceNodes = this.sourceNodes; + Set otherSourceNodes = ((KStreamWindowedImpl) other).sourceNodes; + + if (thisSourceNodes == null || otherSourceNodes == null) + throw new KafkaException("not joinable"); + + Set allSourceNodes = new HashSet<>(sourceNodes); + allSourceNodes.addAll(((KStreamWindowedImpl) other).sourceNodes); KStreamJoin joinThis = new KStreamJoin<>(otherWindowName, valueJoiner); KStreamJoin joinOther = new KStreamJoin<>(thisWindowName, KStreamJoin.reverseJoiner(valueJoiner)); @@ -48,7 +60,8 @@ public KStream join(KStreamWindowed other, ValueJoiner(topology, joinMergeName); + return new KStreamImpl<>(topology, joinMergeName, allSourceNodes); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java index 9ef1d0342967..9df3a615ae05 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java @@ -27,7 +27,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -41,40 +40,35 @@ public Map> partitionGroups(Cluster metadata) { long groupId = 0; for (List topicGroup : sortedTopicGroups) { - int numPartitions = ensureCopartitioning(topicGroup, metadata); + for (String topic : topicGroup) { + List infos = metadata.partitionsForTopic(topic); - for (int partitionId = 0; partitionId < numPartitions; partitionId++) { - long taskId = (groupId << 32) | (long) partitionId; + if (infos == null) + throw new KafkaException("topic not found :" + topic); - ArrayList group = new ArrayList<>(topicGroup.size()); - for (String topic : topicGroup) { + int numPartitions = infos.size(); + + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + long taskId = (groupId << 32) | (long) partitionId; + + List group = groups.get(taskId); + + if (group == null) { + group = new ArrayList<>(topicGroup.size()); + groups.put(taskId, group); + } group.add(new TopicPartition(topic, partitionId)); } - - groups.put(taskId, Collections.unmodifiableList(group)); } groupId++; } - return Collections.unmodifiableMap(groups); - } - - protected int ensureCopartitioning(List topicGroup, Cluster metadata) { - int numPartitions = -1; - - for (String topic : topicGroup) { - List infos = metadata.partitionsForTopic(topic); - - if (infos == null) - throw new KafkaException("topic not found :" + topic); - - if (numPartitions == -1) { - numPartitions = infos.size(); - } else if (numPartitions != infos.size()) { - throw new KafkaException("not copartitioned : [" + toString(topicGroup, ", ") + "]"); - } + // make the data unmodifiable, then return + Map> unmodifiableGroups = new HashMap<>(); + for (Map.Entry> entry : groups.entrySet()) { + unmodifiableGroups.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); } - return numPartitions; + return Collections.unmodifiableMap(unmodifiableGroups); } protected List> sort(Collection> topicGroups) { @@ -94,17 +88,4 @@ protected List> sort(Collection> topicGroups) { return list; } - protected CharSequence toString(Collection set, String separator) { - StringBuilder sb = new StringBuilder(); - Iterator iter = set.iterator(); - if (iter.hasNext()) { - sb.append(iter.next().toString()); - - while (iter.hasNext()) { - sb.append(separator); - sb.append(iter.next().toString()); - } - } - return sb; - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index e0d4990ccafc..a475e1ec5da0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -54,6 +54,7 @@ public class TopologyBuilder { private final Set sourceTopicNames = new HashSet<>(); private final QuickUnion nodeGroups = new QuickUnion<>(); + private final List> copartitionSourceGroups = new ArrayList<>(); private final HashMap nodeToTopics = new HashMap<>(); private interface NodeFactory { @@ -247,49 +248,75 @@ public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplie nodeNames.add(name); nodeFactories.add(new ProcessorNodeFactory(name, parentNames, supplier)); nodeGroups.add(name); + nodeGroups.unite(name, parentNames); return this; } /** - * Declares that the streams of the specified processor nodes must be copartitioned. + * Returns the topic groups. + * A topic group is a group of topics in the same task. * - * @param nodeName - * @param otherNodeNames + * @return groups of topic names */ - public void copartition(String nodeName, String... otherNodeNames) { - for (String other : otherNodeNames) { - nodeGroups.unite(nodeName, other); + public Collection> topicGroups() { + List> topicGroups = new ArrayList<>(); + + for (Set nodeGroup : generateNodeGroups(nodeGroups)) { + Set topicGroup = new HashSet<>(); + for (String node : nodeGroup) { + String[] topics = nodeToTopics.get(node); + if (topics != null) + topicGroup.addAll(Arrays.asList(topics)); + } + topicGroups.add(Collections.unmodifiableSet(topicGroup)); } + + return Collections.unmodifiableList(topicGroups); } - /** - * Returns the grouped topics - * - * @return groups of topic names - */ - public Collection> topicGroups() { - HashMap> topicGroupMap = new HashMap<>(); + private Collection> generateNodeGroups(QuickUnion grouping) { + HashMap> nodeGroupMap = new HashMap<>(); - // collect topics by node groups for (String nodeName : nodeNames) { - String[] topics = nodeToTopics.get(nodeName); - if (topics != null) { - String root = nodeGroups.root(nodeName); - Set topicGroup = topicGroupMap.get(root); - if (topicGroup == null) { - topicGroup = new HashSet<>(); - topicGroupMap.put(root, topicGroup); - } - topicGroup.addAll(Arrays.asList(topics)); + String root = grouping.root(nodeName); + Set nodeGroup = nodeGroupMap.get(root); + if (nodeGroup == null) { + nodeGroup = new HashSet<>(); + nodeGroupMap.put(root, nodeGroup); } + nodeGroup.add(nodeName); } - // make the data unmodifiable, then return - List> topicGroups = new ArrayList<>(topicGroupMap.size()); - for (Set group : topicGroupMap.values()) { - topicGroups.add(Collections.unmodifiableSet(group)); + return nodeGroupMap.values(); + } + + /** + * Asserts that the streams of the specified source nodes must be copartitioned. + * + * @param sourceNodes a set of source node names + */ + public void copartitionSources(Collection sourceNodes) { + copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes))); + } + + /** + * Returns the copartition groups. + * A copartition group is a group of topics that are required to be copartitioned. + * + * @return groups of topic names + */ + public Collection> copartitionGroups() { + List> list = new ArrayList<>(copartitionSourceGroups.size()); + for (Set nodeNames : copartitionSourceGroups) { + Set copartitionGroup = new HashSet<>(); + for (String node : nodeNames) { + String[] topics = nodeToTopics.get(node); + if (topics != null) + copartitionGroup.addAll(Arrays.asList(topics)); + } + list.add(Collections.unmodifiableSet(copartitionGroup)); } - return Collections.unmodifiableList(topicGroups); + return Collections.unmodifiableList(list); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java index 5487eba19e5a..087cbd2591e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java @@ -28,6 +28,10 @@ public void add(T id) { ids.put(id, id); } + public boolean exists(T id) { + return ids.containsKey(id); + } + public T root(T id) { T current = id; T parent = ids.get(current); @@ -46,7 +50,13 @@ public T root(T id) { return current; } - public void unite(T id1, T id2) { + public void unite(T id1, T... idList) { + for (T id2 : idList) { + unitePair(id1, id2); + } + } + + private void unitePair(T id1, T id2) { T root1 = root(id1); T root2 = root(id2); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index f34ea7ca7c25..629c17c6d0bf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -117,7 +117,7 @@ public StreamTask(long id, // create the processor state manager try { int partition = (int) (id & 0xFFFFFFFF); - File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), Integer.toString(partition)); + File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), Long.toString(id)); this.stateMgr = new ProcessorStateManager(partition, stateFile, restoreConsumer); } catch (IOException e) { throw new KafkaException("Error while creating the state manager", e); @@ -139,8 +139,8 @@ public StreamTask(long id, this.processorContext.initialized(); } - public String id() { - return (id >> 32) + ":" + (id & 0xFFFFFFFF); + public long id() { + return id; } public Set partitions() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 7f07a2657446..88c254ab4bfe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.MeasurableStat; import org.apache.kafka.common.metrics.Metrics; @@ -48,12 +49,16 @@ import java.io.IOException; import java.nio.channels.FileLock; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -71,7 +76,7 @@ public class StreamThread extends Thread { protected final Consumer consumer; protected final Consumer restoreConsumer; - private final Map tasks; + private final Map tasks; private final String clientId; private final Time time; private final File stateDir; @@ -194,7 +199,7 @@ public void close() { running.set(false); } - public Map tasks() { + public Map tasks() { return Collections.unmodifiableMap(tasks); } @@ -236,6 +241,8 @@ private void runLoop() { int totalNumBuffered = 0; boolean requiresPoll = true; + ensureCopartitioning(builder.copartitionGroups()); + partitionGrouper.topicGroups(builder.topicGroups()); consumer.subscribe(new ArrayList<>(builder.sourceTopics()), rebalanceListener); @@ -369,7 +376,7 @@ protected void maybeClean() { if (stateDirs != null) { for (File dir : stateDirs) { try { - Integer id = Integer.parseInt(dir.getName()); + Long id = Long.parseLong(dir.getName()); // try to acquire the exclusive lock on the state directory FileLock directoryLock = null; @@ -401,7 +408,7 @@ protected void maybeClean() { } } - protected StreamTask createStreamTask(int id, Collection partitionsForTask) { + protected StreamTask createStreamTask(long id, Collection partitionsForTask) { sensors.taskCreationSensor.record(); return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, builder.build(), config, sensors); @@ -412,7 +419,7 @@ private void addPartitions(Collection assignment) { // TODO: change this hard-coded co-partitioning behavior for (TopicPartition partition : partitions) { - final Integer id = partition.partition(); + final Long id = (long) partition.partition(); StreamTask task = tasks.get(id); if (task == null) { // get the partitions for the task @@ -451,6 +458,46 @@ private void removePartitions() { tasks.clear(); } + private void ensureCopartitioning(Collection> copartitionGroups) { + for (Set copartitionGroup : copartitionGroups) { + ensureCopartitioning(copartitionGroup); + } + } + + private void ensureCopartitioning(Set copartitionGroup) { + int numPartitions = -1; + + for (String topic : copartitionGroup) { + List infos = consumer.partitionsFor(topic); + + if (infos == null) + throw new KafkaException("topic not found: " + topic); + + if (numPartitions == -1) { + numPartitions = infos.size(); + } else if (numPartitions != infos.size()) { + String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]); + Arrays.sort(topics); + throw new KafkaException("topics not copartitioned: [" + toString(Arrays.asList(topics), ",") + "]"); + } + } + } + + protected CharSequence toString(Collection coll, String separator) { + StringBuilder sb = new StringBuilder(); + Iterator iter = coll.iterator(); + if (iter.hasNext()) { + sb.append(iter.next().toString()); + + while (iter.hasNext()) { + sb.append(separator); + sb.append(iter.next().toString()); + } + } + return sb; + } + + private class StreamingMetricsImpl implements StreamingMetrics { final Metrics metrics; final String metricGrpName; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java index 58899faeda09..12bed1788fa3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Utils; @@ -32,12 +33,18 @@ import org.apache.kafka.test.UnlimitedWindowDef; import org.junit.Test; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + import static org.junit.Assert.assertEquals; public class KStreamJoinTest { private String topic1 = "topic1"; private String topic2 = "topic2"; + private String dummyTopic = "dummyTopic"; private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); private StringDeserializer valDeserializer = new StringDeserializer(); @@ -88,6 +95,7 @@ public void testJoin() { KStream stream1; KStream stream2; + KStream dummyStream; KStreamWindowed windowed1; KStreamWindowed windowed2; MockProcessorSupplier processor; @@ -96,11 +104,17 @@ public void testJoin() { processor = new MockProcessorSupplier<>(); stream1 = builder.from(keyDeserializer, valDeserializer, topic1); stream2 = builder.from(keyDeserializer, valDeserializer, topic2); + dummyStream = builder.from(keyDeserializer, valDeserializer, dummyTopic); windowed1 = stream1.with(new UnlimitedWindowDef("window1")); windowed2 = stream2.with(new UnlimitedWindowDef("window2")); windowed1.join(windowed2, joiner).process(processor); + Collection> copartitionGroups = builder.copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + KStreamTestDriver driver = new KStreamTestDriver(builder); driver.setTime(0L); @@ -160,5 +174,22 @@ public void testJoin() { } } - // TODO: test for joinability + @Test(expected = KafkaException.class) + public void testNotJoinable() { + KStreamBuilder builder = new KStreamBuilder(); + + KStream stream1; + KStream stream2; + KStreamWindowed windowed1; + KStreamWindowed windowed2; + MockProcessorSupplier processor; + + processor = new MockProcessorSupplier<>(); + stream1 = builder.from(keyDeserializer, valDeserializer, topic1).map(keyValueMapper); + stream2 = builder.from(keyDeserializer, valDeserializer, topic2); + windowed1 = stream1.with(new UnlimitedWindowDef("window1")); + windowed2 = stream2.with(new UnlimitedWindowDef("window2")); + + windowed1.join(windowed2, joiner).process(processor); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index 00522d53f616..1e622c03cbf5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -22,6 +22,13 @@ import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.TreeMap; + public class TopologyBuilderTest { @Test(expected = TopologyException.class) @@ -94,6 +101,59 @@ public void testSourceTopics() { builder.addSource("source-2", "topic-2"); builder.addSource("source-3", "topic-3"); - assertEquals(builder.sourceTopics().size(), 3); + assertEquals(3, builder.sourceTopics().size()); } + + @Test + public void testTopicGroups() { + final TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("source-1", "topic-1", "topic-1x"); + builder.addSource("source-2", "topic-2"); + builder.addSource("source-3", "topic-3"); + builder.addSource("source-4", "topic-4"); + builder.addSource("source-5", "topic-5"); + + builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1"); + + builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1"); + builder.copartitionSources(list("source-1", "source-2")); + + builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4"); + + List> topicGroups = sort(builder.topicGroups()); + + assertEquals(3, topicGroups.size()); + assertEquals(list(list("topic-1", "topic-1x", "topic-2"), list("topic-3", "topic-4"), list("topic-5")), topicGroups); + + List> copartitionGroups = sort(builder.copartitionGroups()); + + assertEquals(list(list("topic-1", "topic-1x", "topic-2")), copartitionGroups); + } + + private List> sort(Collection> topicGroups) { + TreeMap sortedMap = new TreeMap<>(); + + for (Set group : topicGroups) { + String[] arr = group.toArray(new String[group.size()]); + Arrays.sort(arr); + sortedMap.put(arr[0], arr); + } + + ArrayList> list = new ArrayList(sortedMap.size()); + for (String[] arr : sortedMap.values()) { + list.add(Arrays.asList(arr)); + } + + return list; + } + + private List list(T... elems) { + List s = new ArrayList(); + for (T elem : elems) { + s.add(elem); + } + return s; + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index a7e707e65de4..ea4d978e77d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamingConfig; import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; import java.io.File; @@ -55,6 +56,13 @@ public class StreamThreadTest { private TopicPartition t1p2 = new TopicPartition("topic1", 2); private TopicPartition t2p1 = new TopicPartition("topic2", 1); private TopicPartition t2p2 = new TopicPartition("topic2", 2); + private TopicPartition t3p1 = new TopicPartition("topic3", 1); + private TopicPartition t3p2 = new TopicPartition("topic3", 2); + + private final long g1p1 = (0L << 32) | 1L; // TODO: (1L << 32 | 1L) + private final long g1p2 = (0L << 32) | 2L; // TODO: (1L << 32 | 1L) + private final long g2p1 = (0L << 32) | 1L; // TODO: (2L << 32 | 1L) + private final long g2p2 = (0L << 32) | 2L; // TODO: (2L << 32 | 2L) private Properties configProps() { return new Properties() { @@ -73,7 +81,7 @@ private Properties configProps() { private static class TestStreamTask extends StreamTask { public boolean committed = false; - public TestStreamTask(int id, + public TestStreamTask(long id, Consumer consumer, Producer producer, Consumer restoreConsumer, @@ -102,12 +110,15 @@ public void testPartitionAssignmentChange() throws Exception { final MockConsumer mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); TopologyBuilder builder = new TopologyBuilder(); + builder.addSource("source0", "topic0"); builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); + builder.addSource("source3", "topic3"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3"); StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), new SystemTime()) { @Override - protected StreamTask createStreamTask(int id, Collection partitionsForTask) { + protected StreamTask createStreamTask(long id, Collection partitionsForTask) { return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config); } }; @@ -128,8 +139,8 @@ protected StreamTask createStreamTask(int id, Collection partiti rebalanceListener.onPartitionsRevoked(revokedPartitions); rebalanceListener.onPartitionsAssigned(assignedPartitions); - assertTrue(thread.tasks().containsKey(1)); - assertEquals(expectedGroup1, thread.tasks().get(1).partitions()); + assertTrue(thread.tasks().containsKey(g1p1)); + assertEquals(expectedGroup1, thread.tasks().get(g1p1).partitions()); assertEquals(1, thread.tasks().size()); revokedPartitions = assignedPartitions; @@ -139,8 +150,8 @@ protected StreamTask createStreamTask(int id, Collection partiti rebalanceListener.onPartitionsRevoked(revokedPartitions); rebalanceListener.onPartitionsAssigned(assignedPartitions); - assertTrue(thread.tasks().containsKey(2)); - assertEquals(expectedGroup2, thread.tasks().get(2).partitions()); + assertTrue(thread.tasks().containsKey(g1p2)); + assertEquals(expectedGroup2, thread.tasks().get(g1p2).partitions()); assertEquals(1, thread.tasks().size()); revokedPartitions = assignedPartitions; @@ -151,25 +162,41 @@ protected StreamTask createStreamTask(int id, Collection partiti rebalanceListener.onPartitionsRevoked(revokedPartitions); rebalanceListener.onPartitionsAssigned(assignedPartitions); - assertTrue(thread.tasks().containsKey(1)); - assertTrue(thread.tasks().containsKey(2)); - assertEquals(expectedGroup1, thread.tasks().get(1).partitions()); - assertEquals(expectedGroup2, thread.tasks().get(2).partitions()); + assertTrue(thread.tasks().containsKey(g1p1)); + assertTrue(thread.tasks().containsKey(g1p2)); + assertEquals(expectedGroup1, thread.tasks().get(g1p1).partitions()); + assertEquals(expectedGroup2, thread.tasks().get(g1p2).partitions()); + assertEquals(2, thread.tasks().size()); + + revokedPartitions = assignedPartitions; + assignedPartitions = Arrays.asList(t2p1, t2p2, t3p1, t3p2); + expectedGroup1 = new HashSet<>(Arrays.asList(t2p1, t3p1)); + expectedGroup2 = new HashSet<>(Arrays.asList(t2p2, t3p2)); + + rebalanceListener.onPartitionsRevoked(revokedPartitions); + rebalanceListener.onPartitionsAssigned(assignedPartitions); + + assertTrue(thread.tasks().containsKey(g2p1)); + assertTrue(thread.tasks().containsKey(g2p2)); + assertEquals(expectedGroup1, thread.tasks().get(g2p1).partitions()); + assertEquals(expectedGroup2, thread.tasks().get(g2p2).partitions()); assertEquals(2, thread.tasks().size()); + /* TODO: revokedPartitions = assignedPartitions; - assignedPartitions = Arrays.asList(t1p1, t1p2, t2p1, t2p2); - expectedGroup1 = new HashSet<>(Arrays.asList(t1p1, t2p1)); - expectedGroup2 = new HashSet<>(Arrays.asList(t1p2, t2p2)); + assignedPartitions = Arrays.asList(t1p1, t2p1, t3p1); + expectedGroup1 = new HashSet<>(Arrays.asList(t1p1)); + expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1)); rebalanceListener.onPartitionsRevoked(revokedPartitions); rebalanceListener.onPartitionsAssigned(assignedPartitions); - assertTrue(thread.tasks().containsKey(1)); - assertTrue(thread.tasks().containsKey(2)); - assertEquals(expectedGroup1, thread.tasks().get(1).partitions()); - assertEquals(expectedGroup2, thread.tasks().get(2).partitions()); + assertTrue(thread.tasks().containsKey(g1p1)); + assertTrue(thread.tasks().containsKey(g2p1)); + assertEquals(expectedGroup1, thread.tasks().get(g1p1).partitions()); + assertEquals(expectedGroup2, thread.tasks().get(g2p1).partitions()); assertEquals(2, thread.tasks().size()); + */ revokedPartitions = assignedPartitions; assignedPartitions = Collections.emptyList(); @@ -214,7 +241,7 @@ public void maybeClean() { super.maybeClean(); } @Override - protected StreamTask createStreamTask(int id, Collection partitionsForTask) { + protected StreamTask createStreamTask(long id, Collection partitionsForTask) { return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config); } }; @@ -235,7 +262,7 @@ protected StreamTask createStreamTask(int id, Collection partiti Map prevTasks; // - // Assign t1p1 and t1p2. This should create Task 1 & 2 + // Assign t1p1 and t1p2. This should create Task g1p1 & g1p2 // revokedPartitions = Collections.emptyList(); assignedPartitions = Arrays.asList(t1p1, t1p2); @@ -258,7 +285,7 @@ protected StreamTask createStreamTask(int id, Collection partiti assertTrue(stateDir3.exists()); assertTrue(extraDir.exists()); - // all state directories except for task 1 & 2 will be removed. the extra directory should still exists + // all state directories except for task g1p1 & g1p2 will be removed. the extra directory should still exists mockTime.sleep(11L); thread.maybeClean(); assertTrue(stateDir1.exists()); @@ -267,7 +294,7 @@ protected StreamTask createStreamTask(int id, Collection partiti assertTrue(extraDir.exists()); // - // Revoke t1p1 and t1p2. This should remove Task 1 & 2 + // Revoke t1p1 and t1p2. This should remove Task g1p1 & g1p2 // revokedPartitions = assignedPartitions; assignedPartitions = Collections.emptyList(); @@ -286,7 +313,7 @@ protected StreamTask createStreamTask(int id, Collection partiti // no task assertTrue(thread.tasks().isEmpty()); - // all state directories for task 1 & 2 still exist before the cleanup delay time + // all state directories for task g1p1 & g1p2 still exist before the cleanup delay time mockTime.sleep(cleanupDelay - 10L); thread.maybeClean(); assertTrue(stateDir1.exists()); @@ -294,7 +321,7 @@ protected StreamTask createStreamTask(int id, Collection partiti assertFalse(stateDir3.exists()); assertTrue(extraDir.exists()); - // all state directories for task 1 & 2 are removed + // all state directories for task g1p1 & g1p2 are removed mockTime.sleep(11L); thread.maybeClean(); assertFalse(stateDir1.exists()); @@ -332,7 +359,7 @@ public void maybeCommit() { super.maybeCommit(); } @Override - protected StreamTask createStreamTask(int id, Collection partitionsForTask) { + protected StreamTask createStreamTask(long id, Collection partitionsForTask) { return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config); } }; From f68723bab83c3a3f1c15872f4f24bc932df8198f Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Thu, 22 Oct 2015 11:21:31 -0700 Subject: [PATCH 05/13] partition assignor --- .../apache/kafka/streams/StreamingConfig.java | 3 +- .../processor/DefaultPartitionGrouper.java | 48 ++++--- .../streams/processor/PartitionGrouper.java | 15 ++- .../KafkaStreamingPartitionAssignor.java | 121 ++++++++++++++++++ .../internals/PartitionAssignorImpl.java | 66 ---------- .../processor/internals/StreamTask.java | 11 +- .../processor/internals/StreamThread.java | 56 ++++---- .../DefaultPartitionGrouperTest.java | 93 ++++++++++++++ .../processor/internals/QuickUnionTest.java | 74 +++++++++++ .../processor/internals/StreamThreadTest.java | 87 ++++++++++--- 10 files changed, 426 insertions(+), 148 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java delete mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionAssignorImpl.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java index c544d57d1af6..9203b9adc7a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.streams.processor.DefaultPartitionGrouper; import org.apache.kafka.streams.processor.PartitionGrouper; +import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor; import java.util.Map; @@ -220,7 +221,7 @@ public Map getConsumerConfigs() { // set consumer default property values props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range"); + props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName()); // remove properties that are not required for consumers props.remove(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java index 9df3a615ae05..f87cfa81bc70 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java @@ -34,43 +34,49 @@ public class DefaultPartitionGrouper extends PartitionGrouper { - public Map> partitionGroups(Cluster metadata) { - Map> groups = new HashMap<>(); + public Map> partitionGroups(Cluster metadata) { + Map> groups = new HashMap<>(); List> sortedTopicGroups = sort(topicGroups); - long groupId = 0; + int taskId = 0; for (List topicGroup : sortedTopicGroups) { - for (String topic : topicGroup) { - List infos = metadata.partitionsForTopic(topic); + int maxNumPartitions = maxNumPartitions(metadata, topicGroup); - if (infos == null) - throw new KafkaException("topic not found :" + topic); + for (int partitionId = 0; partitionId < maxNumPartitions; partitionId++) { + List group = new ArrayList<>(topicGroup.size()); - int numPartitions = infos.size(); - - for (int partitionId = 0; partitionId < numPartitions; partitionId++) { - long taskId = (groupId << 32) | (long) partitionId; - - List group = groups.get(taskId); - - if (group == null) { - group = new ArrayList<>(topicGroup.size()); - groups.put(taskId, group); + for (String topic : topicGroup) { + if (partitionId < metadata.partitionsForTopic(topic).size()) { + group.add(new TopicPartition(topic, partitionId)); } - group.add(new TopicPartition(topic, partitionId)); } + groups.put(taskId++, group); } - groupId++; } // make the data unmodifiable, then return - Map> unmodifiableGroups = new HashMap<>(); - for (Map.Entry> entry : groups.entrySet()) { + Map> unmodifiableGroups = new HashMap<>(); + for (Map.Entry> entry : groups.entrySet()) { unmodifiableGroups.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); } return Collections.unmodifiableMap(unmodifiableGroups); } + protected int maxNumPartitions(Cluster metadata, List topics) { + int maxNumPartitions = 0; + for (String topic : topics) { + List infos = metadata.partitionsForTopic(topic); + + if (infos == null) + throw new KafkaException("topic not found :" + topic); + + int numPartitions = infos.size(); + if (numPartitions > maxNumPartitions) + maxNumPartitions = numPartitions; + } + return maxNumPartitions; + } + protected List> sort(Collection> topicGroups) { TreeMap sortedMap = new TreeMap<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java index 2110da61c663..a17721965a2a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor; import java.util.Collection; import java.util.List; @@ -29,7 +30,7 @@ public abstract class PartitionGrouper { protected Collection> topicGroups; - private Map> partitionToTaskIds; + private KafkaStreamingPartitionAssignor partitionAssignor = null; /** * Returns a map of task ids to groups of partitions. The task id is the 64 bit integer @@ -39,18 +40,18 @@ public abstract class PartitionGrouper { * @param metadata * @return a map of task ids to groups of partitions */ - public abstract Map> partitionGroups(Cluster metadata); + public abstract Map> partitionGroups(Cluster metadata); - public final void topicGroups(Collection> topicGroups) { + public void topicGroups(Collection> topicGroups) { this.topicGroups = topicGroups; } - public final void partitionToTaskIds(Map> partitionToTaskIds) { - this.partitionToTaskIds = partitionToTaskIds; + public void partitionAssignor(KafkaStreamingPartitionAssignor partitionAssignor) { + this.partitionAssignor = partitionAssignor; } - public final Set taskIds(TopicPartition partition) { - return partitionToTaskIds.get(partition); + public Set taskIds(TopicPartition partition) { + return partitionAssignor.taskIds(partition); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java new file mode 100644 index 000000000000..36c50c3769a7 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.internals.PartitionAssignor; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.streams.processor.PartitionGrouper; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Configurable { + + private PartitionGrouper partitionGrouper; + private Map> partitionToTaskIds; + + @Override + public void configure(Map configs) { + Object o = configs.get(StreamingConfig.InternalConfig.PARTITION_GROUPER_INSTANCE); + if (o == null) + throw new KafkaException("PartitionGrouper is not specified"); + + if (!PartitionGrouper.class.isInstance(o)) + throw new KafkaException(o.getClass().getName() + " is not an instance of " + PartitionGrouper.class.getName()); + + partitionGrouper = (PartitionGrouper) o; + partitionGrouper.partitionAssignor(this); + } + + @Override + public String name() { + return "streaming"; + } + + @Override + public Subscription subscription(Set topics) { + return new Subscription(new ArrayList<>(topics)); + } + + @Override + public Map assign(Cluster metadata, Map subscriptions) { + Map> partitionGroups = partitionGrouper.partitionGroups(metadata); + + String[] clientIds = subscriptions.keySet().toArray(new String[subscriptions.size()]); + Integer[] taskIds = partitionGroups.keySet().toArray(new Integer[partitionGroups.size()]); + + Map assignment = new HashMap<>(); + + for (int i = 0; i < clientIds.length; i++) { + List partitions = new ArrayList<>(); + List ids = new ArrayList<>(); + for (int j = i; j < taskIds.length; j += clientIds.length) { + Integer taskId = taskIds[j]; + for (TopicPartition partition : partitionGroups.get(taskId)) { + partitions.add(partition); + ids.add(taskId); + } + } + // encode task ids + ByteBuffer buf = ByteBuffer.allocate(ids.size() * 4); + for (Integer id : ids) { + buf.putInt(id); + } + buf.rewind(); + assignment.put(clientIds[i], new Assignment(partitions, buf)); + } + + return assignment; + } + + @Override + public void onAssignment(Assignment assignment) { + List partitions = assignment.partitions(); + ByteBuffer data = assignment.userData(); + + Map> partitionToTaskIds = new HashMap<>(); + + int i = 0; + for (TopicPartition partition : partitions) { + Set taskIds = partitionToTaskIds.get(partition); + if (taskIds == null) { + taskIds = new HashSet<>(); + partitionToTaskIds.put(partition, taskIds); + } + // decode a task id + data.rewind(); + taskIds.add(data.getInt(i * 4)); + i++; + } + this.partitionToTaskIds = partitionToTaskIds; + } + + public Set taskIds(TopicPartition partition) { + return partitionToTaskIds.get(partition); + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionAssignorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionAssignorImpl.java deleted file mode 100644 index aa83b7e745ba..000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionAssignorImpl.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor.internals; - -import org.apache.kafka.common.Configurable; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.streams.StreamingConfig; -import org.apache.kafka.streams.processor.PartitionGrouper; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class PartitionAssignorImpl implements Configurable { - - private PartitionGrouper partitionGrouper; - - @SuppressWarnings("unchecked") - public void configure(Map configs) { - Object o = configs.get(StreamingConfig.InternalConfig.PARTITION_GROUPER_INSTANCE); - if (o == null) - throw new KafkaException("PartitionGrouper is not specified"); - - if (!PartitionGrouper.class.isInstance(o)) - throw new KafkaException(o.getClass().getName() + " is not an instance of " + PartitionGrouper.class.getName()); - - partitionGrouper = (PartitionGrouper) o; - } - - public void partitionToTaskIds(List partitions, ByteBuffer data) { - Map> partitionToTaskIds = new HashMap<>(); - - int i = 0; - for (TopicPartition partition : partitions) { - Set taskIds = partitionToTaskIds.get(partition); - if (taskIds == null) { - taskIds = new HashSet<>(); - partitionToTaskIds.put(partition, taskIds); - } - taskIds.add(data.getLong(i * 8)); - i += 4; - } - partitionGrouper.partitionToTaskIds(Collections.unmodifiableMap(partitionToTaskIds)); - } - -} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 629c17c6d0bf..1de6f9bd6afd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -45,7 +45,7 @@ public class StreamTask implements Punctuator { private static final Logger log = LoggerFactory.getLogger(StreamTask.class); - private final long id; + private final int id; private final int maxBufferedSize; private final Consumer consumer; @@ -78,7 +78,7 @@ public class StreamTask implements Punctuator { * @param config the {@link StreamingConfig} specified by the user * @param metrics the {@link StreamingMetrics} created by the thread */ - public StreamTask(long id, + public StreamTask(int id, Consumer consumer, Producer producer, Consumer restoreConsumer, @@ -116,9 +116,8 @@ public StreamTask(long id, // create the processor state manager try { - int partition = (int) (id & 0xFFFFFFFF); - File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), Long.toString(id)); - this.stateMgr = new ProcessorStateManager(partition, stateFile, restoreConsumer); + File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), Integer.toString(id)); + this.stateMgr = new ProcessorStateManager(id, stateFile, restoreConsumer); } catch (IOException e) { throw new KafkaException("Error while creating the state manager", e); } @@ -139,7 +138,7 @@ public StreamTask(long id, this.processorContext.initialized(); } - public long id() { + public int id() { return id; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 88c254ab4bfe..fd197eaf0581 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -76,7 +76,7 @@ public class StreamThread extends Thread { protected final Consumer consumer; protected final Consumer restoreConsumer; - private final Map tasks; + private final Map tasks; private final String clientId; private final Time time; private final File stateDir; @@ -127,6 +127,7 @@ public StreamThread(TopologyBuilder builder, this.builder = builder; this.clientId = clientId; this.partitionGrouper = config.getConfiguredInstance(StreamingConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class); + this.partitionGrouper.topicGroups(builder.topicGroups()); // set the producer and consumer clients this.producer = (producer != null) ? producer : createProducer(); @@ -199,7 +200,7 @@ public void close() { running.set(false); } - public Map tasks() { + public Map tasks() { return Collections.unmodifiableMap(tasks); } @@ -243,7 +244,6 @@ private void runLoop() { ensureCopartitioning(builder.copartitionGroups()); - partitionGrouper.topicGroups(builder.topicGroups()); consumer.subscribe(new ArrayList<>(builder.sourceTopics()), rebalanceListener); while (stillRunning()) { @@ -376,7 +376,7 @@ protected void maybeClean() { if (stateDirs != null) { for (File dir : stateDirs) { try { - Long id = Long.parseLong(dir.getName()); + int id = Integer.parseInt(dir.getName()); // try to acquire the exclusive lock on the state directory FileLock directoryLock = null; @@ -408,34 +408,35 @@ protected void maybeClean() { } } - protected StreamTask createStreamTask(long id, Collection partitionsForTask) { + protected StreamTask createStreamTask(int id, Collection partitionsForTask) { sensors.taskCreationSensor.record(); return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, builder.build(), config, sensors); } private void addPartitions(Collection assignment) { - HashSet partitions = new HashSet<>(assignment); - - // TODO: change this hard-coded co-partitioning behavior - for (TopicPartition partition : partitions) { - final Long id = (long) partition.partition(); - StreamTask task = tasks.get(id); - if (task == null) { - // get the partitions for the task - HashSet partitionsForTask = new HashSet<>(); - for (TopicPartition part : partitions) - if (part.partition() == id) - partitionsForTask.add(part); - - // create the task - try { - task = createStreamTask(id, partitionsForTask); - } catch (Exception e) { - log.error("Failed to create a task #" + id + " in thread [" + this.getName() + "]: ", e); - throw e; + + HashMap> partitionsForTask = new HashMap<>(); + + for (TopicPartition partition : assignment) { + Set taskIds = partitionGrouper.taskIds(partition); + for (Integer taskId : taskIds) { + Set partitions = partitionsForTask.get(taskId); + if (partitions == null) { + partitions = new HashSet<>(); + partitionsForTask.put(taskId, partitions); } - tasks.put(id, task); + partitions.add(partition); + } + } + + // create the tasks + for (Integer taskId : partitionsForTask.keySet()) { + try { + tasks.put(taskId, createStreamTask(taskId, partitionsForTask.get(taskId))); + } catch (Exception e) { + log.error("Failed to create a task #" + taskId + " in thread [" + this.getName() + "]: ", e); + throw e; } } @@ -458,6 +459,10 @@ private void removePartitions() { tasks.clear(); } + public PartitionGrouper partitionGrouper() { + return partitionGrouper; + } + private void ensureCopartitioning(Collection> copartitionGroups) { for (Set copartitionGroup : copartitionGroups) { ensureCopartitioning(copartitionGroup); @@ -497,7 +502,6 @@ protected CharSequence toString(Collection coll, String separator) { return sb; } - private class StreamingMetricsImpl implements StreamingMetrics { final Metrics metrics; final String metricGrpName; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java new file mode 100644 index 000000000000..3273a77c4c3a --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +public class DefaultPartitionGrouperTest { + + private List infos = Arrays.asList( + new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]) + ); + + private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos); + + @Test + public void testGrouping() { + PartitionGrouper grouper = new DefaultPartitionGrouper(); + int taskId; + Map> expected; + + grouper.topicGroups(Arrays.asList(set("topic1"), set("topic2"))); + + expected = new HashMap<>(); + taskId = 0; + expected.put(taskId++, list(new TopicPartition("topic1", 0))); + expected.put(taskId++, list(new TopicPartition("topic1", 1))); + expected.put(taskId++, list(new TopicPartition("topic1", 2))); + expected.put(taskId++, list(new TopicPartition("topic2", 0))); + expected.put(taskId, list(new TopicPartition("topic2", 1))); + + assertEquals(expected, grouper.partitionGroups(metadata)); + + grouper.topicGroups(Arrays.asList(set("topic1", "topic2"))); + + expected = new HashMap<>(); + taskId = 0; + expected.put(taskId++, list(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0))); + expected.put(taskId++, list(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1))); + expected.put(taskId, list(new TopicPartition("topic1", 2))); + + assertEquals(expected, grouper.partitionGroups(metadata)); + } + + private Set set(T... items) { + Set set = new HashSet<>(); + for (T item : items) { + set.add(item); + } + return set; + } + + private List list(T... items) { + List set = new ArrayList<>(); + for (T item : items) { + set.add(item); + } + return set; + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java new file mode 100644 index 000000000000..d91ecf266195 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +public class QuickUnionTest { + + @SuppressWarnings("unchecked") + @Test + public void testQuickUnion() { + QuickUnion qu = new QuickUnion<>(); + + long[] ids = { + 1L, 2L, 3L, 4L, 5L + }; + + for (long id : ids) { + qu.add(id); + } + + assertEquals(5, roots(qu, ids).size()); + + qu.unite(1L, 2L); + assertEquals(4, roots(qu, ids).size()); + assertEquals(qu.root(1L), qu.root(2L)); + + qu.unite(3L, 4L); + assertEquals(3, roots(qu, ids).size()); + assertEquals(qu.root(1L), qu.root(2L)); + assertEquals(qu.root(3L), qu.root(4L)); + + qu.unite(1L, 5L); + assertEquals(2, roots(qu, ids).size()); + assertEquals(qu.root(1L), qu.root(2L)); + assertEquals(qu.root(2L), qu.root(5L)); + assertEquals(qu.root(3L), qu.root(4L)); + + qu.unite(3L, 5L); + assertEquals(1, roots(qu, ids).size()); + assertEquals(qu.root(1L), qu.root(2L)); + assertEquals(qu.root(2L), qu.root(3L)); + assertEquals(qu.root(3L), qu.root(4L)); + assertEquals(qu.root(4L), qu.root(5L)); + } + + private Set roots(QuickUnion qu, long... ids) { + HashSet roots = new HashSet<>(); + for (long id : ids) { + roots.add(qu.root(id)); + } + return roots; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index ea4d978e77d3..cf4bb08b593e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -25,8 +25,12 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -34,6 +38,7 @@ import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.streams.processor.PartitionGrouper; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; @@ -59,10 +64,28 @@ public class StreamThreadTest { private TopicPartition t3p1 = new TopicPartition("topic3", 1); private TopicPartition t3p2 = new TopicPartition("topic3", 2); - private final long g1p1 = (0L << 32) | 1L; // TODO: (1L << 32 | 1L) - private final long g1p2 = (0L << 32) | 2L; // TODO: (1L << 32 | 1L) - private final long g2p1 = (0L << 32) | 1L; // TODO: (2L << 32 | 1L) - private final long g2p2 = (0L << 32) | 2L; // TODO: (2L << 32 | 2L) + private List infos = Arrays.asList( + new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]) + ); + + private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos); + + PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3")); + + // task0 is unused + private final int task1 = 1; + private final int task2 = 2; + // task3 is unused + private final int task4 = 4; + private final int task5 = 5; private Properties configProps() { return new Properties() { @@ -81,7 +104,7 @@ private Properties configProps() { private static class TestStreamTask extends StreamTask { public boolean committed = false; - public TestStreamTask(long id, + public TestStreamTask(int id, Consumer consumer, Producer producer, Consumer restoreConsumer, @@ -110,7 +133,6 @@ public void testPartitionAssignmentChange() throws Exception { final MockConsumer mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); TopologyBuilder builder = new TopologyBuilder(); - builder.addSource("source0", "topic0"); builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addSource("source3", "topic3"); @@ -118,11 +140,13 @@ public void testPartitionAssignmentChange() throws Exception { StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), new SystemTime()) { @Override - protected StreamTask createStreamTask(long id, Collection partitionsForTask) { + protected StreamTask createStreamTask(int id, Collection partitionsForTask) { return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config); } }; + initPartitionGrouper(thread); + ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; assertTrue(thread.tasks().isEmpty()); @@ -139,8 +163,8 @@ protected StreamTask createStreamTask(long id, Collection partit rebalanceListener.onPartitionsRevoked(revokedPartitions); rebalanceListener.onPartitionsAssigned(assignedPartitions); - assertTrue(thread.tasks().containsKey(g1p1)); - assertEquals(expectedGroup1, thread.tasks().get(g1p1).partitions()); + assertTrue(thread.tasks().containsKey(task1)); + assertEquals(expectedGroup1, thread.tasks().get(task1).partitions()); assertEquals(1, thread.tasks().size()); revokedPartitions = assignedPartitions; @@ -150,8 +174,8 @@ protected StreamTask createStreamTask(long id, Collection partit rebalanceListener.onPartitionsRevoked(revokedPartitions); rebalanceListener.onPartitionsAssigned(assignedPartitions); - assertTrue(thread.tasks().containsKey(g1p2)); - assertEquals(expectedGroup2, thread.tasks().get(g1p2).partitions()); + assertTrue(thread.tasks().containsKey(task2)); + assertEquals(expectedGroup2, thread.tasks().get(task2).partitions()); assertEquals(1, thread.tasks().size()); revokedPartitions = assignedPartitions; @@ -162,10 +186,10 @@ protected StreamTask createStreamTask(long id, Collection partit rebalanceListener.onPartitionsRevoked(revokedPartitions); rebalanceListener.onPartitionsAssigned(assignedPartitions); - assertTrue(thread.tasks().containsKey(g1p1)); - assertTrue(thread.tasks().containsKey(g1p2)); - assertEquals(expectedGroup1, thread.tasks().get(g1p1).partitions()); - assertEquals(expectedGroup2, thread.tasks().get(g1p2).partitions()); + assertTrue(thread.tasks().containsKey(task1)); + assertTrue(thread.tasks().containsKey(task2)); + assertEquals(expectedGroup1, thread.tasks().get(task1).partitions()); + assertEquals(expectedGroup2, thread.tasks().get(task2).partitions()); assertEquals(2, thread.tasks().size()); revokedPartitions = assignedPartitions; @@ -176,10 +200,10 @@ protected StreamTask createStreamTask(long id, Collection partit rebalanceListener.onPartitionsRevoked(revokedPartitions); rebalanceListener.onPartitionsAssigned(assignedPartitions); - assertTrue(thread.tasks().containsKey(g2p1)); - assertTrue(thread.tasks().containsKey(g2p2)); - assertEquals(expectedGroup1, thread.tasks().get(g2p1).partitions()); - assertEquals(expectedGroup2, thread.tasks().get(g2p2).partitions()); + assertTrue(thread.tasks().containsKey(task4)); + assertTrue(thread.tasks().containsKey(task5)); + assertEquals(expectedGroup1, thread.tasks().get(task4).partitions()); + assertEquals(expectedGroup2, thread.tasks().get(task5).partitions()); assertEquals(2, thread.tasks().size()); /* TODO: @@ -240,12 +264,15 @@ public void testMaybeClean() throws Exception { public void maybeClean() { super.maybeClean(); } + @Override - protected StreamTask createStreamTask(long id, Collection partitionsForTask) { + protected StreamTask createStreamTask(int id, Collection partitionsForTask) { return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config); } }; + initPartitionGrouper(thread); + ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; assertTrue(thread.tasks().isEmpty()); @@ -358,12 +385,15 @@ public void testMaybeCommit() throws Exception { public void maybeCommit() { super.maybeCommit(); } + @Override - protected StreamTask createStreamTask(long id, Collection partitionsForTask) { + protected StreamTask createStreamTask(int id, Collection partitionsForTask) { return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config); } }; + initPartitionGrouper(thread); + ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; List revokedPartitions; @@ -414,4 +444,19 @@ protected StreamTask createStreamTask(long id, Collection partit Utils.delete(baseDir); } } + + private void initPartitionGrouper(StreamThread thread) { + PartitionGrouper partitionGrouper = thread.partitionGrouper(); + + KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); + + partitionAssignor.configure( + Collections.singletonMap(StreamingConfig.InternalConfig.PARTITION_GROUPER_INSTANCE, partitionGrouper) + ); + + Map assignments = + partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription)); + + partitionAssignor.onAssignment(assignments.get("client")); + } } From 13f3ad703960581229d511287f27345c567b5d3e Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Thu, 22 Oct 2015 11:34:52 -0700 Subject: [PATCH 06/13] complete undoing long taskid --- .../kafka/streams/kstream/SlidingWindowSupplier.java | 2 +- .../apache/kafka/streams/processor/ProcessorContext.java | 6 +++--- .../streams/processor/internals/ProcessorContextImpl.java | 8 ++++---- .../apache/kafka/streams/state/MeteredKeyValueStore.java | 4 ++-- .../apache/kafka/streams/state/RocksDBKeyValueStore.java | 2 +- .../kafka/streams/state/KeyValueStoreTestDriver.java | 2 +- .../java/org/apache/kafka/test/MockProcessorContext.java | 2 +- 7 files changed, 13 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java index 6d7fdedab8dd..1d5312335cc9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java @@ -83,7 +83,7 @@ public class SlidingWindow extends WindowSupport implements Window { @Override public void init(ProcessorContext context) { this.context = context; - this.partition = context.statePartition(); + this.partition = context.id(); SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback(); context.register(this, restoreFunc); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index 2164c12042b3..e7cf2573f2d6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -26,11 +26,11 @@ public interface ProcessorContext { /** - * Returns the state partition id + * Returns the task id * - * @return the state partition id + * @return the task id */ - int statePartition(); + int id(); /** * Returns the key serializer diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 0d54ab3796d0..dfc838ce72be 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -35,7 +35,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S private static final Logger log = LoggerFactory.getLogger(ProcessorContextImpl.class); - private final long id; + private final int id; private final StreamTask task; private final StreamingMetrics metrics; private final RecordCollector collector; @@ -49,7 +49,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S private boolean initialized; @SuppressWarnings("unchecked") - public ProcessorContextImpl(long id, + public ProcessorContextImpl(int id, StreamTask task, StreamingConfig config, RecordCollector collector, @@ -79,8 +79,8 @@ public void initialized() { } @Override - public int statePartition() { - return (int) (id & 0xFFFFFFFFL); + public int id() { + return id; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java index cb1147cd5e16..779bc75db8a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java @@ -73,7 +73,7 @@ public MeteredKeyValueStore(final String name, final KeyValueStore inner, this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name); this.topic = name; - this.partition = context.statePartition(); + this.partition = context.id(); this.context = context; @@ -171,7 +171,7 @@ public V delete(K key) { /** * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this * store other than {@link #delete(Object)}. - * + * * @param key the key for the entry that the inner store removed */ protected void removed(K key) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java index 751e6b804bcc..7393bb10bcf4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java @@ -81,7 +81,7 @@ private static class RocksDBStore implements KeyValueStore { public RocksDBStore(String name, ProcessorContext context, Serdes serdes) { this.topic = name; - this.partition = context.statePartition(); + this.partition = context.id(); this.context = context; this.serdes = serdes; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 23a40b1ad5d8..4dfa9c21a492 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -245,7 +245,7 @@ public void send(ProducerRecord record, Serializer keySeria this.context = new MockProcessorContext(null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(), serdes.valueDeserializer(), recordCollector) { @Override - public int statePartition() { + public int id() { return 1; } diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index fb6e5ae4b9c5..16df9c5529a8 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -82,7 +82,7 @@ public void setTime(long timestamp) { } @Override - public int statePartition() { + public int id() { return 0; } From 98f3bcc1896fd159ccbbd37fc65b1d9d6f568bb9 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Thu, 22 Oct 2015 11:45:38 -0700 Subject: [PATCH 07/13] fix a test --- .../processor/internals/StreamThreadTest.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index cf4bb08b593e..cbb2558c6147 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -206,7 +206,6 @@ protected StreamTask createStreamTask(int id, Collection partiti assertEquals(expectedGroup2, thread.tasks().get(task5).partitions()); assertEquals(2, thread.tasks().size()); - /* TODO: revokedPartitions = assignedPartitions; assignedPartitions = Arrays.asList(t1p1, t2p1, t3p1); expectedGroup1 = new HashSet<>(Arrays.asList(t1p1)); @@ -215,12 +214,11 @@ protected StreamTask createStreamTask(int id, Collection partiti rebalanceListener.onPartitionsRevoked(revokedPartitions); rebalanceListener.onPartitionsAssigned(assignedPartitions); - assertTrue(thread.tasks().containsKey(g1p1)); - assertTrue(thread.tasks().containsKey(g2p1)); - assertEquals(expectedGroup1, thread.tasks().get(g1p1).partitions()); - assertEquals(expectedGroup2, thread.tasks().get(g2p1).partitions()); + assertTrue(thread.tasks().containsKey(task1)); + assertTrue(thread.tasks().containsKey(task4)); + assertEquals(expectedGroup1, thread.tasks().get(task1).partitions()); + assertEquals(expectedGroup2, thread.tasks().get(task4).partitions()); assertEquals(2, thread.tasks().size()); - */ revokedPartitions = assignedPartitions; assignedPartitions = Collections.emptyList(); @@ -289,7 +287,7 @@ protected StreamTask createStreamTask(int id, Collection partiti Map prevTasks; // - // Assign t1p1 and t1p2. This should create Task g1p1 & g1p2 + // Assign t1p1 and t1p2. This should create task1 & task2 // revokedPartitions = Collections.emptyList(); assignedPartitions = Arrays.asList(t1p1, t1p2); @@ -312,7 +310,7 @@ protected StreamTask createStreamTask(int id, Collection partiti assertTrue(stateDir3.exists()); assertTrue(extraDir.exists()); - // all state directories except for task g1p1 & g1p2 will be removed. the extra directory should still exists + // all state directories except for task task2 & task3 will be removed. the extra directory should still exists mockTime.sleep(11L); thread.maybeClean(); assertTrue(stateDir1.exists()); @@ -321,7 +319,7 @@ protected StreamTask createStreamTask(int id, Collection partiti assertTrue(extraDir.exists()); // - // Revoke t1p1 and t1p2. This should remove Task g1p1 & g1p2 + // Revoke t1p1 and t1p2. This should remove task1 & task2 // revokedPartitions = assignedPartitions; assignedPartitions = Collections.emptyList(); @@ -340,7 +338,7 @@ protected StreamTask createStreamTask(int id, Collection partiti // no task assertTrue(thread.tasks().isEmpty()); - // all state directories for task g1p1 & g1p2 still exist before the cleanup delay time + // all state directories for task task1 & task2 still exist before the cleanup delay time mockTime.sleep(cleanupDelay - 10L); thread.maybeClean(); assertTrue(stateDir1.exists()); @@ -348,7 +346,7 @@ protected StreamTask createStreamTask(int id, Collection partiti assertFalse(stateDir3.exists()); assertTrue(extraDir.exists()); - // all state directories for task g1p1 & g1p2 are removed + // all state directories for task task1 & task2 are removed mockTime.sleep(11L); thread.maybeClean(); assertFalse(stateDir1.exists()); From cc3da9979c939b10afdf17815854275f646c5a7a Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Thu, 22 Oct 2015 14:43:58 -0700 Subject: [PATCH 08/13] fix comment --- .../org/apache/kafka/streams/processor/PartitionGrouper.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java index a17721965a2a..82bb36a2f896 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java @@ -33,9 +33,7 @@ public abstract class PartitionGrouper { private KafkaStreamingPartitionAssignor partitionAssignor = null; /** - * Returns a map of task ids to groups of partitions. The task id is the 64 bit integer - * which uniquely identifies a task. The higher 32 bit integer is an id assigned to a topic group. - * The lower 32 bit integer is a partition id with which the task's local states are associated. + * Returns a map of task ids to groups of partitions. * * @param metadata * @return a map of task ids to groups of partitions From d96054a08fcc0288a0b25d8e59b198b626fe1262 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Thu, 22 Oct 2015 14:54:28 -0700 Subject: [PATCH 09/13] address github comments --- .../KafkaStreamingPartitionAssignor.java | 34 +++++++++++++------ 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java index 36c50c3769a7..ee5bb93ad2af 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.StreamingConfig; import org.apache.kafka.streams.processor.PartitionGrouper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -35,6 +37,8 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Configurable { + private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class); + private PartitionGrouper partitionGrouper; private Map> partitionToTaskIds; @@ -80,8 +84,10 @@ public Map assign(Cluster metadata, Map assign(Cluster metadata, Map partitions = assignment.partitions(); ByteBuffer data = assignment.userData(); + data.rewind(); Map> partitionToTaskIds = new HashMap<>(); - int i = 0; - for (TopicPartition partition : partitions) { - Set taskIds = partitionToTaskIds.get(partition); - if (taskIds == null) { - taskIds = new HashSet<>(); - partitionToTaskIds.put(partition, taskIds); + // check version + int version = data.getInt(); + if (version == 1) { + for (TopicPartition partition : partitions) { + Set taskIds = partitionToTaskIds.get(partition); + if (taskIds == null) { + taskIds = new HashSet<>(); + partitionToTaskIds.put(partition, taskIds); + } + // decode a task id + taskIds.add(data.getInt()); } - // decode a task id - data.rewind(); - taskIds.add(data.getInt(i * 4)); - i++; + } else { + KafkaException ex = new KafkaException("unknown assignment data version: " + version); + log.error(ex.getMessage(), ex); + throw ex; } this.partitionToTaskIds = partitionToTaskIds; } From d0322aa0d91540b2a71cd10d9d54486716d23774 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Thu, 22 Oct 2015 15:03:16 -0700 Subject: [PATCH 10/13] address github comments --- .../processor/internals/QuickUnionTest.java | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java index d91ecf266195..c40e881e0ba7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java @@ -23,12 +23,13 @@ import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; public class QuickUnionTest { @SuppressWarnings("unchecked") @Test - public void testQuickUnion() { + public void testUnite() { QuickUnion qu = new QuickUnion<>(); long[] ids = { @@ -64,6 +65,28 @@ public void testQuickUnion() { assertEquals(qu.root(4L), qu.root(5L)); } + @Test + public void testUniteMany() { + QuickUnion qu = new QuickUnion<>(); + + long[] ids = { + 1L, 2L, 3L, 4L, 5L + }; + + for (long id : ids) { + qu.add(id); + } + + assertEquals(5, roots(qu, ids).size()); + + qu.unite(1L, 2L, 3L, 4L); + assertEquals(2, roots(qu, ids).size()); + assertEquals(qu.root(1L), qu.root(2L)); + assertEquals(qu.root(2L), qu.root(3L)); + assertEquals(qu.root(3L), qu.root(4L)); + assertNotEquals(qu.root(1L), qu.root(5L)); + } + private Set roots(QuickUnion qu, long... ids) { HashSet roots = new HashSet<>(); for (long id : ids) { From 5119b78faef082a5702276f39b401980275c1c94 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Thu, 22 Oct 2015 15:55:52 -0700 Subject: [PATCH 11/13] do not set PARTITION_ASSIGNMENT_STRATEGY_CONFIG for a restore consumer --- .../src/main/java/org/apache/kafka/streams/StreamingConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java index 9203b9adc7a2..a0aef48d8681 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java @@ -213,6 +213,7 @@ public StreamingConfig(Map props) { public Map getConsumerConfigs(PartitionGrouper partitionGrouper) { Map props = getConsumerConfigs(); props.put(StreamingConfig.InternalConfig.PARTITION_GROUPER_INSTANCE, partitionGrouper); + props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName()); return props; } @@ -221,7 +222,6 @@ public Map getConsumerConfigs() { // set consumer default property values props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName()); // remove properties that are not required for consumers props.remove(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG); From 2cbe4039a97330fe19baa634704dd681e3666e93 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Mon, 26 Oct 2015 09:48:52 -0700 Subject: [PATCH 12/13] address github comments --- .../processor/TopologyBuilderTest.java | 34 +++++++------------ 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index 1e622c03cbf5..b6d4e32904c8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -23,11 +23,10 @@ import org.junit.Test; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.TreeMap; public class TopologyBuilderTest { @@ -121,35 +120,26 @@ public void testTopicGroups() { builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4"); - List> topicGroups = sort(builder.topicGroups()); + Collection> topicGroups = builder.topicGroups(); assertEquals(3, topicGroups.size()); - assertEquals(list(list("topic-1", "topic-1x", "topic-2"), list("topic-3", "topic-4"), list("topic-5")), topicGroups); + assertEquals(set(set("topic-1", "topic-1x", "topic-2"), set("topic-3", "topic-4"), set("topic-5")), new HashSet<>(topicGroups)); - List> copartitionGroups = sort(builder.copartitionGroups()); + Collection> copartitionGroups = builder.copartitionGroups(); - assertEquals(list(list("topic-1", "topic-1x", "topic-2")), copartitionGroups); + assertEquals(set(set("topic-1", "topic-1x", "topic-2")), new HashSet<>(copartitionGroups)); } - private List> sort(Collection> topicGroups) { - TreeMap sortedMap = new TreeMap<>(); - - for (Set group : topicGroups) { - String[] arr = group.toArray(new String[group.size()]); - Arrays.sort(arr); - sortedMap.put(arr[0], arr); - } - - ArrayList> list = new ArrayList(sortedMap.size()); - for (String[] arr : sortedMap.values()) { - list.add(Arrays.asList(arr)); + private List list(T... elems) { + List s = new ArrayList(); + for (T elem : elems) { + s.add(elem); } - - return list; + return s; } - private List list(T... elems) { - List s = new ArrayList(); + private Set set(T... elems) { + Set s = new HashSet(); for (T elem : elems) { s.add(elem); } From 9ba5087a84269cd3eeeb6715cb922c151ec9c5e7 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Mon, 26 Oct 2015 10:36:10 -0700 Subject: [PATCH 13/13] address github comments --- .../org/apache/kafka/common/utils/Utils.java | 35 +++++++++++++++- .../processor/internals/StreamThread.java | 17 +------- .../DefaultPartitionGrouperTest.java | 41 ++++++------------- .../processor/TopologyBuilderTest.java | 21 +++------- 4 files changed, 51 insertions(+), 63 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index bc0e6455fef6..974cf1e4f615 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -32,6 +32,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.Properties; @@ -574,10 +575,40 @@ public static ByteBuffer ensureCapacity(ByteBuffer existingBuffer, int newLength * @param the type of element * @return Set */ - public static HashSet mkSet(T... elems) { + public static Set mkSet(T... elems) { return new HashSet<>(Arrays.asList(elems)); } - + + /* + * Creates a list + * @param elems the elements + * @param the type of element + * @return List + */ + public static List mkList(T... elems) { + return Arrays.asList(elems); + } + + + /* + * Create a string from a collection + * @param coll the collection + * @param separator the separator + */ + public static CharSequence mkString(Collection coll, String separator) { + StringBuilder sb = new StringBuilder(); + Iterator iter = coll.iterator(); + if (iter.hasNext()) { + sb.append(iter.next().toString()); + + while (iter.hasNext()) { + sb.append(separator); + sb.append(iter.next().toString()); + } + } + return sb; + } + /** * Recursively delete the given file/directory and any subfiles (if any exist) * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index fd197eaf0581..e3803a108948 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -54,7 +54,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -483,25 +482,11 @@ private void ensureCopartitioning(Set copartitionGroup) { } else if (numPartitions != infos.size()) { String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]); Arrays.sort(topics); - throw new KafkaException("topics not copartitioned: [" + toString(Arrays.asList(topics), ",") + "]"); + throw new KafkaException("topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics), ",") + "]"); } } } - protected CharSequence toString(Collection coll, String separator) { - StringBuilder sb = new StringBuilder(); - Iterator iter = coll.iterator(); - if (iter.hasNext()) { - sb.append(iter.next().toString()); - - while (iter.hasNext()) { - sb.append(separator); - sb.append(iter.next().toString()); - } - } - return sb; - } - private class StreamingMetricsImpl implements StreamingMetrics { final Metrics metrics; final String metricGrpName; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java index 3273a77c4c3a..388955e4ba12 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java @@ -21,15 +21,14 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import static org.apache.kafka.common.utils.Utils.mkList; +import static org.apache.kafka.common.utils.Utils.mkSet; import org.junit.Test; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import static org.junit.Assert.assertEquals; @@ -51,43 +50,27 @@ public void testGrouping() { int taskId; Map> expected; - grouper.topicGroups(Arrays.asList(set("topic1"), set("topic2"))); + grouper.topicGroups(mkList(mkSet("topic1"), mkSet("topic2"))); expected = new HashMap<>(); taskId = 0; - expected.put(taskId++, list(new TopicPartition("topic1", 0))); - expected.put(taskId++, list(new TopicPartition("topic1", 1))); - expected.put(taskId++, list(new TopicPartition("topic1", 2))); - expected.put(taskId++, list(new TopicPartition("topic2", 0))); - expected.put(taskId, list(new TopicPartition("topic2", 1))); + expected.put(taskId++, mkList(new TopicPartition("topic1", 0))); + expected.put(taskId++, mkList(new TopicPartition("topic1", 1))); + expected.put(taskId++, mkList(new TopicPartition("topic1", 2))); + expected.put(taskId++, mkList(new TopicPartition("topic2", 0))); + expected.put(taskId, mkList(new TopicPartition("topic2", 1))); assertEquals(expected, grouper.partitionGroups(metadata)); - grouper.topicGroups(Arrays.asList(set("topic1", "topic2"))); + grouper.topicGroups(mkList(mkSet("topic1", "topic2"))); expected = new HashMap<>(); taskId = 0; - expected.put(taskId++, list(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0))); - expected.put(taskId++, list(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1))); - expected.put(taskId, list(new TopicPartition("topic1", 2))); + expected.put(taskId++, mkList(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0))); + expected.put(taskId++, mkList(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1))); + expected.put(taskId, mkList(new TopicPartition("topic1", 2))); assertEquals(expected, grouper.partitionGroups(metadata)); } - private Set set(T... items) { - Set set = new HashSet<>(); - for (T item : items) { - set.add(item); - } - return set; - } - - private List list(T... items) { - List set = new ArrayList<>(); - for (T item : items) { - set.add(item); - } - return set; - } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index b6d4e32904c8..05d24d39c3b8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -19,10 +19,11 @@ import static org.junit.Assert.assertEquals; +import static org.apache.kafka.common.utils.Utils.mkSet; import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; -import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -123,27 +124,15 @@ public void testTopicGroups() { Collection> topicGroups = builder.topicGroups(); assertEquals(3, topicGroups.size()); - assertEquals(set(set("topic-1", "topic-1x", "topic-2"), set("topic-3", "topic-4"), set("topic-5")), new HashSet<>(topicGroups)); + assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2"), mkSet("topic-3", "topic-4"), mkSet("topic-5")), new HashSet<>(topicGroups)); Collection> copartitionGroups = builder.copartitionGroups(); - assertEquals(set(set("topic-1", "topic-1x", "topic-2")), new HashSet<>(copartitionGroups)); + assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2")), new HashSet<>(copartitionGroups)); } private List list(T... elems) { - List s = new ArrayList(); - for (T elem : elems) { - s.add(elem); - } - return s; - } - - private Set set(T... elems) { - Set s = new HashSet(); - for (T elem : elems) { - s.add(elem); - } - return s; + return Arrays.asList(elems); } }