Skip to content

Commit

Permalink
KAFKA-14866: Remove controller module metrics when broker is shutting…
Browse files Browse the repository at this point in the history
… down (#13473)


Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>
  • Loading branch information
hudeqi committed Jun 6, 2023
1 parent 04f2f6a commit 9ebe395
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 15 deletions.
67 changes: 52 additions & 15 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import kafka.admin.AdminOperationException
import kafka.api._
import kafka.common._
import kafka.cluster.Broker
import kafka.controller.KafkaController.{AlterReassignmentsCallback, ElectLeadersCallback, ListReassignmentsCallback, UpdateFeaturesCallback}
import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback}
import kafka.coordinator.transaction.ZkProducerIdManager
import kafka.server._
import kafka.server.metadata.ZkFinalizedFeatureCache
Expand Down Expand Up @@ -69,6 +69,35 @@ object KafkaController extends Logging {
type ListReassignmentsCallback = Either[Map[TopicPartition, ReplicaAssignment], ApiError] => Unit
type AlterReassignmentsCallback = Either[Map[TopicPartition, ApiError], ApiError] => Unit
type UpdateFeaturesCallback = Either[ApiError, Map[String, ApiError]] => Unit

private val ActiveControllerCountMetricName = "ActiveControllerCount"
private val OfflinePartitionsCountMetricName = "OfflinePartitionsCount"
private val PreferredReplicaImbalanceCountMetricName = "PreferredReplicaImbalanceCount"
private val ControllerStateMetricName = "ControllerState"
private val GlobalTopicCountMetricName = "GlobalTopicCount"
private val GlobalPartitionCountMetricName = "GlobalPartitionCount"
private val TopicsToDeleteCountMetricName = "TopicsToDeleteCount"
private val ReplicasToDeleteCountMetricName = "ReplicasToDeleteCount"
private val TopicsIneligibleToDeleteCountMetricName = "TopicsIneligibleToDeleteCount"
private val ReplicasIneligibleToDeleteCountMetricName = "ReplicasIneligibleToDeleteCount"
private val ActiveBrokerCountMetricName = "ActiveBrokerCount"
private val FencedBrokerCountMetricName = "FencedBrokerCount"

// package private for testing
private[controller] val MetricNames = Set(
ActiveControllerCountMetricName,
OfflinePartitionsCountMetricName,
PreferredReplicaImbalanceCountMetricName,
ControllerStateMetricName,
GlobalTopicCountMetricName,
GlobalPartitionCountMetricName,
TopicsToDeleteCountMetricName,
ReplicasToDeleteCountMetricName,
TopicsIneligibleToDeleteCountMetricName,
ReplicasIneligibleToDeleteCountMetricName,
ActiveBrokerCountMetricName,
FencedBrokerCountMetricName
)
}

class KafkaController(val config: KafkaConfig,
Expand Down Expand Up @@ -144,19 +173,19 @@ class KafkaController(val config: KafkaConfig,
/* single-thread scheduler to clean expired tokens */
private val tokenCleanScheduler = new KafkaScheduler(1, true, "delegation-token-cleaner")

metricsGroup.newGauge("ActiveControllerCount", () => if (isActive) 1 else 0)
metricsGroup.newGauge("OfflinePartitionsCount", () => offlinePartitionCount)
metricsGroup.newGauge("PreferredReplicaImbalanceCount", () => preferredReplicaImbalanceCount)
metricsGroup.newGauge("ControllerState", () => state.value)
metricsGroup.newGauge("GlobalTopicCount", () => globalTopicCount)
metricsGroup.newGauge("GlobalPartitionCount", () => globalPartitionCount)
metricsGroup.newGauge("TopicsToDeleteCount", () => topicsToDeleteCount)
metricsGroup.newGauge("ReplicasToDeleteCount", () => replicasToDeleteCount)
metricsGroup.newGauge("TopicsIneligibleToDeleteCount", () => ineligibleTopicsToDeleteCount)
metricsGroup.newGauge("ReplicasIneligibleToDeleteCount", () => ineligibleReplicasToDeleteCount)
metricsGroup.newGauge("ActiveBrokerCount", () => activeBrokerCount)
metricsGroup.newGauge(ActiveControllerCountMetricName, () => if (isActive) 1 else 0)
metricsGroup.newGauge(OfflinePartitionsCountMetricName, () => offlinePartitionCount)
metricsGroup.newGauge(PreferredReplicaImbalanceCountMetricName, () => preferredReplicaImbalanceCount)
metricsGroup.newGauge(ControllerStateMetricName, () => state.value)
metricsGroup.newGauge(GlobalTopicCountMetricName, () => globalTopicCount)
metricsGroup.newGauge(GlobalPartitionCountMetricName, () => globalPartitionCount)
metricsGroup.newGauge(TopicsToDeleteCountMetricName, () => topicsToDeleteCount)
metricsGroup.newGauge(ReplicasToDeleteCountMetricName, () => replicasToDeleteCount)
metricsGroup.newGauge(TopicsIneligibleToDeleteCountMetricName, () => ineligibleTopicsToDeleteCount)
metricsGroup.newGauge(ReplicasIneligibleToDeleteCountMetricName, () => ineligibleReplicasToDeleteCount)
metricsGroup.newGauge(ActiveBrokerCountMetricName, () => activeBrokerCount)
// FencedBrokerCount metric is always 0 in the ZK controller.
metricsGroup.newGauge("FencedBrokerCount", () => 0)
metricsGroup.newGauge(FencedBrokerCountMetricName, () => 0)

/**
* Returns true if this broker is the current controller.
Expand Down Expand Up @@ -196,8 +225,12 @@ class KafkaController(val config: KafkaConfig,
* shuts down the controller channel manager, if one exists (i.e. if it was the current controller)
*/
def shutdown(): Unit = {
eventManager.close()
onControllerResignation()
try {
eventManager.close()
onControllerResignation()
} finally {
removeMetrics()
}
}

/**
Expand Down Expand Up @@ -502,6 +535,10 @@ class KafkaController(val config: KafkaConfig,
info("Resigned")
}

private def removeMetrics(): Unit = {
KafkaController.MetricNames.foreach(metricsGroup.removeMetric)
}

/*
* This callback is invoked by the controller's LogDirEventNotificationListener with the list of broker ids who
* have experienced new log directory failures. In response the controller should send LeaderAndIsrRequest
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.controller

import kafka.server.metadata.ZkMetadataCache
import kafka.server.{BrokerFeatures, DelegationTokenManager, KafkaConfig}
import kafka.utils.{MockTime, TestUtils}
import kafka.zk.{BrokerInfo, KafkaZkClient}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito.{mock, mockConstruction, times, verify, verifyNoMoreInteractions}

class KafkaControllerTest {
var config: KafkaConfig = _

@BeforeEach
def setUp(): Unit = {
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
config = KafkaConfig.fromProps(props)
}

@Test
def testRemoveMetricsOnClose(): Unit = {
val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
try {
val kafkaController = new KafkaController(
config = config,
zkClient = mock(classOf[KafkaZkClient]),
time = new MockTime(),
metrics = mock(classOf[Metrics]),
initialBrokerInfo = mock(classOf[BrokerInfo]),
initialBrokerEpoch = 0,
tokenManager = mock(classOf[DelegationTokenManager]),
brokerFeatures = mock(classOf[BrokerFeatures]),
featureCache = mock(classOf[ZkMetadataCache])
)

// shutdown kafkaController so that metrics are removed
kafkaController.shutdown()

val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
val numMetricsRegistered = KafkaController.MetricNames.size
verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any())
KafkaController.MetricNames.foreach(metricName => verify(mockMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any()))
// verify that each metric is removed
verify(mockMetricsGroup, times(numMetricsRegistered)).removeMetric(anyString())
KafkaController.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))

// assert that we have verified all invocations on
verifyNoMoreInteractions(mockMetricsGroup)
} finally {
mockMetricsGroupCtor.close()
}
}

}

0 comments on commit 9ebe395

Please sign in to comment.