Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions core/src/main/java/kafka/autobalancer/goals/AbstractGoal.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ protected double calculateCandidateActionScores(Collection<Goal> goalsByPriority
for (Goal goal : goalsByPriority) {
double score = goal.actionAcceptanceScore(action, cluster);
if (score == NOT_ACCEPTABLE) {
LOGGER.debug("action {} is not acceptable for goal {}", action, goal);
return NOT_ACCEPTABLE;
}
goalScoreMapByGroup.compute(goal.group(), (k, v) -> v == null ? new HashMap<>() : v).put(goal, score);
Expand Down Expand Up @@ -228,16 +229,23 @@ public double actionAcceptanceScore(Action action, ClusterModelSnapshot cluster)
return calculateAcceptanceScore(srcBrokerBefore, destBrokerBefore, srcBrokerAfter, destBrokerAfter);
}

@Override
public List<BrokerUpdater.Broker> getBrokersToOptimize(ClusterModelSnapshot cluster) {
List<BrokerUpdater.Broker> brokersToOptimize = new ArrayList<>();
for (BrokerUpdater.Broker broker : cluster.brokers()) {
if (!isBrokerAcceptable(broker)) {
LOGGER.warn("Broker {} violates goal {}", broker.getBrokerId(), name());
brokersToOptimize.add(broker);
}
}
return brokersToOptimize;
}

protected abstract boolean moveReplica(Action action, ClusterModelSnapshot cluster, BrokerUpdater.Broker src, BrokerUpdater.Broker dest);
protected abstract boolean isBrokerAcceptable(BrokerUpdater.Broker broker);
protected abstract double brokerScore(BrokerUpdater.Broker broker);
protected abstract void onBalanceFailed(BrokerUpdater.Broker broker);

@Override
public Set<BrokerUpdater.Broker> getEligibleBrokers(ClusterModelSnapshot cluster) {
return cluster.brokers().stream().filter(BrokerUpdater.Broker::isActive).collect(Collectors.toSet());
}

@Override
public int hashCode() {
return Objects.hashCode(name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,17 @@ public boolean isHardGoal() {
}

@Override
public List<Action> doOptimize(Set<BrokerUpdater.Broker> eligibleBrokers, ClusterModelSnapshot cluster,
public List<Action> doOptimize(List<BrokerUpdater.Broker> brokersToOptimize, ClusterModelSnapshot cluster,
Collection<Goal> goalsByPriority, Collection<Goal> optimizedGoals,
Map<String, Set<String>> goalsByGroup) {
List<Action> actions = new ArrayList<>();
List<BrokerUpdater.Broker> brokersToOptimize = new ArrayList<>();
for (BrokerUpdater.Broker broker : eligibleBrokers) {
if (!isBrokerAcceptable(broker)) {
LOGGER.warn("BrokerUpdater.Broker {} violates goal {}", broker.getBrokerId(), name());
brokersToOptimize.add(broker);
}
}
for (BrokerUpdater.Broker broker : brokersToOptimize) {
if (isBrokerAcceptable(broker)) {
continue;
}
List<BrokerUpdater.Broker> candidateBrokers =
eligibleBrokers.stream().filter(b -> b.getBrokerId() != broker.getBrokerId()).collect(Collectors.toList());
cluster.brokers().stream().filter(b -> b.getBrokerId() != broker.getBrokerId()
&& broker.load(resource()).isTrusted()).collect(Collectors.toList());
if (requireLessLoad(broker)) {
List<Action> brokerActions = tryReduceLoadByAction(ActionType.MOVE, cluster, broker, candidateBrokers,
goalsByPriority, optimizedGoals, goalsByGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
import kafka.autobalancer.common.Action;
import kafka.autobalancer.common.ActionType;
import kafka.autobalancer.common.AutoBalancerConstants;
import kafka.autobalancer.common.types.Resource;
import kafka.autobalancer.model.AbstractInstanceUpdater;
import kafka.autobalancer.model.BrokerUpdater;
import kafka.autobalancer.model.ClusterModelSnapshot;
import kafka.autobalancer.model.ModelUtils;
import kafka.autobalancer.model.TopicPartitionReplicaUpdater;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;

import java.util.ArrayList;
Expand All @@ -35,6 +38,24 @@ public abstract class AbstractResourceGoal extends AbstractGoal {

protected abstract byte resource();

@Override
public List<BrokerUpdater.Broker> getBrokersToOptimize(ClusterModelSnapshot cluster) {
List<BrokerUpdater.Broker> brokersToOptimize = new ArrayList<>();
for (BrokerUpdater.Broker broker : cluster.brokers()) {
if (!isBrokerAcceptable(broker)) {
if (!broker.load(resource()).isTrusted()) {
// do not balance broker with untrusted load
LOGGER.warn("Broker {} has untrusted {} load, skip optimizing for {}", broker.getBrokerId(),
Resource.HUMAN_READABLE_RESOURCE_NAMES.get(resource()), name());
continue;
}
LOGGER.warn("Broker {} violates goal {}", broker.getBrokerId(), name());
brokersToOptimize.add(broker);
}
}
return brokersToOptimize;
}

@Override
protected boolean moveReplica(Action action, ClusterModelSnapshot cluster, BrokerUpdater.Broker src, BrokerUpdater.Broker dest) {
TopicPartitionReplicaUpdater.TopicPartitionReplica srcReplica = cluster.replica(action.getSrcBrokerId(), action.getSrcTopicPartition());
Expand Down Expand Up @@ -78,7 +99,7 @@ protected List<Action> tryReduceLoadByAction(ActionType actionType,
List<TopicPartitionReplicaUpdater.TopicPartitionReplica> srcReplicas = cluster
.replicasFor(srcBroker.getBrokerId())
.stream()
.sorted(Comparator.comparingDouble(r -> -r.load(resource()))) // higher load first
.sorted(Comparator.comparingDouble(r -> -r.loadValue(resource()))) // higher load first
.collect(Collectors.toList());
for (TopicPartitionReplicaUpdater.TopicPartitionReplica tp : srcReplicas) {
candidateBrokers.sort(lowLoadComparator()); // lower load first
Expand All @@ -88,8 +109,8 @@ protected List<Action> tryReduceLoadByAction(ActionType actionType,
optimizedGoals, goalsByGroup);
} else {
optionalAction = trySwapPartitionOut(cluster, tp, srcBroker, candidateBrokers, goalsByPriority,
optimizedGoals, goalsByGroup, Comparator.comparingDouble(p -> p.load(resource())),
(src, candidate) -> src.load(resource()) > candidate.load(resource()));
optimizedGoals, goalsByGroup, Comparator.comparingDouble(p -> p.loadValue(resource())),
(src, candidate) -> src.loadValue(resource()) > candidate.loadValue(resource()));
}

if (optionalAction.isPresent()) {
Expand Down Expand Up @@ -128,7 +149,7 @@ protected List<Action> tryIncreaseLoadByAction(ActionType actionType,
List<TopicPartitionReplicaUpdater.TopicPartitionReplica> candidateReplicas = cluster
.replicasFor(candidateBroker.getBrokerId())
.stream()
.sorted(Comparator.comparingDouble(r -> -r.load(resource()))) // higher load first
.sorted(Comparator.comparingDouble(r -> -r.loadValue(resource()))) // higher load first
.collect(Collectors.toList());
for (TopicPartitionReplicaUpdater.TopicPartitionReplica tp : candidateReplicas) {
Optional<Action> optionalAction;
Expand All @@ -137,8 +158,8 @@ protected List<Action> tryIncreaseLoadByAction(ActionType actionType,
goalsByPriority, optimizedGoals, goalsByGroup);
} else {
optionalAction = trySwapPartitionOut(cluster, tp, candidateBroker, List.of(srcBroker), goalsByPriority,
optimizedGoals, goalsByGroup, Comparator.comparingDouble(p -> p.load(resource())),
(src, candidate) -> src.load(resource()) > candidate.load(resource()));
optimizedGoals, goalsByGroup, Comparator.comparingDouble(p -> p.loadValue(resource())),
(src, candidate) -> src.loadValue(resource()) > candidate.loadValue(resource()));
}

if (optionalAction.isPresent()) {
Expand All @@ -155,6 +176,28 @@ protected List<Action> tryIncreaseLoadByAction(ActionType actionType,
return actionList;
}

@Override
public double actionAcceptanceScore(Action action, ClusterModelSnapshot cluster) {
if (validateAction(action.getSrcBrokerId(), action.getDestBrokerId(), action.getSrcTopicPartition(), cluster)) {
return super.actionAcceptanceScore(action, cluster);
}
return NOT_ACCEPTABLE;
}

boolean validateAction(int srcBrokerId, int destBrokerId, TopicPartition tp, ClusterModelSnapshot cluster) {
BrokerUpdater.Broker destBroker = cluster.broker(destBrokerId);
BrokerUpdater.Broker srcBroker = cluster.broker(srcBrokerId);
TopicPartitionReplicaUpdater.TopicPartitionReplica replica = cluster.replica(srcBrokerId, tp);
AbstractInstanceUpdater.Load replicaLoad = replica.load(resource());
if (!replicaLoad.isTrusted()) {
return false;
}
if (replicaLoad.getValue() == 0) {
return true;
}
return destBroker.load(resource()).isTrusted() && srcBroker.load(resource()).isTrusted();
}

protected abstract Comparator<BrokerUpdater.Broker> highLoadComparator();

protected abstract Comparator<BrokerUpdater.Broker> lowLoadComparator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import kafka.autobalancer.model.BrokerUpdater;
import org.slf4j.Logger;

import java.util.Collection;
import java.util.Comparator;
import java.util.Set;

public abstract class AbstractResourceUsageDistributionGoal extends AbstractResourceDistributionGoal {
private static final Logger LOGGER = new LogContext().logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ);
private final Comparator<BrokerUpdater.Broker> highLoadComparator = Comparator.comparingDouble(b -> -b.load(resource()));
private final Comparator<BrokerUpdater.Broker> lowLoadComparator = Comparator.comparingDouble(b -> b.load(resource()));
private final Comparator<BrokerUpdater.Broker> highLoadComparator = Comparator.comparingDouble(b -> -b.loadValue(resource()));
private final Comparator<BrokerUpdater.Broker> lowLoadComparator = Comparator.comparingDouble(b -> b.loadValue(resource()));
protected Normalizer normalizer;
protected volatile long usageDetectThreshold;
protected volatile double usageAvgDeviationRatio;
Expand All @@ -35,9 +35,9 @@ public abstract class AbstractResourceUsageDistributionGoal extends AbstractReso
protected double usageDistUpperBound;

@Override
public void initialize(Set<BrokerUpdater.Broker> brokers) {
public void initialize(Collection<BrokerUpdater.Broker> brokers) {
byte resource = resource();
usageAvg = brokers.stream().mapToDouble(e -> e.load(resource)).sum() / brokers.size();
usageAvg = brokers.stream().mapToDouble(e -> e.loadValue(resource)).sum() / brokers.size();
usageAvgDeviation = usageAvg * usageAvgDeviationRatio;
usageDistLowerBound = Math.max(0, usageAvg * (1 - this.usageAvgDeviationRatio));
usageDistUpperBound = usageAvg * (1 + this.usageAvgDeviationRatio);
Expand All @@ -48,17 +48,17 @@ public void initialize(Set<BrokerUpdater.Broker> brokers) {

@Override
protected boolean requireLessLoad(BrokerUpdater.Broker broker) {
return broker.load(resource()) > usageDistUpperBound;
return broker.loadValue(resource()) > usageDistUpperBound;
}

@Override
protected boolean requireMoreLoad(BrokerUpdater.Broker broker) {
return broker.load(resource()) < usageDistLowerBound;
return broker.loadValue(resource()) < usageDistLowerBound;
}

@Override
public boolean isBrokerAcceptable(BrokerUpdater.Broker broker) {
double load = broker.load(resource());
double load = broker.loadValue(resource());
if (load < this.usageDetectThreshold) {
return true;
}
Expand All @@ -67,7 +67,7 @@ public boolean isBrokerAcceptable(BrokerUpdater.Broker broker) {

@Override
public double brokerScore(BrokerUpdater.Broker broker) {
double loadAvgDeviationAbs = Math.abs(usageAvg - broker.load(resource()));
double loadAvgDeviationAbs = Math.abs(usageAvg - broker.loadValue(resource()));
if (loadAvgDeviationAbs < usageAvgDeviation) {
return 1.0;
}
Expand Down
11 changes: 5 additions & 6 deletions core/src/main/java/kafka/autobalancer/goals/Goal.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public interface Goal extends Configurable, Comparable<Goal> {

List<Action> doOptimize(Set<BrokerUpdater.Broker> eligibleBrokers, ClusterModelSnapshot cluster,
List<Action> doOptimize(List<BrokerUpdater.Broker> brokersToOptimize, ClusterModelSnapshot cluster,
Collection<Goal> goalsByPriority, Collection<Goal> optimizedGoals,
Map<String, Set<String>> goalsByGroup);

Expand All @@ -38,20 +38,19 @@ default List<Action> optimize(ClusterModelSnapshot cluster, Collection<Goal> goa

default List<Action> optimize(ClusterModelSnapshot cluster, Collection<Goal> goalsByPriority,
Collection<Goal> optimizedGoal, Map<String, Set<String>> goalsByGroup) {
Set<BrokerUpdater.Broker> eligibleBrokers = getEligibleBrokers(cluster);
goalsByPriority.forEach(e -> e.initialize(eligibleBrokers));
return doOptimize(eligibleBrokers, cluster, goalsByPriority, optimizedGoal, goalsByGroup);
goalsByPriority.forEach(e -> e.initialize(cluster.brokers()));
return doOptimize(getBrokersToOptimize(cluster), cluster, goalsByPriority, optimizedGoal, goalsByGroup);
}

void initialize(Set<BrokerUpdater.Broker> brokers);
void initialize(Collection<BrokerUpdater.Broker> brokers);

boolean isHardGoal();

String group();

double weight();

Set<BrokerUpdater.Broker> getEligibleBrokers(ClusterModelSnapshot cluster);
List<BrokerUpdater.Broker> getBrokersToOptimize(ClusterModelSnapshot cluster);

String name();

Expand Down
Loading