Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

KAFKA-774 Periodic refresh of topic metadata on the producer does not…

… check for error code in the response; reviewed by Swapnil Ghike
  • Loading branch information...
commit 6989dac889f2afd3cd192d345d3bd29995e605b9 1 parent 37ca9db
@nehanarkhede nehanarkhede authored
View
57 core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -87,55 +87,50 @@ object AdminUtils extends Logging {
case e2 => throw new AdministrationException(e2.toString)
}
}
-
- def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
+
+ def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker])
def fetchTopicMetadataFromZk(topics: Set[String], zkClient: ZkClient): Set[TopicMetadata] = {
val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
topics.map(topic => fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo))
}
-
+
private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker]): TopicMetadata = {
if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get
val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
-
val partitionMetadata = sortedPartitions.map { partitionMap =>
- val partition = partitionMap._1
- val replicas = partitionMap._2
- val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
- val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
- debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
+ val partition = partitionMap._1
+ val replicas = partitionMap._2
+ val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
+ val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+ debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
- var leaderInfo: Option[Broker] = None
- var replicaInfo: Seq[Broker] = Nil
- var isrInfo: Seq[Broker] = Nil
- try {
+ var leaderInfo: Option[Broker] = None
+ var replicaInfo: Seq[Broker] = Nil
+ var isrInfo: Seq[Broker] = Nil
try {
- leaderInfo = leader match {
- case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
- case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
+ try {
+ leaderInfo = leader match {
+ case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
+ case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
+ }
+ } catch {
+ case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e)
+ }
+ try {
+ replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
+ isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
+ } catch {
+ case e => throw new ReplicaNotAvailableException(e)
}
- } catch {
- case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition))
- }
-
- try {
- replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
- isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
- } catch {
- case e => throw new ReplicaNotAvailableException(e)
- }
-
new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
} catch {
- case e: ReplicaNotAvailableException =>
+ case e =>
+ error("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
- case le: LeaderNotAvailableException =>
- new PartitionMetadata(partition, None, replicaInfo, isrInfo,
- ErrorMapping.codeFor(le.getClass.asInstanceOf[Class[Throwable]]))
}
}
new TopicMetadata(topic, partitionMetadata)
View
7 core/src/main/scala/kafka/common/LeaderNotAvailableException.scala
@@ -20,6 +20,7 @@ package kafka.common
/**
* Thrown when a request is made for partition, but no leader exists for that partition
*/
-class LeaderNotAvailableException(message: String) extends RuntimeException(message) {
- def this() = this(null)
-}
+class LeaderNotAvailableException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+ def this(message: String) = this(message, null)
+ def this() = this(null, null)
+}
View
5 core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -80,10 +80,11 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
if(tmd.errorCode == ErrorMapping.NoError){
topicPartitionInfo.put(tmd.topic, tmd)
} else
- warn("Metadata for topic [%s] is erroneous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
+ warn("Error while fetching metadata for topic [%s]: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
tmd.partitionsMetadata.foreach(pmd =>{
if (pmd.errorCode != ErrorMapping.NoError){
- debug("Metadata for topic partition [%s, %d] is errornous: [%s]".format(tmd.topic, pmd.partitionId, pmd), ErrorMapping.exceptionFor(pmd.errorCode))
+ warn("Error while fetching metadata for topic partition [%s,%d]: [%s]".format(tmd.topic, pmd.partitionId, pmd),
+ ErrorMapping.exceptionFor(pmd.errorCode))
}
})
})
View
1  core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -81,7 +81,6 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}
if(outstandingProduceRequests.size > 0) {
producerStats.failedSendRate.mark()
-
val correlationIdEnd = correlationId.get()
error("Failed to send requests for topics %s with correlation ids in [%d,%d]".format(outstandingProduceRequests.map(_.topic).mkString(","),
correlationIdStart, correlationIdEnd-1))
Please sign in to comment.
Something went wrong with that request. Please try again.