diff --git a/core/src/main/java/kafka/autobalancer/goals/AbstractGoal.java b/core/src/main/java/kafka/autobalancer/goals/AbstractGoal.java index f67a1b5483..3db2afd7e2 100644 --- a/core/src/main/java/kafka/autobalancer/goals/AbstractGoal.java +++ b/core/src/main/java/kafka/autobalancer/goals/AbstractGoal.java @@ -97,6 +97,7 @@ protected double calculateCandidateActionScores(Collection goalsByPriority for (Goal goal : goalsByPriority) { double score = goal.actionAcceptanceScore(action, cluster); if (score == NOT_ACCEPTABLE) { + LOGGER.debug("action {} is not acceptable for goal {}", action, goal); return NOT_ACCEPTABLE; } goalScoreMapByGroup.compute(goal.group(), (k, v) -> v == null ? new HashMap<>() : v).put(goal, score); @@ -228,16 +229,23 @@ public double actionAcceptanceScore(Action action, ClusterModelSnapshot cluster) return calculateAcceptanceScore(srcBrokerBefore, destBrokerBefore, srcBrokerAfter, destBrokerAfter); } + @Override + public List getBrokersToOptimize(ClusterModelSnapshot cluster) { + List brokersToOptimize = new ArrayList<>(); + for (BrokerUpdater.Broker broker : cluster.brokers()) { + if (!isBrokerAcceptable(broker)) { + LOGGER.warn("Broker {} violates goal {}", broker.getBrokerId(), name()); + brokersToOptimize.add(broker); + } + } + return brokersToOptimize; + } + protected abstract boolean moveReplica(Action action, ClusterModelSnapshot cluster, BrokerUpdater.Broker src, BrokerUpdater.Broker dest); protected abstract boolean isBrokerAcceptable(BrokerUpdater.Broker broker); protected abstract double brokerScore(BrokerUpdater.Broker broker); protected abstract void onBalanceFailed(BrokerUpdater.Broker broker); - @Override - public Set getEligibleBrokers(ClusterModelSnapshot cluster) { - return cluster.brokers().stream().filter(BrokerUpdater.Broker::isActive).collect(Collectors.toSet()); - } - @Override public int hashCode() { return Objects.hashCode(name()); diff --git a/core/src/main/java/kafka/autobalancer/goals/AbstractResourceDistributionGoal.java b/core/src/main/java/kafka/autobalancer/goals/AbstractResourceDistributionGoal.java index d52d6dc6ae..ffc8a2ac7f 100644 --- a/core/src/main/java/kafka/autobalancer/goals/AbstractResourceDistributionGoal.java +++ b/core/src/main/java/kafka/autobalancer/goals/AbstractResourceDistributionGoal.java @@ -35,23 +35,17 @@ public boolean isHardGoal() { } @Override - public List doOptimize(Set eligibleBrokers, ClusterModelSnapshot cluster, + public List doOptimize(List brokersToOptimize, ClusterModelSnapshot cluster, Collection goalsByPriority, Collection optimizedGoals, Map> goalsByGroup) { List actions = new ArrayList<>(); - List brokersToOptimize = new ArrayList<>(); - for (BrokerUpdater.Broker broker : eligibleBrokers) { - if (!isBrokerAcceptable(broker)) { - LOGGER.warn("BrokerUpdater.Broker {} violates goal {}", broker.getBrokerId(), name()); - brokersToOptimize.add(broker); - } - } for (BrokerUpdater.Broker broker : brokersToOptimize) { if (isBrokerAcceptable(broker)) { continue; } List candidateBrokers = - eligibleBrokers.stream().filter(b -> b.getBrokerId() != broker.getBrokerId()).collect(Collectors.toList()); + cluster.brokers().stream().filter(b -> b.getBrokerId() != broker.getBrokerId() + && broker.load(resource()).isTrusted()).collect(Collectors.toList()); if (requireLessLoad(broker)) { List brokerActions = tryReduceLoadByAction(ActionType.MOVE, cluster, broker, candidateBrokers, goalsByPriority, optimizedGoals, goalsByGroup); diff --git a/core/src/main/java/kafka/autobalancer/goals/AbstractResourceGoal.java b/core/src/main/java/kafka/autobalancer/goals/AbstractResourceGoal.java index 81d8084de4..d62c73e5cf 100644 --- a/core/src/main/java/kafka/autobalancer/goals/AbstractResourceGoal.java +++ b/core/src/main/java/kafka/autobalancer/goals/AbstractResourceGoal.java @@ -15,10 +15,13 @@ import kafka.autobalancer.common.Action; import kafka.autobalancer.common.ActionType; import kafka.autobalancer.common.AutoBalancerConstants; +import kafka.autobalancer.common.types.Resource; +import kafka.autobalancer.model.AbstractInstanceUpdater; import kafka.autobalancer.model.BrokerUpdater; import kafka.autobalancer.model.ClusterModelSnapshot; import kafka.autobalancer.model.ModelUtils; import kafka.autobalancer.model.TopicPartitionReplicaUpdater; +import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import java.util.ArrayList; @@ -35,6 +38,24 @@ public abstract class AbstractResourceGoal extends AbstractGoal { protected abstract byte resource(); + @Override + public List getBrokersToOptimize(ClusterModelSnapshot cluster) { + List brokersToOptimize = new ArrayList<>(); + for (BrokerUpdater.Broker broker : cluster.brokers()) { + if (!isBrokerAcceptable(broker)) { + if (!broker.load(resource()).isTrusted()) { + // do not balance broker with untrusted load + LOGGER.warn("Broker {} has untrusted {} load, skip optimizing for {}", broker.getBrokerId(), + Resource.HUMAN_READABLE_RESOURCE_NAMES.get(resource()), name()); + continue; + } + LOGGER.warn("Broker {} violates goal {}", broker.getBrokerId(), name()); + brokersToOptimize.add(broker); + } + } + return brokersToOptimize; + } + @Override protected boolean moveReplica(Action action, ClusterModelSnapshot cluster, BrokerUpdater.Broker src, BrokerUpdater.Broker dest) { TopicPartitionReplicaUpdater.TopicPartitionReplica srcReplica = cluster.replica(action.getSrcBrokerId(), action.getSrcTopicPartition()); @@ -78,7 +99,7 @@ protected List tryReduceLoadByAction(ActionType actionType, List srcReplicas = cluster .replicasFor(srcBroker.getBrokerId()) .stream() - .sorted(Comparator.comparingDouble(r -> -r.load(resource()))) // higher load first + .sorted(Comparator.comparingDouble(r -> -r.loadValue(resource()))) // higher load first .collect(Collectors.toList()); for (TopicPartitionReplicaUpdater.TopicPartitionReplica tp : srcReplicas) { candidateBrokers.sort(lowLoadComparator()); // lower load first @@ -88,8 +109,8 @@ protected List tryReduceLoadByAction(ActionType actionType, optimizedGoals, goalsByGroup); } else { optionalAction = trySwapPartitionOut(cluster, tp, srcBroker, candidateBrokers, goalsByPriority, - optimizedGoals, goalsByGroup, Comparator.comparingDouble(p -> p.load(resource())), - (src, candidate) -> src.load(resource()) > candidate.load(resource())); + optimizedGoals, goalsByGroup, Comparator.comparingDouble(p -> p.loadValue(resource())), + (src, candidate) -> src.loadValue(resource()) > candidate.loadValue(resource())); } if (optionalAction.isPresent()) { @@ -128,7 +149,7 @@ protected List tryIncreaseLoadByAction(ActionType actionType, List candidateReplicas = cluster .replicasFor(candidateBroker.getBrokerId()) .stream() - .sorted(Comparator.comparingDouble(r -> -r.load(resource()))) // higher load first + .sorted(Comparator.comparingDouble(r -> -r.loadValue(resource()))) // higher load first .collect(Collectors.toList()); for (TopicPartitionReplicaUpdater.TopicPartitionReplica tp : candidateReplicas) { Optional optionalAction; @@ -137,8 +158,8 @@ protected List tryIncreaseLoadByAction(ActionType actionType, goalsByPriority, optimizedGoals, goalsByGroup); } else { optionalAction = trySwapPartitionOut(cluster, tp, candidateBroker, List.of(srcBroker), goalsByPriority, - optimizedGoals, goalsByGroup, Comparator.comparingDouble(p -> p.load(resource())), - (src, candidate) -> src.load(resource()) > candidate.load(resource())); + optimizedGoals, goalsByGroup, Comparator.comparingDouble(p -> p.loadValue(resource())), + (src, candidate) -> src.loadValue(resource()) > candidate.loadValue(resource())); } if (optionalAction.isPresent()) { @@ -155,6 +176,28 @@ protected List tryIncreaseLoadByAction(ActionType actionType, return actionList; } + @Override + public double actionAcceptanceScore(Action action, ClusterModelSnapshot cluster) { + if (validateAction(action.getSrcBrokerId(), action.getDestBrokerId(), action.getSrcTopicPartition(), cluster)) { + return super.actionAcceptanceScore(action, cluster); + } + return NOT_ACCEPTABLE; + } + + boolean validateAction(int srcBrokerId, int destBrokerId, TopicPartition tp, ClusterModelSnapshot cluster) { + BrokerUpdater.Broker destBroker = cluster.broker(destBrokerId); + BrokerUpdater.Broker srcBroker = cluster.broker(srcBrokerId); + TopicPartitionReplicaUpdater.TopicPartitionReplica replica = cluster.replica(srcBrokerId, tp); + AbstractInstanceUpdater.Load replicaLoad = replica.load(resource()); + if (!replicaLoad.isTrusted()) { + return false; + } + if (replicaLoad.getValue() == 0) { + return true; + } + return destBroker.load(resource()).isTrusted() && srcBroker.load(resource()).isTrusted(); + } + protected abstract Comparator highLoadComparator(); protected abstract Comparator lowLoadComparator(); diff --git a/core/src/main/java/kafka/autobalancer/goals/AbstractResourceUsageDistributionGoal.java b/core/src/main/java/kafka/autobalancer/goals/AbstractResourceUsageDistributionGoal.java index 9a0d17eeb3..ace6258dc5 100644 --- a/core/src/main/java/kafka/autobalancer/goals/AbstractResourceUsageDistributionGoal.java +++ b/core/src/main/java/kafka/autobalancer/goals/AbstractResourceUsageDistributionGoal.java @@ -19,13 +19,13 @@ import kafka.autobalancer.model.BrokerUpdater; import org.slf4j.Logger; +import java.util.Collection; import java.util.Comparator; -import java.util.Set; public abstract class AbstractResourceUsageDistributionGoal extends AbstractResourceDistributionGoal { private static final Logger LOGGER = new LogContext().logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ); - private final Comparator highLoadComparator = Comparator.comparingDouble(b -> -b.load(resource())); - private final Comparator lowLoadComparator = Comparator.comparingDouble(b -> b.load(resource())); + private final Comparator highLoadComparator = Comparator.comparingDouble(b -> -b.loadValue(resource())); + private final Comparator lowLoadComparator = Comparator.comparingDouble(b -> b.loadValue(resource())); protected Normalizer normalizer; protected volatile long usageDetectThreshold; protected volatile double usageAvgDeviationRatio; @@ -35,9 +35,9 @@ public abstract class AbstractResourceUsageDistributionGoal extends AbstractReso protected double usageDistUpperBound; @Override - public void initialize(Set brokers) { + public void initialize(Collection brokers) { byte resource = resource(); - usageAvg = brokers.stream().mapToDouble(e -> e.load(resource)).sum() / brokers.size(); + usageAvg = brokers.stream().mapToDouble(e -> e.loadValue(resource)).sum() / brokers.size(); usageAvgDeviation = usageAvg * usageAvgDeviationRatio; usageDistLowerBound = Math.max(0, usageAvg * (1 - this.usageAvgDeviationRatio)); usageDistUpperBound = usageAvg * (1 + this.usageAvgDeviationRatio); @@ -48,17 +48,17 @@ public void initialize(Set brokers) { @Override protected boolean requireLessLoad(BrokerUpdater.Broker broker) { - return broker.load(resource()) > usageDistUpperBound; + return broker.loadValue(resource()) > usageDistUpperBound; } @Override protected boolean requireMoreLoad(BrokerUpdater.Broker broker) { - return broker.load(resource()) < usageDistLowerBound; + return broker.loadValue(resource()) < usageDistLowerBound; } @Override public boolean isBrokerAcceptable(BrokerUpdater.Broker broker) { - double load = broker.load(resource()); + double load = broker.loadValue(resource()); if (load < this.usageDetectThreshold) { return true; } @@ -67,7 +67,7 @@ public boolean isBrokerAcceptable(BrokerUpdater.Broker broker) { @Override public double brokerScore(BrokerUpdater.Broker broker) { - double loadAvgDeviationAbs = Math.abs(usageAvg - broker.load(resource())); + double loadAvgDeviationAbs = Math.abs(usageAvg - broker.loadValue(resource())); if (loadAvgDeviationAbs < usageAvgDeviation) { return 1.0; } diff --git a/core/src/main/java/kafka/autobalancer/goals/Goal.java b/core/src/main/java/kafka/autobalancer/goals/Goal.java index 93a6a95e2e..2946154856 100644 --- a/core/src/main/java/kafka/autobalancer/goals/Goal.java +++ b/core/src/main/java/kafka/autobalancer/goals/Goal.java @@ -25,7 +25,7 @@ public interface Goal extends Configurable, Comparable { - List doOptimize(Set eligibleBrokers, ClusterModelSnapshot cluster, + List doOptimize(List brokersToOptimize, ClusterModelSnapshot cluster, Collection goalsByPriority, Collection optimizedGoals, Map> goalsByGroup); @@ -38,12 +38,11 @@ default List optimize(ClusterModelSnapshot cluster, Collection goa default List optimize(ClusterModelSnapshot cluster, Collection goalsByPriority, Collection optimizedGoal, Map> goalsByGroup) { - Set eligibleBrokers = getEligibleBrokers(cluster); - goalsByPriority.forEach(e -> e.initialize(eligibleBrokers)); - return doOptimize(eligibleBrokers, cluster, goalsByPriority, optimizedGoal, goalsByGroup); + goalsByPriority.forEach(e -> e.initialize(cluster.brokers())); + return doOptimize(getBrokersToOptimize(cluster), cluster, goalsByPriority, optimizedGoal, goalsByGroup); } - void initialize(Set brokers); + void initialize(Collection brokers); boolean isHardGoal(); @@ -51,7 +50,7 @@ default List optimize(ClusterModelSnapshot cluster, Collection goa double weight(); - Set getEligibleBrokers(ClusterModelSnapshot cluster); + List getBrokersToOptimize(ClusterModelSnapshot cluster); String name(); diff --git a/core/src/main/java/kafka/autobalancer/model/AbstractInstanceUpdater.java b/core/src/main/java/kafka/autobalancer/model/AbstractInstanceUpdater.java index a1058dcc17..74ac253a24 100644 --- a/core/src/main/java/kafka/autobalancer/model/AbstractInstanceUpdater.java +++ b/core/src/main/java/kafka/autobalancer/model/AbstractInstanceUpdater.java @@ -15,6 +15,8 @@ import com.automq.stream.utils.LogContext; import kafka.autobalancer.common.AutoBalancerConstants; import kafka.autobalancer.common.types.Resource; +import kafka.autobalancer.model.samples.AbstractTimeWindowSamples; +import kafka.autobalancer.model.samples.SimpleTimeWindowSamples; import org.slf4j.Logger; import java.util.HashMap; @@ -25,7 +27,7 @@ public abstract class AbstractInstanceUpdater { protected static final Logger LOGGER = new LogContext().logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ); protected final Lock lock = new ReentrantLock(); - protected Map metricsMap = new HashMap<>(); + protected Map metricSampleMap = new HashMap<>(); protected long timestamp = 0L; public boolean update(Map metricsMap, long time) { @@ -50,13 +52,21 @@ public boolean update(Map metricsMap, long time) { protected void update0(Map metricsMap, long timestamp) { lock.lock(); try { - this.metricsMap = metricsMap; + for (Map.Entry entry : metricsMap.entrySet()) { + byte metricType = entry.getKey(); + double value = entry.getValue(); + metricSampleMap.computeIfAbsent(metricType, k -> createSample(metricType)).append(value); + } this.timestamp = timestamp; } finally { lock.unlock(); } } + protected AbstractTimeWindowSamples createSample(byte metricType) { + return new SimpleTimeWindowSamples(1, 1, 1); + } + public long getTimestamp() { long timestamp; lock.lock(); @@ -78,9 +88,6 @@ public AbstractInstance get(long timeSince) { if (timestamp < timeSince) { return null; } - if (!isValidInstance()) { - return null; - } return createInstance(); } finally { lock.unlock(); @@ -96,7 +103,7 @@ public AbstractInstance get(long timeSince) { protected abstract boolean isValidInstance(); public static abstract class AbstractInstance { - protected final Map loads = new HashMap<>(); + protected final Map loads = new HashMap<>(); protected final long timestamp; public AbstractInstance(long timestamp) { @@ -105,20 +112,55 @@ public AbstractInstance(long timestamp) { public abstract AbstractInstance copy(); + public void addLoad(byte resource, Load load) { + this.loads.compute(resource, (k, v) -> { + if (v == null) { + return load; + } + v.add(load); + return v; + }); + } + + public void reduceLoad(byte resource, Load load) { + this.loads.compute(resource, (k, v) -> { + if (v == null) { + return load; + } + v.reduceValue(load); + return v; + }); + } + + public void setLoad(byte resource, Load load) { + this.loads.put(resource, load); + } + public void setLoad(byte resource, double value) { - this.loads.put(resource, value); + setLoad(resource, value, true); + } + + public void setLoad(byte resource, double value, boolean trusted) { + this.loads.put(resource, new Load(trusted, value)); + } + + public Load load(byte resource) { + return loads.getOrDefault(resource, new Load(true, 0)); } - public double load(byte resource) { - return this.loads.getOrDefault(resource, 0.0); + public double loadValue(byte resource) { + Load load = loads.get(resource); + return load == null ? 0 : load.getValue(); } - public Map getLoads() { + public Map getLoads() { return this.loads; } protected void copyLoads(AbstractInstance other) { - this.loads.putAll(other.loads); + for (Map.Entry entry : other.loads.entrySet()) { + this.loads.put(entry.getKey(), new Load(entry.getValue())); + } } protected String timeString() { @@ -126,16 +168,24 @@ protected String timeString() { } protected String loadString() { + return "Loads={" + + buildLoadString() + + "}"; + } + + protected String buildLoadString() { StringBuilder builder = new StringBuilder(); - builder.append("Loads={"); int index = 0; - for (Map.Entry entry : loads.entrySet()) { - builder.append(Resource.resourceString(entry.getKey(), entry.getValue())); + for (Map.Entry entry : loads.entrySet()) { + String resourceStr = Resource.resourceString(entry.getKey(), entry.getValue().getValue()); + builder.append(resourceStr); + builder.append(" ("); + builder.append(entry.getValue().isTrusted() ? "trusted" : "untrusted"); + builder.append(")"); if (index++ != loads.size() - 1) { builder.append(", "); } } - builder.append("}"); return builder.toString(); } @@ -144,4 +194,36 @@ public String toString() { return timeString() + ", " + loadString(); } } + + public static class Load { + private boolean trusted; + private double value; + + public Load(boolean trusted, double value) { + this.trusted = trusted; + this.value = value; + } + + public Load(Load other) { + this.trusted = other.trusted; + this.value = other.value; + } + + public boolean isTrusted() { + return trusted; + } + + public double getValue() { + return value; + } + + public void add(Load load) { + this.value += load.value; + this.trusted &= load.trusted; + } + + public void reduceValue(Load load) { + this.value -= load.value; + } + } } diff --git a/core/src/main/java/kafka/autobalancer/model/BrokerUpdater.java b/core/src/main/java/kafka/autobalancer/model/BrokerUpdater.java index 449bab06ca..d22791b702 100644 --- a/core/src/main/java/kafka/autobalancer/model/BrokerUpdater.java +++ b/core/src/main/java/kafka/autobalancer/model/BrokerUpdater.java @@ -12,6 +12,7 @@ package kafka.autobalancer.model; import kafka.autobalancer.common.types.RawMetricTypes; +import kafka.autobalancer.model.samples.SnapshotSamples; import java.util.HashMap; import java.util.Map; @@ -20,7 +21,7 @@ public class BrokerUpdater extends AbstractInstanceUpdater { private final int brokerId; private final String rack; - private final Map metricSequanceMap = new HashMap<>(); + private final Map metricSequanceMap = new HashMap<>(); private boolean active; public BrokerUpdater(int brokerId, String rack, boolean active) { @@ -41,7 +42,7 @@ public boolean isActive() { return this.active; } - public Map metricSequenceMap() { + public Map metricSequenceMap() { return this.metricSequanceMap; } @@ -71,7 +72,7 @@ protected void update0(Map metricsMap, long timestamp) { if (!RawMetricTypes.BROKER_METRICS.contains(entry.getKey())) { continue; } - MetricValueSequence metric = metricSequanceMap.computeIfAbsent(entry.getKey(), k -> new MetricValueSequence()); + SnapshotSamples metric = metricSequanceMap.computeIfAbsent(entry.getKey(), k -> new SnapshotSamples()); metric.append(entry.getValue()); } } @@ -83,25 +84,27 @@ protected String name() { @Override protected AbstractInstance createInstance() { + return new Broker(brokerId, rack, timestamp, getMetricsSnapshot()); + } + + protected Map getMetricsSnapshot() { Map snapshotMap = new HashMap<>(); - for (Map.Entry entry : metricSequanceMap.entrySet()) { + for (Map.Entry entry : metricSequanceMap.entrySet()) { snapshotMap.put(entry.getKey(), entry.getValue().snapshot()); } - return new Broker(brokerId, rack, active, timestamp, snapshotMap); + return snapshotMap; } public static class Broker extends AbstractInstance { private final int brokerId; private final String rack; private final Map metricsSnapshot; - private boolean active; private boolean isSlowBroker; - public Broker(int brokerId, String rack, boolean active, long timestamp, Map metricsSnapshot) { + public Broker(int brokerId, String rack, long timestamp, Map metricsSnapshot) { super(timestamp); this.brokerId = brokerId; this.rack = rack; - this.active = active; this.metricsSnapshot = metricsSnapshot; this.isSlowBroker = false; } @@ -114,14 +117,6 @@ public String getRack() { return this.rack; } - public void setActive(boolean active) { - this.active = active; - } - - public boolean isActive() { - return this.active; - } - public boolean isSlowBroker() { return isSlowBroker; } @@ -150,7 +145,6 @@ public int hashCode() { public String shortString() { return "Broker{" + "brokerId=" + brokerId + - ", active=" + active + ", slow=" + isSlowBroker + ", " + timeString() + ", " + loadString() + @@ -159,7 +153,7 @@ public String shortString() { @Override public Broker copy() { - Broker broker = new Broker(brokerId, rack, active, timestamp, null); + Broker broker = new Broker(brokerId, rack, timestamp, null); broker.copyLoads(this); return broker; } @@ -168,7 +162,6 @@ public Broker copy() { public String toString() { return "Broker{" + "brokerId=" + brokerId + - ", active=" + active + ", slow=" + isSlowBroker + ", " + super.toString() + "}"; diff --git a/core/src/main/java/kafka/autobalancer/model/ClusterModel.java b/core/src/main/java/kafka/autobalancer/model/ClusterModel.java index 59baf7c1db..e7e1de9467 100644 --- a/core/src/main/java/kafka/autobalancer/model/ClusterModel.java +++ b/core/src/main/java/kafka/autobalancer/model/ClusterModel.java @@ -98,12 +98,16 @@ public ClusterModelSnapshot snapshot(Set excludedBrokerIds, Set long now = System.currentTimeMillis(); for (Map.Entry entry : brokerMap.entrySet()) { int brokerId = entry.getKey(); - BrokerUpdater.Broker broker = (BrokerUpdater.Broker) entry.getValue().get(now - maxToleratedMetricsDelay); - if (broker == null) { - logger.warn("Broker {} metrics is out of sync, will be ignored in this round", brokerId); + if (excludedBrokerIds.contains(brokerId)) { continue; } - if (excludedBrokerIds.contains(brokerId)) { + BrokerUpdater brokerUpdater = entry.getValue(); + if (!brokerUpdater.isValidInstance()) { + continue; + } + BrokerUpdater.Broker broker = (BrokerUpdater.Broker) brokerUpdater.get(now - maxToleratedMetricsDelay); + if (broker == null) { + logger.warn("Broker {} metrics is out of sync, will be ignored in this round", brokerId); continue; } snapshot.addBroker(broker); @@ -114,26 +118,26 @@ public ClusterModelSnapshot snapshot(Set excludedBrokerIds, Set if (broker == null) { continue; } - Map totalLoads = new HashMap<>(); + Map totalLoads = new HashMap<>(); for (Map.Entry tpEntry : entry.getValue().entrySet()) { TopicPartition tp = tpEntry.getKey(); + TopicPartitionReplicaUpdater replicaUpdater = tpEntry.getValue(); + if (!replicaUpdater.isValidInstance()) { + continue; + } TopicPartitionReplicaUpdater.TopicPartitionReplica replica = - (TopicPartitionReplicaUpdater.TopicPartitionReplica) tpEntry.getValue().get(now - maxToleratedMetricsDelay); + (TopicPartitionReplicaUpdater.TopicPartitionReplica) replicaUpdater.get(now - maxToleratedMetricsDelay); if (replica == null) { logger.warn("Broker {} has out of sync topic-partition {}, will be ignored in this round", brokerId, tp); snapshot.removeBroker(brokerId); break; } - for (Map.Entry load : replica.getLoads().entrySet()) { - byte resource = load.getKey(); - totalLoads.put(resource, totalLoads.getOrDefault(resource, 0.0) + load.getValue()); + accumulateLoads(totalLoads, replica); + if (!excludedTopics.contains(tp.topic())) { + snapshot.addTopicPartition(brokerId, tp, replica); } - if (excludedTopics.contains(tp.topic())) { - continue; - } - snapshot.addTopicPartition(brokerId, tp, replica); } - for (Map.Entry loadEntry : totalLoads.entrySet()) { + for (Map.Entry loadEntry : totalLoads.entrySet()) { broker.setLoad(loadEntry.getKey(), loadEntry.getValue()); } } @@ -144,6 +148,19 @@ public ClusterModelSnapshot snapshot(Set excludedBrokerIds, Set return snapshot; } + private void accumulateLoads(Map totalLoads, TopicPartitionReplicaUpdater.TopicPartitionReplica replica) { + for (Map.Entry load : replica.getLoads().entrySet()) { + byte resource = load.getKey(); + totalLoads.compute(resource, (r, totalLoad) -> { + if (totalLoad == null) { + return new AbstractInstanceUpdater.Load(load.getValue()); + } + totalLoad.add(load.getValue()); + return totalLoad; + }); + } + } + protected ClusterModelSnapshot createSnapshot() { return new ClusterModelSnapshot(); } diff --git a/core/src/main/java/kafka/autobalancer/model/ModelUtils.java b/core/src/main/java/kafka/autobalancer/model/ModelUtils.java index f305d534fa..dd0aff6923 100644 --- a/core/src/main/java/kafka/autobalancer/model/ModelUtils.java +++ b/core/src/main/java/kafka/autobalancer/model/ModelUtils.java @@ -17,11 +17,10 @@ public class ModelUtils { public static void moveReplicaLoad(BrokerUpdater.Broker src, BrokerUpdater.Broker dest, TopicPartitionReplicaUpdater.TopicPartitionReplica replica) { - for (Map.Entry load : replica.getLoads().entrySet()) { + for (Map.Entry load : replica.getLoads().entrySet()) { byte resource = load.getKey(); - double delta = load.getValue(); - src.setLoad(resource, src.load(resource) - delta); - dest.setLoad(resource, dest.load(resource) + delta); + src.reduceLoad(resource, load.getValue()); + dest.addLoad(resource, load.getValue()); } } diff --git a/core/src/main/java/kafka/autobalancer/model/TopicPartitionReplicaUpdater.java b/core/src/main/java/kafka/autobalancer/model/TopicPartitionReplicaUpdater.java index 2cd1abd58d..115eabedb4 100644 --- a/core/src/main/java/kafka/autobalancer/model/TopicPartitionReplicaUpdater.java +++ b/core/src/main/java/kafka/autobalancer/model/TopicPartitionReplicaUpdater.java @@ -13,6 +13,7 @@ import kafka.autobalancer.common.types.Resource; import kafka.autobalancer.common.types.RawMetricTypes; +import kafka.autobalancer.model.samples.AbstractTimeWindowSamples; import org.apache.kafka.common.TopicPartition; import java.util.Map; @@ -47,24 +48,28 @@ protected boolean isValidInstance() { @Override protected AbstractInstance createInstance() { TopicPartitionReplica replica = new TopicPartitionReplica(tp, timestamp); - for (Map.Entry entry : metricsMap.entrySet()) { + processRawMetrics(replica); + return replica; + } + + protected void processRawMetrics(TopicPartitionReplica replica) { + for (Map.Entry entry : metricSampleMap.entrySet()) { byte metricType = entry.getKey(); - double value = entry.getValue(); + AbstractTimeWindowSamples samples = entry.getValue(); if (!RawMetricTypes.PARTITION_METRICS.contains(metricType)) { continue; } switch (metricType) { case RawMetricTypes.PARTITION_BYTES_IN: - replica.setLoad(Resource.NW_IN, value); + replica.setLoad(Resource.NW_IN, samples.ofLoad()); break; case RawMetricTypes.PARTITION_BYTES_OUT: - replica.setLoad(Resource.NW_OUT, value); + replica.setLoad(Resource.NW_OUT, samples.ofLoad()); break; default: break; } } - return replica; } public static class TopicPartitionReplica extends AbstractInstance { diff --git a/core/src/main/java/kafka/autobalancer/model/samples/AbstractTimeWindowSamples.java b/core/src/main/java/kafka/autobalancer/model/samples/AbstractTimeWindowSamples.java new file mode 100644 index 0000000000..45882fd15a --- /dev/null +++ b/core/src/main/java/kafka/autobalancer/model/samples/AbstractTimeWindowSamples.java @@ -0,0 +1,120 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.autobalancer.model.samples; + +import kafka.autobalancer.model.AbstractInstanceUpdater; + +import java.util.Deque; +import java.util.LinkedList; + +public abstract class AbstractTimeWindowSamples { + protected final Deque windows; + private final int validWindowSize; + private final int maxWindowSize; + private final int windowCapacity; + + public AbstractTimeWindowSamples(int validWindowSize, int maxWindowSize, int windowCapacity) { + if (validWindowSize <= 0 || validWindowSize > maxWindowSize || windowCapacity <= 0) { + throw new IllegalArgumentException("Invalid window size"); + } + this.windows = new LinkedList<>(); + this.validWindowSize = validWindowSize; + this.maxWindowSize = maxWindowSize; + this.windowCapacity = windowCapacity; + } + + public void append(double value) { + Window window = windows.peekLast(); + if (window == null || !window.append(value)) { + window = rollNextWindow(); + window.append(value); + } + if (windows.size() > maxWindowSize) { + windows.pop(); + } + } + + public boolean isTrusted() { + if (windows.size() < validWindowSize) { + return false; + } + return isTrustedWindowData(); + } + + protected abstract boolean isTrustedWindowData(); + + public AbstractInstanceUpdater.Load ofLoad() { + return new AbstractInstanceUpdater.Load(isTrusted(), getLatest()); + } + + private Window rollNextWindow() { + Window window = new Window(windowCapacity); + windows.offer(window); + return window; + } + + public double getLatest() { + Window window = windows.peekLast(); + if (window == null) { + return 0; + } + return window.latest(); + } + + public int size() { + return windows.size(); + } + + protected static class Window { + private final double[] values; + private final int capacity; + private int size; + private double sum; + public Window(int capacity) { + this.capacity = capacity; + this.values = new double[capacity]; + this.size = 0; + } + + public boolean append(double value) { + if (size == capacity) { + return false; + } + values[size++] = value; + sum += value; + return true; + } + + public int size() { + return size; + } + + public double sum() { + return sum; + } + + public double get(int index) { + if (index < 0 || index >= size) { + throw new IllegalArgumentException(String.format("Invalid index %d, size %d", index, size)); + } + return values[index]; + } + + public double latest() { + if (size == 0) { + return 0; + } + return values[size - 1]; + } + } +} + diff --git a/core/src/main/java/kafka/autobalancer/model/samples/SimpleTimeWindowSamples.java b/core/src/main/java/kafka/autobalancer/model/samples/SimpleTimeWindowSamples.java new file mode 100644 index 0000000000..8a4b59aa7e --- /dev/null +++ b/core/src/main/java/kafka/autobalancer/model/samples/SimpleTimeWindowSamples.java @@ -0,0 +1,23 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.autobalancer.model.samples; + +public class SimpleTimeWindowSamples extends AbstractTimeWindowSamples { + public SimpleTimeWindowSamples(int validWindowSize, int maxWindowSize, int windowCapacity) { + super(validWindowSize, maxWindowSize, windowCapacity); + } + + @Override + public boolean isTrustedWindowData() { + return true; + } +} diff --git a/core/src/main/java/kafka/autobalancer/model/MetricValueSequence.java b/core/src/main/java/kafka/autobalancer/model/samples/SnapshotSamples.java similarity index 79% rename from core/src/main/java/kafka/autobalancer/model/MetricValueSequence.java rename to core/src/main/java/kafka/autobalancer/model/samples/SnapshotSamples.java index 61668d6ea1..1f85d65b94 100644 --- a/core/src/main/java/kafka/autobalancer/model/MetricValueSequence.java +++ b/core/src/main/java/kafka/autobalancer/model/samples/SnapshotSamples.java @@ -9,28 +9,30 @@ * by the Apache License, Version 2.0 */ -package kafka.autobalancer.model; +package kafka.autobalancer.model.samples; + +import kafka.autobalancer.model.Snapshot; import java.util.Deque; import java.util.LinkedList; -public class MetricValueSequence { +public class SnapshotSamples { private static final int DEFAULT_MAX_SIZE = 1024; private final Deque values; private final int maxSize; private Snapshot prev; - public MetricValueSequence() { + public SnapshotSamples() { this(DEFAULT_MAX_SIZE); } - public MetricValueSequence(int maxSize) { + public SnapshotSamples(int maxSize) { this.maxSize = maxSize; this.values = new LinkedList<>(); } - public MetricValueSequence copy() { - MetricValueSequence copy = new MetricValueSequence(maxSize); + public SnapshotSamples copy() { + SnapshotSamples copy = new SnapshotSamples(maxSize); copy.values.addAll(values); return copy; } diff --git a/core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java b/core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java index abeb43d2e0..91df3904ae 100644 --- a/core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java +++ b/core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java @@ -93,8 +93,8 @@ private boolean checkConsumeRecord(ClusterModel clusterModel, int brokerId, long return false; } - return testReplica.load(Resource.NW_IN) != 0 - && testReplica.load(Resource.NW_OUT) == 0; + return testReplica.loadValue(Resource.NW_IN) != 0 + && testReplica.loadValue(Resource.NW_OUT) == 0; } @Test diff --git a/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java b/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java index f9fc276fe9..f8aa0aede5 100644 --- a/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java +++ b/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java @@ -273,13 +273,13 @@ public CompletableFuture execute(List actions) { ClusterModelSnapshot snapshot = clusterModel.snapshot(); - double[] loadsBefore = snapshot.brokers().stream().map(b -> b.load(Resource.NW_IN)).mapToDouble(Double::doubleValue).toArray(); + double[] loadsBefore = snapshot.brokers().stream().map(b -> b.loadValue(Resource.NW_IN)).mapToDouble(Double::doubleValue).toArray(); double meanBefore = Arrays.stream(loadsBefore).sum() / loadsBefore.length; double stdDevBefore = calculateStdDev(meanBefore, loadsBefore); for (Action action : actionList) { snapshot.applyAction(action); } - double[] loadsAfter = snapshot.brokers().stream().map(b -> b.load(Resource.NW_IN)).mapToDouble(Double::doubleValue).toArray(); + double[] loadsAfter = snapshot.brokers().stream().map(b -> b.loadValue(Resource.NW_IN)).mapToDouble(Double::doubleValue).toArray(); double meanAfter = Arrays.stream(loadsBefore).sum() / loadsBefore.length; double stdDevAfter = calculateStdDev(meanAfter, loadsAfter); Assertions.assertEquals(meanBefore, meanAfter); diff --git a/core/src/test/java/kafka/autobalancer/goals/AbstractGoalTest.java b/core/src/test/java/kafka/autobalancer/goals/AbstractGoalTest.java deleted file mode 100644 index bb7828b219..0000000000 --- a/core/src/test/java/kafka/autobalancer/goals/AbstractGoalTest.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.autobalancer.goals; - -import kafka.autobalancer.common.Action; -import kafka.autobalancer.common.ActionType; -import kafka.autobalancer.common.types.Resource; -import kafka.autobalancer.config.AutoBalancerControllerConfig; -import kafka.autobalancer.model.BrokerUpdater.Broker; -import kafka.autobalancer.model.ClusterModelSnapshot; -import kafka.autobalancer.model.TopicPartitionReplicaUpdater.TopicPartitionReplica; -import org.apache.kafka.common.TopicPartition; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; - -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.StringJoiner; - -@Tag("S3Unit") -public class AbstractGoalTest extends GoalTestBase { - - private final Map goalMap = new HashMap<>(); - - @BeforeEach - public void setup() { - Map config = new HashMap<>(); - config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, new StringJoiner(",") - .add(NetworkInUsageDistributionGoal.class.getName()) - .add(NetworkOutUsageDistributionGoal.class.getName()).toString()); - config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_USAGE_DISTRIBUTION_DETECT_THRESHOLD, 0); - config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_USAGE_DISTRIBUTION_DETECT_THRESHOLD, 0); - config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_DISTRIBUTION_DETECT_AVG_DEVIATION, 0.2); - config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_DISTRIBUTION_DETECT_AVG_DEVIATION, 0.2); - AutoBalancerControllerConfig controllerConfig = new AutoBalancerControllerConfig(config, false); - List goalList = controllerConfig.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, AbstractGoal.class); - goalList.sort(Comparator.reverseOrder()); - for (AbstractGoal goal : goalList) { - goalMap.put(goal.name(), goal); - } - } - - @Test - public void testCalculateAcceptanceScore() { - ClusterModelSnapshot cluster = new ClusterModelSnapshot(); - Broker broker0 = createBroker(cluster, RACK, 0, true); - Broker broker1 = createBroker(cluster, RACK, 1, true); - Broker broker2 = createBroker(cluster, RACK, 2, true); - - broker0.setLoad(Resource.NW_IN, 40 * 1024 * 1024); - broker1.setLoad(Resource.NW_IN, 80 * 1024 * 1024); - broker2.setLoad(Resource.NW_IN, 120 * 1024 * 1024); - - TopicPartitionReplica replica0 = createTopicPartition(cluster, 0, TOPIC_0, 0); - replica0.setLoad(Resource.NW_IN, 40 * 1024 * 1024); - - TopicPartitionReplica replica1 = createTopicPartition(cluster, 1, TOPIC_0, 1); - TopicPartitionReplica replica2 = createTopicPartition(cluster, 1, TOPIC_0, 2); - replica1.setLoad(Resource.NW_IN, 40 * 1024 * 1024); - replica2.setLoad(Resource.NW_IN, 40 * 1024 * 1024); - - TopicPartitionReplica replica3 = createTopicPartition(cluster, 2, TOPIC_0, 3); - TopicPartitionReplica replica4 = createTopicPartition(cluster, 2, TOPIC_0, 4); - TopicPartitionReplica replica5 = createTopicPartition(cluster, 2, TOPIC_0, 5); - replica3.setLoad(Resource.NW_IN, 40 * 1024 * 1024); - replica4.setLoad(Resource.NW_IN, 40 * 1024 * 1024); - replica5.setLoad(Resource.NW_IN, 40 * 1024 * 1024); - - Goal goal = goalMap.get(NetworkInUsageDistributionGoal.class.getSimpleName()); - AbstractResourceDistributionGoal distributionGoal = (AbstractResourceDistributionGoal) goal; - distributionGoal.initialize(Set.of(broker0, broker1, broker2)); - - Assertions.assertEquals(0.319, goal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 0), 0, 1), cluster), 0.001); - Assertions.assertEquals(0.319, goal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 0), 0, 2), cluster), 0.001); - - Assertions.assertEquals(0.5, goal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 2), 1, 0), cluster), 0.001); - Assertions.assertEquals(0.319, goal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 2), 1, 2), cluster), 0.001); - - Assertions.assertEquals(0.608, goal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 4), 2, 0), cluster), 0.001); - Assertions.assertEquals(0.5, goal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 4), 2, 1), cluster), 0.001); - - List actions = goal.optimize(cluster, List.of(goal), Collections.emptyList()); - Assertions.assertFalse(actions.isEmpty()); - Assertions.assertEquals(1, actions.size()); - Assertions.assertEquals(2, actions.get(0).getSrcBrokerId()); - Assertions.assertEquals(0, actions.get(0).getDestBrokerId()); - Assertions.assertEquals(TOPIC_0, actions.get(0).getSrcTopicPartition().topic()); - Assertions.assertTrue(Set.of(3, 4, 5).contains(actions.get(0).getSrcTopicPartition().partition())); - Assertions.assertNull(actions.get(0).getDestTopicPartition()); - } -} diff --git a/core/src/test/java/kafka/autobalancer/goals/AbstractResourceGoalTest.java b/core/src/test/java/kafka/autobalancer/goals/AbstractResourceGoalTest.java new file mode 100644 index 0000000000..beb2bdea88 --- /dev/null +++ b/core/src/test/java/kafka/autobalancer/goals/AbstractResourceGoalTest.java @@ -0,0 +1,68 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.autobalancer.goals; + +import kafka.autobalancer.common.types.Resource; +import kafka.autobalancer.model.BrokerUpdater; +import kafka.autobalancer.model.ClusterModelSnapshot; +import kafka.autobalancer.model.TopicPartitionReplicaUpdater; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class AbstractResourceGoalTest extends GoalTestBase { + + @Test + public void testValidAction() { + AbstractResourceGoal goal = Mockito.mock(AbstractResourceGoal.class); + Mockito.doCallRealMethod().when(goal).validateAction(Mockito.anyInt(), Mockito.anyInt(), Mockito.any(), Mockito.any()); + ClusterModelSnapshot cluster = new ClusterModelSnapshot(); + BrokerUpdater.Broker srcBroker = createBroker(cluster, "", 0); + BrokerUpdater.Broker destBroker = createBroker(cluster, "", 1); + TopicPartitionReplicaUpdater.TopicPartitionReplica replica = createTopicPartition(cluster, 0, TOPIC_0, 0); + TopicPartition tp = new TopicPartition(TOPIC_0, 0); + + // test valid action on trusted brokers + srcBroker.setLoad(Resource.NW_IN, 100, true); + destBroker.setLoad(Resource.NW_IN, 0, true); + replica.setLoad(Resource.NW_IN, 100, true); + Assertions.assertTrue(goal.validateAction(srcBroker.getBrokerId(), destBroker.getBrokerId(), tp, cluster)); + + // test valid action on untrusted brokers + srcBroker.setLoad(Resource.NW_IN, 100, false); + destBroker.setLoad(Resource.NW_IN, 0, false); + replica.setLoad(Resource.NW_IN, 0, true); + Assertions.assertTrue(goal.validateAction(srcBroker.getBrokerId(), destBroker.getBrokerId(), tp, cluster)); + + // test invalid action + srcBroker.setLoad(Resource.NW_IN, 100, false); + destBroker.setLoad(Resource.NW_IN, 0, false); + replica.setLoad(Resource.NW_IN, 100, true); + Assertions.assertFalse(goal.validateAction(srcBroker.getBrokerId(), destBroker.getBrokerId(), tp, cluster)); + + srcBroker.setLoad(Resource.NW_IN, 100, true); + destBroker.setLoad(Resource.NW_IN, 0, false); + replica.setLoad(Resource.NW_IN, 100, true); + Assertions.assertFalse(goal.validateAction(srcBroker.getBrokerId(), destBroker.getBrokerId(), tp, cluster)); + + srcBroker.setLoad(Resource.NW_IN, 100, false); + destBroker.setLoad(Resource.NW_IN, 0, true); + replica.setLoad(Resource.NW_IN, 100, true); + Assertions.assertFalse(goal.validateAction(srcBroker.getBrokerId(), destBroker.getBrokerId(), tp, cluster)); + + srcBroker.setLoad(Resource.NW_IN, 100, false); + destBroker.setLoad(Resource.NW_IN, 0, true); + replica.setLoad(Resource.NW_IN, 100, false); + Assertions.assertFalse(goal.validateAction(srcBroker.getBrokerId(), destBroker.getBrokerId(), tp, cluster)); + } +} diff --git a/core/src/test/java/kafka/autobalancer/goals/GoalTestBase.java b/core/src/test/java/kafka/autobalancer/goals/GoalTestBase.java index 57a96833b0..99310ef2cd 100644 --- a/core/src/test/java/kafka/autobalancer/goals/GoalTestBase.java +++ b/core/src/test/java/kafka/autobalancer/goals/GoalTestBase.java @@ -17,14 +17,25 @@ package kafka.autobalancer.goals; +import kafka.autobalancer.config.AutoBalancerControllerConfig; import kafka.autobalancer.model.BrokerUpdater.Broker; import kafka.autobalancer.model.ClusterModelSnapshot; import kafka.autobalancer.model.TopicPartitionReplicaUpdater.TopicPartitionReplica; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Tag; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; + +import static kafka.autobalancer.common.types.Resource.NW_IN; +import static kafka.autobalancer.common.types.Resource.NW_OUT; + @Tag("S3Unit") public class GoalTestBase { + private final Map goalMap = new HashMap<>(); protected static final String RACK = "default"; protected static final String TOPIC_0 = "TestTopic0"; protected static final String TOPIC_1 = "TestTopic1"; @@ -32,9 +43,43 @@ public class GoalTestBase { protected static final String TOPIC_3 = "TestTopic3"; protected static final String TOPIC_4 = "TestTopic4"; - protected Broker createBroker(ClusterModelSnapshot cluster, String rack, - int brokerId, boolean active) { - Broker broker = new Broker(brokerId, rack, active, System.currentTimeMillis(), null); + protected void setup() { + Map config = new HashMap<>(); + config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, new StringJoiner(",") + .add(NetworkInUsageDistributionGoal.class.getName()) + .add(NetworkOutUsageDistributionGoal.class.getName()).toString()); + config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_USAGE_DISTRIBUTION_DETECT_THRESHOLD, 0); + config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_USAGE_DISTRIBUTION_DETECT_THRESHOLD, 0); + config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_DISTRIBUTION_DETECT_AVG_DEVIATION, 0.2); + config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_DISTRIBUTION_DETECT_AVG_DEVIATION, 0.2); + AutoBalancerControllerConfig controllerConfig = new AutoBalancerControllerConfig(config, false); + List goalList = controllerConfig.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, AbstractGoal.class); + for (AbstractGoal goal : goalList) { + goalMap.put(goal.name(), goal); + } + } + + protected Goal getGoalByResource(byte resource) { + Goal goal = null; + switch (resource) { + case NW_IN: + goal = goalMap.get(NetworkInUsageDistributionGoal.class.getSimpleName()); + break; + case NW_OUT: + goal = goalMap.get(NetworkOutUsageDistributionGoal.class.getSimpleName()); + break; + default: + break; + } + return goal; + } + + protected Collection getGoals() { + return goalMap.values(); + } + + protected Broker createBroker(ClusterModelSnapshot cluster, String rack, int brokerId) { + Broker broker = new Broker(brokerId, rack, System.currentTimeMillis(), null); cluster.addBroker(broker); return broker; } diff --git a/core/src/test/java/kafka/autobalancer/goals/AbstractResourceUsageDistributionGoalTest.java b/core/src/test/java/kafka/autobalancer/goals/ResourceUsageDistributionGoalTest.java similarity index 63% rename from core/src/test/java/kafka/autobalancer/goals/AbstractResourceUsageDistributionGoalTest.java rename to core/src/test/java/kafka/autobalancer/goals/ResourceUsageDistributionGoalTest.java index de2f8584c2..bf882f7293 100644 --- a/core/src/test/java/kafka/autobalancer/goals/AbstractResourceUsageDistributionGoalTest.java +++ b/core/src/test/java/kafka/autobalancer/goals/ResourceUsageDistributionGoalTest.java @@ -36,45 +36,51 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.StringJoiner; import static kafka.autobalancer.common.types.Resource.NW_IN; import static kafka.autobalancer.common.types.Resource.NW_OUT; @Tag("S3Unit") -public class AbstractResourceUsageDistributionGoalTest extends GoalTestBase { - private final Map goalMap = new HashMap<>(); +public class ResourceUsageDistributionGoalTest extends GoalTestBase { @BeforeEach public void setup() { + super.setup(); + } + + @Test + public void testGoalConfig() { + Map config = new HashMap<>(); - config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, new StringJoiner(",") - .add(NetworkInUsageDistributionGoal.class.getName()) - .add(NetworkOutUsageDistributionGoal.class.getName()).toString()); - config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_USAGE_DISTRIBUTION_DETECT_THRESHOLD, 0); - config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_USAGE_DISTRIBUTION_DETECT_THRESHOLD, 0); - config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_DISTRIBUTION_DETECT_AVG_DEVIATION, 0.2); - config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_DISTRIBUTION_DETECT_AVG_DEVIATION, 0.2); + config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, NetworkInUsageDistributionGoal.class.getName()); + config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_USAGE_DISTRIBUTION_DETECT_THRESHOLD, 5 * 1024 * 1024); + config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_DISTRIBUTION_DETECT_AVG_DEVIATION, 0.1); + config.put(KafkaConfig.S3NetworkBaselineBandwidthProp(), 50 * 1024 * 1024); + AutoBalancerControllerConfig controllerConfig = new AutoBalancerControllerConfig(config, false); - List goalList = controllerConfig.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, AbstractGoal.class); - for (AbstractGoal goal : goalList) { - goalMap.put(goal.name(), goal); - } - } + List goals = controllerConfig.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, Goal.class); + Assertions.assertEquals(1, goals.size()); + Assertions.assertTrue(goals.get(0) instanceof NetworkInUsageDistributionGoal); + NetworkInUsageDistributionGoal networkInUsageDistributionGoal = (NetworkInUsageDistributionGoal) goals.get(0); + Assertions.assertEquals(5 * 1024 * 1024, networkInUsageDistributionGoal.usageDetectThreshold); + Assertions.assertEquals(0.1, networkInUsageDistributionGoal.usageAvgDeviationRatio); + Assertions.assertEquals(50 * 1024 * 1024, networkInUsageDistributionGoal.linearNormalizerThreshold); - private AbstractGoal getGoalByResource(byte resource) { - AbstractGoal goal = null; - switch (resource) { - case NW_IN: - goal = (AbstractGoal) goalMap.get(NetworkInUsageDistributionGoal.class.getSimpleName()); - break; - case NW_OUT: - goal = (AbstractGoal) goalMap.get(NetworkOutUsageDistributionGoal.class.getSimpleName()); - break; - default: - break; - } - return goal; + config.remove(KafkaConfig.S3NetworkBaselineBandwidthProp()); + controllerConfig = new AutoBalancerControllerConfig(config, false); + goals = controllerConfig.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, Goal.class); + Assertions.assertEquals(1, goals.size()); + Assertions.assertTrue(goals.get(0) instanceof NetworkInUsageDistributionGoal); + networkInUsageDistributionGoal = (NetworkInUsageDistributionGoal) goals.get(0); + Assertions.assertEquals(100 * 1024 * 1024, networkInUsageDistributionGoal.linearNormalizerThreshold); + + config.put(KafkaConfig.S3NetworkBaselineBandwidthProp(), String.valueOf(60 * 1024 * 1024)); + controllerConfig = new AutoBalancerControllerConfig(config, false); + goals = controllerConfig.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, Goal.class); + Assertions.assertEquals(1, goals.size()); + Assertions.assertTrue(goals.get(0) instanceof NetworkInUsageDistributionGoal); + networkInUsageDistributionGoal = (NetworkInUsageDistributionGoal) goals.get(0); + Assertions.assertEquals(60 * 1024 * 1024, networkInUsageDistributionGoal.linearNormalizerThreshold); } private void testSingleResourceGoalScore(byte resource) { @@ -86,10 +92,10 @@ private void testSingleResourceGoalScore(byte resource) { } ClusterModelSnapshot cluster = new ClusterModelSnapshot(); - Broker broker0 = createBroker(cluster, RACK, 0, true); - Broker broker1 = createBroker(cluster, RACK, 1, true); - Broker broker2 = createBroker(cluster, RACK, 2, true); - Broker broker3 = createBroker(cluster, RACK, 3, true); + Broker broker0 = createBroker(cluster, RACK, 0); + Broker broker1 = createBroker(cluster, RACK, 1); + Broker broker2 = createBroker(cluster, RACK, 2); + Broker broker3 = createBroker(cluster, RACK, 3); double load0 = 600 * 1024 * 1024; broker0.setLoad(resource, load0); @@ -145,41 +151,6 @@ private void testSingleResourceGoalScore(byte resource) { Assertions.assertEquals(0.50024, actionScore1, 0.00001); } - @Test - public void testGoalConfig() { - - Map config = new HashMap<>(); - config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, NetworkInUsageDistributionGoal.class.getName()); - config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_USAGE_DISTRIBUTION_DETECT_THRESHOLD, 5 * 1024 * 1024); - config.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_DISTRIBUTION_DETECT_AVG_DEVIATION, 0.1); - config.put(KafkaConfig.S3NetworkBaselineBandwidthProp(), 50 * 1024 * 1024); - - AutoBalancerControllerConfig controllerConfig = new AutoBalancerControllerConfig(config, false); - List goals = controllerConfig.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, Goal.class); - Assertions.assertEquals(1, goals.size()); - Assertions.assertTrue(goals.get(0) instanceof NetworkInUsageDistributionGoal); - NetworkInUsageDistributionGoal networkInUsageDistributionGoal = (NetworkInUsageDistributionGoal) goals.get(0); - Assertions.assertEquals(5 * 1024 * 1024, networkInUsageDistributionGoal.usageDetectThreshold); - Assertions.assertEquals(0.1, networkInUsageDistributionGoal.usageAvgDeviationRatio); - Assertions.assertEquals(50 * 1024 * 1024, networkInUsageDistributionGoal.linearNormalizerThreshold); - - config.remove(KafkaConfig.S3NetworkBaselineBandwidthProp()); - controllerConfig = new AutoBalancerControllerConfig(config, false); - goals = controllerConfig.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, Goal.class); - Assertions.assertEquals(1, goals.size()); - Assertions.assertTrue(goals.get(0) instanceof NetworkInUsageDistributionGoal); - networkInUsageDistributionGoal = (NetworkInUsageDistributionGoal) goals.get(0); - Assertions.assertEquals(100 * 1024 * 1024, networkInUsageDistributionGoal.linearNormalizerThreshold); - - config.put(KafkaConfig.S3NetworkBaselineBandwidthProp(), String.valueOf(60 * 1024 * 1024)); - controllerConfig = new AutoBalancerControllerConfig(config, false); - goals = controllerConfig.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, Goal.class); - Assertions.assertEquals(1, goals.size()); - Assertions.assertTrue(goals.get(0) instanceof NetworkInUsageDistributionGoal); - networkInUsageDistributionGoal = (NetworkInUsageDistributionGoal) goals.get(0); - Assertions.assertEquals(60 * 1024 * 1024, networkInUsageDistributionGoal.linearNormalizerThreshold); - } - @Test public void testGoalScore() { testSingleResourceGoalScore(NW_IN); @@ -187,50 +158,65 @@ public void testGoalScore() { } private void testSingleResourceDistributionOptimizeOneMove(byte resource) { - AbstractGoal goal = getGoalByResource(resource); - Assertions.assertNotNull(goal); - ClusterModelSnapshot cluster = new ClusterModelSnapshot(); - Broker broker0 = createBroker(cluster, RACK, 0, true); - Broker broker1 = createBroker(cluster, RACK, 1, true); + Broker broker0 = createBroker(cluster, RACK, 0); + Broker broker1 = createBroker(cluster, RACK, 1); + Broker broker2 = createBroker(cluster, RACK, 2); - double load0 = 80; - double load1 = 20; - broker0.setLoad(resource, load0); - - broker1.setLoad(resource, load1); + broker0.setLoad(resource, 40 * 1024 * 1024); + broker1.setLoad(resource, 80 * 1024 * 1024); + broker2.setLoad(resource, 120 * 1024 * 1024); TopicPartitionReplica replica0 = createTopicPartition(cluster, 0, TOPIC_0, 0); - TopicPartitionReplica replica1 = createTopicPartition(cluster, 0, TOPIC_2, 0); - TopicPartitionReplica replica2 = createTopicPartition(cluster, 0, TOPIC_3, 0); - TopicPartitionReplica replica3 = createTopicPartition(cluster, 0, TOPIC_0, 1); - replica0.setLoad(resource, 20); - replica1.setLoad(resource, 30); - replica2.setLoad(resource, 15); - replica3.setLoad(resource, 15); - Assertions.assertEquals(load0, cluster.replicasFor(0).stream().mapToDouble(e -> e.load(resource)).sum()); + replica0.setLoad(resource, 40 * 1024 * 1024); - TopicPartitionReplica replica4 = createTopicPartition(cluster, 1, TOPIC_4, 0); - TopicPartitionReplica replica5 = createTopicPartition(cluster, 1, TOPIC_2, 1); - replica4.setLoad(resource, 15); - replica5.setLoad(resource, 5); - Assertions.assertEquals(load1, cluster.replicasFor(1).stream().mapToDouble(e -> e.load(resource)).sum()); + TopicPartitionReplica replica1 = createTopicPartition(cluster, 1, TOPIC_0, 1); + TopicPartitionReplica replica2 = createTopicPartition(cluster, 1, TOPIC_0, 2); + replica1.setLoad(resource, 40 * 1024 * 1024); + replica2.setLoad(resource, 40 * 1024 * 1024); - List actions = goal.optimize(cluster, goalMap.values(), Collections.emptyList()); - Assertions.assertNotEquals(0, actions.size()); - Assertions.assertNotNull(cluster); - for (Broker broker : cluster.brokers()) { - Assertions.assertTrue(goal.isBrokerAcceptable(broker)); - } + TopicPartitionReplica replica3 = createTopicPartition(cluster, 2, TOPIC_0, 3); + TopicPartitionReplica replica4 = createTopicPartition(cluster, 2, TOPIC_0, 4); + TopicPartitionReplica replica5 = createTopicPartition(cluster, 2, TOPIC_0, 5); + replica3.setLoad(resource, 40 * 1024 * 1024); + replica4.setLoad(resource, 40 * 1024 * 1024); + replica5.setLoad(resource, 40 * 1024 * 1024); + + Goal goal = getGoalByResource(resource); + goal.initialize(Set.of(broker0, broker1, broker2)); + + Assertions.assertEquals(0.319, goal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 0), 0, 1), cluster), 0.001); + Assertions.assertEquals(0.319, goal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 0), 0, 2), cluster), 0.001); + + Assertions.assertEquals(0.5, goal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 2), 1, 0), cluster), 0.001); + Assertions.assertEquals(0.319, goal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 2), 1, 2), cluster), 0.001); + + Assertions.assertEquals(0.608, goal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 4), 2, 0), cluster), 0.001); + Assertions.assertEquals(0.5, goal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 4), 2, 1), cluster), 0.001); + + List actions = goal.optimize(cluster, List.of(goal), Collections.emptyList()); + Assertions.assertFalse(actions.isEmpty()); + Assertions.assertEquals(1, actions.size()); + Assertions.assertEquals(2, actions.get(0).getSrcBrokerId()); + Assertions.assertEquals(0, actions.get(0).getDestBrokerId()); + Assertions.assertEquals(TOPIC_0, actions.get(0).getSrcTopicPartition().topic()); + Assertions.assertTrue(Set.of(3, 4, 5).contains(actions.get(0).getSrcTopicPartition().partition())); + Assertions.assertNull(actions.get(0).getDestTopicPartition()); + } + + @Test + public void testSingleResourceDistributionOptimizeOneMove() { + testSingleResourceDistributionOptimizeOneMove(NW_IN); + testSingleResourceDistributionOptimizeOneMove(NW_OUT); } private void testSingleResourceDistributionOptimizeMultiMoveOut(byte resource) { - AbstractGoal goal = getGoalByResource(resource); + AbstractGoal goal = (AbstractGoal) getGoalByResource(resource); Assertions.assertNotNull(goal); ClusterModelSnapshot cluster = new ClusterModelSnapshot(); - Broker broker0 = createBroker(cluster, RACK, 0, true); - Broker broker1 = createBroker(cluster, RACK, 1, true); + Broker broker0 = createBroker(cluster, RACK, 0); + Broker broker1 = createBroker(cluster, RACK, 1); double load0 = 80; double load1 = 10; @@ -252,15 +238,15 @@ private void testSingleResourceDistributionOptimizeMultiMoveOut(byte resource) { replica4.setLoad(resource, 5); replica5.setLoad(resource, 10); replica6.setLoad(resource, 15); - Assertions.assertEquals(load0, cluster.replicasFor(0).stream().mapToDouble(e -> e.load(resource)).sum()); + Assertions.assertEquals(load0, cluster.replicasFor(0).stream().mapToDouble(e -> e.loadValue(resource)).sum()); TopicPartitionReplica replica7 = createTopicPartition(cluster, 1, TOPIC_1, 0); TopicPartitionReplica replica8 = createTopicPartition(cluster, 1, TOPIC_1, 1); replica7.setLoad(resource, 5); replica8.setLoad(resource, 5); - Assertions.assertEquals(load1, cluster.replicasFor(1).stream().mapToDouble(e -> e.load(resource)).sum()); + Assertions.assertEquals(load1, cluster.replicasFor(1).stream().mapToDouble(e -> e.loadValue(resource)).sum()); - List actions = goal.optimize(cluster, goalMap.values(), Collections.emptyList()); + List actions = goal.optimize(cluster, getGoals(), Collections.emptyList()); Assertions.assertNotEquals(0, actions.size()); Assertions.assertNotNull(cluster); for (Broker broker : cluster.brokers()) { @@ -269,12 +255,12 @@ private void testSingleResourceDistributionOptimizeMultiMoveOut(byte resource) { } private void testSingleResourceDistributionOptimizeMultiMoveIn(byte resource) { - AbstractGoal goal = getGoalByResource(resource); + AbstractGoal goal = (AbstractGoal) getGoalByResource(resource); Assertions.assertNotNull(goal); ClusterModelSnapshot cluster = new ClusterModelSnapshot(); - Broker broker0 = createBroker(cluster, RACK, 0, true); - Broker broker1 = createBroker(cluster, RACK, 1, true); + Broker broker0 = createBroker(cluster, RACK, 0); + Broker broker1 = createBroker(cluster, RACK, 1); double load0 = 10; double load1 = 80; @@ -286,7 +272,7 @@ private void testSingleResourceDistributionOptimizeMultiMoveIn(byte resource) { TopicPartitionReplica replica2 = createTopicPartition(cluster, 0, TOPIC_1, 1); replica1.setLoad(resource, 5); replica2.setLoad(resource, 5); - Assertions.assertEquals(load0, cluster.replicasFor(0).stream().mapToDouble(e -> e.load(resource)).sum()); + Assertions.assertEquals(load0, cluster.replicasFor(0).stream().mapToDouble(e -> e.loadValue(resource)).sum()); TopicPartitionReplica replica3 = createTopicPartition(cluster, 1, TOPIC_0, 0); TopicPartitionReplica replica4 = createTopicPartition(cluster, 1, TOPIC_0, 1); @@ -302,9 +288,9 @@ private void testSingleResourceDistributionOptimizeMultiMoveIn(byte resource) { replica7.setLoad(resource, 5); replica8.setLoad(resource, 10); replica9.setLoad(resource, 15); - Assertions.assertEquals(load1, cluster.replicasFor(1).stream().mapToDouble(e -> e.load(resource)).sum()); + Assertions.assertEquals(load1, cluster.replicasFor(1).stream().mapToDouble(e -> e.loadValue(resource)).sum()); - List actions = goal.optimize(cluster, goalMap.values(), Collections.emptyList()); + List actions = goal.optimize(cluster, getGoals(), Collections.emptyList()); Assertions.assertNotEquals(0, actions.size()); Assertions.assertNotNull(cluster); for (Broker broker : cluster.brokers()) { @@ -313,26 +299,118 @@ private void testSingleResourceDistributionOptimizeMultiMoveIn(byte resource) { } @Test - public void testSingleResourceDistributionOptimizeOneMove() { - testSingleResourceDistributionOptimizeOneMove(NW_IN); - testSingleResourceDistributionOptimizeOneMove(NW_OUT); - } - - @Test - public void testSingleResourceDistributionOptimizeMultiMoveOut() { + public void testSingleResourceDistributionOptimizeMultiMove() { testSingleResourceDistributionOptimizeMultiMoveOut(NW_IN); testSingleResourceDistributionOptimizeMultiMoveOut(NW_OUT); testSingleResourceDistributionOptimizeMultiMoveIn(NW_IN); testSingleResourceDistributionOptimizeMultiMoveIn(NW_OUT); } + private void testSkipUntrustedBroker(byte resource) { + ClusterModelSnapshot cluster = new ClusterModelSnapshot(); + Broker broker0 = createBroker(cluster, RACK, 0); + Broker broker1 = createBroker(cluster, RACK, 1); + + broker0.setLoad(resource, 0); + broker1.setLoad(resource, 80 * 1024 * 1024); + + TopicPartitionReplica replica1 = createTopicPartition(cluster, 1, TOPIC_1, 0); + TopicPartitionReplica replica2 = createTopicPartition(cluster, 1, TOPIC_1, 1); + replica1.setLoad(resource, 35 * 1024 * 1024); + replica2.setLoad(resource, 45 * 1024 * 1024); + + Goal goal = getGoalByResource(resource); + goal.initialize(Set.of(broker0, broker1)); + + List actions = goal.optimize(cluster, List.of(goal), Collections.emptyList()); + Assertions.assertEquals(1, actions.size()); + Assertions.assertEquals(new Action(ActionType.MOVE, new TopicPartition(TOPIC_1, 1), 1, 0), actions.get(0)); + + broker0.setLoad(resource, 0, false); + actions = goal.optimize(cluster, List.of(goal), Collections.emptyList()); + Assertions.assertTrue(actions.isEmpty()); + + broker0.setLoad(resource, 0); + broker1.setLoad(resource, 80 * 1024 * 1024, false); + replica1.setLoad(resource, 40 * 1024 * 1024, false); + replica2.setLoad(resource, 40 * 1024 * 1024, false); + } + + @Test + public void testSkipUntrustedBroker() { + testSkipUntrustedBroker(NW_IN); + testSkipUntrustedBroker(NW_OUT); + } + + private ClusterModelSnapshot buildClusterModelSnapshot() { + ClusterModelSnapshot cluster = new ClusterModelSnapshot(); + Broker broker0 = createBroker(cluster, RACK, 0); + Broker broker1 = createBroker(cluster, RACK, 1); + + broker0.setLoad(NW_IN, 0); + broker0.setLoad(NW_OUT, 40 * 1024 * 1024); + broker1.setLoad(NW_IN, 80 * 1024 * 1024); + broker1.setLoad(NW_OUT, 40 * 1024 * 1024); + + TopicPartitionReplica replica0 = createTopicPartition(cluster, 0, TOPIC_1, 0); + replica0.setLoad(NW_IN, 0); + replica0.setLoad(NW_OUT, 40 * 1024 * 1024); + + TopicPartitionReplica replica1 = createTopicPartition(cluster, 1, TOPIC_1, 1); + TopicPartitionReplica replica2 = createTopicPartition(cluster, 1, TOPIC_1, 2); + replica1.setLoad(NW_IN, 45 * 1024 * 1024); + replica1.setLoad(NW_OUT, 20 * 1024 * 1024); + + replica2.setLoad(NW_IN, 35 * 1024 * 1024); + replica2.setLoad(NW_OUT, 20 * 1024 * 1024); + return cluster; + } + + @Test + public void testSkipUntrustedPartition() { + ClusterModelSnapshot cluster = buildClusterModelSnapshot(); + Broker broker0 = cluster.broker(0); + Broker broker1 = cluster.broker(1); + TopicPartitionReplica replica0 = cluster.replica(0, new TopicPartition(TOPIC_1, 0)); + TopicPartitionReplica replica1 = cluster.replica(1, new TopicPartition(TOPIC_1, 1)); + TopicPartitionReplica replica2 = cluster.replica(1, new TopicPartition(TOPIC_1, 2)); + + Goal nwInGoal = getGoalByResource(NW_IN); + Goal nwOutGoal = getGoalByResource(NW_OUT); + nwInGoal.initialize(Set.of(broker0, broker1)); + + List actions = nwInGoal.optimize(cluster, List.of(nwInGoal, nwOutGoal), Collections.emptyList()); + Assertions.assertEquals(1, actions.size()); + Assertions.assertEquals(new Action(ActionType.MOVE, new TopicPartition(TOPIC_1, 1), 1, 0), actions.get(0)); + + broker0.setLoad(NW_OUT, 40 * 1024 * 1024, false); + replica0.setLoad(NW_OUT, 40 * 1024 * 1024, false); + actions = nwInGoal.optimize(cluster, List.of(nwInGoal, nwOutGoal), Collections.emptyList()); + Assertions.assertTrue(actions.isEmpty()); + + // reset cluster + cluster = buildClusterModelSnapshot(); + replica1 = cluster.replica(1, new TopicPartition(TOPIC_1, 1)); + replica2 = cluster.replica(1, new TopicPartition(TOPIC_1, 2)); + replica1.setLoad(NW_OUT, 40 * 1024 * 1024); + replica2.setLoad(NW_OUT, 0, false); + actions = nwInGoal.optimize(cluster, List.of(nwInGoal, nwOutGoal), Collections.emptyList()); + Assertions.assertTrue(actions.isEmpty()); + + replica1.setLoad(NW_OUT, 40 * 1024 * 1024); + replica2.setLoad(NW_OUT, 0); + actions = nwInGoal.optimize(cluster, List.of(nwInGoal, nwOutGoal), Collections.emptyList()); + Assertions.assertEquals(1, actions.size()); + Assertions.assertEquals(new Action(ActionType.MOVE, new TopicPartition(TOPIC_1, 2), 1, 0), actions.get(0)); + } + private void testMultiGoalOptimizeWithOneToOneReplicaSwap(byte resource) { - AbstractGoal goal = getGoalByResource(resource); + AbstractGoal goal = (AbstractGoal) getGoalByResource(resource); Assertions.assertNotNull(goal); ClusterModelSnapshot cluster = new ClusterModelSnapshot(); - Broker broker0 = createBroker(cluster, RACK, 0, true); - Broker broker1 = createBroker(cluster, RACK, 1, true); + Broker broker0 = createBroker(cluster, RACK, 0); + Broker broker1 = createBroker(cluster, RACK, 1); broker0.setLoad(NW_IN, 90); broker0.setLoad(NW_OUT, 50); @@ -346,8 +424,8 @@ private void testMultiGoalOptimizeWithOneToOneReplicaSwap(byte resource) { replica0.setLoad(NW_OUT, 30); replica1.setLoad(NW_IN, 50); replica1.setLoad(NW_OUT, 20); - Assertions.assertEquals(90, cluster.replicasFor(0).stream().mapToDouble(e -> e.load(NW_IN)).sum()); - Assertions.assertEquals(50, cluster.replicasFor(0).stream().mapToDouble(e -> e.load(NW_OUT)).sum()); + Assertions.assertEquals(90, cluster.replicasFor(0).stream().mapToDouble(e -> e.loadValue(NW_IN)).sum()); + Assertions.assertEquals(50, cluster.replicasFor(0).stream().mapToDouble(e -> e.loadValue(NW_OUT)).sum()); TopicPartitionReplica replica2 = createTopicPartition(cluster, 1, TOPIC_0, 1); TopicPartitionReplica replica3 = createTopicPartition(cluster, 1, TOPIC_1, 1); @@ -355,10 +433,10 @@ private void testMultiGoalOptimizeWithOneToOneReplicaSwap(byte resource) { replica2.setLoad(NW_OUT, 50); replica3.setLoad(NW_IN, 15); replica3.setLoad(NW_OUT, 40); - Assertions.assertEquals(20, cluster.replicasFor(1).stream().mapToDouble(e -> e.load(NW_IN)).sum()); - Assertions.assertEquals(90, cluster.replicasFor(1).stream().mapToDouble(e -> e.load(NW_OUT)).sum()); + Assertions.assertEquals(20, cluster.replicasFor(1).stream().mapToDouble(e -> e.loadValue(NW_IN)).sum()); + Assertions.assertEquals(90, cluster.replicasFor(1).stream().mapToDouble(e -> e.loadValue(NW_OUT)).sum()); - List actions = goal.optimize(cluster, goalMap.values(), Collections.emptyList()); + List actions = goal.optimize(cluster, getGoals(), Collections.emptyList()); System.out.printf("Actions: %s%n", actions); Assertions.assertNotEquals(0, actions.size()); Assertions.assertNotNull(cluster); @@ -382,7 +460,7 @@ private void setupCluster(byte resource, ClusterModelSnapshot cluster, Broker br TopicPartitionReplica replica0 = createTopicPartition(cluster, 0, TOPIC_0, 0); replica0.setLoad(resource, 10); - Assertions.assertEquals(load0, cluster.replicasFor(0).stream().mapToDouble(e -> e.load(resource)).sum()); + Assertions.assertEquals(load0, cluster.replicasFor(0).stream().mapToDouble(e -> e.loadValue(resource)).sum()); TopicPartitionReplica replica1 = createTopicPartition(cluster, 1, TOPIC_0, 1); TopicPartitionReplica replica2 = createTopicPartition(cluster, 1, TOPIC_0, 2); @@ -390,30 +468,30 @@ private void setupCluster(byte resource, ClusterModelSnapshot cluster, Broker br replica1.setLoad(resource, 20); replica2.setLoad(resource, 40); replica3.setLoad(resource, 30); - Assertions.assertEquals(load1, cluster.replicasFor(1).stream().mapToDouble(e -> e.load(resource)).sum()); + Assertions.assertEquals(load1, cluster.replicasFor(1).stream().mapToDouble(e -> e.loadValue(resource)).sum()); } private void testNotIncreaseLoadForSlowBroker(byte resource) { - AbstractGoal goal = getGoalByResource(resource); + AbstractGoal goal = (AbstractGoal) getGoalByResource(resource); Assertions.assertNotNull(goal); // test with normal brokers ClusterModelSnapshot cluster = new ClusterModelSnapshot(); - Broker broker0 = createBroker(cluster, RACK, 0, true); - Broker broker1 = createBroker(cluster, RACK, 1, true); + Broker broker0 = createBroker(cluster, RACK, 0); + Broker broker1 = createBroker(cluster, RACK, 1); setupCluster(resource, cluster, broker0, broker1); - List actions = goal.optimize(cluster, goalMap.values(), Collections.emptyList()); + List actions = goal.optimize(cluster, getGoals(), Collections.emptyList()); Assertions.assertEquals(1, actions.size()); Assertions.assertEquals(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 2), 1, 0), actions.get(0)); cluster.brokers().forEach(b -> Assertions.assertTrue(goal.isBrokerAcceptable(b))); // test with broker0 marked as slow broker cluster = new ClusterModelSnapshot(); - broker0 = createBroker(cluster, RACK, 0, true); + broker0 = createBroker(cluster, RACK, 0); broker0.setSlowBroker(true); - broker1 = createBroker(cluster, RACK, 1, true); + broker1 = createBroker(cluster, RACK, 1); setupCluster(resource, cluster, broker0, broker1); - actions = goal.optimize(cluster, goalMap.values(), Collections.emptyList()); + actions = goal.optimize(cluster, getGoals(), Collections.emptyList()); Assertions.assertTrue(actions.isEmpty()); } diff --git a/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java b/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java index 03a282e7f1..18a8f3dd7c 100644 --- a/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java +++ b/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java @@ -23,12 +23,14 @@ import kafka.autobalancer.metricsreporter.metric.TopicPartitionMetrics; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.metadata.BrokerRegistrationFencingChange; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -78,6 +80,41 @@ public void testUnregisterBroker() { Assertions.assertNull(clusterModel.brokerUpdater(2)); } + @Test + public void testFencedBroker() { + RecordClusterModel clusterModel = new RecordClusterModel(); + RegisterBrokerRecord record1 = new RegisterBrokerRecord() + .setBrokerId(1); + RegisterBrokerRecord record2 = new RegisterBrokerRecord() + .setBrokerId(2); + clusterModel.onBrokerRegister(record1); + clusterModel.onBrokerRegister(record2); + + Assertions.assertEquals(1, ((BrokerUpdater.Broker) clusterModel.brokerUpdater(1).get()).getBrokerId()); + Assertions.assertEquals(2, ((BrokerUpdater.Broker) clusterModel.brokerUpdater(2).get()).getBrokerId()); + + BrokerRegistrationChangeRecord fencedRecord = new BrokerRegistrationChangeRecord() + .setBrokerId(2) + .setFenced(BrokerRegistrationFencingChange.FENCE.value()); + clusterModel.onBrokerRegistrationChanged(fencedRecord); + + Assertions.assertTrue(clusterModel.brokerUpdater(1).isActive()); + Assertions.assertFalse(clusterModel.brokerUpdater(2).isActive()); + + clusterModel.updateBrokerMetrics(1, Map.of( + RawMetricTypes.BROKER_APPEND_LATENCY_AVG_MS, 0.0, + RawMetricTypes.BROKER_MAX_PENDING_APPEND_LATENCY_MS, 0.0, + RawMetricTypes.BROKER_MAX_PENDING_FETCH_LATENCY_MS, 0.0), System.currentTimeMillis()); + clusterModel.updateBrokerMetrics(2, Map.of( + RawMetricTypes.BROKER_APPEND_LATENCY_AVG_MS, 0.0, + RawMetricTypes.BROKER_MAX_PENDING_APPEND_LATENCY_MS, 0.0, + RawMetricTypes.BROKER_MAX_PENDING_FETCH_LATENCY_MS, 0.0), System.currentTimeMillis()); + + ClusterModelSnapshot snapshot = clusterModel.snapshot(); + Assertions.assertNotNull(snapshot.broker(1)); + Assertions.assertNull(snapshot.broker(2)); + } + @Test public void testCreateTopic() { RecordClusterModel clusterModel = new RecordClusterModel(); @@ -286,8 +323,8 @@ public void testExcludeTopics() { Assertions.assertEquals(1, replicas.size()); Assertions.assertEquals(topicName, replicas.iterator().next().getTopicPartition().topic()); Assertions.assertEquals(partition, replicas.iterator().next().getTopicPartition().partition()); - Assertions.assertEquals(70, snapshot.broker(brokerId).load(Resource.NW_IN)); - Assertions.assertEquals(60, snapshot.broker(brokerId).load(Resource.NW_OUT)); + Assertions.assertEquals(70, snapshot.broker(brokerId).loadValue(Resource.NW_IN)); + Assertions.assertEquals(60, snapshot.broker(brokerId).loadValue(Resource.NW_OUT)); } @Test @@ -409,6 +446,79 @@ public void testSlowBroker() { clusterModel.unregisterBroker(0); } + @Test + public void testTrustedMetrics() { + RecordClusterModel clusterModel = new RecordClusterModel(); + String topicName = "testTopic"; + Uuid topicId = Uuid.randomUuid(); + int partition = 0; + int partition1 = 1; + int brokerId = 1; + + RegisterBrokerRecord registerBrokerRecord = new RegisterBrokerRecord() + .setBrokerId(brokerId); + clusterModel.onBrokerRegister(registerBrokerRecord); + TopicRecord topicRecord = new TopicRecord() + .setName(topicName) + .setTopicId(topicId); + clusterModel.onTopicCreate(topicRecord); + PartitionRecord partitionRecord = new PartitionRecord() + .setLeader(brokerId) + .setTopicId(topicId) + .setPartitionId(partition); + clusterModel.onPartitionCreate(partitionRecord); + PartitionRecord partitionRecord1 = new PartitionRecord() + .setLeader(brokerId) + .setTopicId(topicId) + .setPartitionId(partition1); + clusterModel.onPartitionCreate(partitionRecord1); + + long now = System.currentTimeMillis(); + + Assertions.assertTrue(clusterModel.updateBrokerMetrics(brokerId, Map.of( + RawMetricTypes.BROKER_APPEND_LATENCY_AVG_MS, 0.0, + RawMetricTypes.BROKER_MAX_PENDING_APPEND_LATENCY_MS, 0.0, + RawMetricTypes.BROKER_MAX_PENDING_FETCH_LATENCY_MS, 0.0), now)); + + TopicPartitionMetrics topicPartitionMetrics = new TopicPartitionMetrics(now, brokerId, "", topicName, partition); + topicPartitionMetrics.put(RawMetricTypes.PARTITION_BYTES_IN, 10); + topicPartitionMetrics.put(RawMetricTypes.PARTITION_BYTES_OUT, 15); + topicPartitionMetrics.put(RawMetricTypes.PARTITION_SIZE, 10); + Assertions.assertTrue(clusterModel.updateTopicPartitionMetrics(topicPartitionMetrics.brokerId(), + new TopicPartition(topicName, partition), topicPartitionMetrics.getMetricValueMap(), topicPartitionMetrics.time())); + + topicPartitionMetrics = new TopicPartitionMetrics(now, brokerId, "", topicName, partition1); + topicPartitionMetrics.put(RawMetricTypes.PARTITION_BYTES_IN, 20); + topicPartitionMetrics.put(RawMetricTypes.PARTITION_BYTES_OUT, 25); + topicPartitionMetrics.put(RawMetricTypes.PARTITION_SIZE, 10); + Assertions.assertTrue(clusterModel.updateTopicPartitionMetrics(topicPartitionMetrics.brokerId(), + new TopicPartition(topicName, partition1), topicPartitionMetrics.getMetricValueMap(), topicPartitionMetrics.time())); + + ClusterModelSnapshot snapshot = clusterModel.snapshot(); + AbstractInstanceUpdater.Load brokerLoadIn = snapshot.broker(brokerId).load(Resource.NW_IN); + Assertions.assertTrue(brokerLoadIn.isTrusted()); + Assertions.assertEquals(30, brokerLoadIn.getValue()); + + AbstractInstanceUpdater.Load brokerLoadOut = snapshot.broker(brokerId).load(Resource.NW_OUT); + Assertions.assertTrue(brokerLoadOut.isTrusted()); + Assertions.assertEquals(40, brokerLoadOut.getValue()); + + Assertions.assertTrue(snapshot.broker(brokerId).load(Resource.NW_OUT).isTrusted()); + AbstractInstanceUpdater.Load loadIn0 = snapshot.replica(brokerId, new TopicPartition(topicName, partition)).load(Resource.NW_IN); + Assertions.assertTrue(loadIn0.isTrusted()); + Assertions.assertEquals(10, loadIn0.getValue()); + AbstractInstanceUpdater.Load loadOut0 = snapshot.replica(brokerId, new TopicPartition(topicName, partition)).load(Resource.NW_OUT); + Assertions.assertTrue(loadOut0.isTrusted()); + Assertions.assertEquals(15, loadOut0.getValue()); + + AbstractInstanceUpdater.Load loadIn1 = snapshot.replica(brokerId, new TopicPartition(topicName, partition1)).load(Resource.NW_IN); + Assertions.assertTrue(loadIn1.isTrusted()); + Assertions.assertEquals(20, loadIn1.getValue()); + AbstractInstanceUpdater.Load loadOut1 = snapshot.replica(brokerId, new TopicPartition(topicName, partition1)).load(Resource.NW_OUT); + Assertions.assertTrue(loadOut1.isTrusted()); + Assertions.assertEquals(25, loadOut1.getValue()); + } + private BrokerMetrics createBrokerMetrics(int brokerId, double appendLatency, double pendingAppendLatency, double pendingFetchLatency) { long now = System.currentTimeMillis(); diff --git a/core/src/test/java/kafka/autobalancer/model/samples/SimpleTimeWindowSamplesTest.java b/core/src/test/java/kafka/autobalancer/model/samples/SimpleTimeWindowSamplesTest.java new file mode 100644 index 0000000000..d548a0c266 --- /dev/null +++ b/core/src/test/java/kafka/autobalancer/model/samples/SimpleTimeWindowSamplesTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.autobalancer.model.samples; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SimpleTimeWindowSamplesTest { + @Test + public void testWindow() { + AbstractTimeWindowSamples.Window window = new AbstractTimeWindowSamples.Window(5); + for (int i = 0; i < 1000; i++) { + if (i < 5) { + Assertions.assertTrue(window.append(i)); + } else { + Assertions.assertFalse(window.append(i)); + } + } + Assertions.assertEquals(4, window.latest()); + Assertions.assertEquals(10, window.sum()); + } + + @Test + public void testAppend() { + AbstractTimeWindowSamples timeWindowValueSequence = new SimpleTimeWindowSamples(5, 10, 10); + Assertions.assertFalse(timeWindowValueSequence.isTrusted()); + + for (int i = 0; i < 20; i++) { + timeWindowValueSequence.append(0); + } + Assertions.assertEquals(2, timeWindowValueSequence.size()); + Assertions.assertFalse(timeWindowValueSequence.isTrusted()); + + for (int i = 0; i < 10000; i++) { + timeWindowValueSequence.append(0); + } + Assertions.assertEquals(10, timeWindowValueSequence.size()); + Assertions.assertTrue(timeWindowValueSequence.isTrusted()); + for (int i = 0; i < 50; i++) { + timeWindowValueSequence.append(i); + } + Assertions.assertEquals(10, timeWindowValueSequence.size()); + Assertions.assertTrue(timeWindowValueSequence.isTrusted()); + Assertions.assertEquals(49, timeWindowValueSequence.getLatest()); + } +} diff --git a/core/src/test/java/kafka/autobalancer/model/MetricValueSequenceTest.java b/core/src/test/java/kafka/autobalancer/model/samples/SnapshotSamplesTest.java similarity index 89% rename from core/src/test/java/kafka/autobalancer/model/MetricValueSequenceTest.java rename to core/src/test/java/kafka/autobalancer/model/samples/SnapshotSamplesTest.java index 989216e205..72b72da218 100644 --- a/core/src/test/java/kafka/autobalancer/model/MetricValueSequenceTest.java +++ b/core/src/test/java/kafka/autobalancer/model/samples/SnapshotSamplesTest.java @@ -9,15 +9,16 @@ * by the Apache License, Version 2.0 */ -package kafka.autobalancer.model; +package kafka.autobalancer.model.samples; +import kafka.autobalancer.model.Snapshot; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class MetricValueSequenceTest { +public class SnapshotSamplesTest { @Test public void testAppendAndSnapshot() { - MetricValueSequence sequence = new MetricValueSequence(512); + SnapshotSamples sequence = new SnapshotSamples(512); for (int i = 0; i < 1000; i++) { sequence.append(i); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index b4f1889612..e1eb0835e0 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -102,6 +102,7 @@ public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, St this.networkOutboundLimiter = networkOutboundLimiter; S3StreamMetricsManager.registerPendingStreamAppendLatencySupplier(streamId, () -> getHeadLatency(this.pendingAppendTimestamps)); S3StreamMetricsManager.registerPendingStreamFetchLatencySupplier(streamId, () -> getHeadLatency(this.pendingFetchTimestamps)); + NetworkStats.getInstance().createStreamReadBytesStats(streamId); } private long getHeadLatency(Deque timestamps) { @@ -215,10 +216,10 @@ public CompletableFuture fetch(FetchContext context, final long finalSize = totalSize; if (context.readOptions().fastRead()) { networkOutboundLimiter.forceConsume(totalSize); - NetworkStats.getInstance().fastReadBytesStats(streamId).inc(finalSize); + NetworkStats.getInstance().fastReadBytesStats(streamId).ifPresent(counter -> counter.inc(finalSize)); } else { return networkOutboundLimiter.consume(ThrottleStrategy.CATCH_UP, totalSize).thenApply(nil -> { - NetworkStats.getInstance().slowReadBytesStats(streamId).inc(finalSize); + NetworkStats.getInstance().slowReadBytesStats(streamId).ifPresent(counter -> counter.inc(finalSize)); return rs; }); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkStats.java index 229e0ea65b..f4b77348c4 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkStats.java @@ -22,6 +22,7 @@ import org.apache.commons.lang3.tuple.Pair; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; public class NetworkStats { @@ -53,16 +54,18 @@ public CounterMetric networkUsageTotalStats(AsyncNetworkBandwidthLimiter.Type ty : networkOutboundUsageTotalStats.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundUsageMetric(strategy)); } - public Counter fastReadBytesStats(long streamId) { - return getStreamReadBytesStats(streamId).getLeft(); + public Optional fastReadBytesStats(long streamId) { + Pair pair = streamReadBytesStats.getOrDefault(streamId, null); + return pair == null ? Optional.empty() : Optional.of(pair.getLeft()); } - public Counter slowReadBytesStats(long streamId) { - return getStreamReadBytesStats(streamId).getRight(); + public Optional slowReadBytesStats(long streamId) { + Pair pair = streamReadBytesStats.getOrDefault(streamId, null); + return pair == null ? Optional.empty() : Optional.of(pair.getRight()); } - private Pair getStreamReadBytesStats(long streamId) { - return streamReadBytesStats.computeIfAbsent(streamId, k -> new ImmutablePair<>(new Counter(), new Counter())); + public void createStreamReadBytesStats(long streamId) { + streamReadBytesStats.putIfAbsent(streamId, new ImmutablePair<>(new Counter(), new Counter())); } public void removeStreamReadBytesStats(long streamId) {