diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala index 1e7af76aeb457..e44ef65cf06c6 100644 --- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala @@ -19,9 +19,8 @@ package kafka.server import java.util.concurrent.LinkedBlockingDeque import java.util.concurrent.atomic.AtomicReference - import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} -import kafka.utils.Logging +import kafka.metrics.KafkaMetricsGroup import org.apache.kafka.clients._ import org.apache.kafka.common.Node import org.apache.kafka.common.metrics.Metrics @@ -120,7 +119,8 @@ class BrokerToControllerChannelManagerImpl( channelName: String, threadNamePrefix: Option[String], retryTimeoutMs: Long -) extends BrokerToControllerChannelManager with Logging { +) extends BrokerToControllerChannelManager with KafkaMetricsGroup { + private val responseQueueSizeMetricName = "NumRequestsForwardingToController" private val logContext = new LogContext(s"[broker-${config.brokerId}-to-controller] ") private val manualMetadataUpdater = new ManualMetadataUpdater() private val apiVersions = new ApiVersions() @@ -129,9 +129,11 @@ class BrokerToControllerChannelManagerImpl( def start(): Unit = { requestThread.start() + newGauge(responseQueueSizeMetricName, () => requestThread.queueSize) } def shutdown(): Unit = { + removeMetric(responseQueueSizeMetricName) requestThread.shutdown() info(s"Broker to controller channel manager for $channelName shutdown") }