Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.ConfigUtils;
import org.apache.kafka.controller.es.ClusterLoads;
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager;

import java.util.ArrayList;
Expand Down Expand Up @@ -70,12 +71,14 @@ public class AnomalyDetector extends AbstractResumableService {
this.executionIntervalMs = executionIntervalMs;
this.clusterModel = clusterModel;
this.actionExecutor = actionExecutor;
this.executorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("anomaly-detector"));
this.executorService = Executors.newScheduledThreadPool(2, new AutoBalancerThreadFactory("anomaly-detector"));
this.goalsByPriority = goals;
Collections.sort(this.goalsByPriority);
this.excludedBrokers = excludedBrokers;
ClusterLoads.getInstance().updateExcludedBrokers(this.excludedBrokers);
this.excludedTopics = excludedTopics;
this.executorService.schedule(this::detect, detectInterval, TimeUnit.MILLISECONDS);
this.executorService.scheduleAtFixedRate(() -> clusterModel.updateClusterLoad(maxTolerateMetricsDelayMs), 30, 30, TimeUnit.SECONDS);
S3StreamKafkaMetricsManager.setSlowBrokerSupplier(() -> this.slowBrokers);
logger.info("detectInterval: {}ms, executionConcurrency: {}, executionIntervalMs: {}ms, goals: {}, excluded brokers: {}, excluded topics: {}",
this.detectInterval, this.executionConcurrency, this.executionIntervalMs, this.goalsByPriority, this.excludedBrokers, this.excludedTopics);
Expand Down Expand Up @@ -182,6 +185,7 @@ public void reconfigure(Map<String, Object> configs) {
AutoBalancerControllerConfig tmp = new AutoBalancerControllerConfig(configs, false);
this.excludedBrokers = tmp.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS)
.stream().map(Integer::parseInt).collect(Collectors.toSet());
ClusterLoads.getInstance().updateExcludedBrokers(this.excludedBrokers);
}
if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS)) {
AutoBalancerControllerConfig tmp = new AutoBalancerControllerConfig(configs, false);
Expand Down
48 changes: 48 additions & 0 deletions core/src/main/java/kafka/autobalancer/model/ClusterModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

import com.automq.stream.utils.LogContext;
import kafka.autobalancer.common.AutoBalancerConstants;
import kafka.autobalancer.common.types.Resource;
import org.apache.kafka.controller.es.ClusterLoads;
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
Expand Down Expand Up @@ -148,6 +150,52 @@ public ClusterModelSnapshot snapshot(Set<Integer> excludedBrokerIds, Set<String>
return snapshot;
}

public void updateClusterLoad(long maxToleratedMetricsDelay) {
clusterLock.lock();
try {
Map<Integer, Double> brokerLoads = new HashMap<>();
Map<TopicPartition, Double> tpLoads = new HashMap<>();
boolean invalid = false;
long now = System.currentTimeMillis();
for (Map.Entry<Integer, Map<TopicPartition, TopicPartitionReplicaUpdater>> entry : brokerReplicaMap.entrySet()) {
int brokerId = entry.getKey();
for (Map.Entry<TopicPartition, TopicPartitionReplicaUpdater> tpEntry : entry.getValue().entrySet()) {
TopicPartition tp = tpEntry.getKey();
TopicPartitionReplicaUpdater replicaUpdater = tpEntry.getValue();
if (!replicaUpdater.isValidInstance()) {
continue;
}
TopicPartitionReplicaUpdater.TopicPartitionReplica replica =
(TopicPartitionReplicaUpdater.TopicPartitionReplica) replicaUpdater.get(now - maxToleratedMetricsDelay);
if (replica == null) {
invalid = true;
brokerLoads = null;
tpLoads = null;
break;
}
tpLoads.put(tp, partitionLoad(replica));
brokerLoads.compute(brokerId, (id, load) -> {
if (load == null) {
return partitionLoad(replica);
}
return load + partitionLoad(replica);
});
}
if (invalid) {
break;
}
}
ClusterLoads.getInstance().updateBrokerLoads(brokerLoads);
ClusterLoads.getInstance().updatePartitionLoads(tpLoads);
} finally {
clusterLock.unlock();
}
}

protected double partitionLoad(TopicPartitionReplicaUpdater.TopicPartitionReplica replica) {
return replica.loadValue(Resource.NW_IN) + replica.loadValue(Resource.NW_OUT);
}

private void accumulateLoads(Map<Byte, AbstractInstanceUpdater.Load> totalLoads, TopicPartitionReplicaUpdater.TopicPartitionReplica replica) {
for (Map.Entry<Byte, AbstractInstanceUpdater.Load> load : replica.getLoads().entrySet()) {
byte resource = load.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -826,9 +826,10 @@ public void replay(UpdateNextNodeIdRecord record) {
nextNodeId.set(record.nodeId());
}

public List<BrokerRegistration> getActiveBrokers() {
public List<Integer> getActiveBrokers() {
return brokerRegistrations.values().stream()
.filter(broker -> isActive(broker.id()))
.map(BrokerRegistration::id)
.filter(this::isActive)
.collect(Collectors.toList());
}
// AutoMQ inject end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@
import org.apache.kafka.controller.es.AutoMQCreateTopicPolicy;
import org.apache.kafka.controller.es.CreatePartitionPolicy;
import org.apache.kafka.controller.es.ElasticCreatePartitionPolicy;
import org.apache.kafka.controller.es.LoadAwarePartitionLeaderSelector;
import org.apache.kafka.controller.es.PartitionLeaderSelector;
import org.apache.kafka.controller.es.RandomPartitionLeaderSelector;
import org.apache.kafka.controller.stream.TopicDeletion;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
Expand Down Expand Up @@ -2047,11 +2047,12 @@ void generateLeaderAndIsrUpdates(String context,
builder.setTargetNode(brokerToAdd);
} else {
if (partitionLeaderSelector == null) {
partitionLeaderSelector = new RandomPartitionLeaderSelector(clusterControl.getActiveBrokers());
partitionLeaderSelector = new LoadAwarePartitionLeaderSelector(clusterControl.getActiveBrokers(),
brokerId -> brokerId != brokerToRemove);
}
partitionLeaderSelector
.select(partition, br -> br.id() != brokerToRemove)
.ifPresent(broker -> builder.setTargetNode(broker.id()));
.select(new TopicPartition(topic.name(), topicIdPart.partitionId()))
.ifPresent(builder::setTargetNode);
}
if (fencing) {
TopicPartition topicPartition = new TopicPartition(topic.name(), topicIdPart.partitionId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package org.apache.kafka.controller.es;

import org.apache.kafka.common.TopicPartition;

import java.util.Map;
import java.util.Set;

public class ClusterLoads {
public static final double INVALID = -1;
private static final long EXPIRE_TIME_MS = 60000; // 1 minute
private volatile static ClusterLoads instance = null;
private volatile Set<Integer> excludedBrokers;
private volatile Map<Integer, Double> brokerLoads;
private volatile Map<TopicPartition, Double> partitionLoads;
private volatile long lastUpdateTime = 0;

private ClusterLoads() {
}

public static ClusterLoads getInstance() {
if (instance == null) {
synchronized (ClusterLoads.class) {
if (instance == null) {
instance = new ClusterLoads();
}
}
}
return instance;
}

public void updateExcludedBrokers(Set<Integer> excludedBrokers) {
this.excludedBrokers = excludedBrokers;
}

public Set<Integer> excludedBrokers() {
return excludedBrokers;
}

public Map<Integer, Double> brokerLoads() {
if (System.currentTimeMillis() - lastUpdateTime > EXPIRE_TIME_MS) {
return null;
}
return brokerLoads;
}

public double partitionLoad(TopicPartition tp) {
return partitionLoads.getOrDefault(tp, INVALID);
}

public void updateBrokerLoads(Map<Integer, Double> brokerLoads) {
this.brokerLoads = brokerLoads;
this.lastUpdateTime = System.currentTimeMillis();
}

public void updatePartitionLoads(Map<TopicPartition, Double> partitionLoads) {
this.partitionLoads = partitionLoads;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package org.apache.kafka.controller.es;

import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.function.Predicate;

public class LoadAwarePartitionLeaderSelector implements PartitionLeaderSelector {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadAwarePartitionLeaderSelector.class);
private final PriorityQueue<BrokerLoad> brokerLoads;
private final RandomPartitionLeaderSelector randomSelector;
private final Map<Integer, Double> brokerLoadMap;

public LoadAwarePartitionLeaderSelector(List<Integer> aliveBrokers, Predicate<Integer> brokerPredicate) {
Set<Integer> excludedBrokers = ClusterLoads.getInstance().excludedBrokers();
if (excludedBrokers == null) {
excludedBrokers = new HashSet<>();
}
List<Integer> availableBrokers = new ArrayList<>();
for (int broker : aliveBrokers) {
if (!excludedBrokers.contains(broker)) {
availableBrokers.add(broker);
}
}
brokerLoadMap = ClusterLoads.getInstance().brokerLoads();
if (brokerLoadMap == null) {
this.brokerLoads = null;
LOGGER.warn("No broker loads available, using random partition leader selector");
} else {
this.brokerLoads = new PriorityQueue<>();
for (int brokerId : availableBrokers) {
if (!brokerPredicate.test(brokerId)) {
continue;
}
brokerLoads.offer(new BrokerLoad(brokerId, brokerLoadMap.getOrDefault(brokerId, 0.0)));
}
}
this.randomSelector = new RandomPartitionLeaderSelector(availableBrokers, brokerPredicate);
}

@Override
public Optional<Integer> select(TopicPartition tp) {
if (this.brokerLoads == null || brokerLoads.isEmpty()) {
return randomSelector.select(tp);
}
double tpLoad = ClusterLoads.getInstance().partitionLoad(tp);
if (tpLoad == ClusterLoads.INVALID) {
return randomSelector.select(tp);
}
BrokerLoad candidate = brokerLoads.poll();
if (candidate == null) {
return randomSelector.select(tp);
}
double load = candidate.load() + tpLoad;
candidate.setLoad(load);
brokerLoadMap.put(candidate.brokerId(), load);
brokerLoads.offer(candidate);
return Optional.of(candidate.brokerId());
}

public static class BrokerLoad implements Comparable<BrokerLoad> {
private final int brokerId;
private double load;

public BrokerLoad(int brokerId, double load) {
this.brokerId = brokerId;
this.load = load;
}

public int brokerId() {
return brokerId;
}

public double load() {
return load;
}

public void setLoad(double load) {
this.load = load;
}

@Override
public int compareTo(BrokerLoad o) {
return Double.compare(load, o.load);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,14 @@

package org.apache.kafka.controller.es;

import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.common.TopicPartition;

import java.util.Optional;
import java.util.function.Predicate;

public interface PartitionLeaderSelector {

/**
* Select a leader for the given partition.
*/
Optional<BrokerRegistration> select(PartitionRegistration partition, Predicate<BrokerRegistration> predicate);


default Optional<BrokerRegistration> select(PartitionRegistration partition) {
return select(partition, br -> true);
}

Optional<Integer> select(TopicPartition tp);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,30 @@

package org.apache.kafka.controller.es;

import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;

public class RandomPartitionLeaderSelector implements PartitionLeaderSelector {
private final List<BrokerRegistration> aliveBrokers;
private final List<Integer> aliveBrokers;
private int selectNextIndex = 0;

public RandomPartitionLeaderSelector(List<BrokerRegistration> aliveBrokers) {
this.aliveBrokers = new ArrayList<>(aliveBrokers);
public RandomPartitionLeaderSelector(List<Integer> aliveBrokers, Predicate<Integer> predicate) {
this.aliveBrokers = aliveBrokers.stream().filter(predicate).collect(Collectors.toList());
Collections.shuffle(this.aliveBrokers);
}

@Override
public Optional<BrokerRegistration> select(PartitionRegistration partition, Predicate<BrokerRegistration> predicate) {
for (int i = 0; i < aliveBrokers.size(); i++) {
BrokerRegistration broker = aliveBrokers.get(selectNextIndex);
selectNextIndex = (selectNextIndex + 1) % aliveBrokers.size();
if (predicate.test(broker)) {
return Optional.of(broker);
}
public Optional<Integer> select(TopicPartition tp) {
if (aliveBrokers.isEmpty()) {
return Optional.empty();
}
return Optional.empty();
int broker = aliveBrokers.get(selectNextIndex);
selectNextIndex = (selectNextIndex + 1) % aliveBrokers.size();
return Optional.of(broker);
}
}
Loading