-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5194: KIP-153: Include only client traffic in BytesOutPerSec me… #3003
Conversation
…tric Also added 2 new metrics to account for incoming/outgoing traffic due to internal replication - ReplicationBytesInPerSec - ReplicationBytesOutPerSec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, left one suggestion and one question.
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesOutRate.mark(data.records.sizeInBytes) | ||
BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(data.records.sizeInBytes) | ||
} else { | ||
BrokerTopicStats.getBrokerAllTopicsStats.replicationBytesOutRate.mark(data.records.sizeInBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a method in BrokerTopicStats
that takes a topic and isFromFollower
that does this logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -119,6 +119,8 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup { | |||
val failedFetchRequestRate = newMeter(BrokerTopicStats.FailedFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags) | |||
val totalProduceRequestRate = newMeter(BrokerTopicStats.TotalProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags) | |||
val totalFetchRequestRate = newMeter(BrokerTopicStats.TotalFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags) | |||
val replicationBytesInRate = newMeter(BrokerTopicStats.ReplicationBytesInPerSec, "bytes", TimeUnit.SECONDS, tags) | |||
val replicationBytesOutRate = newMeter(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes", TimeUnit.SECONDS, tags) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably good to move this higher, next to the other bytes
metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only create these meters if name
is None
.
@@ -112,6 +112,7 @@ class ReplicaFetcherThread(name: String, | |||
trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark") | |||
if (quota.isThrottled(topicPartition)) | |||
quota.record(records.sizeInBytes) | |||
BrokerTopicStats.getBrokerAllTopicsStats.replicationBytesInRate.mark(records.sizeInBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For bytesOut
, we track everything in the same place (KafkaApis
).
For bytesIn
, it's a bit scattered. For the leader we do the check in ReplicaManager
, which means that we count the bytes from GroupCoordinator.doSyncGroup
, but we don't count the bytes from GroupMetadataManager.cleanupGroupMetadata
. Not sure if this is by design. @junrao probably knows.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Thanks for the feedback ! Instead of guarding the creation of the 2 new replication metrics in |
@mimaison, with the latest change, the metric names are: kafka.server:type=BrokerReplicationMetrics,name= ReplicationBytesInPerSec Did you really intend to do that? |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@ijuma have you seen #3003 (comment) ? It seemed cleaner to have them in a new class even if they are under a new name. As these are new metrics I thought that was acceptable. If you'd prefer to have them under BrokerTopicStats, I can move them back.
Maybe there's a better way in Scala (with Option ?) |
@mimaison I saw the comment, but it wasn't clear if you realised what the metric names would be. Metric names are public and their consistency is more important than internal aspects. Yeah, using private[server] val replicationBytesInRate = if (name.isEmpty) Some(newMeter(...)) else None And maybe introduce methods in |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of minor comments, LGTM otherwise.
@@ -115,6 +115,8 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup { | |||
val bytesInRate = newMeter(BrokerTopicStats.BytesInPerSec, "bytes", TimeUnit.SECONDS, tags) | |||
val bytesOutRate = newMeter(BrokerTopicStats.BytesOutPerSec, "bytes", TimeUnit.SECONDS, tags) | |||
val bytesRejectedRate = newMeter(BrokerTopicStats.BytesRejectedPerSec, "bytes", TimeUnit.SECONDS, tags) | |||
private[server] val replicationBytesInRate = if (name.isEmpty) Some(newMeter(BrokerTopicStats.ReplicationBytesInPerSec, "bytes", TimeUnit.SECONDS, tags)) else None | |||
private[server] val replicationBytesOutRate = if (name.isEmpty) Some(newMeter(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes", TimeUnit.SECONDS, tags)) else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These lines are a bit long.
@@ -152,9 +159,30 @@ object BrokerTopicStats extends Logging { | |||
stats.getAndMaybePut(topic) | |||
} | |||
|
|||
def updateReplicationBytesIn(value: Long) { | |||
if (getBrokerAllTopicsStats.replicationBytesInRate.isDefined) { | |||
getBrokerAllTopicsStats.replicationBytesInRate.get.mark(value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should use foreach
instead of isDefined/get
. Same for the other methods where this applies.
I had forgotten. About the test, can we simply have a test that verifies that the metrics are updated? That should be OK even with the existing limitations right? |
Refer to this link for build results (access rights to CI server needed): |
Thanks for the feedback. We've added a test that verifies metrics are updated when producing/consuming. Hopefully it's what you had in mind. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mimaison : Thanks for the patch. Looks good. Just a few minor comments.
} | ||
} | ||
|
||
def updateReplicationBytesOut(value: Long) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be private?
removeMetric(BrokerTopicStats.FailedProduceRequestsPerSec, tags) | ||
removeMetric(BrokerTopicStats.FailedFetchRequestsPerSec, tags) | ||
removeMetric(BrokerTopicStats.TotalProduceRequestsPerSec, tags) | ||
removeMetric(BrokerTopicStats.TotalFetchRequestsPerSec, tags) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the extra new line.
@Test | ||
def testBrokerTopicMetrics() { | ||
val replicationBytesIn = "ReplicationBytesInPerSec" | ||
val replicationBytesOut = "ReplicationBytesOutPerSec" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just use the constants in BrokerTopicStats?
createTopic(zkUtils, topic, 1, numNodes, servers) | ||
// Topic metrics can take a moment to be created | ||
TestUtils.waitUntilTrue( | ||
() => Metrics.defaultRegistry.allMetrics.asScala.filterKeys(k => k.getMBeanName.endsWith(topic)).size > 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, the BytesIn metric is only created when there is data produced to that topic. So, perhaps we should publish some messages first?
With @mpburg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, LGTM. I included a few changes before merging as per my comments below.
.getOrElse(fail(s"Unable to find metric ${metricName}")) | ||
._2.asInstanceOf[Meter] | ||
.count | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this to:
Metrics.defaultRegistry.allMetrics.asScala
.filterKeys(_.getMBeanName.endsWith(metricName))
.values
.headOption
.getOrElse(fail(s"Unable to find metric $metricName"))
.asInstanceOf[Meter]
.count
assertTrue(getMeterCount(bytesOut) > initialBytesOut) | ||
} | ||
|
||
private def getMeterCount(metricName: String): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed this to meterCount
, we generally avoid the get
prefix in Kafka.
val replicationBytesIn = BrokerTopicStats.ReplicationBytesInPerSec | ||
val replicationBytesOut = BrokerTopicStats.ReplicationBytesOutPerSec | ||
val bytesIn = BrokerTopicStats.BytesInPerSec + ",topic=" + topic | ||
val bytesOut = BrokerTopicStats.BytesOutPerSec + ",topic=" + topic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this to use String interpolation.
for (record <- consumer.poll(50).asScala) | ||
records.add(record) | ||
} while (records.size < numMessages) | ||
records.asScala.toList |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored this into 2 separate methods, made sure the consumer was closed, checked the exact number of messages and added a timeout.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
We had a transient test failure after this was merged: @mimaison, would you be able to investigate? |
Yes I'm having a look |
@mimaison, it's not clear to me how this can happen. Since we are producing messages with acks=-1, the producer only gets a response after the data has been replicated. And we need at least two fetch requests from the follower for the leader to know that the data has been replicated. The follower initialises |
Oh, it's probably min.isr. |
Yes that's probably it, sending a new PR now |
I was holding off submitting my PR, but since it's been a while and we need to fix this ASAP, I did it now: |
…tric
Also added 2 new metrics to account for incoming/outgoing traffic due to internal replication
We had a look at creating an integration test to verify the Bytes/Replication In/Out metrics but had issues.
When running using IntegrationTestHarness, all the brokers were using the same Yammer Metrics registry and metrics name collided.
Also although we only update the global value for the 2 new metrics, we can see an inner node for each topic (that stays 0).
eg: kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec,topic=testtopic
It looks like something forced by BrokerTopicMetrics. If there's no way to remove them, then we might want to update them accordingly to give them meaningful values