diff --git a/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java index 55b144e536..94b3695aeb 100644 --- a/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java +++ b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.ConfigUtils; +import org.apache.kafka.controller.es.ClusterLoads; import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager; import java.util.ArrayList; @@ -70,12 +71,14 @@ public class AnomalyDetector extends AbstractResumableService { this.executionIntervalMs = executionIntervalMs; this.clusterModel = clusterModel; this.actionExecutor = actionExecutor; - this.executorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("anomaly-detector")); + this.executorService = Executors.newScheduledThreadPool(2, new AutoBalancerThreadFactory("anomaly-detector")); this.goalsByPriority = goals; Collections.sort(this.goalsByPriority); this.excludedBrokers = excludedBrokers; + ClusterLoads.getInstance().updateExcludedBrokers(this.excludedBrokers); this.excludedTopics = excludedTopics; this.executorService.schedule(this::detect, detectInterval, TimeUnit.MILLISECONDS); + this.executorService.scheduleAtFixedRate(() -> clusterModel.updateClusterLoad(maxTolerateMetricsDelayMs), 30, 30, TimeUnit.SECONDS); S3StreamKafkaMetricsManager.setSlowBrokerSupplier(() -> this.slowBrokers); logger.info("detectInterval: {}ms, executionConcurrency: {}, executionIntervalMs: {}ms, goals: {}, excluded brokers: {}, excluded topics: {}", this.detectInterval, this.executionConcurrency, this.executionIntervalMs, this.goalsByPriority, this.excludedBrokers, this.excludedTopics); @@ -182,6 +185,7 @@ public void reconfigure(Map configs) { AutoBalancerControllerConfig tmp = new AutoBalancerControllerConfig(configs, false); this.excludedBrokers = tmp.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS) .stream().map(Integer::parseInt).collect(Collectors.toSet()); + ClusterLoads.getInstance().updateExcludedBrokers(this.excludedBrokers); } if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS)) { AutoBalancerControllerConfig tmp = new AutoBalancerControllerConfig(configs, false); diff --git a/core/src/main/java/kafka/autobalancer/model/ClusterModel.java b/core/src/main/java/kafka/autobalancer/model/ClusterModel.java index dc83221c8c..569af4a299 100644 --- a/core/src/main/java/kafka/autobalancer/model/ClusterModel.java +++ b/core/src/main/java/kafka/autobalancer/model/ClusterModel.java @@ -13,6 +13,8 @@ import com.automq.stream.utils.LogContext; import kafka.autobalancer.common.AutoBalancerConstants; +import kafka.autobalancer.common.types.Resource; +import org.apache.kafka.controller.es.ClusterLoads; import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -148,6 +150,52 @@ public ClusterModelSnapshot snapshot(Set excludedBrokerIds, Set return snapshot; } + public void updateClusterLoad(long maxToleratedMetricsDelay) { + clusterLock.lock(); + try { + Map brokerLoads = new HashMap<>(); + Map tpLoads = new HashMap<>(); + boolean invalid = false; + long now = System.currentTimeMillis(); + for (Map.Entry> entry : brokerReplicaMap.entrySet()) { + int brokerId = entry.getKey(); + for (Map.Entry tpEntry : entry.getValue().entrySet()) { + TopicPartition tp = tpEntry.getKey(); + TopicPartitionReplicaUpdater replicaUpdater = tpEntry.getValue(); + if (!replicaUpdater.isValidInstance()) { + continue; + } + TopicPartitionReplicaUpdater.TopicPartitionReplica replica = + (TopicPartitionReplicaUpdater.TopicPartitionReplica) replicaUpdater.get(now - maxToleratedMetricsDelay); + if (replica == null) { + invalid = true; + brokerLoads = null; + tpLoads = null; + break; + } + tpLoads.put(tp, partitionLoad(replica)); + brokerLoads.compute(brokerId, (id, load) -> { + if (load == null) { + return partitionLoad(replica); + } + return load + partitionLoad(replica); + }); + } + if (invalid) { + break; + } + } + ClusterLoads.getInstance().updateBrokerLoads(brokerLoads); + ClusterLoads.getInstance().updatePartitionLoads(tpLoads); + } finally { + clusterLock.unlock(); + } + } + + protected double partitionLoad(TopicPartitionReplicaUpdater.TopicPartitionReplica replica) { + return replica.loadValue(Resource.NW_IN) + replica.loadValue(Resource.NW_OUT); + } + private void accumulateLoads(Map totalLoads, TopicPartitionReplicaUpdater.TopicPartitionReplica replica) { for (Map.Entry load : replica.getLoads().entrySet()) { byte resource = load.getKey(); diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index e42c424ed9..02629315f7 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -826,9 +826,10 @@ public void replay(UpdateNextNodeIdRecord record) { nextNodeId.set(record.nodeId()); } - public List getActiveBrokers() { + public List getActiveBrokers() { return brokerRegistrations.values().stream() - .filter(broker -> isActive(broker.id())) + .map(BrokerRegistration::id) + .filter(this::isActive) .collect(Collectors.toList()); } // AutoMQ inject end diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index e5cac250cc..3945fdd497 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -95,8 +95,8 @@ import org.apache.kafka.controller.es.AutoMQCreateTopicPolicy; import org.apache.kafka.controller.es.CreatePartitionPolicy; import org.apache.kafka.controller.es.ElasticCreatePartitionPolicy; +import org.apache.kafka.controller.es.LoadAwarePartitionLeaderSelector; import org.apache.kafka.controller.es.PartitionLeaderSelector; -import org.apache.kafka.controller.es.RandomPartitionLeaderSelector; import org.apache.kafka.controller.stream.TopicDeletion; import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.metadata.BrokerHeartbeatReply; @@ -2047,11 +2047,12 @@ void generateLeaderAndIsrUpdates(String context, builder.setTargetNode(brokerToAdd); } else { if (partitionLeaderSelector == null) { - partitionLeaderSelector = new RandomPartitionLeaderSelector(clusterControl.getActiveBrokers()); + partitionLeaderSelector = new LoadAwarePartitionLeaderSelector(clusterControl.getActiveBrokers(), + brokerId -> brokerId != brokerToRemove); } partitionLeaderSelector - .select(partition, br -> br.id() != brokerToRemove) - .ifPresent(broker -> builder.setTargetNode(broker.id())); + .select(new TopicPartition(topic.name(), topicIdPart.partitionId())) + .ifPresent(builder::setTargetNode); } if (fencing) { TopicPartition topicPartition = new TopicPartition(topic.name(), topicIdPart.partitionId()); diff --git a/metadata/src/main/java/org/apache/kafka/controller/es/ClusterLoads.java b/metadata/src/main/java/org/apache/kafka/controller/es/ClusterLoads.java new file mode 100644 index 0000000000..22fa6d022d --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/es/ClusterLoads.java @@ -0,0 +1,69 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package org.apache.kafka.controller.es; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Map; +import java.util.Set; + +public class ClusterLoads { + public static final double INVALID = -1; + private static final long EXPIRE_TIME_MS = 60000; // 1 minute + private volatile static ClusterLoads instance = null; + private volatile Set excludedBrokers; + private volatile Map brokerLoads; + private volatile Map partitionLoads; + private volatile long lastUpdateTime = 0; + + private ClusterLoads() { + } + + public static ClusterLoads getInstance() { + if (instance == null) { + synchronized (ClusterLoads.class) { + if (instance == null) { + instance = new ClusterLoads(); + } + } + } + return instance; + } + + public void updateExcludedBrokers(Set excludedBrokers) { + this.excludedBrokers = excludedBrokers; + } + + public Set excludedBrokers() { + return excludedBrokers; + } + + public Map brokerLoads() { + if (System.currentTimeMillis() - lastUpdateTime > EXPIRE_TIME_MS) { + return null; + } + return brokerLoads; + } + + public double partitionLoad(TopicPartition tp) { + return partitionLoads.getOrDefault(tp, INVALID); + } + + public void updateBrokerLoads(Map brokerLoads) { + this.brokerLoads = brokerLoads; + this.lastUpdateTime = System.currentTimeMillis(); + } + + public void updatePartitionLoads(Map partitionLoads) { + this.partitionLoads = partitionLoads; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/es/LoadAwarePartitionLeaderSelector.java b/metadata/src/main/java/org/apache/kafka/controller/es/LoadAwarePartitionLeaderSelector.java new file mode 100644 index 0000000000..9927509915 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/es/LoadAwarePartitionLeaderSelector.java @@ -0,0 +1,106 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package org.apache.kafka.controller.es; + +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.function.Predicate; + +public class LoadAwarePartitionLeaderSelector implements PartitionLeaderSelector { + private static final Logger LOGGER = LoggerFactory.getLogger(LoadAwarePartitionLeaderSelector.class); + private final PriorityQueue brokerLoads; + private final RandomPartitionLeaderSelector randomSelector; + private final Map brokerLoadMap; + + public LoadAwarePartitionLeaderSelector(List aliveBrokers, Predicate brokerPredicate) { + Set excludedBrokers = ClusterLoads.getInstance().excludedBrokers(); + if (excludedBrokers == null) { + excludedBrokers = new HashSet<>(); + } + List availableBrokers = new ArrayList<>(); + for (int broker : aliveBrokers) { + if (!excludedBrokers.contains(broker)) { + availableBrokers.add(broker); + } + } + brokerLoadMap = ClusterLoads.getInstance().brokerLoads(); + if (brokerLoadMap == null) { + this.brokerLoads = null; + LOGGER.warn("No broker loads available, using random partition leader selector"); + } else { + this.brokerLoads = new PriorityQueue<>(); + for (int brokerId : availableBrokers) { + if (!brokerPredicate.test(brokerId)) { + continue; + } + brokerLoads.offer(new BrokerLoad(brokerId, brokerLoadMap.getOrDefault(brokerId, 0.0))); + } + } + this.randomSelector = new RandomPartitionLeaderSelector(availableBrokers, brokerPredicate); + } + + @Override + public Optional select(TopicPartition tp) { + if (this.brokerLoads == null || brokerLoads.isEmpty()) { + return randomSelector.select(tp); + } + double tpLoad = ClusterLoads.getInstance().partitionLoad(tp); + if (tpLoad == ClusterLoads.INVALID) { + return randomSelector.select(tp); + } + BrokerLoad candidate = brokerLoads.poll(); + if (candidate == null) { + return randomSelector.select(tp); + } + double load = candidate.load() + tpLoad; + candidate.setLoad(load); + brokerLoadMap.put(candidate.brokerId(), load); + brokerLoads.offer(candidate); + return Optional.of(candidate.brokerId()); + } + + public static class BrokerLoad implements Comparable { + private final int brokerId; + private double load; + + public BrokerLoad(int brokerId, double load) { + this.brokerId = brokerId; + this.load = load; + } + + public int brokerId() { + return brokerId; + } + + public double load() { + return load; + } + + public void setLoad(double load) { + this.load = load; + } + + @Override + public int compareTo(BrokerLoad o) { + return Double.compare(load, o.load); + } + } +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/es/PartitionLeaderSelector.java b/metadata/src/main/java/org/apache/kafka/controller/es/PartitionLeaderSelector.java index 8e7d77f120..7990b34ac5 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/es/PartitionLeaderSelector.java +++ b/metadata/src/main/java/org/apache/kafka/controller/es/PartitionLeaderSelector.java @@ -17,22 +17,14 @@ package org.apache.kafka.controller.es; -import org.apache.kafka.metadata.BrokerRegistration; -import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.common.TopicPartition; import java.util.Optional; -import java.util.function.Predicate; public interface PartitionLeaderSelector { /** * Select a leader for the given partition. */ - Optional select(PartitionRegistration partition, Predicate predicate); - - - default Optional select(PartitionRegistration partition) { - return select(partition, br -> true); - } - + Optional select(TopicPartition tp); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/es/RandomPartitionLeaderSelector.java b/metadata/src/main/java/org/apache/kafka/controller/es/RandomPartitionLeaderSelector.java index cb4190e54a..c136938924 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/es/RandomPartitionLeaderSelector.java +++ b/metadata/src/main/java/org/apache/kafka/controller/es/RandomPartitionLeaderSelector.java @@ -17,33 +17,30 @@ package org.apache.kafka.controller.es; -import org.apache.kafka.metadata.BrokerRegistration; -import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.common.TopicPartition; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.function.Predicate; +import java.util.stream.Collectors; public class RandomPartitionLeaderSelector implements PartitionLeaderSelector { - private final List aliveBrokers; + private final List aliveBrokers; private int selectNextIndex = 0; - public RandomPartitionLeaderSelector(List aliveBrokers) { - this.aliveBrokers = new ArrayList<>(aliveBrokers); + public RandomPartitionLeaderSelector(List aliveBrokers, Predicate predicate) { + this.aliveBrokers = aliveBrokers.stream().filter(predicate).collect(Collectors.toList()); Collections.shuffle(this.aliveBrokers); } @Override - public Optional select(PartitionRegistration partition, Predicate predicate) { - for (int i = 0; i < aliveBrokers.size(); i++) { - BrokerRegistration broker = aliveBrokers.get(selectNextIndex); - selectNextIndex = (selectNextIndex + 1) % aliveBrokers.size(); - if (predicate.test(broker)) { - return Optional.of(broker); - } + public Optional select(TopicPartition tp) { + if (aliveBrokers.isEmpty()) { + return Optional.empty(); } - return Optional.empty(); + int broker = aliveBrokers.get(selectNextIndex); + selectNextIndex = (selectNextIndex + 1) % aliveBrokers.size(); + return Optional.of(broker); } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/es/LoadAwarePartitionLeaderSelectorTest.java b/metadata/src/test/java/org/apache/kafka/controller/es/LoadAwarePartitionLeaderSelectorTest.java new file mode 100644 index 0000000000..3800ccb595 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/controller/es/LoadAwarePartitionLeaderSelectorTest.java @@ -0,0 +1,105 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package org.apache.kafka.controller.es; + +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class LoadAwarePartitionLeaderSelectorTest { + + @Test + public void testLoadAwarePartitionLeaderSelector() { + List aliveBrokers = List.of(0, 1, 2, 3, 4, 5); + Set brokerSet = new HashSet<>(aliveBrokers); + int brokerToRemove = 5; + LoadAwarePartitionLeaderSelector loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, broker -> broker != brokerToRemove); + + // fallback to random selector + int brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 0)).orElse(-1); + Assertions.assertTrue(brokerSet.contains(brokerId)); + Assertions.assertTrue(brokerId != brokerToRemove); + + // load aware selector + Map brokerLoads = setUpCluster(); + loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, broker -> broker != brokerToRemove); + + brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 0)).orElse(-1); + Assertions.assertTrue(brokerSet.contains(brokerId)); + Assertions.assertEquals(0, brokerId); + brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 1)).orElse(-1); + Assertions.assertTrue(brokerSet.contains(brokerId)); + Assertions.assertEquals(0, brokerId); + brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 2)).orElse(-1); + Assertions.assertTrue(brokerSet.contains(brokerId)); + Assertions.assertEquals(1, brokerId); + brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 3)).orElse(-1); + Assertions.assertTrue(brokerSet.contains(brokerId)); + Assertions.assertEquals(0, brokerId); + + Assertions.assertEquals(35.0, brokerLoads.get(0)); + Assertions.assertEquals(25.0, brokerLoads.get(1)); + Assertions.assertEquals(20.0, brokerLoads.get(2)); + Assertions.assertEquals(30.0, brokerLoads.get(3)); + Assertions.assertEquals(40.0, brokerLoads.get(4)); + Assertions.assertEquals(50.0, brokerLoads.get(5)); + + // tests exclude broker + brokerLoads = setUpCluster(); + ClusterLoads.getInstance().updateExcludedBrokers(Set.of(1)); + loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, broker -> broker != brokerToRemove); + + brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 0)).orElse(-1); + Assertions.assertTrue(brokerSet.contains(brokerId)); + Assertions.assertEquals(0, brokerId); + brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 1)).orElse(-1); + Assertions.assertTrue(brokerSet.contains(brokerId)); + Assertions.assertEquals(0, brokerId); + brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 2)).orElse(-1); + Assertions.assertTrue(brokerSet.contains(brokerId)); + Assertions.assertEquals(0, brokerId); + brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 3)).orElse(-1); + Assertions.assertTrue(brokerSet.contains(brokerId)); + Assertions.assertEquals(2, brokerId); + + Assertions.assertEquals(30.0, brokerLoads.get(0)); + Assertions.assertEquals(10.0, brokerLoads.get(1)); + Assertions.assertEquals(40.0, brokerLoads.get(2)); + Assertions.assertEquals(30.0, brokerLoads.get(3)); + Assertions.assertEquals(40.0, brokerLoads.get(4)); + Assertions.assertEquals(50.0, brokerLoads.get(5)); + } + + private Map setUpCluster() { + Map brokerLoads = new HashMap<>(); + brokerLoads.put(1, 10.0); + brokerLoads.put(2, 20.0); + brokerLoads.put(3, 30.0); + brokerLoads.put(4, 40.0); + brokerLoads.put(5, 50.0); + Map partitionLoads = Map.of( + new TopicPartition("topic", 0), 5.0, + new TopicPartition("topic", 1), 10.0, + new TopicPartition("topic", 2), 15.0, + new TopicPartition("topic", 3), 20.0 + ); + ClusterLoads.getInstance().updateBrokerLoads(brokerLoads); + ClusterLoads.getInstance().updatePartitionLoads(partitionLoads); + return brokerLoads; + } +}