From 5f43586cbba99f2df79af26cc31b0cb01741f9ee Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Wed, 17 May 2017 23:03:12 -0700 Subject: [PATCH] TRIVIAL: Remove redundant asMap utility in ConsumerProtocol --- .../consumer/internals/ConsumerProtocol.java | 20 +++---------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java index f8be9a075a1e8..920c2957c4dc9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java @@ -23,11 +23,10 @@ import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Type; +import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -124,7 +123,8 @@ public static ByteBuffer serializeAssignment(PartitionAssignor.Assignment assign Struct struct = new Struct(ASSIGNMENT_V0); struct.set(USER_DATA_KEY_NAME, assignment.userData()); List topicAssignments = new ArrayList<>(); - for (Map.Entry> topicEntry : asMap(assignment.partitions()).entrySet()) { + Map> partitionsByTopic = CollectionUtils.groupDataByTopic(assignment.partitions()); + for (Map.Entry> topicEntry : partitionsByTopic.entrySet()) { Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT_V0); topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey()); topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray()); @@ -146,18 +146,4 @@ private static void checkVersionCompatibility(short version) { // otherwise, assume versions can be parsed as V0 } - private static Map> asMap(Collection partitions) { - Map> partitionMap = new HashMap<>(); - for (TopicPartition partition : partitions) { - String topic = partition.topic(); - List topicPartitions = partitionMap.get(topic); - if (topicPartitions == null) { - topicPartitions = new ArrayList<>(); - partitionMap.put(topic, topicPartitions); - } - topicPartitions.add(partition.partition()); - } - return partitionMap; - } - }