Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions config/kraft/broker.properties
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,3 @@ s3.wal.path=/tmp/kraft-broker-logs/s3wal
############################# Settings for Auto Balancer #############################
# The metric reporter to collect and report metrics for Auto Balancer
metric.reporters=kafka.autobalancer.metricsreporter.AutoBalancerMetricsReporter

# The network inbound bandwidth in bytes/s, default 100MB/s. Used in NetworkInCapacityGoal and calculation of inbound bandwidth utilization
# autobalancer.reporter.network.in.capacity=104857600

# The network outbound bandwidth in bytes/s, default 100MB/s. Used in NetworkOutCapacityGoal and calculation of outbound bandwidth utilization
# autobalancer.reporter.network.out.capacity=104857600
6 changes: 0 additions & 6 deletions config/kraft/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,6 @@ s3.wal.path=/tmp/kraft-combined-logs/s3wal
# The metric reporter to collect and report metrics for Auto Balancer
metric.reporters=kafka.autobalancer.metricsreporter.AutoBalancerMetricsReporter

# The network inbound bandwidth in bytes/s, default 100MB/s. Used in NetworkInCapacityGoal and calculation of inbound bandwidth utilization
# autobalancer.reporter.network.in.capacity=104857600

# The network outbound bandwidth in bytes/s, default 100MB/s. Used in NetworkOutCapacityGoal and calculation of outbound bandwidth utilization
# autobalancer.reporter.network.out.capacity=104857600

############################# Settings of Controller for Auto Balancer #############################
# Whether to enabled Auto Balancer in controller, default false
# autobalancer.controller.enable=false
Expand Down
24 changes: 17 additions & 7 deletions core/src/main/java/kafka/autobalancer/AutoBalancerManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import java.util.List;
import java.util.Set;

public class AutoBalancerManager {
public class AutoBalancerManager implements AutoBalancerService {
private final Logger logger;
private final LoadRetriever loadRetriever;
private final AnomalyDetector anomalyDetector;
Expand All @@ -65,7 +65,7 @@ public AutoBalancerManager(Time time, KafkaConfig kafkaConfig, QuorumController
LogContext logContext = new LogContext(String.format("[AutoBalancerManager id=%d] ", quorumController.nodeId()));
logger = logContext.logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ);
AutoBalancerControllerConfig config = new AutoBalancerControllerConfig(kafkaConfig.props(), false);
RecordClusterModel clusterModel = new RecordClusterModel(new LogContext(String.format("[ClusterModel id=%d] ", quorumController.nodeId())));
RecordClusterModel clusterModel = buildClusterModel(quorumController.nodeId());
this.loadRetriever = new LoadRetriever(config, quorumController, clusterModel,
new LogContext(String.format("[LoadRetriever id=%d] ", quorumController.nodeId())));
ControllerActionExecutorService actionExecutorService = new ControllerActionExecutorService(config, quorumController,
Expand All @@ -77,7 +77,6 @@ public AutoBalancerManager(Time time, KafkaConfig kafkaConfig, QuorumController
.detectIntervalMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS))
.maxTolerateMetricsDelayMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS))
.coolDownIntervalPerActionMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS))
.aggregateBrokerLoad(config.getBoolean(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_LOAD_AGGREGATION))
.clusterModel(clusterModel)
.executor(actionExecutorService)
.addGoals(config.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, Goal.class))
Expand All @@ -95,19 +94,30 @@ public AutoBalancerManager(Time time, KafkaConfig kafkaConfig, QuorumController
raftClient.register(new AutoBalancerListener(registry, this.loadRetriever, this.anomalyDetector));
}

@Override
public void start() {
loadRetriever.start();
anomalyDetector.start();
logger.info("Started");
}

public void shutdown() throws InterruptedException {
anomalyDetector.shutdown();
loadRetriever.shutdown();
queue.close();
@Override
public void shutdown() {
try {
anomalyDetector.shutdown();
loadRetriever.shutdown();
queue.close();
} catch (Exception e) {
logger.error("Exception in shutdown", e);
}

logger.info("Shutdown completed");
}

public RecordClusterModel buildClusterModel(int nodeId) {
return new RecordClusterModel(new LogContext(String.format("[ClusterModel id=%d] ", nodeId)));
}

private Set<Integer> parseExcludedBrokers(AutoBalancerControllerConfig config) {
Set<Integer> excludedBrokers = new HashSet<>();
for (String brokerId : config.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package kafka.autobalancer.goals;
package kafka.autobalancer;

public class GoalConstants {
public static final int NETWORK_DISTRIBUTION_GOAL_PRIORITY = 8;
public static final int NETWORK_CAPACITY_GOAL_PRIORITY = 10;
public interface AutoBalancerService {
void start();
void shutdown();
}
4 changes: 2 additions & 2 deletions core/src/main/java/kafka/autobalancer/LoadRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -414,13 +414,13 @@ private void updateClusterModel(AutoBalancerMetrics metrics) {
switch (metrics.metricClassId()) {
case BROKER_METRIC:
BrokerMetrics brokerMetrics = (BrokerMetrics) metrics;
clusterModel.updateBrokerMetrics(brokerMetrics.brokerId(), brokerMetrics.getMetricTypeValueMap(), brokerMetrics.time());
clusterModel.updateBrokerMetrics(brokerMetrics.brokerId(), brokerMetrics.getMetricValueMap(), brokerMetrics.time());
break;
case PARTITION_METRIC:
TopicPartitionMetrics partitionMetrics = (TopicPartitionMetrics) metrics;
clusterModel.updateTopicPartitionMetrics(partitionMetrics.brokerId(),
new TopicPartition(partitionMetrics.topic(), partitionMetrics.partition()),
partitionMetrics.getMetricTypeValueMap(), partitionMetrics.time());
partitionMetrics.getMetricValueMap(), partitionMetrics.time());
break;
default:
logger.error("Not supported metrics version {}", metrics.metricClassId());
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/kafka/autobalancer/common/Action.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package kafka.autobalancer.common;

import java.util.Objects;
import org.apache.kafka.common.TopicPartition;

public class Action {
Expand Down Expand Up @@ -78,4 +79,18 @@ public String prettyString() {
}
return toString();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Action action = (Action) o;
return srcBrokerId == action.srcBrokerId && destBrokerId == action.destBrokerId
&& Objects.equals(srcTp, action.srcTp) && Objects.equals(destTp, action.destTp) && type == action.type;
}

@Override
public int hashCode() {
return Objects.hash(srcTp, destTp, srcBrokerId, destBrokerId, type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package kafka.autobalancer.common;

import com.automq.stream.utils.LogContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class AutoBalancerThreadFactory implements ThreadFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(AutoBalancerThreadFactory.class);
private static final Logger LOGGER = new LogContext().logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ);
private final String name;
private final boolean daemon;
private final AtomicInteger id = new AtomicInteger(0);
Expand Down
16 changes: 0 additions & 16 deletions core/src/main/java/kafka/autobalancer/common/RawMetricType.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
Expand All @@ -41,9 +40,6 @@ public enum RawMetricType {
TOPIC_PARTITION_BYTES_OUT(MetricScope.PARTITION, (byte) 5, (byte) 0),
PARTITION_SIZE(MetricScope.PARTITION, (byte) 6, (byte) 0),
BROKER_CPU_UTIL(MetricScope.BROKER, (byte) 7, (byte) 0);

private static final List<RawMetricType> CACHED_VALUES = List.of(RawMetricType.values());
private static final SortedMap<Byte, Set<RawMetricType>> BROKER_METRIC_TYPES_DIFF_BY_VERSION = buildBrokerMetricTypesDiffByVersion();
private static final List<RawMetricType> BROKER_METRIC_TYPES = Collections.unmodifiableList(buildMetricTypeList(MetricScope.BROKER));
private static final List<RawMetricType> PARTITION_METRIC_TYPES = Collections.unmodifiableList(buildMetricTypeList(MetricScope.PARTITION));
private final byte id;
Expand All @@ -60,18 +56,6 @@ public enum RawMetricType {
this.supportedVersionSince = supportedVersionSince;
}

public static List<RawMetricType> allMetricTypes() {
return Collections.unmodifiableList(CACHED_VALUES);
}

public static Map<Byte, Set<RawMetricType>> brokerMetricTypesDiffByVersion() {
return BROKER_METRIC_TYPES_DIFF_BY_VERSION;
}

public static Set<RawMetricType> brokerMetricTypesDiffForVersion(byte version) {
return BROKER_METRIC_TYPES_DIFF_BY_VERSION.get(version);
}

public static List<RawMetricType> partitionMetricTypes() {
return PARTITION_METRIC_TYPES;
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/kafka/autobalancer/common/Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public enum Resource {
NW_OUT("NWOut", 2, 10),
UNKNOWN("UNKNOWN", 999, 0);

public static final Double IGNORED_CAPACITY_VALUE = -1.0;
public static final Double IGNORED_VALUE = -1.0;
// EPSILON_PERCENT defines the acceptable nuance when comparing the utilization of the resource.
// This nuance is generated due to precision loss when summing up float type utilization value.
// In stress test we find that for cluster of around 800,000 replicas, the summed up nuance can be
Expand Down Expand Up @@ -58,7 +58,7 @@ public static Resource of(int id) {

public String resourceString(double value) {
String valueStr = "";
if (value == IGNORED_CAPACITY_VALUE) {
if (value == IGNORED_VALUE) {
valueStr = "ignored";
} else {
switch (this) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.autobalancer.common.normalizer;

/**
* Linear normalizer that normalize the value to [0, 1]
*/
public class LinearNormalizer implements Normalizer {
public final double min;
public final double max;

public LinearNormalizer(double min, double max) {
this.min = min;
this.max = max;
}

@Override
public double normalize(double value) {
return (value - min) / (max - min);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.autobalancer.common.normalizer;

public interface Normalizer {

/**
* Normalize the value to [0, 1]
*
* @param value the value to normalize
* @return the normalized value
*/
double normalize(double value);
default double normalize(double value, boolean reverse) {
double normalizedValue = normalize(value);
return reverse ? (1 - normalizedValue) : normalizedValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.autobalancer.common.normalizer;

/**
* Step normalizer that normalize the value to [0, 1], when value is less than stepVar, it will be normalized with
* LinearNormalizer, otherwise it will be normalized with a logarithmic function which approaches 1 while the value
* approaches infinity.
*/
public class StepNormalizer implements Normalizer {
private final double stepValue;
private final double stepVar;
private final Normalizer linearNormalizer;

public StepNormalizer(double min, double stepVar, double stepValue) {
if (stepValue < 0 || stepValue > 1) {
throw new IllegalArgumentException("Step value must be in [0, 1]");
}
this.stepVar = stepVar;
this.stepValue = stepValue;
this.linearNormalizer = new LinearNormalizer(min, this.stepVar);
}

@Override
public double normalize(double value) {
if (value <= stepVar) {
return stepValue * linearNormalizer.normalize(value);
}
return stepValue + delta(value);
}

private double delta(double value) {
return (1 - this.stepValue) * (1 - 1 / (Math.log(value) / Math.log(stepVar)));
}
}
Loading