From 938e4e11d9cbdf6e3d6b70b2f619c8d4872c6b7f Mon Sep 17 00:00:00 2001 From: zqh Date: Wed, 27 Jan 2016 14:59:09 +0800 Subject: [PATCH] valueFactory's key is consumerId the key of partitionAssignment's valueFactory is consumerId, not topic. partitionAssignment.getAndMaybePut(threadId.consumer) --- core/src/main/scala/kafka/consumer/PartitionAssignor.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index 5a1bdd0dee506..30a2e2fce4f1f 100755 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -72,7 +72,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { - val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId] + val valueFactory = (consumerId: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId] val partitionAssignment = new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory)) @@ -131,7 +131,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { class RangeAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { - val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId] + val valueFactory = (consumerId: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId] val partitionAssignment = new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory)) for (topic <- ctx.myTopicThreadIds.keySet) {