diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 78afdc86b40b..ff8a687b5eeb 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -179,6 +179,7 @@ class LogCleaner(initialConfig: CleanerConfig, */ def removeMetrics(): Unit = { LogCleaner.MetricNames.foreach(metricsGroup.removeMetric) + cleanerManager.removeMetrics() } /** diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index ef5df50ca8f0..e8a56f4567a8 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -88,17 +88,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], /* for coordinating the pausing and the cleaning of a partition */ private val pausedCleaningCond = lock.newCondition() + // Avoid adding legacy tags for a metric when initializing `LogCleanerManager` + GaugeMetricNameWithTag.clear() /* gauges for tracking the number of partitions marked as uncleanable for each log directory */ for (dir <- logDirs) { - metricsGroup.newGauge("uncleanable-partitions-count", + val metricTag = Map("logDirectory" -> dir.getAbsolutePath).asJava + metricsGroup.newGauge(UncleanablePartitionsCountMetricName, () => inLock(lock) { uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) }, - Map("logDirectory" -> dir.getAbsolutePath).asJava + metricTag ) + GaugeMetricNameWithTag.computeIfAbsent(UncleanablePartitionsCountMetricName, k => new java.util.ArrayList[java.util.Map[String, String]]()) + .add(metricTag) } /* gauges for tracking the number of uncleanable bytes from uncleanable partitions for each log directory */ for (dir <- logDirs) { - metricsGroup.newGauge("uncleanable-bytes", + val metricTag = Map("logDirectory" -> dir.getAbsolutePath).asJava + metricsGroup.newGauge(UncleanableBytesMetricName, () => inLock(lock) { uncleanablePartitions.get(dir.getAbsolutePath) match { case Some(partitions) => @@ -116,17 +122,19 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], case None => 0 } }, - Map("logDirectory" -> dir.getAbsolutePath).asJava + metricTag ) + GaugeMetricNameWithTag.computeIfAbsent(UncleanableBytesMetricName, k => new java.util.ArrayList[java.util.Map[String, String]]()) + .add(metricTag) } /* a gauge for tracking the cleanable ratio of the dirtiest log */ @volatile private var dirtiestLogCleanableRatio = 0.0 - metricsGroup.newGauge("max-dirty-percent", () => (100 * dirtiestLogCleanableRatio).toInt) + metricsGroup.newGauge(MaxDirtyPercentMetricName, () => (100 * dirtiestLogCleanableRatio).toInt) /* a gauge for tracking the time since the last log cleaner run, in milli seconds */ @volatile private var timeOfLastRun: Long = Time.SYSTEM.milliseconds - metricsGroup.newGauge("time-since-last-run-ms", () => Time.SYSTEM.milliseconds - timeOfLastRun) + metricsGroup.newGauge(TimeSinceLastRunMsMetricName, () => Time.SYSTEM.milliseconds - timeOfLastRun) /** * @return the position processed for all logs. @@ -538,6 +546,13 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], logDirsToRemove.foreach { uncleanablePartitions.remove } } } + + def removeMetrics(): Unit = { + GaugeMetricNameNoTag.foreach(metricsGroup.removeMetric) + GaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => { + metricNameAndTags._2.asScala.foreach(tag => metricsGroup.removeMetric(metricNameAndTags._1, tag)) + }) + } } /** @@ -555,6 +570,17 @@ private case class OffsetsToClean(firstDirtyOffset: Long, } private[log] object LogCleanerManager extends Logging { + private val UncleanablePartitionsCountMetricName = "uncleanable-partitions-count" + private val UncleanableBytesMetricName = "uncleanable-bytes" + private val MaxDirtyPercentMetricName = "max-dirty-percent" + private val TimeSinceLastRunMsMetricName = "time-since-last-run-ms" + + private[log] val GaugeMetricNameWithTag = new java.util.HashMap[String, java.util.List[java.util.Map[String, String]]]() + + private[log] val GaugeMetricNameNoTag = Set( + MaxDirtyPercentMetricName, + TimeSinceLastRunMsMetricName + ) def isCompactAndDelete(log: UnifiedLog): Boolean = { log.config.compact && log.config.delete diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 5a3ce6e43dde..f6e4f82f8736 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -30,6 +30,7 @@ import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} +import org.mockito.ArgumentMatchers import org.mockito.ArgumentMatchers.{any, anyString} import org.mockito.Mockito.{mockConstruction, times, verify, verifyNoMoreInteractions} @@ -71,7 +72,7 @@ class LogCleanerTest { val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup]) try { val logCleaner = new LogCleaner(new CleanerConfig(true), - logDirs = Array(TestUtils.tempDir()), + logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()), logs = new Pool[TopicPartition, UnifiedLog](), logDirFailureChannel = new LogDirFailureChannel(1), time = time) @@ -83,11 +84,27 @@ class LogCleanerTest { val numMetricsRegistered = LogCleaner.MetricNames.size verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) - // verify that each metric is removed + // verify that each metric in `LogCleaner` is removed LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_)) + // verify that each metric in `LogCleanerManager` is removed + val mockLogCleanerManagerMetricsGroup = mockMetricsGroupCtor.constructed.get(1) + LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any())) + LogCleanerManager.GaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => { + metricNameAndTags._2.asScala.foreach(tags => { + verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricNameAndTags._1), any(), ArgumentMatchers.eq(tags)) + }) + }) + LogCleanerManager.GaugeMetricNameNoTag.foreach(verify(mockLogCleanerManagerMetricsGroup).removeMetric(_)) + LogCleanerManager.GaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => { + metricNameAndTags._2.asScala.foreach(tags => { + verify(mockLogCleanerManagerMetricsGroup).removeMetric(ArgumentMatchers.eq(metricNameAndTags._1), ArgumentMatchers.eq(tags)) + }) + }) + // assert that we have verified all invocations on verifyNoMoreInteractions(mockMetricsGroup) + verifyNoMoreInteractions(mockLogCleanerManagerMetricsGroup) } finally { mockMetricsGroupCtor.close() }