Permalink
Browse files

kafka-801; Fix MessagesInPerSec mbean to count uncompressed message r…

…ate; patched by Jun Rao; reviewed by Neha Narkhede
  • Loading branch information...
junrao committed Mar 13, 2013
1 parent 290d5e0 commit dd9676163956b3e577808a1e9744aa9fb5e83e4e
Showing with 3 additions and 3 deletions.
  1. +3 −3 core/src/main/scala/kafka/log/Log.scala
@@ -265,9 +265,6 @@ private[kafka] class Log(val dir: File,
if(messageSetInfo.count == 0) {
(-1L, -1L)
} else {
- BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(messageSetInfo.count)
- BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(messageSetInfo.count)
-
// trim any invalid bytes or partial messages before appending it to the on-disk log
var validMessages = trimInvalidBytes(messages)
@@ -288,6 +285,9 @@ private[kafka] class Log(val dir: File,
case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
}
val lastOffset = offsetCounter.get - 1
+ val numMessages = lastOffset - firstOffset + 1
+ BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(numMessages)
+ BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numMessages)
(firstOffset, lastOffset)
} else {
require(messageSetInfo.offsetsMonotonic, "Out of order offsets found in " + messages)

0 comments on commit dd96761

Please sign in to comment.