diff --git a/build.gradle b/build.gradle
index 76dec6966800..250aed449117 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1638,6 +1638,8 @@ project(':storage:api') {
dependencies {
implementation project(':clients')
+ implementation project(':server-common')
+ implementation libs.metrics
implementation libs.slf4jApi
testImplementation project(':clients')
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7ecc4d5a1d14..8148de598fad 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -261,6 +261,10 @@
+
+
+
+
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 33bde33882f6..d07d60ce3a87 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -111,6 +111,7 @@
import java.util.stream.Stream;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
+import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
/**
* This class is responsible for
@@ -123,8 +124,6 @@ public class RemoteLogManager implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class);
private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader";
- public static final String REMOTE_LOG_READER_METRICS_NAME_PREFIX = "RemoteLogReader";
- public static final String REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT = "RemoteLogManagerTasksAvgIdlePercent";
private final RemoteLogManagerConfig rlmConfig;
private final int brokerId;
private final String logDir;
@@ -185,7 +184,7 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
- metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT, new Gauge() {
+ metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName(), new Gauge() {
@Override
public Double value() {
return rlmScheduledThreadPool.getIdlePercent();
@@ -195,13 +194,12 @@ public Double value() {
remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
REMOTE_LOG_READER_THREAD_NAME_PREFIX,
rlmConfig.remoteLogReaderThreads(),
- rlmConfig.remoteLogReaderMaxPendingTasks(),
- REMOTE_LOG_READER_METRICS_NAME_PREFIX
+ rlmConfig.remoteLogReaderMaxPendingTasks()
);
}
private void removeMetrics() {
- metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
+ metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
remoteStorageReaderThreadPool.removeMetrics();
}
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index d5403e0c57aa..aed480beba5d 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -38,6 +38,7 @@
import java.util.Collections;
import java.util.Optional;
+
import scala.compat.java8.OptionConverters;
@@ -171,7 +172,7 @@ public KafkaApis build() {
if (metrics == null) throw new RuntimeException("You must set metrics");
if (quotas == null) throw new RuntimeException("You must set quotas");
if (fetchManager == null) throw new RuntimeException("You must set fetchManager");
- if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats();
+ if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(Optional.of(config));
if (apiVersionManager == null) throw new RuntimeException("You must set apiVersionManager");
return new KafkaApis(requestChannel,
diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 5860aa176936..2566e4bcfcbc 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -54,7 +54,7 @@ public class ReplicaManagerBuilder {
private MetadataCache metadataCache = null;
private LogDirFailureChannel logDirFailureChannel = null;
private AlterPartitionManager alterPartitionManager = null;
- private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ private BrokerTopicStats brokerTopicStats = null;
private AtomicBoolean isShuttingDown = new AtomicBoolean(false);
private Optional remoteLogManager = Optional.empty();
private Optional zkClient = Optional.empty();
@@ -179,6 +179,7 @@ public ReplicaManager build() {
if (metadataCache == null) throw new RuntimeException("You must set metadataCache");
if (logDirFailureChannel == null) throw new RuntimeException("You must set logDirFailureChannel");
if (alterPartitionManager == null) throw new RuntimeException("You must set alterIsrManager");
+ if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(Optional.of(config));
return new ReplicaManager(config,
metrics,
time,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 1d0bd0124b02..825d9eb8c19c 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -188,8 +188,7 @@ class BrokerServer(
kafkaScheduler.startup()
/* register broker metrics */
- brokerTopicStats = new BrokerTopicStats
-
+ brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(config))
quotaManagers = QuotaFactory.instantiate(config, metrics, time, s"broker-${config.nodeId}-")
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 7a9f166b56df..6191bf881205 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger
import com.yammer.metrics.core.Meter
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.{KafkaThread, Time}
+import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import java.util.Collections
@@ -227,7 +228,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
}
}
-class BrokerTopicMetrics(name: Option[String]) {
+class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[KafkaConfig]) {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
val tags: java.util.Map[String, String] = name match {
@@ -277,17 +278,12 @@ class BrokerTopicMetrics(name: Option[String]) {
BrokerTopicStats.TotalFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"),
BrokerTopicStats.FetchMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"),
BrokerTopicStats.ProduceMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"),
- BrokerTopicStats.RemoteCopyBytesPerSec -> MeterWrapper(BrokerTopicStats.RemoteCopyBytesPerSec, "bytes"),
- BrokerTopicStats.RemoteFetchBytesPerSec -> MeterWrapper(BrokerTopicStats.RemoteFetchBytesPerSec, "bytes"),
- BrokerTopicStats.RemoteFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteFetchRequestsPerSec, "requests"),
- BrokerTopicStats.RemoteCopyRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteCopyRequestsPerSec, "requests"),
- BrokerTopicStats.FailedRemoteFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteFetchRequestsPerSec, "requests"),
- BrokerTopicStats.FailedRemoteCopyRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteCopyRequestsPerSec, "requests"),
BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec -> MeterWrapper(BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec, "requests"),
BrokerTopicStats.InvalidMagicNumberRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMagicNumberRecordsPerSec, "requests"),
BrokerTopicStats.InvalidMessageCrcRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMessageCrcRecordsPerSec, "requests"),
BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec, "requests")
).asJava)
+
if (name.isEmpty) {
metricTypeMap.put(BrokerTopicStats.ReplicationBytesInPerSec, MeterWrapper(BrokerTopicStats.ReplicationBytesInPerSec, "bytes"))
metricTypeMap.put(BrokerTopicStats.ReplicationBytesOutPerSec, MeterWrapper(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes"))
@@ -295,6 +291,18 @@ class BrokerTopicMetrics(name: Option[String]) {
metricTypeMap.put(BrokerTopicStats.ReassignmentBytesOutPerSec, MeterWrapper(BrokerTopicStats.ReassignmentBytesOutPerSec, "bytes"))
}
+ configOpt.ifPresent(config =>
+ if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
+ metricTypeMap.putAll(Map(
+ RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName, "bytes"),
+ RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName, "bytes"),
+ RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName, "requests"),
+ RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName, "requests"),
+ RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName, "requests"),
+ RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName, "requests")
+ ).asJava)
+ })
+
// used for testing only
def metricMap: Map[String, MeterWrapper] = metricTypeMap.toMap
@@ -342,17 +350,17 @@ class BrokerTopicMetrics(name: Option[String]) {
def invalidOffsetOrSequenceRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter()
- def remoteCopyBytesRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteCopyBytesPerSec).meter()
+ def remoteCopyBytesRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName).meter()
- def remoteFetchBytesRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteFetchBytesPerSec).meter()
+ def remoteFetchBytesRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName).meter()
- def remoteFetchRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteFetchRequestsPerSec).meter()
+ def remoteFetchRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName).meter()
- def remoteCopyRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteCopyRequestsPerSec).meter()
+ def remoteCopyRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName).meter()
- def failedRemoteFetchRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteFetchRequestsPerSec).meter()
+ def failedRemoteFetchRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName).meter()
- def failedRemoteCopyRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteCopyRequestsPerSec).meter()
+ def failedRemoteCopyRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName).meter()
def closeMetric(metricType: String): Unit = {
val meter = metricTypeMap.get(metricType)
@@ -378,27 +386,18 @@ object BrokerTopicStats {
val ProduceMessageConversionsPerSec = "ProduceMessageConversionsPerSec"
val ReassignmentBytesInPerSec = "ReassignmentBytesInPerSec"
val ReassignmentBytesOutPerSec = "ReassignmentBytesOutPerSec"
- val RemoteCopyBytesPerSec = "RemoteCopyBytesPerSec"
- val RemoteFetchBytesPerSec = "RemoteFetchBytesPerSec"
- val RemoteFetchRequestsPerSec = "RemoteFetchRequestsPerSec"
- val RemoteCopyRequestsPerSec = "RemoteCopyRequestsPerSec"
- val FailedRemoteFetchRequestsPerSec = "RemoteFetchErrorsPerSec"
- val FailedRemoteCopyRequestsPerSec = "RemoteCopyErrorsPerSec"
-
// These following topics are for LogValidator for better debugging on failed records
val NoKeyCompactedTopicRecordsPerSec = "NoKeyCompactedTopicRecordsPerSec"
val InvalidMagicNumberRecordsPerSec = "InvalidMagicNumberRecordsPerSec"
val InvalidMessageCrcRecordsPerSec = "InvalidMessageCrcRecordsPerSec"
val InvalidOffsetOrSequenceRecordsPerSec = "InvalidOffsetOrSequenceRecordsPerSec"
-
- private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k))
}
-class BrokerTopicStats extends Logging {
- import BrokerTopicStats._
+class BrokerTopicStats(configOpt: java.util.Optional[KafkaConfig] = java.util.Optional.empty()) extends Logging {
+ private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k), configOpt)
private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
- val allTopicsStats = new BrokerTopicMetrics(None)
+ val allTopicsStats = new BrokerTopicMetrics(None, configOpt)
def topicStats(topic: String): BrokerTopicMetrics =
stats.getAndMaybePut(topic)
@@ -439,12 +438,12 @@ class BrokerTopicStats extends Logging {
topicMetrics.closeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesOutPerSec)
topicMetrics.closeMetric(BrokerTopicStats.ReassignmentBytesOutPerSec)
- topicMetrics.closeMetric(BrokerTopicStats.RemoteCopyBytesPerSec)
- topicMetrics.closeMetric(BrokerTopicStats.RemoteFetchBytesPerSec)
- topicMetrics.closeMetric(BrokerTopicStats.RemoteFetchRequestsPerSec)
- topicMetrics.closeMetric(BrokerTopicStats.RemoteCopyRequestsPerSec)
- topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteFetchRequestsPerSec)
- topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteCopyRequestsPerSec)
+ topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName)
+ topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName)
+ topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName)
+ topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName)
+ topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName)
+ topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName)
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0d10f1cdde39..a40c7d2bbea0 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -261,7 +261,7 @@ class KafkaServer(
metrics = Server.initializeMetrics(config, time, clusterId)
/* register broker metrics */
- _brokerTopicStats = new BrokerTopicStats
+ _brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(config))
quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
KafkaBroker.notifyClusterListeners(clusterId, kafkaMetricsReporters ++ metrics.reporters.asScala)
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 941f4dc961e6..dedcea0e38db 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -59,7 +59,6 @@
import org.apache.kafka.storage.internals.log.LazyIndex;
import org.apache.kafka.storage.internals.log.OffsetIndex;
import org.apache.kafka.storage.internals.log.ProducerStateManager;
-import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool;
import org.apache.kafka.storage.internals.log.TimeIndex;
import org.apache.kafka.storage.internals.log.TransactionIndex;
import org.apache.kafka.test.TestUtils;
@@ -88,14 +87,14 @@
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.stream.Collectors;
-import static kafka.log.remote.RemoteLogManager.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT;
-import static kafka.log.remote.RemoteLogManager.REMOTE_LOG_READER_METRICS_NAME_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
+import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
+import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -127,7 +126,8 @@ public class RemoteLogManagerTest {
RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class);
RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class);
RemoteLogManagerConfig remoteLogManagerConfig = null;
- BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+
+ BrokerTopicStats brokerTopicStats = null;
RemoteLogManager remoteLogManager = null;
TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Leader", 0));
@@ -157,8 +157,10 @@ public List read() {
void setUp() throws Exception {
topicIds.put(leaderTopicIdPartition.topicPartition().topic(), leaderTopicIdPartition.topicId());
topicIds.put(followerTopicIdPartition.topicPartition().topic(), followerTopicIdPartition.topicId());
- Properties props = new Properties();
+ Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
+ props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
remoteLogManagerConfig = createRLMConfig(props);
+ brokerTopicStats = new BrokerTopicStats(Optional.of(KafkaConfig.fromProps(props)));
kafka.utils.TestUtils.clearYammerMetrics();
@@ -922,11 +924,8 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() {
KafkaMetricsGroup mockRlmMetricsGroup = mockMetricsGroupCtor.constructed().get(0);
KafkaMetricsGroup mockThreadPoolMetricsGroup = mockMetricsGroupCtor.constructed().get(1);
- List remoteLogManagerMetricNames = Collections.singletonList(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
- List remoteStorageThreadPoolMetricNames = RemoteStorageThreadPool.METRIC_SUFFIXES
- .stream()
- .map(suffix -> REMOTE_LOG_READER_METRICS_NAME_PREFIX + suffix)
- .collect(Collectors.toList());
+ List remoteLogManagerMetricNames = Collections.singletonList(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
+ Set remoteStorageThreadPoolMetricNames = REMOTE_STORAGE_THREAD_POOL_METRICS;
verify(mockRlmMetricsGroup, times(remoteLogManagerMetricNames.size())).newGauge(anyString(), any());
// Verify that the RemoteLogManager metrics are removed
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
index 3e8596f93cd6..d1d35b5e5df1 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
@@ -17,9 +17,11 @@
package kafka.log.remote;
import kafka.server.BrokerTopicStats;
+import kafka.server.KafkaConfig;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.Records;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
@@ -30,6 +32,8 @@
import org.mockito.ArgumentCaptor;
import java.io.IOException;
+import java.util.Optional;
+import java.util.Properties;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -44,13 +48,16 @@
public class RemoteLogReaderTest {
public static final String TOPIC = "test";
RemoteLogManager mockRLM = mock(RemoteLogManager.class);
- BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ BrokerTopicStats brokerTopicStats = null;
LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100);
Records records = mock(Records.class);
@BeforeEach
public void setUp() {
TestUtils.clearYammerMetrics();
+ Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
+ props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
+ brokerTopicStats = new BrokerTopicStats(Optional.of(KafkaConfig.fromProps(props)));
}
@Test
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 09e21ccc084a..c34d8ff0cdbc 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -24,9 +24,12 @@ import org.apache.kafka.common.errors.{InvalidTopicException, UnknownTopicOrPart
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.authenticator.TestJaasConfig
+import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
@@ -54,6 +57,12 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
+ if (testInfo.getDisplayName.contains("testMetrics") && testInfo.getDisplayName.endsWith("true")) {
+ // systemRemoteStorageEnabled is enabled
+ this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
+ this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
+ this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
+ }
verifyNoRequestMetrics("Request metrics not removed in a previous test")
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
super.setUp(testInfo)
@@ -70,8 +79,9 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
* Verifies some of the metrics of producer, consumer as well as server.
*/
@nowarn("cat=deprecation")
- @Test
- def testMetrics(): Unit = {
+ @ParameterizedTest(name = "testMetrics with systemRemoteStorageEnabled: {0}")
+ @ValueSource(booleans = Array(true, false))
+ def testMetrics(systemRemoteStorageEnabled: Boolean): Unit = {
val topic = "topicWithOldMessageFormat"
val props = new Properties
props.setProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.9.0")
@@ -103,6 +113,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
generateAuthenticationFailure(tp)
verifyBrokerAuthenticationMetrics(server)
+ verifyRemoteStorageMetrics(systemRemoteStorageEnabled)
}
private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int,
@@ -308,4 +319,17 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
}
assertTrue(metrics.isEmpty, s"$errorMessage: ${metrics.keys}")
}
+
+ private def verifyRemoteStorageMetrics(shouldContainMetrics: Boolean): Unit = {
+ val metrics = RemoteStorageMetrics.allMetrics().asScala.filter(name =>
+ KafkaYammerMetrics.defaultRegistry.allMetrics.asScala.find(metric => {
+ metric._1.getMBeanName().equals(name.getMBeanName)
+ }).isDefined
+ ).toList
+ if (shouldContainMetrics) {
+ assertEquals(RemoteStorageMetrics.allMetrics().size(), metrics.size, s"Only $metrics appear in the metrics")
+ } else {
+ assertEquals(0, metrics.size, s"$metrics should not appear in the metrics")
+ }
+ }
}
diff --git a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
index 5b0c07460543..bcdafd4bd4d3 100644
--- a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
+++ b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
@@ -25,8 +25,11 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.MockTime
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, RemoteStorageMetrics}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, when}
@@ -77,4 +80,22 @@ class KafkaRequestHandlerTest {
assertEquals(Some(startTime + 2000000), request.callbackRequestDequeueTimeNanos)
assertEquals(Some(startTime + 3000000), request.callbackRequestCompleteTimeNanos)
}
+
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testTopicStats(systemRemoteStorageEnabled: Boolean): Unit = {
+ val topic = "topic"
+ val props = kafka.utils.TestUtils.createDummyBrokerConfig()
+ props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, systemRemoteStorageEnabled.toString)
+ val brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(KafkaConfig.fromProps(props)))
+ brokerTopicStats.topicStats(topic)
+ RemoteStorageMetrics.brokerTopicStatsMetrics.forEach(metric => {
+ if (systemRemoteStorageEnabled) {
+ assertTrue(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName))
+ } else {
+ assertFalse(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName))
+ }
+ })
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 724833074110..92533d0e7468 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3583,6 +3583,7 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)
val props = new Properties()
+ props.put("zookeeper.connect", "test")
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString)
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
@@ -3591,7 +3592,7 @@ class ReplicaManagerTest {
val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props)
val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
val mockLog = mock(classOf[UnifiedLog])
- val brokerTopicStats = new BrokerTopicStats
+ val brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(KafkaConfig.fromProps(props)))
val remoteLogManager = new RemoteLogManager(
remoteLogManagerConfig,
0,
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e587c8cb6e8f..15ccb7d339b5 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -280,6 +280,10 @@ object TestUtils extends Logging {
Await.result(future, FiniteDuration(5, TimeUnit.MINUTES))
}
+ def createDummyBrokerConfig(): Properties = {
+ createBrokerConfig(0, "")
+ }
+
/**
* Create a test config for the provided parameters.
*
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 12e2cd1468c8..511f51064e19 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -95,6 +95,7 @@
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -133,7 +134,7 @@ public void setup() throws IOException {
KafkaConfig config = new KafkaConfig(props);
LogConfig logConfig = createLogConfig();
- BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
List logDirs = Collections.singletonList(logDir);
logManager = new LogManagerBuilder().
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 13409a57044d..b3ab13fcb432 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -117,7 +117,7 @@ public class KRaftMetadataRequestBenchmark {
clientQuotaManager, clientRequestQuotaManager, controllerMutationQuotaManager, replicaQuotaManager,
replicaQuotaManager, replicaQuotaManager, Option.empty());
private FetchManager fetchManager = Mockito.mock(FetchManager.class);
- private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ private BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
private KafkaApis kafkaApis;
private RequestChannel.Request allTopicMetadataRequest;
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 5f0bcec62f03..187743ce0b15 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -122,7 +122,7 @@ public class MetadataRequestBenchmark {
clientQuotaManager, clientRequestQuotaManager, controllerMutationQuotaManager, replicaQuotaManager,
replicaQuotaManager, replicaQuotaManager, Option.empty());
private FetchManager fetchManager = Mockito.mock(FetchManager.class);
- private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ private BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
private KafkaApis kafkaApis;
private RequestChannel.Request allTopicMetadataRequest;
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index 0e051ac5d896..b28ab90818f9 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -94,7 +94,7 @@ public void setup() throws IOException {
scheduler.startup();
LogConfig logConfig = new LogConfig(new Properties());
- BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
logManager = new LogManagerBuilder().
setLogDirs(Collections.singletonList(logDir)).
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index a8fec969c872..eafe0e4b27ec 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -74,7 +74,7 @@ public class UpdateFollowerFetchStateBenchmark {
private Option topicId = OptionConverters.toScala(Optional.of(Uuid.randomUuid()));
private File logDir = new File(System.getProperty("java.io.tmpdir"), topicPartition.toString());
private KafkaScheduler scheduler = new KafkaScheduler(1, true, "scheduler");
- private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ private BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
private LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
private long nextOffset = 0;
private LogManager logManager;
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
index ac79fd357f8e..ade2a8fe0070 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
@@ -36,6 +36,7 @@
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Optional;
import java.util.Random;
import java.util.stream.IntStream;
@@ -79,7 +80,7 @@ public enum Bytes {
ByteBuffer[] batchBuffers;
RequestLocal requestLocal;
LogValidator.MetricsRecorder validatorMetricsRecorder = UnifiedLog.newValidatorMetricsRecorder(
- new BrokerTopicStats().allTopicsStats());
+ new BrokerTopicStats(Optional.empty()).allTopicsStats());
@Setup
public void init() {
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 1315a2e183b8..b0502c955a54 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -56,6 +56,7 @@
import java.io.File;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -110,7 +111,7 @@ public void setup() {
new LogConfig(new Properties()), new MockConfigRepository(), new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d,
1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, MetadataVersion.latest(), 4, false, Option.empty());
scheduler.startup();
- final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
final MetadataCache metadataCache =
MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(),
this.brokerProperties.interBrokerProtocolVersion(),
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index befa467e224d..0bbf934e6e88 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -63,6 +63,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -113,7 +114,7 @@ public void setup() {
this.metrics = new Metrics();
this.time = Time.SYSTEM;
this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size());
- final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
final List files =
JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
CleanerConfig cleanerConfig = new CleanerConfig(1,
diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
new file mode 100644
index 000000000000..7c5f74836ae5
--- /dev/null
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import com.yammer.metrics.core.MetricName;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This class contains the metrics related to tiered storage feature, which is to have a centralized
+ * place to store them, so that we can verify all of them easily.
+ *
+ * @see kafka.api.MetricsTest
+ */
+public class RemoteStorageMetrics {
+ private static final String REMOTE_LOG_READER_METRICS_NAME_PREFIX = "RemoteLogReader";
+ private static final String REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT = "RemoteLogManagerTasksAvgIdlePercent";
+ private static final String TASK_QUEUE_SIZE = "TaskQueueSize";
+ private static final String AVG_IDLE_PERCENT = "AvgIdlePercent";
+ private static final String REMOTE_COPY_BYTES_PER_SEC = "RemoteCopyBytesPerSec";
+ private static final String REMOTE_FETCH_BYTES_PER_SEC = "RemoteFetchBytesPerSec";
+ private static final String REMOTE_FETCH_REQUESTS_PER_SEC = "RemoteFetchRequestsPerSec";
+ private static final String REMOTE_COPY_REQUESTS_PER_SEC = "RemoteCopyRequestsPerSec";
+ private static final String FAILED_REMOTE_FETCH_PER_SEC = "RemoteFetchErrorsPerSec";
+ private static final String FAILED_REMOTE_COPY_PER_SEC = "RemoteCopyErrorsPerSec";
+ private static final String REMOTE_LOG_READER_TASK_QUEUE_SIZE = REMOTE_LOG_READER_METRICS_NAME_PREFIX + TASK_QUEUE_SIZE;
+ private static final String REMOTE_LOG_READER_AVG_IDLE_PERCENT = REMOTE_LOG_READER_METRICS_NAME_PREFIX + AVG_IDLE_PERCENT;
+ public static final Set REMOTE_STORAGE_THREAD_POOL_METRICS = Collections.unmodifiableSet(
+ new HashSet<>(Arrays.asList(REMOTE_LOG_READER_TASK_QUEUE_SIZE, REMOTE_LOG_READER_AVG_IDLE_PERCENT)));
+
+ public final static MetricName REMOTE_COPY_BYTES_PER_SEC_METRIC = getMetricName(
+ "kafka.server", "BrokerTopicMetrics", REMOTE_COPY_BYTES_PER_SEC);
+ public final static MetricName REMOTE_FETCH_BYTES_PER_SEC_METRIC = getMetricName(
+ "kafka.server", "BrokerTopicMetrics", REMOTE_FETCH_BYTES_PER_SEC);
+ public final static MetricName REMOTE_FETCH_REQUESTS_PER_SEC_METRIC = getMetricName(
+ "kafka.server", "BrokerTopicMetrics", REMOTE_FETCH_REQUESTS_PER_SEC);
+ public final static MetricName REMOTE_COPY_REQUESTS_PER_SEC_METRIC = getMetricName(
+ "kafka.server", "BrokerTopicMetrics", REMOTE_COPY_REQUESTS_PER_SEC);
+ public final static MetricName FAILED_REMOTE_FETCH_PER_SEC_METRIC = getMetricName(
+ "kafka.server", "BrokerTopicMetrics", FAILED_REMOTE_FETCH_PER_SEC);
+ public final static MetricName FAILED_REMOTE_COPY_PER_SEC_METRIC = getMetricName(
+ "kafka.server", "BrokerTopicMetrics", FAILED_REMOTE_COPY_PER_SEC);
+ public final static MetricName REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC = getMetricName(
+ "kafka.log.remote", "RemoteLogManager", REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
+ public final static MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
+ "org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE);
+ public final static MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName(
+ "org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT);
+
+ public static Set allMetrics() {
+ Set metrics = new HashSet<>();
+ metrics.add(REMOTE_COPY_BYTES_PER_SEC_METRIC);
+ metrics.add(REMOTE_FETCH_BYTES_PER_SEC_METRIC);
+ metrics.add(REMOTE_FETCH_REQUESTS_PER_SEC_METRIC);
+ metrics.add(REMOTE_COPY_REQUESTS_PER_SEC_METRIC);
+ metrics.add(FAILED_REMOTE_FETCH_PER_SEC_METRIC);
+ metrics.add(FAILED_REMOTE_COPY_PER_SEC_METRIC);
+ metrics.add(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
+ metrics.add(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
+ metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
+
+ return metrics;
+ }
+
+ public static Set brokerTopicStatsMetrics() {
+ Set metrics = new HashSet<>();
+ metrics.add(REMOTE_COPY_BYTES_PER_SEC_METRIC);
+ metrics.add(REMOTE_FETCH_BYTES_PER_SEC_METRIC);
+ metrics.add(REMOTE_FETCH_REQUESTS_PER_SEC_METRIC);
+ metrics.add(REMOTE_COPY_REQUESTS_PER_SEC_METRIC);
+ metrics.add(FAILED_REMOTE_FETCH_PER_SEC_METRIC);
+ metrics.add(FAILED_REMOTE_COPY_PER_SEC_METRIC);
+
+ return metrics;
+ }
+ private static MetricName getMetricName(String group, String type, String name) {
+ return KafkaYammerMetrics.getMetricName(group, type, name);
+ }
+}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
index 22807a0271ba..9c903e51d4b7 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
@@ -23,28 +23,23 @@
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.slf4j.Logger;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC;
+import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC;
+import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
+
public class RemoteStorageThreadPool extends ThreadPoolExecutor {
- public static final String TASK_QUEUE_SIZE = "TaskQueueSize";
- public static final String AVG_IDLE_PERCENT = "AvgIdlePercent";
- public static final Set METRIC_SUFFIXES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(TASK_QUEUE_SIZE, AVG_IDLE_PERCENT)));
private final Logger logger;
- private final String metricsNamePrefix;
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass());
public RemoteStorageThreadPool(String threadNamePrefix,
int numThreads,
- int maxPendingTasks,
- String metricsNamePrefix) {
+ int maxPendingTasks) {
super(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(maxPendingTasks),
new RemoteStorageThreadFactory(threadNamePrefix));
logger = new LogContext() {
@@ -54,14 +49,13 @@ public String logPrefix() {
}
}.logger(RemoteStorageThreadPool.class);
- this.metricsNamePrefix = metricsNamePrefix;
- metricsGroup.newGauge(metricsNamePrefix.concat(TASK_QUEUE_SIZE), new Gauge() {
+ metricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(), new Gauge() {
@Override
public Integer value() {
return RemoteStorageThreadPool.this.getQueue().size();
}
});
- metricsGroup.newGauge(metricsNamePrefix.concat(AVG_IDLE_PERCENT), new Gauge() {
+ metricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(), new Gauge() {
@Override
public Double value() {
return 1 - (double) RemoteStorageThreadPool.this.getActiveCount() / (double) RemoteStorageThreadPool.this.getCorePoolSize();
@@ -98,6 +92,6 @@ public Thread newThread(Runnable r) {
}
public void removeMetrics() {
- METRIC_SUFFIXES.forEach(metric -> metricsGroup.removeMetric(metricsNamePrefix.concat(metric)));
+ REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(metricsGroup::removeMetric);
}
}