Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions config/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ log4j.appender.s3StreamThreadPoolAppender.File=${kafka.logs.dir}/s3stream-thread
log4j.appender.s3StreamThreadPoolAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.s3StreamThreadPoolAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.autoBalancerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.autoBalancerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.autoBalancerAppender.File=${kafka.logs.dir}/auto-balancer.log
log4j.appender.autoBalancerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.autoBalancerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

# Change the line below to adjust ZK client logging
log4j.logger.org.apache.zookeeper=INFO

Expand Down Expand Up @@ -124,3 +130,6 @@ log4j.additivity.state.change.logger=false
log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false

log4j.logger.kafka.autobalancer=INFO, autoBalancerAppender
log4j.additivity.kafka.autobalancer=false

51 changes: 42 additions & 9 deletions core/src/main/java/kafka/autobalancer/AutoBalancerManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

package kafka.autobalancer;

import com.automq.stream.utils.LogContext;
import kafka.autobalancer.common.AutoBalancerConstants;
import kafka.autobalancer.detector.AnomalyDetector;
import kafka.autobalancer.config.AutoBalancerControllerConfig;
import kafka.autobalancer.detector.AnomalyDetectorBuilder;
import kafka.autobalancer.goals.Goal;
import kafka.autobalancer.listeners.BrokerStatusListener;
import kafka.autobalancer.listeners.ClusterStatusListenerRegistry;
import kafka.autobalancer.listeners.TopicPartitionStatusListener;
import kafka.autobalancer.model.ClusterModel;
import kafka.autobalancer.executor.ControllerActionExecutorService;
import kafka.autobalancer.model.RecordClusterModel;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
Expand All @@ -32,7 +38,6 @@
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumController;
import org.apache.kafka.queue.KafkaEventQueue;
Expand All @@ -45,7 +50,9 @@
import org.apache.kafka.snapshot.SnapshotReader;
import org.slf4j.Logger;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class AutoBalancerManager {
private final Logger logger;
Expand All @@ -56,21 +63,34 @@ public class AutoBalancerManager {

public AutoBalancerManager(Time time, KafkaConfig kafkaConfig, QuorumController quorumController, KafkaRaftClient<ApiMessageAndVersion> raftClient) {
LogContext logContext = new LogContext(String.format("[AutoBalancerManager id=%d] ", quorumController.nodeId()));
logger = logContext.logger(AutoBalancerManager.class);
logger = logContext.logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ);
AutoBalancerControllerConfig config = new AutoBalancerControllerConfig(kafkaConfig.props(), false);
ClusterModel clusterModel = new ClusterModel(config, new LogContext(String.format("[ClusterModel id=%d] ", quorumController.nodeId())));
RecordClusterModel clusterModel = new RecordClusterModel(new LogContext(String.format("[ClusterModel id=%d] ", quorumController.nodeId())));
this.loadRetriever = new LoadRetriever(config, quorumController, clusterModel,
new LogContext(String.format("[LoadRetriever id=%d] ", quorumController.nodeId())));
ExecutionManager executionManager = new ExecutionManager(config, quorumController,
ControllerActionExecutorService actionExecutorService = new ControllerActionExecutorService(config, quorumController,
new LogContext(String.format("[ExecutionManager id=%d] ", quorumController.nodeId())));
this.anomalyDetector = new AnomalyDetector(config, clusterModel, executionManager,
new LogContext(String.format("[AnomalyDetector id=%d] ", quorumController.nodeId())));
this.queue = new KafkaEventQueue(time, new LogContext(), "auto-balancer-");

this.anomalyDetector = new AnomalyDetectorBuilder()
.logContext(new LogContext(String.format("[AnomalyDetector id=%d] ", quorumController.nodeId())))
.maxActionsNumPerExecution(config.getInt(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS))
.detectIntervalMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS))
.maxTolerateMetricsDelayMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS))
.coolDownIntervalPerActionMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS))
.aggregateBrokerLoad(config.getBoolean(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_LOAD_AGGREGATION))
.clusterModel(clusterModel)
.executor(actionExecutorService)
.addGoals(config.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, Goal.class))
.excludedBrokers(parseExcludedBrokers(config))
.excludedTopics(config.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS))
.build();

this.queue = new KafkaEventQueue(time, new org.apache.kafka.common.utils.LogContext(), "auto-balancer-");
this.quorumController = quorumController;
ClusterStatusListenerRegistry registry = new ClusterStatusListenerRegistry();
registry.register((BrokerStatusListener) clusterModel);
registry.register((TopicPartitionStatusListener) clusterModel);
registry.register(executionManager);
registry.register(actionExecutorService);
registry.register(this.loadRetriever);
raftClient.register(new AutoBalancerListener(registry, this.loadRetriever, this.anomalyDetector));
}
Expand All @@ -88,6 +108,19 @@ public void shutdown() throws InterruptedException {
logger.info("Shutdown completed");
}

private Set<Integer> parseExcludedBrokers(AutoBalancerControllerConfig config) {
Set<Integer> excludedBrokers = new HashSet<>();
for (String brokerId : config.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS)) {
try {
excludedBrokers.add(Integer.parseInt(brokerId));
} catch (Exception e) {
logger.warn("Failed to parse broker id {} from config", brokerId);
}

}
return excludedBrokers;
}

class AutoBalancerListener implements RaftClient.Listener<ApiMessageAndVersion> {
private final ClusterStatusListenerRegistry registry;
private final LoadRetriever loadRetriever;
Expand Down
16 changes: 12 additions & 4 deletions core/src/main/java/kafka/autobalancer/LoadRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package kafka.autobalancer;

import com.automq.stream.utils.LogContext;
import kafka.autobalancer.common.AutoBalancerConstants;
import kafka.autobalancer.common.AutoBalancerThreadFactory;
import kafka.autobalancer.config.AutoBalancerConfig;
import kafka.autobalancer.config.AutoBalancerControllerConfig;
Expand All @@ -34,14 +36,14 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
Expand Down Expand Up @@ -97,7 +99,7 @@ public LoadRetriever(AutoBalancerControllerConfig config, Controller controller,
if (logContext == null) {
logContext = new LogContext("[LoadRetriever] ");
}
this.logger = logContext.logger(LoadRetriever.class);
this.logger = logContext.logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ);
this.controller = controller;
this.clusterModel = clusterModel;
this.bootstrapServerMap = new HashMap<>();
Expand Down Expand Up @@ -379,6 +381,8 @@ public void retrieve() {
updateClusterModel(record.value());
}
logger.debug("Finished consuming {} metrics from {}.", records.count(), metricReporterTopic);
} catch (InvalidTopicException e) {
checkAndCreateTopic();
} catch (Exception e) {
logger.error("Consumer poll error: {}", e.getMessage());
}
Expand Down Expand Up @@ -409,10 +413,14 @@ public void onLeaderChanged(boolean isLeader) {
private void updateClusterModel(AutoBalancerMetrics metrics) {
switch (metrics.metricClassId()) {
case BROKER_METRIC:
clusterModel.updateBroker((BrokerMetrics) metrics);
BrokerMetrics brokerMetrics = (BrokerMetrics) metrics;
clusterModel.updateBrokerMetrics(brokerMetrics.brokerId(), brokerMetrics.getMetricTypeValueMap(), brokerMetrics.time());
break;
case PARTITION_METRIC:
clusterModel.updateTopicPartition((TopicPartitionMetrics) metrics);
TopicPartitionMetrics partitionMetrics = (TopicPartitionMetrics) metrics;
clusterModel.updateTopicPartitionMetrics(partitionMetrics.brokerId(),
new TopicPartition(partitionMetrics.topic(), partitionMetrics.partition()),
partitionMetrics.getMetricTypeValueMap(), partitionMetrics.time());
break;
default:
logger.error("Not supported metrics version {}", metrics.metricClassId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.autobalancer.common;

public class AutoBalancerConstants {
public static final String AUTO_BALANCER_LOGGER_CLAZZ = "kafka.autobalancer";
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* Some portion of this file Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package kafka.autobalancer.metricsreporter.metric;
package kafka.autobalancer.common;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -29,31 +29,23 @@
import java.util.SortedMap;
import java.util.TreeMap;

import static kafka.autobalancer.metricsreporter.metric.RawMetricType.MetricScope.BROKER;
import static kafka.autobalancer.metricsreporter.metric.RawMetricType.MetricScope.PARTITION;

/**
* This class was modified based on Cruise Control: com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType.
*/
/*
* The metric type helps the metric sampler to distinguish what metric a value is representing. These metrics are
* called raw metrics because they are the most basic information reported by the Kafka brokers without any processing.
* Each metric type has an id for serde purpose.
*/
public enum RawMetricType {
BROKER_CAPACITY_NW_IN(BROKER, (byte) 0, (byte) 0),
BROKER_CAPACITY_NW_OUT(BROKER, (byte) 1, (byte) 0),
ALL_TOPIC_BYTES_IN(BROKER, (byte) 2, (byte) 0),
ALL_TOPIC_BYTES_OUT(BROKER, (byte) 3, (byte) 0),
TOPIC_PARTITION_BYTES_IN(PARTITION, (byte) 4, (byte) 0),
TOPIC_PARTITION_BYTES_OUT(PARTITION, (byte) 5, (byte) 0),
PARTITION_SIZE(PARTITION, (byte) 6, (byte) 0),
BROKER_CPU_UTIL(BROKER, (byte) 7, (byte) 0);
BROKER_CAPACITY_NW_IN(MetricScope.BROKER, (byte) 0, (byte) 0),
BROKER_CAPACITY_NW_OUT(MetricScope.BROKER, (byte) 1, (byte) 0),
ALL_TOPIC_BYTES_IN(MetricScope.BROKER, (byte) 2, (byte) 0),
ALL_TOPIC_BYTES_OUT(MetricScope.BROKER, (byte) 3, (byte) 0),
TOPIC_PARTITION_BYTES_IN(MetricScope.PARTITION, (byte) 4, (byte) 0),
TOPIC_PARTITION_BYTES_OUT(MetricScope.PARTITION, (byte) 5, (byte) 0),
PARTITION_SIZE(MetricScope.PARTITION, (byte) 6, (byte) 0),
BROKER_CPU_UTIL(MetricScope.BROKER, (byte) 7, (byte) 0);

private static final List<RawMetricType> CACHED_VALUES = List.of(RawMetricType.values());
private static final SortedMap<Byte, Set<RawMetricType>> BROKER_METRIC_TYPES_DIFF_BY_VERSION = buildBrokerMetricTypesDiffByVersion();
private static final List<RawMetricType> BROKER_METRIC_TYPES = Collections.unmodifiableList(buildMetricTypeList(BROKER));
private static final List<RawMetricType> PARTITION_METRIC_TYPES = Collections.unmodifiableList(buildMetricTypeList(PARTITION));
private static final List<RawMetricType> BROKER_METRIC_TYPES = Collections.unmodifiableList(buildMetricTypeList(MetricScope.BROKER));
private static final List<RawMetricType> PARTITION_METRIC_TYPES = Collections.unmodifiableList(buildMetricTypeList(MetricScope.PARTITION));
private final byte id;
private final MetricScope metricScope;
private final byte supportedVersionSince;
Expand Down Expand Up @@ -103,7 +95,7 @@ public static RawMetricType forId(byte id) {
private static SortedMap<Byte, Set<RawMetricType>> buildBrokerMetricTypesDiffByVersion() {
SortedMap<Byte, Set<RawMetricType>> buildBrokerMetricTypesDiffByVersion = new TreeMap<>();
for (RawMetricType type : RawMetricType.values()) {
if (type.metricScope() == BROKER) {
if (type.metricScope() == MetricScope.BROKER) {
buildBrokerMetricTypesDiffByVersion.computeIfAbsent(type.supportedVersionSince(), t -> new HashSet<>()).add(type);
}
}
Expand Down
5 changes: 0 additions & 5 deletions core/src/main/java/kafka/autobalancer/common/Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@

import java.util.List;

/**
* CPU: a host and broker-level resource.
* NW (in and out): a host-level resource.
* DISK: a broker-level resource.
*/
public enum Resource {
CPU("CPU", 0, 0.001),
NW_IN("NWIn", 1, 10),
Expand Down
Loading