diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 541aa947dd..61fcf92644 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -145,6 +145,7 @@
+
diff --git a/core/src/main/java/kafka/autobalancer/LoadRetriever.java b/core/src/main/java/kafka/autobalancer/LoadRetriever.java
index e6c487ec1d..71b757af6c 100644
--- a/core/src/main/java/kafka/autobalancer/LoadRetriever.java
+++ b/core/src/main/java/kafka/autobalancer/LoadRetriever.java
@@ -374,7 +374,9 @@ public void retrieve() {
}
updateClusterModel(record.value());
}
- logger.debug("Finished consuming {} metrics from {}.", records.count(), metricReporterTopic);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Finished consuming {} metrics from {}.", records.count(), metricReporterTopic);
+ }
} catch (InvalidTopicException e) {
checkAndCreateTopic();
} catch (Exception e) {
diff --git a/core/src/main/java/kafka/autobalancer/model/ClusterModel.java b/core/src/main/java/kafka/autobalancer/model/ClusterModel.java
index 4a4b0780c3..98da80ea3e 100644
--- a/core/src/main/java/kafka/autobalancer/model/ClusterModel.java
+++ b/core/src/main/java/kafka/autobalancer/model/ClusterModel.java
@@ -13,6 +13,7 @@
import com.automq.stream.utils.LogContext;
import kafka.autobalancer.common.AutoBalancerConstants;
+import kafka.log.stream.s3.telemetry.metrics.S3StreamKafkaMetricsManager;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.Utils;
@@ -21,7 +22,9 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -42,6 +45,7 @@ public class ClusterModel {
private final Map> brokerReplicaMap = new HashMap<>();
private final Map idToTopicNameMap = new HashMap<>();
private final Map> topicPartitionReplicaMap = new HashMap<>();
+ private final Map brokerMetricTimeMap = new ConcurrentHashMap<>();
public ClusterModel() {
this(null);
@@ -52,6 +56,21 @@ public ClusterModel(LogContext logContext) {
logContext = new LogContext("[ClusterModel]");
}
logger = logContext.logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ);
+ S3StreamKafkaMetricsManager.setAutoBalancerMetricsTimeMapSupplier(() -> {
+ clusterLock.lock();
+ try {
+ Map tmpMap = new HashMap<>();
+ for (Map.Entry entry : brokerMetricTimeMap.entrySet()) {
+ BrokerUpdater brokerUpdater = brokerMap.get(entry.getKey());
+ if (brokerUpdater != null && brokerUpdater.isValidInstance()) {
+ tmpMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return tmpMap;
+ } finally {
+ clusterLock.unlock();
+ }
+ });
}
public ClusterModelSnapshot snapshot() {
@@ -123,7 +142,16 @@ public boolean updateBrokerMetrics(int brokerId, Map metricsMap, l
clusterLock.unlock();
}
if (brokerUpdater != null) {
- return brokerUpdater.update(metricsMap, time);
+ boolean ret = brokerUpdater.update(metricsMap, time);
+ if (ret) {
+ brokerMetricTimeMap.compute(brokerId, (k, v) -> {
+ if (v == null) {
+ return time;
+ }
+ return Math.max(v, time);
+ });
+ }
+ return ret;
}
return false;
}
@@ -140,7 +168,14 @@ public boolean updateTopicPartitionMetrics(int brokerId, TopicPartition tp, Map<
clusterLock.unlock();
}
if (replicaUpdater != null) {
- return replicaUpdater.update(metricsMap, time);
+ boolean ret = replicaUpdater.update(metricsMap, time);
+ brokerMetricTimeMap.compute(brokerId, (k, v) -> {
+ if (v == null) {
+ return time;
+ }
+ return Math.max(v, time);
+ });
+ return ret;
}
return false;
}
@@ -158,6 +193,7 @@ public void registerBroker(int brokerId, String rackId) {
brokerIdToRackMap.putIfAbsent(brokerId, rackId);
brokerMap.putIfAbsent(brokerId, brokerUpdater);
brokerReplicaMap.put(brokerId, new HashMap<>());
+ brokerMetricTimeMap.put(brokerId, 0L);
} finally {
clusterLock.unlock();
}
@@ -173,6 +209,7 @@ public void unregisterBroker(int brokerId) {
brokerIdToRackMap.remove(brokerId);
brokerMap.remove(brokerId);
brokerReplicaMap.remove(brokerId);
+ brokerMetricTimeMap.remove(brokerId);
} finally {
clusterLock.unlock();
}
@@ -183,6 +220,12 @@ public void changeBrokerStatus(int brokerId, boolean active) {
try {
brokerMap.computeIfPresent(brokerId, (id, brokerUpdater) -> {
brokerUpdater.setActive(active);
+ brokerMetricTimeMap.compute(brokerId, (k, v) -> {
+ if (!active) {
+ return null;
+ }
+ return Objects.requireNonNullElse(v, 0L);
+ });
return brokerUpdater;
});
} finally {
diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java
index 7a6ad2b077..cddb0f09a9 100644
--- a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java
+++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java
@@ -48,6 +48,7 @@
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.semconv.ResourceAttributes;
+import kafka.log.stream.s3.telemetry.metrics.S3StreamKafkaMetricsManager;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import org.apache.commons.lang3.StringUtils;
@@ -140,6 +141,9 @@ private void init() {
Meter meter = openTelemetrySdk.getMeter(TelemetryConstants.TELEMETRY_SCOPE_NAME);
S3StreamMetricsManager.configure(new MetricsConfig(metricsLevel(), Attributes.empty()));
S3StreamMetricsManager.initMetrics(meter, TelemetryConstants.KAFKA_METRICS_PREFIX);
+
+ S3StreamKafkaMetricsManager.configure(new MetricsConfig(metricsLevel(), Attributes.empty()));
+ S3StreamKafkaMetricsManager.initMetrics(meter, TelemetryConstants.KAFKA_METRICS_PREFIX);
}
LOGGER.info("Instrument manager initialized with metrics: {} (level: {}), trace: {} report interval: {}",
diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/metrics/MultiAttributes.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/metrics/MultiAttributes.java
new file mode 100644
index 0000000000..c28e7f57f5
--- /dev/null
+++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/metrics/MultiAttributes.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2024, AutoMQ CO.,LTD.
+ *
+ * Use of this software is governed by the Business Source License
+ * included in the file BSL.md
+ *
+ * As of the Change Date specified in that file, in accordance with
+ * the Business Source License, use of this software will be governed
+ * by the Apache License, Version 2.0
+ */
+
+package kafka.log.stream.s3.telemetry.metrics;
+
+import com.automq.stream.s3.metrics.MetricsConfig;
+import com.automq.stream.s3.metrics.wrapper.ConfigListener;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class MultiAttributes implements ConfigListener {
+ private final Map attributesMap = new ConcurrentHashMap<>();
+ private final AttributeKey keyName;
+ private Attributes baseAttributes;
+
+ public MultiAttributes(Attributes baseAttributes, AttributeKey keyName) {
+ this.baseAttributes = baseAttributes;
+ this.keyName = keyName;
+ }
+
+ public Attributes get(K key) {
+ return attributesMap.computeIfAbsent(key, k -> buildAttributes(baseAttributes, Attributes.of(keyName, key)));
+ }
+
+ private Attributes buildAttributes(Attributes baseAttributes, Attributes attributes) {
+ return Attributes.builder().putAll(baseAttributes).putAll(attributes).build();
+ }
+
+ private void reBuildAttributes(Attributes baseAttributes) {
+ for (Map.Entry entry : attributesMap.entrySet()) {
+ attributesMap.replace(entry.getKey(), buildAttributes(baseAttributes, entry.getValue()));
+ }
+ }
+
+ @Override
+ public void onConfigChange(MetricsConfig metricsConfig) {
+ this.baseAttributes = metricsConfig.getBaseAttributes();
+ reBuildAttributes(metricsConfig.getBaseAttributes());
+ }
+}
diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/metrics/S3StreamKafkaMetricsConstants.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/metrics/S3StreamKafkaMetricsConstants.java
new file mode 100644
index 0000000000..3c341aec4b
--- /dev/null
+++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/metrics/S3StreamKafkaMetricsConstants.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2024, AutoMQ CO.,LTD.
+ *
+ * Use of this software is governed by the Business Source License
+ * included in the file BSL.md
+ *
+ * As of the Change Date specified in that file, in accordance with
+ * the Business Source License, use of this software will be governed
+ * by the Apache License, Version 2.0
+ */
+
+package kafka.log.stream.s3.telemetry.metrics;
+
+import io.opentelemetry.api.common.AttributeKey;
+
+public class S3StreamKafkaMetricsConstants {
+ public static final String AUTO_BALANCER_METRICS_TIME_DELAY_METRIC_NAME = "auto_balancer_metrics_time_delay";
+ public static final AttributeKey LABEL_NODE_ID = AttributeKey.stringKey("node_id");
+}
diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/metrics/S3StreamKafkaMetricsManager.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/metrics/S3StreamKafkaMetricsManager.java
new file mode 100644
index 0000000000..26df5c2429
--- /dev/null
+++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/metrics/S3StreamKafkaMetricsManager.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2024, AutoMQ CO.,LTD.
+ *
+ * Use of this software is governed by the Business Source License
+ * included in the file BSL.md
+ *
+ * As of the Change Date specified in that file, in accordance with
+ * the Business Source License, use of this software will be governed
+ * by the Apache License, Version 2.0
+ */
+
+package kafka.log.stream.s3.telemetry.metrics;
+
+import com.automq.stream.s3.metrics.MetricsConfig;
+import com.automq.stream.s3.metrics.MetricsLevel;
+import com.automq.stream.s3.metrics.NoopObservableLongGauge;
+import com.automq.stream.s3.metrics.wrapper.ConfigListener;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongGauge;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+public class S3StreamKafkaMetricsManager {
+
+ private static final List BASE_ATTRIBUTES_LISTENERS = new ArrayList<>();
+ private static final MultiAttributes BROKER_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
+ S3StreamKafkaMetricsConstants.LABEL_NODE_ID);
+
+ static {
+ BASE_ATTRIBUTES_LISTENERS.add(BROKER_ATTRIBUTES);
+ }
+
+ private static ObservableLongGauge autoBalancerMetricsTimeDelay = new NoopObservableLongGauge();
+ private static Supplier