diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index e83de5a01a..11842ce939 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -30,6 +30,7 @@ import org.apache.kafka.server.metrics.KafkaMetricsGroup import java.util.Collections import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} +import scala.collection.Set import scala.collection.mutable import scala.jdk.CollectionConverters._ @@ -632,6 +633,11 @@ class BrokerTopicStats(configOpt: java.util.Optional[KafkaConfig] = java.util.Op } } + def removeRedundantMetrics(topicPartitions: Set[TopicPartition]): Unit = { + val topicPartitionsToRemove = partitionStats.keys.diff(topicPartitions) + topicPartitionsToRemove.foreach(removeMetrics) + } + def updateBytesOut(topic: String, isFollower: Boolean, isReassignment: Boolean, value: Long): Unit = { if (isFollower) { if (isReassignment) diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala index 839006c486..eaea705003 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala @@ -83,13 +83,18 @@ class ElasticReplicaManager( addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None, directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP, private val fastFetchExecutor: ExecutorService = Executors.newFixedThreadPool(4, ThreadUtils.createThreadFactory("kafka-apis-fast-fetch-executor-%d", true)), - private val slowFetchExecutor: ExecutorService = Executors.newFixedThreadPool(12, ThreadUtils.createThreadFactory("kafka-apis-slow-fetch-executor-%d", true)) + private val slowFetchExecutor: ExecutorService = Executors.newFixedThreadPool(12, ThreadUtils.createThreadFactory("kafka-apis-slow-fetch-executor-%d", true)), + private val partitionMetricsCleanerExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("kafka-partition-metrics-cleaner", true)), ) extends ReplicaManager(config, metrics, time, scheduler, logManager, remoteLogManager, quotaManagers, metadataCache, logDirFailureChannel, alterPartitionManager, brokerTopicStats, isShuttingDown, zkClient, delayedProducePurgatoryParam, delayedFetchPurgatoryParam, delayedDeleteRecordsPurgatoryParam, delayedElectLeaderPurgatoryParam, delayedRemoteFetchPurgatoryParam, threadNamePrefix, brokerEpochSupplier, addPartitionsToTxnManager, directoryEventHandler) { + partitionMetricsCleanerExecutor.scheduleAtFixedRate(() => { + brokerTopicStats.removeRedundantMetrics(allPartitions.keys) + }, 1, 1, TimeUnit.HOURS) + protected val openingPartitions = new ConcurrentHashMap[TopicPartition, CompletableFuture[Void]]() protected val closingPartitions = new ConcurrentHashMap[TopicPartition, CompletableFuture[Void]]() @@ -189,6 +194,7 @@ class ElasticReplicaManager( getPartition(topicPartition) match { case hostedPartition: HostedPartition.Online => if (allPartitions.remove(topicPartition, hostedPartition)) { + brokerTopicStats.removeMetrics(topicPartition) maybeRemoveTopicMetrics(topicPartition.topic) // AutoMQ for Kafka inject start if (ElasticLogManager.enabled()) {