Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,7 @@ project(':core') {
testImplementation(libs.jfreechart) {
exclude group: 'junit', module: 'junit'
}
testImplementation(libs.commonMath3)

generator project(':generator')
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.concurrent.TimeUnit;

public class AnomalyDetector {
public static final int UNLIMITED_ACTIONS_PER_DETECT = -1;
private final Logger logger;
private final List<Goal> goalsByPriority;
private final ClusterModel clusterModel;
Expand Down Expand Up @@ -103,7 +102,7 @@ public void resume() {
this.running = true;
}

private void detect() {
void detect() {
if (!this.running) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class AnomalyDetectorBuilder {
private LogContext logContext = null;
private ClusterModel clusterModel = null;
private ActionExecutorService executor = null;
private int maxActionsNumPerDetect = AnomalyDetector.UNLIMITED_ACTIONS_PER_DETECT;
private int maxActionsNumPerDetect = Integer.MAX_VALUE;
private long detectIntervalMs = 60000;
private long maxTolerateMetricsDelayMs = 30000;
private long coolDownIntervalPerActionMs = 100;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.autobalancer.detector;

import com.automq.stream.s3.metrics.TimerUtil;
import kafka.autobalancer.common.Action;
import kafka.autobalancer.common.RawMetricType;
import kafka.autobalancer.config.AutoBalancerControllerConfig;
import kafka.autobalancer.executor.ActionExecutorService;
import kafka.autobalancer.goals.Goal;
import kafka.autobalancer.goals.NetworkInDistributionGoal;
import kafka.autobalancer.goals.NetworkOutDistributionGoal;
import kafka.autobalancer.model.ClusterModel;
import org.apache.commons.math3.distribution.PoissonDistribution;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class AnomalyDetectorTest {

@Test
public void testSchedulingTimeCost() {
ClusterModel clusterModel = new ClusterModel();

int brokerNum = 20;
for (int i = 0; i < brokerNum; i++) {
clusterModel.registerBroker(i, "");
}
int topicNum = 5000;
int totalPartitionNum = 100000;
int partitionNumPerTopic = totalPartitionNum / topicNum;
Random r = new Random();
int[] partitionNums = generatePartitionDist(totalPartitionNum, brokerNum);
Assertions.assertEquals(totalPartitionNum, Arrays.stream(partitionNums).sum());
int currPartitionNum = 0;
int brokerIndex = 0;
for (int i = 0; i < topicNum; i++) {
Uuid topicId = Uuid.randomUuid();
String topicName = "topic-" + i;
clusterModel.createTopic(topicId, topicName);
for (int j = 0; j < partitionNumPerTopic; j++) {
clusterModel.createPartition(topicId, j, brokerIndex);
Map<RawMetricType, Double> metrics = generateRandomMetrics(r);
clusterModel.updateBrokerMetrics(brokerIndex, metrics, System.currentTimeMillis());
clusterModel.updateTopicPartitionMetrics(brokerIndex, new TopicPartition(topicName, j), metrics, System.currentTimeMillis());
currPartitionNum++;
if (currPartitionNum >= partitionNums[brokerIndex]) {
brokerIndex++;
currPartitionNum = 0;
}
}
}

Map<String, ?> configs = new AutoBalancerControllerConfig(Collections.emptyMap(), false).originals();
Goal goal0 = new NetworkInDistributionGoal();
Goal goal1 = new NetworkOutDistributionGoal();
goal0.configure(configs);
goal1.configure(configs);

AnomalyDetector detector = new AnomalyDetectorBuilder()
.clusterModel(clusterModel)
.addGoal(goal0)
.addGoal(goal1)
.executor(new ActionExecutorService() {
@Override
public void start() {

}

@Override
public void shutdown() {

}

@Override
public void execute(Action action) {

}

@Override
public void execute(List<Action> actions) {

}
})
.build();

Assertions.assertTimeout(Duration.ofMillis(1000), () -> {
TimerUtil timerUtil = new TimerUtil();
detector.resume();
detector.detect();
System.out.printf("Detect cost: %d ms%n", timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
});
}

private int[] generatePartitionDist(int totalPartitionNum, int brokerNum) {
int[] partitionNums = new int[brokerNum];
PoissonDistribution poissonDistribution = new PoissonDistribution(10);
int[] samples = poissonDistribution.sample(brokerNum);
int sum = Arrays.stream(samples).sum();
for (int i = 0; i < brokerNum; i++) {
partitionNums[i] = (int) (samples[i] * 1.0 / sum * totalPartitionNum);
}
int partitionSum = Arrays.stream(partitionNums).sum();
partitionNums[0] += totalPartitionNum - partitionSum;
return partitionNums;
}

private Map<RawMetricType, Double> generateRandomMetrics(Random r) {
Map<RawMetricType, Double> metrics = new HashMap<>();
metrics.put(RawMetricType.BROKER_CAPACITY_NW_IN, 20.0 * 1024 * 1024);
metrics.put(RawMetricType.BROKER_CAPACITY_NW_OUT, 20.0 * 1024 * 1024);
metrics.put(RawMetricType.ALL_TOPIC_BYTES_IN, 20.0 * 1024 * 1024);
metrics.put(RawMetricType.ALL_TOPIC_BYTES_OUT, 20.0 * 1024 * 1024);
metrics.put(RawMetricType.BROKER_CPU_UTIL, 0.0);
metrics.put(RawMetricType.TOPIC_PARTITION_BYTES_OUT, r.nextDouble(0, 1024 * 1024));
metrics.put(RawMetricType.TOPIC_PARTITION_BYTES_IN, r.nextDouble(0, 1024 * 1024));
metrics.put(RawMetricType.PARTITION_SIZE, 0.0);
return metrics;
}
}
2 changes: 2 additions & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ versions += [
zstd: "1.5.2-1",
commonLang: "3.12.0",
commonio: "2.15.1",
commonMath3: "3.6.1",
s3stream: "0.16.0-SNAPSHOT",
opentelemetry: "1.32.0",
opentelemetryAlpha: "1.32.0-alpha",
Expand Down Expand Up @@ -227,6 +228,7 @@ libs += [
httpclient: "org.apache.httpcomponents:httpclient:$versions.httpclient",
commonLang: "org.apache.commons:commons-lang3:$versions.commonLang",
commonio : "commons-io:commons-io:$versions.commonio",
commonMath3: "org.apache.commons:commons-math3:$versions.commonMath3",
nettyHttp2: "io.netty:netty-codec-http2:$versions.netty",
s3stream: "com.automq.elasticstream:s3stream:$versions.s3stream",
opentelemetryJava8: "io.opentelemetry.instrumentation:opentelemetry-runtime-telemetry-java8:$versions.opentelemetryAlpha",
Expand Down