Skip to content

Commit

Permalink
KAFKA-1030 Addition of partitions requires bouncing all the consumers…
Browse files Browse the repository at this point in the history
… of that topic; reviewed by Neha, Swapnil, Joel
  • Loading branch information
nehanarkhede committed Sep 17, 2013
1 parent aebf746 commit c6ca971
Showing 1 changed file with 2 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import java.util.UUID
import kafka.serializer._
import kafka.utils.ZkUtils._
import kafka.common._
import kafka.client.ClientUtils
import com.yammer.metrics.core.Gauge
import kafka.metrics._
import scala.Some
Expand Down Expand Up @@ -422,17 +421,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
true
}
else {
val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
brokers,
config.clientId,
config.socketTimeoutMs,
correlationId.getAndIncrement).topicsMetadata
val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
topicsMetadata.foreach(m => {
val topic = m.topic
val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
partitionsPerTopicMap.put(topic, partitions)
})
val partitionsAssignmentPerTopicMap = getPartitionAssignmentForTopics(zkClient, myTopicThreadIdsMap.keySet.toSeq)
val partitionsPerTopicMap = partitionsAssignmentPerTopicMap.map(p => (p._1, p._2.keySet.toSeq))

/**
* fetchers must be stopped to avoid data duplication, since if the current
Expand Down

0 comments on commit c6ca971

Please sign in to comment.