Skip to content

Commit

Permalink
KAFKA-820 Topic metadata request handling fails to return all metadat…
Browse files Browse the repository at this point in the history
…a about replicas; reviewed by Jun Rao
  • Loading branch information
nehanarkhede committed Mar 22, 2013
1 parent 7b14eba commit 08b2a37
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 19 deletions.
36 changes: 24 additions & 12 deletions core/src/main/scala/kafka/admin/AdminUtils.scala
Expand Up @@ -24,6 +24,7 @@ import kafka.utils.{Logging, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import scala.collection._
import mutable.ListBuffer
import scala.collection.mutable
import kafka.common._
import scala.Some
Expand Down Expand Up @@ -111,26 +112,33 @@ object AdminUtils extends Logging {
var replicaInfo: Seq[Broker] = Nil
var isrInfo: Seq[Broker] = Nil
try {
try {
leaderInfo = leader match {
case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
}
} catch {
case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e)
leaderInfo = leader match {
case Some(l) =>
try {
Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
} catch {
case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e)
}
case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
}
try {
replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
} catch {
case e => throw new ReplicaNotAvailableException(e)
}
if(replicaInfo.size < replicas.size)
throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
if(isrInfo.size < inSyncReplicas.size)
throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
} catch {
case e =>
error("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
}
new TopicMetadata(topic, partitionMetadata)
Expand All @@ -143,19 +151,23 @@ object AdminUtils extends Logging {
private def getBrokerInfoFromCache(zkClient: ZkClient,
cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker],
brokerIds: Seq[Int]): Seq[Broker] = {
brokerIds.map { id =>
var failedBrokerIds: ListBuffer[Int] = new ListBuffer()
val brokerMetadata = brokerIds.map { id =>
val optionalBrokerInfo = cachedBrokerInfo.get(id)
optionalBrokerInfo match {
case Some(brokerInfo) => brokerInfo // return broker info from the cache
case Some(brokerInfo) => Some(brokerInfo) // return broker info from the cache
case None => // fetch it from zookeeper
ZkUtils.getBrokerInfo(zkClient, id) match {
case Some(brokerInfo) =>
cachedBrokerInfo += (id -> brokerInfo)
brokerInfo
case None => throw new BrokerNotAvailableException("Failed to fetch broker info for broker " + id)
Some(brokerInfo)
case None =>
failedBrokerIds += id
None
}
}
}
brokerMetadata.filter(_.isDefined).map(_.get)
}

private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
Expand Up @@ -80,12 +80,12 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
if(tmd.errorCode == ErrorMapping.NoError){
topicPartitionInfo.put(tmd.topic, tmd)
} else
warn("Error while fetching metadata for topic [%s]: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
tmd.partitionsMetadata.foreach(pmd =>{
if (pmd.errorCode != ErrorMapping.NoError){
warn("Error while fetching metadata for topic partition [%s,%d]: [%s]".format(tmd.topic, pmd.partitionId, pmd),
ErrorMapping.exceptionFor(pmd.errorCode))
}
if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
ErrorMapping.exceptionFor(pmd.errorCode).getClass))
} // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
})
})
producerPool.updateProducer(topicsMetadata)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/producer/ProducerPool.scala
Expand Up @@ -43,9 +43,9 @@ class ProducerPool(val config: ProducerConfig) extends Logging {
private val syncProducers = new HashMap[Int, SyncProducer]
private val lock = new Object()

def updateProducer(topicMetadatas: Seq[TopicMetadata]) {
def updateProducer(topicMetadata: Seq[TopicMetadata]) {
val newBrokers = new collection.mutable.HashSet[Broker]
topicMetadatas.foreach(tmd => {
topicMetadata.foreach(tmd => {
tmd.partitionsMetadata.foreach(pmd => {
if(pmd.leader.isDefined)
newBrokers+=(pmd.leader.get)
Expand Down

0 comments on commit 08b2a37

Please sign in to comment.