Skip to content

Commit

Permalink
KAFKA-15129: [1/N] Remove metrics in LogCleanerManager when LogCleane…
Browse files Browse the repository at this point in the history
…r shutdown (#13924)

Reviewers: Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>

---------

Co-authored-by: Deqi Hu <deqi.hu@shopee.com>
  • Loading branch information
hudeqi and Deqi Hu committed Jul 3, 2023
1 parent 0ae1d22 commit 48eb8c9
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 8 deletions.
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Expand Up @@ -179,6 +179,7 @@ class LogCleaner(initialConfig: CleanerConfig,
*/
def removeMetrics(): Unit = {
LogCleaner.MetricNames.foreach(metricsGroup.removeMetric)
cleanerManager.removeMetrics()
}

/**
Expand Down
38 changes: 32 additions & 6 deletions core/src/main/scala/kafka/log/LogCleanerManager.scala
Expand Up @@ -88,17 +88,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
/* for coordinating the pausing and the cleaning of a partition */
private val pausedCleaningCond = lock.newCondition()

// Avoid adding legacy tags for a metric when initializing `LogCleanerManager`
GaugeMetricNameWithTag.clear()
/* gauges for tracking the number of partitions marked as uncleanable for each log directory */
for (dir <- logDirs) {
metricsGroup.newGauge("uncleanable-partitions-count",
val metricTag = Map("logDirectory" -> dir.getAbsolutePath).asJava
metricsGroup.newGauge(UncleanablePartitionsCountMetricName,
() => inLock(lock) { uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) },
Map("logDirectory" -> dir.getAbsolutePath).asJava
metricTag
)
GaugeMetricNameWithTag.computeIfAbsent(UncleanablePartitionsCountMetricName, k => new java.util.ArrayList[java.util.Map[String, String]]())
.add(metricTag)
}

/* gauges for tracking the number of uncleanable bytes from uncleanable partitions for each log directory */
for (dir <- logDirs) {
metricsGroup.newGauge("uncleanable-bytes",
val metricTag = Map("logDirectory" -> dir.getAbsolutePath).asJava
metricsGroup.newGauge(UncleanableBytesMetricName,
() => inLock(lock) {
uncleanablePartitions.get(dir.getAbsolutePath) match {
case Some(partitions) =>
Expand All @@ -116,17 +122,19 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
case None => 0
}
},
Map("logDirectory" -> dir.getAbsolutePath).asJava
metricTag
)
GaugeMetricNameWithTag.computeIfAbsent(UncleanableBytesMetricName, k => new java.util.ArrayList[java.util.Map[String, String]]())
.add(metricTag)
}

/* a gauge for tracking the cleanable ratio of the dirtiest log */
@volatile private var dirtiestLogCleanableRatio = 0.0
metricsGroup.newGauge("max-dirty-percent", () => (100 * dirtiestLogCleanableRatio).toInt)
metricsGroup.newGauge(MaxDirtyPercentMetricName, () => (100 * dirtiestLogCleanableRatio).toInt)

/* a gauge for tracking the time since the last log cleaner run, in milli seconds */
@volatile private var timeOfLastRun: Long = Time.SYSTEM.milliseconds
metricsGroup.newGauge("time-since-last-run-ms", () => Time.SYSTEM.milliseconds - timeOfLastRun)
metricsGroup.newGauge(TimeSinceLastRunMsMetricName, () => Time.SYSTEM.milliseconds - timeOfLastRun)

/**
* @return the position processed for all logs.
Expand Down Expand Up @@ -538,6 +546,13 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
logDirsToRemove.foreach { uncleanablePartitions.remove }
}
}

def removeMetrics(): Unit = {
GaugeMetricNameNoTag.foreach(metricsGroup.removeMetric)
GaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => {
metricNameAndTags._2.asScala.foreach(tag => metricsGroup.removeMetric(metricNameAndTags._1, tag))
})
}
}

/**
Expand All @@ -555,6 +570,17 @@ private case class OffsetsToClean(firstDirtyOffset: Long,
}

private[log] object LogCleanerManager extends Logging {
private val UncleanablePartitionsCountMetricName = "uncleanable-partitions-count"
private val UncleanableBytesMetricName = "uncleanable-bytes"
private val MaxDirtyPercentMetricName = "max-dirty-percent"
private val TimeSinceLastRunMsMetricName = "time-since-last-run-ms"

private[log] val GaugeMetricNameWithTag = new java.util.HashMap[String, java.util.List[java.util.Map[String, String]]]()

private[log] val GaugeMetricNameNoTag = Set(
MaxDirtyPercentMetricName,
TimeSinceLastRunMsMetricName
)

def isCompactAndDelete(log: UnifiedLog): Boolean = {
log.config.compact && log.config.delete
Expand Down
21 changes: 19 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Expand Up @@ -30,6 +30,7 @@ import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito.{mockConstruction, times, verify, verifyNoMoreInteractions}

Expand Down Expand Up @@ -71,7 +72,7 @@ class LogCleanerTest {
val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
try {
val logCleaner = new LogCleaner(new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir()),
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)
Expand All @@ -83,11 +84,27 @@ class LogCleanerTest {
val numMetricsRegistered = LogCleaner.MetricNames.size
verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any())

// verify that each metric is removed
// verify that each metric in `LogCleaner` is removed
LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))

// verify that each metric in `LogCleanerManager` is removed
val mockLogCleanerManagerMetricsGroup = mockMetricsGroupCtor.constructed.get(1)
LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any()))
LogCleanerManager.GaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => {
metricNameAndTags._2.asScala.foreach(tags => {
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricNameAndTags._1), any(), ArgumentMatchers.eq(tags))
})
})
LogCleanerManager.GaugeMetricNameNoTag.foreach(verify(mockLogCleanerManagerMetricsGroup).removeMetric(_))
LogCleanerManager.GaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => {
metricNameAndTags._2.asScala.foreach(tags => {
verify(mockLogCleanerManagerMetricsGroup).removeMetric(ArgumentMatchers.eq(metricNameAndTags._1), ArgumentMatchers.eq(tags))
})
})

// assert that we have verified all invocations on
verifyNoMoreInteractions(mockMetricsGroup)
verifyNoMoreInteractions(mockLogCleanerManagerMetricsGroup)
} finally {
mockMetricsGroupCtor.close()
}
Expand Down

0 comments on commit 48eb8c9

Please sign in to comment.