From 3cc93114ad9207d363450bd532efb91559459ff1 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Tue, 16 Jan 2024 15:31:52 +0800 Subject: [PATCH] feat(core): add unit test for AnomalyDetector Signed-off-by: Shichao Nie --- build.gradle | 1 + .../detector/AnomalyDetector.java | 3 +- .../detector/AnomalyDetectorBuilder.java | 2 +- .../detector/AnomalyDetectorTest.java | 145 ++++++++++++++++++ gradle/dependencies.gradle | 2 + 5 files changed, 150 insertions(+), 3 deletions(-) create mode 100644 core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java diff --git a/build.gradle b/build.gradle index d1c16185a5..f8d5531588 100644 --- a/build.gradle +++ b/build.gradle @@ -1006,6 +1006,7 @@ project(':core') { testImplementation(libs.jfreechart) { exclude group: 'junit', module: 'junit' } + testImplementation(libs.commonMath3) generator project(':generator') } diff --git a/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java index ce9531ce33..616172f180 100644 --- a/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java +++ b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java @@ -37,7 +37,6 @@ import java.util.concurrent.TimeUnit; public class AnomalyDetector { - public static final int UNLIMITED_ACTIONS_PER_DETECT = -1; private final Logger logger; private final List goalsByPriority; private final ClusterModel clusterModel; @@ -103,7 +102,7 @@ public void resume() { this.running = true; } - private void detect() { + void detect() { if (!this.running) { return; } diff --git a/core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorBuilder.java b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorBuilder.java index 2ff6d254de..c4f5b399bf 100644 --- a/core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorBuilder.java +++ b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorBuilder.java @@ -35,7 +35,7 @@ public class AnomalyDetectorBuilder { private LogContext logContext = null; private ClusterModel clusterModel = null; private ActionExecutorService executor = null; - private int maxActionsNumPerDetect = AnomalyDetector.UNLIMITED_ACTIONS_PER_DETECT; + private int maxActionsNumPerDetect = Integer.MAX_VALUE; private long detectIntervalMs = 60000; private long maxTolerateMetricsDelayMs = 30000; private long coolDownIntervalPerActionMs = 100; diff --git a/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java b/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java new file mode 100644 index 0000000000..7a3f11e98a --- /dev/null +++ b/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.autobalancer.detector; + +import com.automq.stream.s3.metrics.TimerUtil; +import kafka.autobalancer.common.Action; +import kafka.autobalancer.common.RawMetricType; +import kafka.autobalancer.config.AutoBalancerControllerConfig; +import kafka.autobalancer.executor.ActionExecutorService; +import kafka.autobalancer.goals.Goal; +import kafka.autobalancer.goals.NetworkInDistributionGoal; +import kafka.autobalancer.goals.NetworkOutDistributionGoal; +import kafka.autobalancer.model.ClusterModel; +import org.apache.commons.math3.distribution.PoissonDistribution; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +public class AnomalyDetectorTest { + + @Test + public void testSchedulingTimeCost() { + ClusterModel clusterModel = new ClusterModel(); + + int brokerNum = 20; + for (int i = 0; i < brokerNum; i++) { + clusterModel.registerBroker(i, ""); + } + int topicNum = 5000; + int totalPartitionNum = 100000; + int partitionNumPerTopic = totalPartitionNum / topicNum; + Random r = new Random(); + int[] partitionNums = generatePartitionDist(totalPartitionNum, brokerNum); + Assertions.assertEquals(totalPartitionNum, Arrays.stream(partitionNums).sum()); + int currPartitionNum = 0; + int brokerIndex = 0; + for (int i = 0; i < topicNum; i++) { + Uuid topicId = Uuid.randomUuid(); + String topicName = "topic-" + i; + clusterModel.createTopic(topicId, topicName); + for (int j = 0; j < partitionNumPerTopic; j++) { + clusterModel.createPartition(topicId, j, brokerIndex); + Map metrics = generateRandomMetrics(r); + clusterModel.updateBrokerMetrics(brokerIndex, metrics, System.currentTimeMillis()); + clusterModel.updateTopicPartitionMetrics(brokerIndex, new TopicPartition(topicName, j), metrics, System.currentTimeMillis()); + currPartitionNum++; + if (currPartitionNum >= partitionNums[brokerIndex]) { + brokerIndex++; + currPartitionNum = 0; + } + } + } + + Map configs = new AutoBalancerControllerConfig(Collections.emptyMap(), false).originals(); + Goal goal0 = new NetworkInDistributionGoal(); + Goal goal1 = new NetworkOutDistributionGoal(); + goal0.configure(configs); + goal1.configure(configs); + + AnomalyDetector detector = new AnomalyDetectorBuilder() + .clusterModel(clusterModel) + .addGoal(goal0) + .addGoal(goal1) + .executor(new ActionExecutorService() { + @Override + public void start() { + + } + + @Override + public void shutdown() { + + } + + @Override + public void execute(Action action) { + + } + + @Override + public void execute(List actions) { + + } + }) + .build(); + + Assertions.assertTimeout(Duration.ofMillis(1000), () -> { + TimerUtil timerUtil = new TimerUtil(); + detector.resume(); + detector.detect(); + System.out.printf("Detect cost: %d ms%n", timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + }); + } + + private int[] generatePartitionDist(int totalPartitionNum, int brokerNum) { + int[] partitionNums = new int[brokerNum]; + PoissonDistribution poissonDistribution = new PoissonDistribution(10); + int[] samples = poissonDistribution.sample(brokerNum); + int sum = Arrays.stream(samples).sum(); + for (int i = 0; i < brokerNum; i++) { + partitionNums[i] = (int) (samples[i] * 1.0 / sum * totalPartitionNum); + } + int partitionSum = Arrays.stream(partitionNums).sum(); + partitionNums[0] += totalPartitionNum - partitionSum; + return partitionNums; + } + + private Map generateRandomMetrics(Random r) { + Map metrics = new HashMap<>(); + metrics.put(RawMetricType.BROKER_CAPACITY_NW_IN, 20.0 * 1024 * 1024); + metrics.put(RawMetricType.BROKER_CAPACITY_NW_OUT, 20.0 * 1024 * 1024); + metrics.put(RawMetricType.ALL_TOPIC_BYTES_IN, 20.0 * 1024 * 1024); + metrics.put(RawMetricType.ALL_TOPIC_BYTES_OUT, 20.0 * 1024 * 1024); + metrics.put(RawMetricType.BROKER_CPU_UTIL, 0.0); + metrics.put(RawMetricType.TOPIC_PARTITION_BYTES_OUT, r.nextDouble(0, 1024 * 1024)); + metrics.put(RawMetricType.TOPIC_PARTITION_BYTES_IN, r.nextDouble(0, 1024 * 1024)); + metrics.put(RawMetricType.PARTITION_SIZE, 0.0); + return metrics; + } +} diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index b920908e80..8bd9358f2f 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -129,6 +129,7 @@ versions += [ zstd: "1.5.2-1", commonLang: "3.12.0", commonio: "2.15.1", + commonMath3: "3.6.1", s3stream: "0.16.0-SNAPSHOT", opentelemetry: "1.32.0", opentelemetryAlpha: "1.32.0-alpha", @@ -227,6 +228,7 @@ libs += [ httpclient: "org.apache.httpcomponents:httpclient:$versions.httpclient", commonLang: "org.apache.commons:commons-lang3:$versions.commonLang", commonio : "commons-io:commons-io:$versions.commonio", + commonMath3: "org.apache.commons:commons-math3:$versions.commonMath3", nettyHttp2: "io.netty:netty-codec-http2:$versions.netty", s3stream: "com.automq.elasticstream:s3stream:$versions.s3stream", opentelemetryJava8: "io.opentelemetry.instrumentation:opentelemetry-runtime-telemetry-java8:$versions.opentelemetryAlpha",