-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4982: add listener tags to socket-server-metrics #3004
Conversation
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, seems fine, just left some minor comments. Can we write a simple test?
@@ -400,7 +398,7 @@ private[kafka] class Processor(val id: Int, | |||
|
|||
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() | |||
private val inflightResponses = mutable.Map[String, RequestChannel.Response]() | |||
private val metricTags = Map("networkProcessor" -> id.toString).asJava | |||
val metricTags = Map("protocol" -> securityProtocol.name, "listener" -> listenerName.value, "networkProcessor" -> id.toString).asJava |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is a bit long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -105,9 +99,13 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time | |||
} | |||
} | |||
|
|||
val allIoWaitRatioMetricNames = processors.map { p => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably move this to inside the Gauge
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
added unit test and fixed test failure
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
private val metricTags = Map("networkProcessor" -> id.toString).asJava | ||
val metricTags = Map( | ||
"protocol" -> securityProtocol.name, "listener" -> listenerName.value, "networkProcessor" -> id.toString | ||
).asJava | ||
|
||
newGauge("IdlePercent", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The KIP simply mentions SelectorMetrics (in Kafka Metrics), but you have also changed the tags for IdlePercent which is a Yammer Metric. Is that intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it was intentional, as "io-wait-ratio" is still a per-network-thread metric and JConsole was grouping it with KafkaMetrics, so I didn't spot it coming from a different registry.
IMHO it would look too odd to leave that just tagged with the processor thread number. I'll clarify the KIP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to update the compatibility section too. @junrao, is this OK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edoardocomar I had a closer look and I think you misunderstood the issue. io-wait-ratio
is a Selector metric. The issue that I'm raising is with regards to IdlePercent
:
newGauge("IdlePercent",
new Gauge[Double] {
def value = {
Option(metrics.metric(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags))).fold(0.0)(_.value)
}
},
metricTags.asScala
See the metricTags
passed to it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You didn't do the same for NetworkProcessorAvgIdlePercent
though. My preference would be to leave the Yammer metrics alone and just add the tags to the underlying SelectorMetrics, but worth waiting for @junrao's opinion.
Edit: NetworkProcessorAvgIdlePercent
is not per network process, so that's OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma On second thought, as the (per-processor) IdlePercent
is an alias to the (per-processor) io-wait-ratio
we could leave the former unchanged for compatibility. And do the multilevel tagging only in the latter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that was my suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jun also agreed that we should just add the new tag to the SelectorMetrics.
|
||
assertEquals(2, metricsNames.size) | ||
assert(metricsNames.head.getMBeanName.endsWith("protocol=PLAINTEXT,listener=PLAINTEXT,networkProcessor=0")) | ||
assert(metricsNames.last.getMBeanName.endsWith("protocol=TRACE,listener=TRACE,networkProcessor=1")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Kafka, we typically call it a security protocol, not just a protocol. The latter is clearly more concise though. @junrao, do you think protocol
is OK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as below we agreed to remove it, leaving only the listener tag
private val metricTags = Map("networkProcessor" -> id.toString).asJava | ||
val metricTags = Map( | ||
"protocol" -> securityProtocol.name, "listener" -> listenerName.value, "networkProcessor" -> id.toString | ||
).asJava |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the JMX screenshot, the order is important, so you should use a LinkedHashMap
. Also, it seems like listener
should be the first entry as it uniquely identifies the listener.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the protocol as first one, as when using a treeview representation, allows for the listeners to be shown as children.
In fact, protocol/listener/networkProcessor is the order from the most generic to the most specific tag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true. I guess the ideal scenario in my mind would be to just have two levels: listener name and network processor. And the security protocol would be an attribute value. However, I'm not sure how easy it is to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could omit the securityProtocol altogether. Although that's another deviation from the KIP, as you say the listener is the main differentiator.
I started from the need to distinguish between metrics, but in our setup we have no listener labels, just protocols. That's why we initially came up with a 3-level tagging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't find a way to add a string value ... that could be a future enhancement ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's just remove the security protocol algother. Synced with Jun and he agrees that this is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Do you think you'll be able to update the PR today? If so, we can probably get it merged today. |
removed protol tag, left yammer metric with older tagging
Thanks for the detailed feedback. I've updated the PR |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I included a couple of minor changes as described below before merging.
@@ -400,15 +398,16 @@ private[kafka] class Processor(val id: Int, | |||
|
|||
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() | |||
private val inflightResponses = mutable.Map[String, RequestChannel.Response]() | |||
private val metricTags = Map("networkProcessor" -> id.toString).asJava | |||
val metricTags = SortedMap("listener" -> listenerName.value, "networkProcessor" -> id.toString).asJava |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A LinkedHashMap is better because it makes it obvious that the order of declaration is important. With a SortedMap, adding a third key could change the order.
assertFalse(kafkaMetricNames.isEmpty) | ||
val expectedListeners = Set("PLAINTEXT","TRACE") | ||
kafkaMetricNames.foreach { kafkaMetricName => { | ||
System.err.println(kafkaMetricName.tags) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was probably not intentional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops thanks for spotting. Shame syserr/syout are't caught by something akin to a findbugs for Scala
Added tags