Permalink
Browse files

KAFKA-816 Reduce noise in Kafka server logs due to NotLeaderForPartit…

…ionException; reviewed by Jun Rao
  • Loading branch information...
1 parent 08b2a37 commit 51421fcc0111031bb77f779a6f6c00520d526a34 @nehanarkhede nehanarkhede committed Mar 22, 2013
Showing with 27 additions and 1 deletion.
  1. +27 −1 core/src/main/scala/kafka/server/KafkaApis.scala
@@ -197,10 +197,19 @@ class KafkaApis(val requestChannel: RequestChannel,
.format(messages.size, topicAndPartition.topic, topicAndPartition.partition, start, end))
ProduceResult(topicAndPartition, start, end)
} catch {
+ // NOTE: Failed produce requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException
+ // since failed produce requests metric is supposed to indicate failure of a broker in handling a produce request
+ // for a partition it is the leader for
case e: KafkaStorageException =>
fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
Runtime.getRuntime.halt(1)
null
+ case utpe: UnknownTopicOrPartitionException =>
+ warn(utpe.getMessage)
+ new ProduceResult(topicAndPartition, utpe)
+ case nle: NotLeaderForPartitionException =>
+ warn(nle.getMessage)
+ new ProduceResult(topicAndPartition, nle)
case e =>
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
@@ -278,7 +287,16 @@ class KafkaApis(val requestChannel: RequestChannel,
new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
}
} catch {
- case t: Throwable =>
+ // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException
+ // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request
+ // for a partition it is the leader for
+ case utpe: UnknownTopicOrPartitionException =>
+ warn(utpe.getMessage)
+ new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
+ case nle: NotLeaderForPartitionException =>
+ warn(nle.getMessage)
+ new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
+ case t =>
BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
error("error when processing request " + (topic, partition, offset, fetchSize), t)
@@ -344,6 +362,14 @@ class KafkaApis(val requestChannel: RequestChannel,
}
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets))
} catch {
+ // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages
+ // are typically transient and there is no value in logging the entire stack trace for the same
+ case utpe: UnknownTopicOrPartitionException =>
+ warn(utpe.getMessage)
+ (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), Nil) )
+ case nle: NotLeaderForPartitionException =>
+ warn(nle.getMessage)
+ (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) )
case e =>
warn("Error while responding to offset request", e)
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )

0 comments on commit 51421fc

Please sign in to comment.