-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5968: Create/remove request metrics during broker startup/shutdown #3991
Conversation
@ijuma Can you review, please? Thank you... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, looks good overall, just a few nitpicks.
} | ||
|
||
def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest) | ||
|
||
} | ||
|
||
object RequestMetrics { | ||
class RequestChannelMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this can be an inner class in the RequestChannel
object called Metrics
or MetricsMap
.
metricsMap.put(name, new RequestMetrics(name)) | ||
} | ||
|
||
def apply(metricName: String) = metricsMap(metricName) | ||
def requestMetrics(metricName: String) = metricsMap(metricName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we not leave it as apply
?
@@ -365,7 +369,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup { | |||
else | |||
None | |||
|
|||
private val errorMeters = mutable.Map[Errors, ErrorMeter]() | |||
val errorMeters = mutable.Map[Errors, ErrorMeter]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's good to avoid exposing mutable collections publicly. Typically, one can expose the operations required instead.
removeMetric("TotalTimeMs", tags) | ||
removeMetric("ResponseSendTimeMs", tags) | ||
removeMetric("RequestBytes", tags) | ||
removeMetric("ResponseSendTimeMs", tags) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe introduce constants?
@@ -53,7 +53,8 @@ object RequestChannel extends Logging { | |||
val context: RequestContext, | |||
val startTimeNanos: Long, | |||
memoryPool: MemoryPool, | |||
@volatile private var buffer: ByteBuffer) extends BaseRequest { | |||
@volatile private var buffer: ByteBuffer, | |||
requestChannelMetrics: RequestChannelMetrics) extends BaseRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just call it metrics
?
@@ -247,6 +249,8 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe | |||
) | |||
} | |||
|
|||
def requestMetrics(name: String): RequestMetrics = requestChannelMetrics.requestMetrics(name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call it metrics
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this method.
@ijuma Thanks for the review, updated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Just a minor style comment. No re-review necessary, feel free to merge after the tests pass.
} | ||
|
||
class RequestMetrics(name: String) extends KafkaMetricsGroup { | ||
val tags = Map("request" -> name) | ||
val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags) | ||
val requestRate = newMeter(RequestMetrics.RequestsPerSec, "requests", TimeUnit.SECONDS, tags) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A common pattern is doing the following at the top of the class:
import RequestMetrics._
Then the class has access to the members of the companion object and we don't need to add RequestMetrics
everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma Thank you, updated
Replaces the static
RequestMetrics
object with a class so that metrics are created and removed during broker startup and shutdown to avoid metrics tests being affected by metrics left behind by previous tests. Also reinstateskafka.api.MetricsTest
which was failing frequently earlier due to tests removing the static request metrics.