Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
<allow pkg="kafka.test" />
<allow pkg="org.apache.directory" />
<allow pkg="org.mockito" />
<allow pkg="kafka.log" />
</subpackage>

<subpackage name="s3shell">
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/kafka/autobalancer/LoadRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,9 @@ public void retrieve() {
}
updateClusterModel(record.value());
}
logger.debug("Finished consuming {} metrics from {}.", records.count(), metricReporterTopic);
if (logger.isDebugEnabled()) {
logger.debug("Finished consuming {} metrics from {}.", records.count(), metricReporterTopic);
}
} catch (InvalidTopicException e) {
checkAndCreateTopic();
} catch (Exception e) {
Expand Down
47 changes: 45 additions & 2 deletions core/src/main/java/kafka/autobalancer/model/ClusterModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import com.automq.stream.utils.LogContext;
import kafka.autobalancer.common.AutoBalancerConstants;
import kafka.log.stream.s3.telemetry.metrics.S3StreamKafkaMetricsManager;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.Utils;
Expand All @@ -21,7 +22,9 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -42,6 +45,7 @@ public class ClusterModel {
private final Map<Integer, Map<TopicPartition, TopicPartitionReplicaUpdater>> brokerReplicaMap = new HashMap<>();
private final Map<Uuid, String> idToTopicNameMap = new HashMap<>();
private final Map<String, Map<Integer, Integer>> topicPartitionReplicaMap = new HashMap<>();
private final Map<Integer, Long> brokerMetricTimeMap = new ConcurrentHashMap<>();

public ClusterModel() {
this(null);
Expand All @@ -52,6 +56,21 @@ public ClusterModel(LogContext logContext) {
logContext = new LogContext("[ClusterModel]");
}
logger = logContext.logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ);
S3StreamKafkaMetricsManager.setAutoBalancerMetricsTimeMapSupplier(() -> {
clusterLock.lock();
try {
Map<Integer, Long> tmpMap = new HashMap<>();
for (Map.Entry<Integer, Long> entry : brokerMetricTimeMap.entrySet()) {
BrokerUpdater brokerUpdater = brokerMap.get(entry.getKey());
if (brokerUpdater != null && brokerUpdater.isValidInstance()) {
tmpMap.put(entry.getKey(), entry.getValue());
}
}
return tmpMap;
} finally {
clusterLock.unlock();
}
});
}

public ClusterModelSnapshot snapshot() {
Expand Down Expand Up @@ -123,7 +142,16 @@ public boolean updateBrokerMetrics(int brokerId, Map<Byte, Double> metricsMap, l
clusterLock.unlock();
}
if (brokerUpdater != null) {
return brokerUpdater.update(metricsMap, time);
boolean ret = brokerUpdater.update(metricsMap, time);
if (ret) {
brokerMetricTimeMap.compute(brokerId, (k, v) -> {
if (v == null) {
return time;
}
return Math.max(v, time);
});
}
return ret;
}
return false;
}
Expand All @@ -140,7 +168,14 @@ public boolean updateTopicPartitionMetrics(int brokerId, TopicPartition tp, Map<
clusterLock.unlock();
}
if (replicaUpdater != null) {
return replicaUpdater.update(metricsMap, time);
boolean ret = replicaUpdater.update(metricsMap, time);
brokerMetricTimeMap.compute(brokerId, (k, v) -> {
if (v == null) {
return time;
}
return Math.max(v, time);
});
return ret;
}
return false;
}
Expand All @@ -158,6 +193,7 @@ public void registerBroker(int brokerId, String rackId) {
brokerIdToRackMap.putIfAbsent(brokerId, rackId);
brokerMap.putIfAbsent(brokerId, brokerUpdater);
brokerReplicaMap.put(brokerId, new HashMap<>());
brokerMetricTimeMap.put(brokerId, 0L);
} finally {
clusterLock.unlock();
}
Expand All @@ -173,6 +209,7 @@ public void unregisterBroker(int brokerId) {
brokerIdToRackMap.remove(brokerId);
brokerMap.remove(brokerId);
brokerReplicaMap.remove(brokerId);
brokerMetricTimeMap.remove(brokerId);
} finally {
clusterLock.unlock();
}
Expand All @@ -183,6 +220,12 @@ public void changeBrokerStatus(int brokerId, boolean active) {
try {
brokerMap.computeIfPresent(brokerId, (id, brokerUpdater) -> {
brokerUpdater.setActive(active);
brokerMetricTimeMap.compute(brokerId, (k, v) -> {
if (!active) {
return null;
}
return Objects.requireNonNullElse(v, 0L);
});
return brokerUpdater;
});
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.semconv.ResourceAttributes;
import kafka.log.stream.s3.telemetry.metrics.S3StreamKafkaMetricsManager;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -140,6 +141,9 @@ private void init() {
Meter meter = openTelemetrySdk.getMeter(TelemetryConstants.TELEMETRY_SCOPE_NAME);
S3StreamMetricsManager.configure(new MetricsConfig(metricsLevel(), Attributes.empty()));
S3StreamMetricsManager.initMetrics(meter, TelemetryConstants.KAFKA_METRICS_PREFIX);

S3StreamKafkaMetricsManager.configure(new MetricsConfig(metricsLevel(), Attributes.empty()));
S3StreamKafkaMetricsManager.initMetrics(meter, TelemetryConstants.KAFKA_METRICS_PREFIX);
}

LOGGER.info("Instrument manager initialized with metrics: {} (level: {}), trace: {} report interval: {}",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.log.stream.s3.telemetry.metrics;

import com.automq.stream.s3.metrics.MetricsConfig;
import com.automq.stream.s3.metrics.wrapper.ConfigListener;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MultiAttributes<K> implements ConfigListener {
private final Map<K, Attributes> attributesMap = new ConcurrentHashMap<>();
private final AttributeKey<K> keyName;
private Attributes baseAttributes;

public MultiAttributes(Attributes baseAttributes, AttributeKey<K> keyName) {
this.baseAttributes = baseAttributes;
this.keyName = keyName;
}

public Attributes get(K key) {
return attributesMap.computeIfAbsent(key, k -> buildAttributes(baseAttributes, Attributes.of(keyName, key)));
}

private Attributes buildAttributes(Attributes baseAttributes, Attributes attributes) {
return Attributes.builder().putAll(baseAttributes).putAll(attributes).build();
}

private void reBuildAttributes(Attributes baseAttributes) {
for (Map.Entry<K, Attributes> entry : attributesMap.entrySet()) {
attributesMap.replace(entry.getKey(), buildAttributes(baseAttributes, entry.getValue()));
}
}

@Override
public void onConfigChange(MetricsConfig metricsConfig) {
this.baseAttributes = metricsConfig.getBaseAttributes();
reBuildAttributes(metricsConfig.getBaseAttributes());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.log.stream.s3.telemetry.metrics;

import io.opentelemetry.api.common.AttributeKey;

public class S3StreamKafkaMetricsConstants {
public static final String AUTO_BALANCER_METRICS_TIME_DELAY_METRIC_NAME = "auto_balancer_metrics_time_delay";
public static final AttributeKey<String> LABEL_NODE_ID = AttributeKey.stringKey("node_id");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.log.stream.s3.telemetry.metrics;

import com.automq.stream.s3.metrics.MetricsConfig;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.NoopObservableLongGauge;
import com.automq.stream.s3.metrics.wrapper.ConfigListener;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

public class S3StreamKafkaMetricsManager {

private static final List<ConfigListener> BASE_ATTRIBUTES_LISTENERS = new ArrayList<>();
private static final MultiAttributes<String> BROKER_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
S3StreamKafkaMetricsConstants.LABEL_NODE_ID);

static {
BASE_ATTRIBUTES_LISTENERS.add(BROKER_ATTRIBUTES);
}

private static ObservableLongGauge autoBalancerMetricsTimeDelay = new NoopObservableLongGauge();
private static Supplier<Map<Integer, Long>> autoBalancerMetricsTimeMapSupplier = Collections::emptyMap;
private static MetricsConfig metricsConfig = new MetricsConfig(MetricsLevel.INFO, Attributes.empty());

public static void configure(MetricsConfig metricsConfig) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
S3StreamKafkaMetricsManager.metricsConfig = metricsConfig;
for (ConfigListener listener : BASE_ATTRIBUTES_LISTENERS) {
listener.onConfigChange(metricsConfig);
}
}
}

public static void initMetrics(Meter meter, String prefix) {
autoBalancerMetricsTimeDelay = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.AUTO_BALANCER_METRICS_TIME_DELAY_METRIC_NAME)
.setDescription("The time delay of auto balancer metrics per broker")
.setUnit("ms")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
Map<Integer, Long> metricsTimeDelayMap = autoBalancerMetricsTimeMapSupplier.get();
for (Map.Entry<Integer, Long> entry : metricsTimeDelayMap.entrySet()) {
long timestamp = entry.getValue();
long delay = timestamp == 0 ? -1 : System.currentTimeMillis() - timestamp;
result.record(delay, BROKER_ATTRIBUTES.get(String.valueOf(entry.getKey())));
}
}
});
}

public static void setAutoBalancerMetricsTimeMapSupplier(Supplier<Map<Integer, Long>> autoBalancerMetricsTimeMapSupplier) {
S3StreamKafkaMetricsManager.autoBalancerMetricsTimeMapSupplier = autoBalancerMetricsTimeMapSupplier;
}
}
Loading