diff --git a/core/src/main/java/kafka/autobalancer/common/normalizer/LinearNormalizer.java b/core/src/main/java/kafka/autobalancer/common/normalizer/LinearNormalizer.java deleted file mode 100644 index 2da619dd2d..0000000000 --- a/core/src/main/java/kafka/autobalancer/common/normalizer/LinearNormalizer.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2024, AutoMQ CO.,LTD. - * - * Use of this software is governed by the Business Source License - * included in the file BSL.md - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 - */ - -package kafka.autobalancer.common.normalizer; - -/** - * Linear normalizer that normalize the value to [0, 1] - */ -public class LinearNormalizer implements Normalizer { - public final double min; - public final double max; - - public LinearNormalizer(double min, double max) { - this.min = min; - this.max = max; - } - - @Override - public double normalize(double value) { - return (value - min) / (max - min); - } -} diff --git a/core/src/main/java/kafka/autobalancer/common/normalizer/Normalizer.java b/core/src/main/java/kafka/autobalancer/common/normalizer/Normalizer.java deleted file mode 100644 index d285be3d3b..0000000000 --- a/core/src/main/java/kafka/autobalancer/common/normalizer/Normalizer.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2024, AutoMQ CO.,LTD. - * - * Use of this software is governed by the Business Source License - * included in the file BSL.md - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 - */ - -package kafka.autobalancer.common.normalizer; - -public interface Normalizer { - - /** - * Normalize the value to [0, 1] - * - * @param value the value to normalize - * @return the normalized value - */ - double normalize(double value); - default double normalize(double value, boolean reverse) { - double normalizedValue = normalize(value); - return reverse ? (1 - normalizedValue) : normalizedValue; - } -} diff --git a/core/src/main/java/kafka/autobalancer/common/normalizer/StepNormalizer.java b/core/src/main/java/kafka/autobalancer/common/normalizer/StepNormalizer.java deleted file mode 100644 index 2ac3178bd1..0000000000 --- a/core/src/main/java/kafka/autobalancer/common/normalizer/StepNormalizer.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2024, AutoMQ CO.,LTD. - * - * Use of this software is governed by the Business Source License - * included in the file BSL.md - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 - */ - -package kafka.autobalancer.common.normalizer; - -/** - * Step normalizer that normalize the value to [0, 1], when value is less than stepVar, it will be normalized with - * LinearNormalizer, otherwise it will be normalized with a logarithmic function which approaches 1 while the value - * approaches infinity. - */ -public class StepNormalizer implements Normalizer { - private final double stepValue; - private final double stepVar; - private final Normalizer linearNormalizer; - - public StepNormalizer(double min, double stepVar, double stepValue) { - if (stepValue < 0 || stepValue > 1) { - throw new IllegalArgumentException("Step value must be in [0, 1]"); - } - this.stepVar = stepVar; - this.stepValue = stepValue; - this.linearNormalizer = new LinearNormalizer(min, this.stepVar); - } - - @Override - public double normalize(double value) { - if (value <= stepVar) { - return stepValue * linearNormalizer.normalize(value); - } - return stepValue + delta(value); - } - - private double delta(double value) { - return (1 - this.stepValue) * (1 - 1 / (Math.log(value) / Math.log(stepVar))); - } -} diff --git a/core/src/main/java/kafka/autobalancer/goals/AbstractGoal.java b/core/src/main/java/kafka/autobalancer/goals/AbstractGoal.java index 5a70c73c42..a242e1d20d 100644 --- a/core/src/main/java/kafka/autobalancer/goals/AbstractGoal.java +++ b/core/src/main/java/kafka/autobalancer/goals/AbstractGoal.java @@ -17,7 +17,6 @@ import kafka.autobalancer.common.AutoBalancerConstants; import kafka.autobalancer.model.BrokerUpdater; import kafka.autobalancer.model.ClusterModelSnapshot; -import kafka.autobalancer.model.ModelUtils; import kafka.autobalancer.model.TopicPartitionReplicaUpdater; import org.slf4j.Logger; @@ -117,9 +116,9 @@ protected double calculateAcceptanceScore(BrokerUpdater.Broker srcBrokerBefore, } if (!isSrcBrokerAcceptedBefore && !isSrcBrokerAcceptedAfter) { - return score <= POSITIVE_ACTION_SCORE_THRESHOLD ? 0.0 : score; + return score < POSITIVE_ACTION_SCORE_THRESHOLD ? 0.0 : score; } else if (!isDestBrokerAcceptedBefore && !isDestBrokerAcceptedAfter) { - return score <= POSITIVE_ACTION_SCORE_THRESHOLD ? 0.0 : score; + return score < POSITIVE_ACTION_SCORE_THRESHOLD ? 0.0 : score; } return score; } @@ -162,24 +161,16 @@ public double actionAcceptanceScore(Action action, ClusterModelSnapshot cluster) BrokerUpdater.Broker destBrokerBefore = cluster.broker(action.getDestBrokerId()); BrokerUpdater.Broker srcBrokerAfter = srcBrokerBefore.copy(); BrokerUpdater.Broker destBrokerAfter = destBrokerBefore.copy(); - TopicPartitionReplicaUpdater.TopicPartitionReplica srcReplica = cluster.replica(action.getSrcBrokerId(), action.getSrcTopicPartition()); - switch (action.getType()) { - case MOVE: - ModelUtils.moveReplicaLoad(srcBrokerAfter, destBrokerAfter, srcReplica); - break; - case SWAP: - ModelUtils.moveReplicaLoad(srcBrokerAfter, destBrokerAfter, srcReplica); - ModelUtils.moveReplicaLoad(destBrokerAfter, srcBrokerAfter, - cluster.replica(action.getDestBrokerId(), action.getDestTopicPartition())); - break; - default: - return 0.0; + if (!moveReplica(action, cluster, srcBrokerAfter, destBrokerAfter)) { + return 0.0; } return calculateAcceptanceScore(srcBrokerBefore, destBrokerBefore, srcBrokerAfter, destBrokerAfter); } + protected abstract boolean moveReplica(Action action, ClusterModelSnapshot cluster, BrokerUpdater.Broker src, BrokerUpdater.Broker dest); + @Override public Set getEligibleBrokers(ClusterModelSnapshot cluster) { return cluster.brokers().stream().filter(BrokerUpdater.Broker::isActive).collect(Collectors.toSet()); diff --git a/core/src/main/java/kafka/autobalancer/goals/AbstractResourceGoal.java b/core/src/main/java/kafka/autobalancer/goals/AbstractResourceGoal.java index da52482cf0..fa5f52b765 100644 --- a/core/src/main/java/kafka/autobalancer/goals/AbstractResourceGoal.java +++ b/core/src/main/java/kafka/autobalancer/goals/AbstractResourceGoal.java @@ -18,6 +18,7 @@ import kafka.autobalancer.common.Resource; import kafka.autobalancer.model.BrokerUpdater; import kafka.autobalancer.model.ClusterModelSnapshot; +import kafka.autobalancer.model.ModelUtils; import kafka.autobalancer.model.TopicPartitionReplicaUpdater; import org.slf4j.Logger; @@ -55,6 +56,24 @@ protected Optional trySwapPartitionOut(ClusterModelSnapshot cluster, return getAcceptableAction(candidateActionScores); } + @Override + protected boolean moveReplica(Action action, ClusterModelSnapshot cluster, BrokerUpdater.Broker src, BrokerUpdater.Broker dest) { + TopicPartitionReplicaUpdater.TopicPartitionReplica srcReplica = cluster.replica(action.getSrcBrokerId(), action.getSrcTopicPartition()); + switch (action.getType()) { + case MOVE: + ModelUtils.moveReplicaLoad(src, dest, srcReplica); + break; + case SWAP: + TopicPartitionReplicaUpdater.TopicPartitionReplica destReplica = cluster.replica(action.getDestBrokerId(), action.getDestTopicPartition()); + ModelUtils.moveReplicaLoad(src, dest, srcReplica); + ModelUtils.moveReplicaLoad(dest, src, destReplica); + break; + default: + return false; + } + return true; + } + /** * Try to reduce resource load by move or swap replicas out. * diff --git a/core/src/test/java/kafka/autobalancer/common/normalizer/NormalizerTest.java b/core/src/test/java/kafka/autobalancer/common/normalizer/NormalizerTest.java deleted file mode 100644 index cc6d9ab431..0000000000 --- a/core/src/test/java/kafka/autobalancer/common/normalizer/NormalizerTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.autobalancer.common.normalizer; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class NormalizerTest { - private static final double EPSILON = 0.0001; - - @Test - public void testLinearNormalize() { - Normalizer normalizer = new LinearNormalizer(0, 100); - Assertions.assertEquals(-0.1, normalizer.normalize(-10), EPSILON); - Assertions.assertEquals(0.0, normalizer.normalize(0), EPSILON); - Assertions.assertEquals(0.4, normalizer.normalize(40), EPSILON); - Assertions.assertEquals(1.0, normalizer.normalize(100), EPSILON); - Assertions.assertEquals(1.2, normalizer.normalize(120), EPSILON); - - Assertions.assertEquals(1.1, normalizer.normalize(-10, true), EPSILON); - Assertions.assertEquals(1.0, normalizer.normalize(0, true), EPSILON); - Assertions.assertEquals(0.6, normalizer.normalize(40, true), EPSILON); - Assertions.assertEquals(0.0, normalizer.normalize(100, true), EPSILON); - Assertions.assertEquals(-0.2, normalizer.normalize(120, true), EPSILON); - } - - @Test - public void testStepNormalizer() { - Normalizer normalizer = new StepNormalizer(0, 100, 0.9); - Assertions.assertEquals(-0.09, normalizer.normalize(-10), EPSILON); - Assertions.assertEquals(0, normalizer.normalize(0), EPSILON); - Assertions.assertEquals(0.36, normalizer.normalize(40), EPSILON); - Assertions.assertEquals(0.9, normalizer.normalize(100), EPSILON); - double v1 = normalizer.normalize(120); - double v2 = normalizer.normalize(Double.MAX_VALUE); - Assertions.assertTrue(v1 > 0.9 && v1 < 1.0); - Assertions.assertTrue(v2 > 0.9 && v2 < 1.0); - Assertions.assertTrue(v1 < v2); - } -}