diff --git a/config/log4j.properties b/config/log4j.properties index 8f9eac8f21..0e00c52b93 100644 --- a/config/log4j.properties +++ b/config/log4j.properties @@ -75,6 +75,12 @@ log4j.appender.s3StreamThreadPoolAppender.File=${kafka.logs.dir}/s3stream-thread log4j.appender.s3StreamThreadPoolAppender.layout=org.apache.log4j.PatternLayout log4j.appender.s3StreamThreadPoolAppender.layout.ConversionPattern=[%d] %p %m (%c)%n +log4j.appender.autoBalancerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.autoBalancerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.autoBalancerAppender.File=${kafka.logs.dir}/auto-balancer.log +log4j.appender.autoBalancerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.autoBalancerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + # Change the line below to adjust ZK client logging log4j.logger.org.apache.zookeeper=INFO @@ -124,3 +130,6 @@ log4j.additivity.state.change.logger=false log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender log4j.additivity.kafka.authorizer.logger=false +log4j.logger.kafka.autobalancer=INFO, autoBalancerAppender +log4j.additivity.kafka.autobalancer=false + diff --git a/core/src/main/java/kafka/autobalancer/AutoBalancerManager.java b/core/src/main/java/kafka/autobalancer/AutoBalancerManager.java index b9f62759aa..5d878fdd3a 100644 --- a/core/src/main/java/kafka/autobalancer/AutoBalancerManager.java +++ b/core/src/main/java/kafka/autobalancer/AutoBalancerManager.java @@ -17,11 +17,17 @@ package kafka.autobalancer; +import com.automq.stream.utils.LogContext; +import kafka.autobalancer.common.AutoBalancerConstants; +import kafka.autobalancer.detector.AnomalyDetector; import kafka.autobalancer.config.AutoBalancerControllerConfig; +import kafka.autobalancer.detector.AnomalyDetectorBuilder; +import kafka.autobalancer.goals.Goal; import kafka.autobalancer.listeners.BrokerStatusListener; import kafka.autobalancer.listeners.ClusterStatusListenerRegistry; import kafka.autobalancer.listeners.TopicPartitionStatusListener; -import kafka.autobalancer.model.ClusterModel; +import kafka.autobalancer.executor.ControllerActionExecutorService; +import kafka.autobalancer.model.RecordClusterModel; import kafka.server.KafkaConfig; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; import org.apache.kafka.common.metadata.MetadataRecordType; @@ -32,7 +38,6 @@ import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.metadata.UnregisterBrokerRecord; import org.apache.kafka.common.protocol.ApiMessage; -import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.controller.QuorumController; import org.apache.kafka.queue.KafkaEventQueue; @@ -45,7 +50,9 @@ import org.apache.kafka.snapshot.SnapshotReader; import org.slf4j.Logger; +import java.util.HashSet; import java.util.List; +import java.util.Set; public class AutoBalancerManager { private final Logger logger; @@ -56,21 +63,34 @@ public class AutoBalancerManager { public AutoBalancerManager(Time time, KafkaConfig kafkaConfig, QuorumController quorumController, KafkaRaftClient raftClient) { LogContext logContext = new LogContext(String.format("[AutoBalancerManager id=%d] ", quorumController.nodeId())); - logger = logContext.logger(AutoBalancerManager.class); + logger = logContext.logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ); AutoBalancerControllerConfig config = new AutoBalancerControllerConfig(kafkaConfig.props(), false); - ClusterModel clusterModel = new ClusterModel(config, new LogContext(String.format("[ClusterModel id=%d] ", quorumController.nodeId()))); + RecordClusterModel clusterModel = new RecordClusterModel(new LogContext(String.format("[ClusterModel id=%d] ", quorumController.nodeId()))); this.loadRetriever = new LoadRetriever(config, quorumController, clusterModel, new LogContext(String.format("[LoadRetriever id=%d] ", quorumController.nodeId()))); - ExecutionManager executionManager = new ExecutionManager(config, quorumController, + ControllerActionExecutorService actionExecutorService = new ControllerActionExecutorService(config, quorumController, new LogContext(String.format("[ExecutionManager id=%d] ", quorumController.nodeId()))); - this.anomalyDetector = new AnomalyDetector(config, clusterModel, executionManager, - new LogContext(String.format("[AnomalyDetector id=%d] ", quorumController.nodeId()))); - this.queue = new KafkaEventQueue(time, new LogContext(), "auto-balancer-"); + + this.anomalyDetector = new AnomalyDetectorBuilder() + .logContext(new LogContext(String.format("[AnomalyDetector id=%d] ", quorumController.nodeId()))) + .maxActionsNumPerExecution(config.getInt(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS)) + .detectIntervalMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS)) + .maxTolerateMetricsDelayMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS)) + .coolDownIntervalPerActionMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS)) + .aggregateBrokerLoad(config.getBoolean(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_LOAD_AGGREGATION)) + .clusterModel(clusterModel) + .executor(actionExecutorService) + .addGoals(config.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, Goal.class)) + .excludedBrokers(parseExcludedBrokers(config)) + .excludedTopics(config.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS)) + .build(); + + this.queue = new KafkaEventQueue(time, new org.apache.kafka.common.utils.LogContext(), "auto-balancer-"); this.quorumController = quorumController; ClusterStatusListenerRegistry registry = new ClusterStatusListenerRegistry(); registry.register((BrokerStatusListener) clusterModel); registry.register((TopicPartitionStatusListener) clusterModel); - registry.register(executionManager); + registry.register(actionExecutorService); registry.register(this.loadRetriever); raftClient.register(new AutoBalancerListener(registry, this.loadRetriever, this.anomalyDetector)); } @@ -88,6 +108,19 @@ public void shutdown() throws InterruptedException { logger.info("Shutdown completed"); } + private Set parseExcludedBrokers(AutoBalancerControllerConfig config) { + Set excludedBrokers = new HashSet<>(); + for (String brokerId : config.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS)) { + try { + excludedBrokers.add(Integer.parseInt(brokerId)); + } catch (Exception e) { + logger.warn("Failed to parse broker id {} from config", brokerId); + } + + } + return excludedBrokers; + } + class AutoBalancerListener implements RaftClient.Listener { private final ClusterStatusListenerRegistry registry; private final LoadRetriever loadRetriever; diff --git a/core/src/main/java/kafka/autobalancer/LoadRetriever.java b/core/src/main/java/kafka/autobalancer/LoadRetriever.java index b62669d6ed..30ba401e62 100644 --- a/core/src/main/java/kafka/autobalancer/LoadRetriever.java +++ b/core/src/main/java/kafka/autobalancer/LoadRetriever.java @@ -17,6 +17,8 @@ package kafka.autobalancer; +import com.automq.stream.utils.LogContext; +import kafka.autobalancer.common.AutoBalancerConstants; import kafka.autobalancer.common.AutoBalancerThreadFactory; import kafka.autobalancer.config.AutoBalancerConfig; import kafka.autobalancer.config.AutoBalancerControllerConfig; @@ -34,6 +36,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsResponseData; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; @@ -41,7 +44,6 @@ import org.apache.kafka.common.metadata.UnregisterBrokerRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.Controller; import org.apache.kafka.controller.ControllerRequestContext; import org.apache.kafka.metadata.BrokerRegistrationFencingChange; @@ -97,7 +99,7 @@ public LoadRetriever(AutoBalancerControllerConfig config, Controller controller, if (logContext == null) { logContext = new LogContext("[LoadRetriever] "); } - this.logger = logContext.logger(LoadRetriever.class); + this.logger = logContext.logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ); this.controller = controller; this.clusterModel = clusterModel; this.bootstrapServerMap = new HashMap<>(); @@ -379,6 +381,8 @@ public void retrieve() { updateClusterModel(record.value()); } logger.debug("Finished consuming {} metrics from {}.", records.count(), metricReporterTopic); + } catch (InvalidTopicException e) { + checkAndCreateTopic(); } catch (Exception e) { logger.error("Consumer poll error: {}", e.getMessage()); } @@ -409,10 +413,14 @@ public void onLeaderChanged(boolean isLeader) { private void updateClusterModel(AutoBalancerMetrics metrics) { switch (metrics.metricClassId()) { case BROKER_METRIC: - clusterModel.updateBroker((BrokerMetrics) metrics); + BrokerMetrics brokerMetrics = (BrokerMetrics) metrics; + clusterModel.updateBrokerMetrics(brokerMetrics.brokerId(), brokerMetrics.getMetricTypeValueMap(), brokerMetrics.time()); break; case PARTITION_METRIC: - clusterModel.updateTopicPartition((TopicPartitionMetrics) metrics); + TopicPartitionMetrics partitionMetrics = (TopicPartitionMetrics) metrics; + clusterModel.updateTopicPartitionMetrics(partitionMetrics.brokerId(), + new TopicPartition(partitionMetrics.topic(), partitionMetrics.partition()), + partitionMetrics.getMetricTypeValueMap(), partitionMetrics.time()); break; default: logger.error("Not supported metrics version {}", metrics.metricClassId()); diff --git a/core/src/main/java/kafka/autobalancer/common/AutoBalancerConstants.java b/core/src/main/java/kafka/autobalancer/common/AutoBalancerConstants.java new file mode 100644 index 0000000000..75735a6c5d --- /dev/null +++ b/core/src/main/java/kafka/autobalancer/common/AutoBalancerConstants.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.autobalancer.common; + +public class AutoBalancerConstants { + public static final String AUTO_BALANCER_LOGGER_CLAZZ = "kafka.autobalancer"; +} diff --git a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/RawMetricType.java b/core/src/main/java/kafka/autobalancer/common/RawMetricType.java similarity index 78% rename from core/src/main/java/kafka/autobalancer/metricsreporter/metric/RawMetricType.java rename to core/src/main/java/kafka/autobalancer/common/RawMetricType.java index 48c0d39beb..e927357cb8 100644 --- a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/RawMetricType.java +++ b/core/src/main/java/kafka/autobalancer/common/RawMetricType.java @@ -18,7 +18,7 @@ * Some portion of this file Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. */ -package kafka.autobalancer.metricsreporter.metric; +package kafka.autobalancer.common; import java.util.ArrayList; import java.util.Collections; @@ -29,31 +29,23 @@ import java.util.SortedMap; import java.util.TreeMap; -import static kafka.autobalancer.metricsreporter.metric.RawMetricType.MetricScope.BROKER; -import static kafka.autobalancer.metricsreporter.metric.RawMetricType.MetricScope.PARTITION; - /** * This class was modified based on Cruise Control: com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType. */ -/* - * The metric type helps the metric sampler to distinguish what metric a value is representing. These metrics are - * called raw metrics because they are the most basic information reported by the Kafka brokers without any processing. - * Each metric type has an id for serde purpose. - */ public enum RawMetricType { - BROKER_CAPACITY_NW_IN(BROKER, (byte) 0, (byte) 0), - BROKER_CAPACITY_NW_OUT(BROKER, (byte) 1, (byte) 0), - ALL_TOPIC_BYTES_IN(BROKER, (byte) 2, (byte) 0), - ALL_TOPIC_BYTES_OUT(BROKER, (byte) 3, (byte) 0), - TOPIC_PARTITION_BYTES_IN(PARTITION, (byte) 4, (byte) 0), - TOPIC_PARTITION_BYTES_OUT(PARTITION, (byte) 5, (byte) 0), - PARTITION_SIZE(PARTITION, (byte) 6, (byte) 0), - BROKER_CPU_UTIL(BROKER, (byte) 7, (byte) 0); + BROKER_CAPACITY_NW_IN(MetricScope.BROKER, (byte) 0, (byte) 0), + BROKER_CAPACITY_NW_OUT(MetricScope.BROKER, (byte) 1, (byte) 0), + ALL_TOPIC_BYTES_IN(MetricScope.BROKER, (byte) 2, (byte) 0), + ALL_TOPIC_BYTES_OUT(MetricScope.BROKER, (byte) 3, (byte) 0), + TOPIC_PARTITION_BYTES_IN(MetricScope.PARTITION, (byte) 4, (byte) 0), + TOPIC_PARTITION_BYTES_OUT(MetricScope.PARTITION, (byte) 5, (byte) 0), + PARTITION_SIZE(MetricScope.PARTITION, (byte) 6, (byte) 0), + BROKER_CPU_UTIL(MetricScope.BROKER, (byte) 7, (byte) 0); private static final List CACHED_VALUES = List.of(RawMetricType.values()); private static final SortedMap> BROKER_METRIC_TYPES_DIFF_BY_VERSION = buildBrokerMetricTypesDiffByVersion(); - private static final List BROKER_METRIC_TYPES = Collections.unmodifiableList(buildMetricTypeList(BROKER)); - private static final List PARTITION_METRIC_TYPES = Collections.unmodifiableList(buildMetricTypeList(PARTITION)); + private static final List BROKER_METRIC_TYPES = Collections.unmodifiableList(buildMetricTypeList(MetricScope.BROKER)); + private static final List PARTITION_METRIC_TYPES = Collections.unmodifiableList(buildMetricTypeList(MetricScope.PARTITION)); private final byte id; private final MetricScope metricScope; private final byte supportedVersionSince; @@ -103,7 +95,7 @@ public static RawMetricType forId(byte id) { private static SortedMap> buildBrokerMetricTypesDiffByVersion() { SortedMap> buildBrokerMetricTypesDiffByVersion = new TreeMap<>(); for (RawMetricType type : RawMetricType.values()) { - if (type.metricScope() == BROKER) { + if (type.metricScope() == MetricScope.BROKER) { buildBrokerMetricTypesDiffByVersion.computeIfAbsent(type.supportedVersionSince(), t -> new HashSet<>()).add(type); } } diff --git a/core/src/main/java/kafka/autobalancer/common/Resource.java b/core/src/main/java/kafka/autobalancer/common/Resource.java index f025dd3f42..e5e9ea2636 100644 --- a/core/src/main/java/kafka/autobalancer/common/Resource.java +++ b/core/src/main/java/kafka/autobalancer/common/Resource.java @@ -22,11 +22,6 @@ import java.util.List; -/** - * CPU: a host and broker-level resource. - * NW (in and out): a host-level resource. - * DISK: a broker-level resource. - */ public enum Resource { CPU("CPU", 0, 0.001), NW_IN("NWIn", 1, 10), diff --git a/core/src/main/java/kafka/autobalancer/AnomalyDetector.java b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java similarity index 59% rename from core/src/main/java/kafka/autobalancer/AnomalyDetector.java rename to core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java index 1b52ff0047..ce9531ce33 100644 --- a/core/src/main/java/kafka/autobalancer/AnomalyDetector.java +++ b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java @@ -15,21 +15,21 @@ * limitations under the License. */ -package kafka.autobalancer; +package kafka.autobalancer.detector; +import com.automq.stream.utils.LogContext; import kafka.autobalancer.common.Action; +import kafka.autobalancer.common.AutoBalancerConstants; import kafka.autobalancer.common.AutoBalancerThreadFactory; -import kafka.autobalancer.config.AutoBalancerControllerConfig; -import kafka.autobalancer.goals.AbstractGoal; +import kafka.autobalancer.executor.ActionExecutorService; +import kafka.autobalancer.goals.Goal; import kafka.autobalancer.model.BrokerUpdater; import kafka.autobalancer.model.ClusterModel; import kafka.autobalancer.model.ClusterModelSnapshot; import kafka.autobalancer.model.TopicPartitionReplicaUpdater; -import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Executors; @@ -37,52 +37,46 @@ import java.util.concurrent.TimeUnit; public class AnomalyDetector { + public static final int UNLIMITED_ACTIONS_PER_DETECT = -1; private final Logger logger; - private final List goalsByPriority; - private final int maxActionsNumPerExecution; - private final long executionInterval; + private final List goalsByPriority; private final ClusterModel clusterModel; private final ScheduledExecutorService executorService; - private final ExecutionManager executionManager; + private final ActionExecutorService actionExecutor; + private final Set excludedBrokers; + private final Set excludedTopics; + private final int maxActionsNumPerExecution; private final long detectInterval; - private final Set excludedBrokers = new HashSet<>(); - private final Set excludedTopics = new HashSet<>(); + private final long maxTolerateMetricsDelayMs; + private final long coolDownIntervalPerActionMs; + private final boolean aggregateBrokerLoad; private volatile boolean running; - public AnomalyDetector(AutoBalancerControllerConfig config, ClusterModel clusterModel, ExecutionManager executionManager, LogContext logContext) { - if (logContext == null) { - logContext = new LogContext("[AnomalyDetector] "); - } - this.logger = logContext.logger(AnomalyDetector.class); - this.goalsByPriority = config.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, AbstractGoal.class); - this.maxActionsNumPerExecution = config.getInt(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS); - Collections.sort(this.goalsByPriority); - logger.info("Goals: {}", this.goalsByPriority); - this.detectInterval = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS); - this.executionInterval = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS); - fetchExcludedConfig(config); + AnomalyDetector(LogContext logContext, int maxActionsNumPerDetect, long detectIntervalMs, long maxTolerateMetricsDelayMs, + long coolDownIntervalPerActionMs, boolean aggregateBrokerLoad, ClusterModel clusterModel, + ActionExecutorService actionExecutor, List goals, + Set excludedBrokers, Set excludedTopics) { + this.logger = logContext.logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ); + + this.maxActionsNumPerExecution = maxActionsNumPerDetect; + this.detectInterval = detectIntervalMs; + this.maxTolerateMetricsDelayMs = maxTolerateMetricsDelayMs; + this.coolDownIntervalPerActionMs = coolDownIntervalPerActionMs; + this.aggregateBrokerLoad = aggregateBrokerLoad; this.clusterModel = clusterModel; + this.actionExecutor = actionExecutor; this.executorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("anomaly-detector")); - this.executionManager = executionManager; + this.goalsByPriority = goals; + Collections.sort(this.goalsByPriority); + this.excludedBrokers = excludedBrokers; + this.excludedTopics = excludedTopics; this.running = false; - } - - private void fetchExcludedConfig(AutoBalancerControllerConfig config) { - List brokerIds = config.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS); - for (String brokerIdStr : brokerIds) { - try { - excludedBrokers.add(Integer.parseInt(brokerIdStr)); - } catch (NumberFormatException ignored) { - - } - } - List topics = config.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS); - excludedTopics.addAll(topics); - logger.info("Excluded brokers: {}, excluded topics: {}", excludedBrokers, excludedTopics); + logger.info("maxActionsNumPerDetect: {}, detectInterval: {}ms, coolDownIntervalPerAction: {}ms, goals: {}, excluded brokers: {}, excluded topics: {}", + this.maxActionsNumPerExecution, this.detectInterval, coolDownIntervalPerActionMs, this.goalsByPriority, this.excludedBrokers, this.excludedTopics); } public void start() { - this.executionManager.start(); + this.actionExecutor.start(); this.executorService.schedule(this::detect, detectInterval, TimeUnit.MILLISECONDS); logger.info("Started"); } @@ -90,7 +84,14 @@ public void start() { public void shutdown() throws InterruptedException { this.running = false; this.executorService.shutdown(); - this.executionManager.shutdown(); + try { + if (!this.executorService.awaitTermination(10, TimeUnit.SECONDS)) { + this.executorService.shutdownNow(); + } + } catch (InterruptedException ignored) { + } + + this.actionExecutor.shutdown(); logger.info("Shutdown completed"); } @@ -108,7 +109,8 @@ private void detect() { } logger.info("Start detect"); // The delay in processing kraft log could result in outdated cluster snapshot - ClusterModelSnapshot snapshot = this.clusterModel.snapshot(excludedBrokers, excludedTopics); + ClusterModelSnapshot snapshot = this.clusterModel.snapshot(excludedBrokers, excludedTopics, + this.maxTolerateMetricsDelayMs, this.aggregateBrokerLoad); for (BrokerUpdater.Broker broker : snapshot.brokers()) { logger.info("Broker status: {}", broker); @@ -118,13 +120,13 @@ private void detect() { } int availableActionNum = maxActionsNumPerExecution; - for (AbstractGoal goal : goalsByPriority) { + for (Goal goal : goalsByPriority) { if (!this.running) { break; } List actions = goal.optimize(snapshot, goalsByPriority); int size = Math.min(availableActionNum, actions.size()); - this.executionManager.appendActions(actions.subList(0, size)); + this.actionExecutor.execute(actions.subList(0, size)); availableActionNum -= size; if (availableActionNum <= 0) { logger.info("No more action can be executed in this round"); @@ -132,7 +134,7 @@ private void detect() { } } - long nextDelay = (maxActionsNumPerExecution - availableActionNum) * this.executionInterval + this.detectInterval; + long nextDelay = (maxActionsNumPerExecution - availableActionNum) * this.coolDownIntervalPerActionMs + this.detectInterval; this.executorService.schedule(this::detect, nextDelay, TimeUnit.MILLISECONDS); logger.info("Detect finished, next detect will be after {} ms", nextDelay); } diff --git a/core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorBuilder.java b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorBuilder.java new file mode 100644 index 0000000000..2ff6d254de --- /dev/null +++ b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorBuilder.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.autobalancer.detector; + +import com.automq.stream.utils.LogContext; +import kafka.autobalancer.executor.ActionExecutorService; +import kafka.autobalancer.goals.Goal; +import kafka.autobalancer.model.ClusterModel; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class AnomalyDetectorBuilder { + private final List goalsByPriority = new ArrayList<>(); + private final Set excludedBrokers = new HashSet<>(); + private final Set excludedTopics = new HashSet<>(); + private LogContext logContext = null; + private ClusterModel clusterModel = null; + private ActionExecutorService executor = null; + private int maxActionsNumPerDetect = AnomalyDetector.UNLIMITED_ACTIONS_PER_DETECT; + private long detectIntervalMs = 60000; + private long maxTolerateMetricsDelayMs = 30000; + private long coolDownIntervalPerActionMs = 100; + private boolean aggregateBrokerLoad = true; + + public AnomalyDetectorBuilder() { + + } + + public AnomalyDetectorBuilder logContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + public AnomalyDetectorBuilder addGoal(Goal goal) { + this.goalsByPriority.add(goal); + return this; + } + + public AnomalyDetectorBuilder addGoals(List goals) { + this.goalsByPriority.addAll(goals); + return this; + } + + public AnomalyDetectorBuilder excludedBroker(Integer excludedBroker) { + this.excludedBrokers.add(excludedBroker); + return this; + } + + public AnomalyDetectorBuilder excludedBrokers(Collection excludedBrokers) { + this.excludedBrokers.addAll(excludedBrokers); + return this; + } + + public AnomalyDetectorBuilder excludedTopic(String excludedTopic) { + this.excludedTopics.add(excludedTopic); + return this; + } + + public AnomalyDetectorBuilder excludedTopics(Collection excludedTopics) { + this.excludedTopics.addAll(excludedTopics); + return this; + } + + public AnomalyDetectorBuilder clusterModel(ClusterModel clusterModel) { + this.clusterModel = clusterModel; + return this; + } + + public AnomalyDetectorBuilder executor(ActionExecutorService executor) { + this.executor = executor; + return this; + } + + public AnomalyDetectorBuilder maxActionsNumPerExecution(int maxActionsNumPerExecution) { + this.maxActionsNumPerDetect = maxActionsNumPerExecution; + return this; + } + + public AnomalyDetectorBuilder detectIntervalMs(long detectIntervalMs) { + this.detectIntervalMs = detectIntervalMs; + return this; + } + + public AnomalyDetectorBuilder maxTolerateMetricsDelayMs(long maxTolerateMetricsDelayMs) { + this.maxTolerateMetricsDelayMs = maxTolerateMetricsDelayMs; + return this; + } + + public AnomalyDetectorBuilder aggregateBrokerLoad(boolean aggregateBrokerLoad) { + this.aggregateBrokerLoad = aggregateBrokerLoad; + return this; + } + + public AnomalyDetectorBuilder coolDownIntervalPerActionMs(long coolDownIntervalPerActionMs) { + this.coolDownIntervalPerActionMs = coolDownIntervalPerActionMs; + return this; + } + + public AnomalyDetector build() { + if (logContext == null) { + logContext = new LogContext("[AnomalyDetector] "); + } + if (clusterModel == null) { + throw new IllegalArgumentException("ClusterModel must be set"); + } + if (executor == null) { + throw new IllegalArgumentException("Executor must be set"); + } + if (goalsByPriority.isEmpty()) { + throw new IllegalArgumentException("At least one goal must be set"); + } + return new AnomalyDetector(logContext, maxActionsNumPerDetect, detectIntervalMs, maxTolerateMetricsDelayMs, + coolDownIntervalPerActionMs, aggregateBrokerLoad, clusterModel, executor, goalsByPriority, excludedBrokers, excludedTopics); + } +} diff --git a/core/src/main/java/kafka/autobalancer/metricsreporter/exception/AutoBalancerMetricsReporterException.java b/core/src/main/java/kafka/autobalancer/executor/ActionExecutorService.java similarity index 51% rename from core/src/main/java/kafka/autobalancer/metricsreporter/exception/AutoBalancerMetricsReporterException.java rename to core/src/main/java/kafka/autobalancer/executor/ActionExecutorService.java index 93a0709e1e..c354c38692 100644 --- a/core/src/main/java/kafka/autobalancer/metricsreporter/exception/AutoBalancerMetricsReporterException.java +++ b/core/src/main/java/kafka/autobalancer/executor/ActionExecutorService.java @@ -14,26 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -/* - * Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. - */ -package kafka.autobalancer.metricsreporter.exception; +package kafka.autobalancer.executor; -/** - * This class was modified based on Cruise Control: com.linkedin.kafka.cruisecontrol.metricsreporter.exception.CruiseControlMetricsReporterException. - */ -public class AutoBalancerMetricsReporterException extends Exception { +import kafka.autobalancer.common.Action; + +import java.util.List; + +public interface ActionExecutorService { + + void start(); - public AutoBalancerMetricsReporterException(String message, Throwable cause) { - super(message, cause); - } + void shutdown(); - public AutoBalancerMetricsReporterException(String message) { - super(message); - } + void execute(Action action); - public AutoBalancerMetricsReporterException(Throwable cause) { - super(cause); - } + void execute(List actions); } diff --git a/core/src/main/java/kafka/autobalancer/ExecutionManager.java b/core/src/main/java/kafka/autobalancer/executor/ControllerActionExecutorService.java similarity index 88% rename from core/src/main/java/kafka/autobalancer/ExecutionManager.java rename to core/src/main/java/kafka/autobalancer/executor/ControllerActionExecutorService.java index c0975aa39f..e8af327731 100644 --- a/core/src/main/java/kafka/autobalancer/ExecutionManager.java +++ b/core/src/main/java/kafka/autobalancer/executor/ControllerActionExecutorService.java @@ -15,10 +15,12 @@ * limitations under the License. */ -package kafka.autobalancer; +package kafka.autobalancer.executor; +import com.automq.stream.utils.LogContext; import kafka.autobalancer.common.Action; import kafka.autobalancer.common.ActionType; +import kafka.autobalancer.common.AutoBalancerConstants; import kafka.autobalancer.config.AutoBalancerControllerConfig; import kafka.autobalancer.listeners.BrokerStatusListener; import org.apache.kafka.common.TopicPartition; @@ -27,7 +29,6 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.UnregisterBrokerRecord; import org.apache.kafka.common.utils.KafkaThread; -import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.controller.Controller; import org.apache.kafka.controller.ControllerRequestContext; @@ -43,43 +44,61 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -public class ExecutionManager implements Runnable, BrokerStatusListener { - private final Logger logger; - private final Controller controller; +public class ControllerActionExecutorService implements ActionExecutorService, Runnable, BrokerStatusListener { private final BlockingQueue actionQueue = new ArrayBlockingQueue<>(1000); private final Set fencedBrokers = ConcurrentHashMap.newKeySet(); - private final long executionInterval; - private final KafkaThread dispatchThread; + private Logger logger; + private Controller controller; + private long executionInterval; + private KafkaThread dispatchThread; // TODO: optimize to per-broker concurrency control private long lastExecutionTime = 0L; private volatile boolean shutdown; - public ExecutionManager(AutoBalancerControllerConfig config, Controller controller) { + public ControllerActionExecutorService(AutoBalancerControllerConfig config, Controller controller) { this(config, controller, null); } - public ExecutionManager(AutoBalancerControllerConfig config, Controller controller, LogContext logContext) { + public ControllerActionExecutorService(AutoBalancerControllerConfig config, Controller controller, LogContext logContext) { if (logContext == null) { logContext = new LogContext("[ExecutionManager] "); } - this.logger = logContext.logger(ExecutionManager.class); + this.logger = logContext.logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ); this.controller = controller; this.executionInterval = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS); this.dispatchThread = KafkaThread.daemon("executor-dispatcher", this); } + @Override public void start() { this.shutdown = false; this.dispatchThread.start(); logger.info("Started"); } + @Override public void shutdown() { this.shutdown = true; this.dispatchThread.interrupt(); logger.info("Shutdown completed"); } + @Override + public void execute(Action action) { + try { + this.actionQueue.put(action); + } catch (InterruptedException ignored) { + + } + } + + @Override + public void execute(List actions) { + for (Action action : actions) { + execute(action); + } + } + @Override public void run() { while (!shutdown) { @@ -135,20 +154,6 @@ private AlterPartitionReassignmentsRequestData.ReassignableTopic buildTopic(Topi return topic; } - public void appendAction(Action action) { - try { - this.actionQueue.put(action); - } catch (InterruptedException ignored) { - - } - } - - public void appendActions(List actions) { - for (Action action : actions) { - appendAction(action); - } - } - @Override public void onBrokerRegister(RegisterBrokerRecord record) { fencedBrokers.remove(record.brokerId()); diff --git a/core/src/main/java/kafka/autobalancer/goals/AbstractGoal.java b/core/src/main/java/kafka/autobalancer/goals/AbstractGoal.java index 8673b57e06..f55b031264 100644 --- a/core/src/main/java/kafka/autobalancer/goals/AbstractGoal.java +++ b/core/src/main/java/kafka/autobalancer/goals/AbstractGoal.java @@ -18,20 +18,16 @@ package kafka.autobalancer.goals; import kafka.autobalancer.common.Action; -import kafka.autobalancer.model.BrokerUpdater.Broker; import kafka.autobalancer.model.ClusterModelSnapshot; import kafka.autobalancer.model.ModelUtils; +import kafka.autobalancer.model.BrokerUpdater.Broker; import kafka.autobalancer.model.TopicPartitionReplicaUpdater.TopicPartitionReplica; -import org.apache.kafka.common.Configurable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; -public abstract class AbstractGoal implements Goal, Configurable, Comparable { - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractGoal.class); +public abstract class AbstractGoal implements Goal { protected static final double POSITIVE_ACTION_SCORE_THRESHOLD = 0.5; /** @@ -61,17 +57,18 @@ private double scoreDelta(Broker srcBrokerBefore, Broker destBrokerBefore, Broke * @return normalized score. 0 means not allowed action * > 0 means permitted action, but can be positive or negative for this goal */ - private double calculateAcceptanceScore(Broker srcBrokerBefore, Broker destBrokerBefore, Broker srcBrokerAfter, Broker destBrokerAfter) { + double calculateAcceptanceScore(Broker srcBrokerBefore, Broker destBrokerBefore, Broker srcBrokerAfter, Broker destBrokerAfter) { double score = scoreDelta(srcBrokerBefore, destBrokerBefore, srcBrokerAfter, destBrokerAfter); - boolean isSrcBrokerAcceptedBefore = isBrokerAcceptable(srcBrokerBefore); - boolean isDestBrokerAcceptedBefore = isBrokerAcceptable(destBrokerBefore); - boolean isSrcBrokerAcceptedAfter = isBrokerAcceptable(srcBrokerAfter); - boolean isDestBrokerAcceptedAfter = isBrokerAcceptable(destBrokerAfter); if (!isHardGoal()) { return score; } + boolean isSrcBrokerAcceptedBefore = isBrokerAcceptable(srcBrokerBefore); + boolean isDestBrokerAcceptedBefore = isBrokerAcceptable(destBrokerBefore); + boolean isSrcBrokerAcceptedAfter = isBrokerAcceptable(srcBrokerAfter); + boolean isDestBrokerAcceptedAfter = isBrokerAcceptable(destBrokerAfter); + if (isSrcBrokerAcceptedBefore && !isSrcBrokerAcceptedAfter) { return 0.0; } else if (isDestBrokerAcceptedBefore && !isDestBrokerAcceptedAfter) { @@ -113,21 +110,11 @@ public double actionAcceptanceScore(Action action, ClusterModelSnapshot cluster) return calculateAcceptanceScore(srcBrokerBefore, destBrokerBefore, srcBrokerAfter, destBrokerAfter); } - @Override - public int priority() { - return GoalUtils.priority(this); - } - @Override public Set getEligibleBrokers(ClusterModelSnapshot cluster) { return cluster.brokers().stream().filter(Broker::isActive).collect(Collectors.toSet()); } - @Override - public int compareTo(AbstractGoal other) { - return Integer.compare(other.priority(), this.priority()); - } - @Override public int hashCode() { return Objects.hashCode(name()); diff --git a/core/src/main/java/kafka/autobalancer/goals/AbstractResourceCapacityGoal.java b/core/src/main/java/kafka/autobalancer/goals/AbstractResourceCapacityGoal.java index c16e638388..8ef22378f2 100644 --- a/core/src/main/java/kafka/autobalancer/goals/AbstractResourceCapacityGoal.java +++ b/core/src/main/java/kafka/autobalancer/goals/AbstractResourceCapacityGoal.java @@ -19,8 +19,8 @@ import kafka.autobalancer.common.Action; import kafka.autobalancer.common.ActionType; -import kafka.autobalancer.model.BrokerUpdater.Broker; import kafka.autobalancer.model.ClusterModelSnapshot; +import kafka.autobalancer.model.BrokerUpdater.Broker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +39,7 @@ public boolean isHardGoal() { } @Override - public List optimize(ClusterModelSnapshot cluster, Collection goalsByPriority) { + public List optimize(ClusterModelSnapshot cluster, Collection goalsByPriority) { List actions = new ArrayList<>(); validateConfig(); Set eligibleBrokers = getEligibleBrokers(cluster); diff --git a/core/src/main/java/kafka/autobalancer/goals/AbstractResourceDistributionGoal.java b/core/src/main/java/kafka/autobalancer/goals/AbstractResourceDistributionGoal.java index 948ec724a1..4488df0efc 100644 --- a/core/src/main/java/kafka/autobalancer/goals/AbstractResourceDistributionGoal.java +++ b/core/src/main/java/kafka/autobalancer/goals/AbstractResourceDistributionGoal.java @@ -53,7 +53,7 @@ public void validateConfig() { } @Override - public List optimize(ClusterModelSnapshot cluster, Collection goalsByPriority) { + public List optimize(ClusterModelSnapshot cluster, Collection goalsByPriority) { List actions = new ArrayList<>(); validateConfig(); Set eligibleBrokers = getEligibleBrokers(cluster); diff --git a/core/src/main/java/kafka/autobalancer/goals/AbstractResourceGoal.java b/core/src/main/java/kafka/autobalancer/goals/AbstractResourceGoal.java index afaceb2634..791869b7e4 100644 --- a/core/src/main/java/kafka/autobalancer/goals/AbstractResourceGoal.java +++ b/core/src/main/java/kafka/autobalancer/goals/AbstractResourceGoal.java @@ -39,7 +39,7 @@ public abstract class AbstractResourceGoal extends AbstractGoal { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractGoal.class); - abstract Resource resource(); + protected abstract Resource resource(); private Optional getAcceptableAction(List> candidateActionScores) { Action acceptableAction = null; @@ -51,8 +51,8 @@ private Optional getAcceptableAction(List> can return Optional.ofNullable(acceptableAction); } - private double normalizeGoalsScore(Map scoreMap) { - int totalWeight = scoreMap.keySet().stream().mapToInt(AbstractGoal::priority).sum(); + private double normalizeGoalsScore(Map scoreMap) { + int totalWeight = scoreMap.keySet().stream().mapToInt(Goal::priority).sum(); return scoreMap.entrySet().stream() .mapToDouble(entry -> entry.getValue() * (double) entry.getKey().priority() / totalWeight) .sum(); @@ -62,7 +62,7 @@ private Optional trySwapPartitionOut(ClusterModelSnapshot cluster, TopicPartitionReplicaUpdater.TopicPartitionReplica srcReplica, BrokerUpdater.Broker srcBroker, List candidates, - Collection goalsByPriority) { + Collection goalsByPriority) { List> candidateActionScores = new ArrayList<>(); for (BrokerUpdater.Broker candidate : candidates) { for (TopicPartitionReplicaUpdater.TopicPartitionReplica candidateReplica : cluster.replicasFor(candidate.getBrokerId())) { @@ -72,8 +72,8 @@ private Optional trySwapPartitionOut(ClusterModelSnapshot cluster, boolean isHardGoalViolated = false; Action action = new Action(ActionType.SWAP, srcReplica.getTopicPartition(), srcBroker.getBrokerId(), candidate.getBrokerId(), candidateReplica.getTopicPartition()); - Map scoreMap = new HashMap<>(); - for (AbstractGoal goal : goalsByPriority) { + Map scoreMap = new HashMap<>(); + for (Goal goal : goalsByPriority) { double score = goal.actionAcceptanceScore(action, cluster); if (goal.isHardGoal() && score == 0) { isHardGoalViolated = true; @@ -94,13 +94,13 @@ private Optional tryMovePartitionOut(ClusterModelSnapshot cluster, TopicPartitionReplicaUpdater.TopicPartitionReplica replica, BrokerUpdater.Broker srcBroker, List candidates, - Collection goalsByPriority) { + Collection goalsByPriority) { List> candidateActionScores = new ArrayList<>(); for (BrokerUpdater.Broker candidate : candidates) { boolean isHardGoalViolated = false; Action action = new Action(ActionType.MOVE, replica.getTopicPartition(), srcBroker.getBrokerId(), candidate.getBrokerId()); - Map scoreMap = new HashMap<>(); - for (AbstractGoal goal : goalsByPriority) { + Map scoreMap = new HashMap<>(); + for (Goal goal : goalsByPriority) { double score = goal.actionAcceptanceScore(action, cluster); if (goal.isHardGoal() && score == 0) { isHardGoalViolated = true; @@ -131,7 +131,7 @@ protected List tryReduceLoadByAction(ActionType actionType, ClusterModelSnapshot cluster, BrokerUpdater.Broker srcBroker, List candidateBrokers, - Collection goalsByPriority) { + Collection goalsByPriority) { List actionList = new ArrayList<>(); List srcReplicas = cluster .replicasFor(srcBroker.getBrokerId()) @@ -174,7 +174,7 @@ protected List tryIncreaseLoadByAction(ActionType actionType, ClusterModelSnapshot cluster, BrokerUpdater.Broker srcBroker, List candidateBrokers, - Collection goalsByPriority) { + Collection goalsByPriority) { List actionList = new ArrayList<>(); candidateBrokers.sort(Comparator.comparingDouble(b -> -b.utilizationFor(resource()))); // higher load first for (BrokerUpdater.Broker candidateBroker : candidateBrokers) { diff --git a/core/src/main/java/kafka/autobalancer/goals/Goal.java b/core/src/main/java/kafka/autobalancer/goals/Goal.java index 4b4c3ee0d0..34a38b4ef6 100644 --- a/core/src/main/java/kafka/autobalancer/goals/Goal.java +++ b/core/src/main/java/kafka/autobalancer/goals/Goal.java @@ -20,16 +20,17 @@ import kafka.autobalancer.common.Action; import kafka.autobalancer.model.BrokerUpdater; import kafka.autobalancer.model.ClusterModelSnapshot; +import org.apache.kafka.common.Configurable; import java.util.Collection; import java.util.List; import java.util.Set; -public interface Goal { +public interface Goal extends Configurable, Comparable { boolean isHardGoal(); - List optimize(ClusterModelSnapshot cluster, Collection goalsByPriority); + List optimize(ClusterModelSnapshot cluster, Collection goalsByPriority); void validateConfig(); @@ -53,4 +54,9 @@ public interface Goal { * @return action acceptance score, 0 for not accepted */ double actionAcceptanceScore(Action action, ClusterModelSnapshot cluster); + + @Override + default int compareTo(Goal other) { + return Integer.compare(other.priority(), this.priority()); + } } diff --git a/core/src/main/java/kafka/autobalancer/goals/GoalConstants.java b/core/src/main/java/kafka/autobalancer/goals/GoalConstants.java new file mode 100644 index 0000000000..f62397281e --- /dev/null +++ b/core/src/main/java/kafka/autobalancer/goals/GoalConstants.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.autobalancer.goals; + +public class GoalConstants { + public static final int NETWORK_DISTRIBUTION_GOAL_PRIORITY = 8; + public static final int NETWORK_CAPACITY_GOAL_PRIORITY = 10; +} diff --git a/core/src/main/java/kafka/autobalancer/goals/GoalUtils.java b/core/src/main/java/kafka/autobalancer/goals/GoalUtils.java index ee1c457fad..684ce108c0 100644 --- a/core/src/main/java/kafka/autobalancer/goals/GoalUtils.java +++ b/core/src/main/java/kafka/autobalancer/goals/GoalUtils.java @@ -21,22 +21,7 @@ import kafka.autobalancer.common.ActionType; import kafka.autobalancer.model.ClusterModelSnapshot; -import java.util.HashMap; -import java.util.Map; - public class GoalUtils { - private static final Map GOALS_PRIORITY_MAP = new HashMap<>(); - - static { - GOALS_PRIORITY_MAP.put(NetworkInCapacityGoal.class.getSimpleName(), 10); - GOALS_PRIORITY_MAP.put(NetworkOutCapacityGoal.class.getSimpleName(), 10); - GOALS_PRIORITY_MAP.put(NetworkInDistributionGoal.class.getSimpleName(), 8); - GOALS_PRIORITY_MAP.put(NetworkOutDistributionGoal.class.getSimpleName(), 8); - } - - public static int priority(AbstractGoal goal) { - return GOALS_PRIORITY_MAP.getOrDefault(goal.name(), 0); - } public static boolean isValidAction(Action action, ClusterModelSnapshot cluster) { if (cluster.broker(action.getSrcBrokerId()) == null diff --git a/core/src/main/java/kafka/autobalancer/goals/NetworkInCapacityGoal.java b/core/src/main/java/kafka/autobalancer/goals/NetworkInCapacityGoal.java index a50f24d67b..013ee76114 100644 --- a/core/src/main/java/kafka/autobalancer/goals/NetworkInCapacityGoal.java +++ b/core/src/main/java/kafka/autobalancer/goals/NetworkInCapacityGoal.java @@ -18,8 +18,8 @@ package kafka.autobalancer.goals; import kafka.autobalancer.common.Resource; -import kafka.autobalancer.config.AutoBalancerControllerConfig; import kafka.autobalancer.model.BrokerUpdater; +import kafka.autobalancer.config.AutoBalancerControllerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +34,7 @@ public String name() { } @Override - Resource resource() { + protected Resource resource() { return Resource.NW_IN; } @@ -48,4 +48,9 @@ public void configure(Map configs) { public void onBalanceFailed(BrokerUpdater.Broker broker) { LOGGER.warn("Failed to reduce broker {} network inbound load after iterating all partitions", broker.getBrokerId()); } + + @Override + public int priority() { + return GoalConstants.NETWORK_CAPACITY_GOAL_PRIORITY; + } } diff --git a/core/src/main/java/kafka/autobalancer/goals/NetworkInDistributionGoal.java b/core/src/main/java/kafka/autobalancer/goals/NetworkInDistributionGoal.java index f1d4237a85..a581a720a2 100644 --- a/core/src/main/java/kafka/autobalancer/goals/NetworkInDistributionGoal.java +++ b/core/src/main/java/kafka/autobalancer/goals/NetworkInDistributionGoal.java @@ -18,8 +18,8 @@ package kafka.autobalancer.goals; import kafka.autobalancer.common.Resource; -import kafka.autobalancer.config.AutoBalancerControllerConfig; import kafka.autobalancer.model.BrokerUpdater; +import kafka.autobalancer.config.AutoBalancerControllerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +34,7 @@ public String name() { } @Override - Resource resource() { + protected Resource resource() { return Resource.NW_IN; } @@ -49,4 +49,9 @@ public void configure(Map configs) { public void onBalanceFailed(BrokerUpdater.Broker broker) { LOGGER.warn("Failed to balance broker {} network inbound load after iterating all partitions", broker.getBrokerId()); } + + @Override + public int priority() { + return GoalConstants.NETWORK_DISTRIBUTION_GOAL_PRIORITY; + } } diff --git a/core/src/main/java/kafka/autobalancer/goals/NetworkOutCapacityGoal.java b/core/src/main/java/kafka/autobalancer/goals/NetworkOutCapacityGoal.java index e9d08961df..618db809ab 100644 --- a/core/src/main/java/kafka/autobalancer/goals/NetworkOutCapacityGoal.java +++ b/core/src/main/java/kafka/autobalancer/goals/NetworkOutCapacityGoal.java @@ -18,8 +18,8 @@ package kafka.autobalancer.goals; import kafka.autobalancer.common.Resource; -import kafka.autobalancer.config.AutoBalancerControllerConfig; import kafka.autobalancer.model.BrokerUpdater; +import kafka.autobalancer.config.AutoBalancerControllerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +34,7 @@ public String name() { } @Override - Resource resource() { + protected Resource resource() { return Resource.NW_OUT; } @@ -48,4 +48,9 @@ public void configure(Map configs) { public void onBalanceFailed(BrokerUpdater.Broker broker) { LOGGER.warn("Failed to reduce broker {} network outbound load after iterating all partitions", broker.getBrokerId()); } + + @Override + public int priority() { + return GoalConstants.NETWORK_CAPACITY_GOAL_PRIORITY; + } } diff --git a/core/src/main/java/kafka/autobalancer/goals/NetworkOutDistributionGoal.java b/core/src/main/java/kafka/autobalancer/goals/NetworkOutDistributionGoal.java index 7376fbdd2e..7cf52890f5 100644 --- a/core/src/main/java/kafka/autobalancer/goals/NetworkOutDistributionGoal.java +++ b/core/src/main/java/kafka/autobalancer/goals/NetworkOutDistributionGoal.java @@ -18,8 +18,8 @@ package kafka.autobalancer.goals; import kafka.autobalancer.common.Resource; -import kafka.autobalancer.config.AutoBalancerControllerConfig; import kafka.autobalancer.model.BrokerUpdater; +import kafka.autobalancer.config.AutoBalancerControllerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +34,7 @@ public String name() { } @Override - Resource resource() { + protected Resource resource() { return Resource.NW_OUT; } @@ -49,4 +49,9 @@ public void configure(Map configs) { public void onBalanceFailed(BrokerUpdater.Broker broker) { LOGGER.warn("Failed to balance broker {} network outbound load after iterating all partitions", broker.getBrokerId()); } + + @Override + public int priority() { + return GoalConstants.NETWORK_DISTRIBUTION_GOAL_PRIORITY; + } } diff --git a/core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java b/core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java index bf6022a3ae..ade23d5176 100644 --- a/core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java +++ b/core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java @@ -20,6 +20,7 @@ package kafka.autobalancer.metricsreporter; +import kafka.autobalancer.common.RawMetricType; import com.yammer.metrics.core.Metric; import com.yammer.metrics.core.MetricName; import com.yammer.metrics.core.MetricsRegistry; @@ -29,7 +30,6 @@ import kafka.autobalancer.metricsreporter.metric.BrokerMetrics; import kafka.autobalancer.metricsreporter.metric.MetricSerde; import kafka.autobalancer.metricsreporter.metric.MetricsUtils; -import kafka.autobalancer.metricsreporter.metric.RawMetricType; import kafka.autobalancer.metricsreporter.metric.YammerMetricProcessor; import kafka.server.KafkaConfig; import org.apache.kafka.clients.ClientUtils; diff --git a/core/src/main/java/kafka/autobalancer/metricsreporter/exception/UnknownVersionException.java b/core/src/main/java/kafka/autobalancer/metricsreporter/exception/UnknownVersionException.java index 43f437cbe4..9a9245eafd 100644 --- a/core/src/main/java/kafka/autobalancer/metricsreporter/exception/UnknownVersionException.java +++ b/core/src/main/java/kafka/autobalancer/metricsreporter/exception/UnknownVersionException.java @@ -26,7 +26,7 @@ /* Unknown version during Serialization/Deserialization. */ -public class UnknownVersionException extends AutoBalancerMetricsReporterException { +public class UnknownVersionException extends Exception { public UnknownVersionException(String msg) { super(msg); } diff --git a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/AutoBalancerMetrics.java b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/AutoBalancerMetrics.java index 2036c3f135..e5d6ff44c8 100644 --- a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/AutoBalancerMetrics.java +++ b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/AutoBalancerMetrics.java @@ -20,6 +20,8 @@ package kafka.autobalancer.metricsreporter.metric; +import kafka.autobalancer.common.RawMetricType; + import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; diff --git a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/BrokerMetrics.java b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/BrokerMetrics.java index d07e0ac5ae..fb8a4a423f 100644 --- a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/BrokerMetrics.java +++ b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/BrokerMetrics.java @@ -20,6 +20,7 @@ package kafka.autobalancer.metricsreporter.metric; +import kafka.autobalancer.common.RawMetricType; import kafka.autobalancer.metricsreporter.exception.UnknownVersionException; import java.nio.ByteBuffer; diff --git a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/MetricsUtils.java b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/MetricsUtils.java index 8ced714c4b..8c2f4ec96d 100644 --- a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/MetricsUtils.java +++ b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/MetricsUtils.java @@ -20,6 +20,7 @@ package kafka.autobalancer.metricsreporter.metric; +import kafka.autobalancer.common.RawMetricType; import com.yammer.metrics.core.Metric; import com.yammer.metrics.core.MetricName; import kafka.metrics.KafkaMetricsGroup$; diff --git a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/TopicPartitionMetrics.java b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/TopicPartitionMetrics.java index 9945696003..257c825946 100644 --- a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/TopicPartitionMetrics.java +++ b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/TopicPartitionMetrics.java @@ -20,6 +20,7 @@ package kafka.autobalancer.metricsreporter.metric; +import kafka.autobalancer.common.RawMetricType; import kafka.autobalancer.metricsreporter.exception.UnknownVersionException; import org.apache.kafka.common.TopicPartition; diff --git a/core/src/main/java/kafka/autobalancer/model/BrokerUpdater.java b/core/src/main/java/kafka/autobalancer/model/BrokerUpdater.java index ce37e6c208..78a75f6fb9 100644 --- a/core/src/main/java/kafka/autobalancer/model/BrokerUpdater.java +++ b/core/src/main/java/kafka/autobalancer/model/BrokerUpdater.java @@ -17,14 +17,13 @@ package kafka.autobalancer.model; +import kafka.autobalancer.common.RawMetricType; import kafka.autobalancer.common.Resource; -import kafka.autobalancer.metricsreporter.metric.AutoBalancerMetrics; -import kafka.autobalancer.metricsreporter.metric.MetricsUtils; -import kafka.autobalancer.metricsreporter.metric.RawMetricType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Objects; @@ -46,6 +45,7 @@ public static class Broker { private final double[] brokerCapacity = new double[Resource.cachedValues().size()]; private final double[] brokerLoad = new double[Resource.cachedValues().size()]; private final Set resources = new HashSet<>(); + private final Map metricsMap = new HashMap<>(); private boolean active; private long timestamp; @@ -84,6 +84,10 @@ public void setLoad(Resource resource, double value) { this.brokerLoad[resource.id()] = value; } + public void setMetricValue(RawMetricType metricType, double value) { + this.metricsMap.put(metricType, value); + } + public double load(Resource resource) { if (!resources.contains(resource)) { return 0.0; @@ -145,45 +149,57 @@ public String toString() { StringBuilder builder = new StringBuilder(); builder.append("{brokerId=") .append(brokerId) - .append(", brokerCapacity=["); + .append(", active=").append(active) + .append(", timestamp=").append(timestamp) + .append(", Capacities=["); for (int i = 0; i < brokerCapacity.length; i++) { builder.append(Resource.of(i).resourceString(brokerCapacity[i])); if (i != brokerCapacity.length - 1) { builder.append(", "); } } - builder.append("], brokerLoad=["); + builder.append("], Loads=["); for (int i = 0; i < brokerLoad.length; i++) { builder.append(Resource.of(i).resourceString(brokerLoad[i])); if (i != brokerLoad.length - 1) { builder.append(", "); } } - builder.append("], active=").append(active) - .append(", timestamp=").append(timestamp) - .append("}"); + builder.append("]"); + int i = 0; + for (Map.Entry entry : metricsMap.entrySet()) { + if (i == 0) { + builder.append(" Metrics={"); + } + builder.append(entry.getKey()) + .append("=") + .append(entry.getValue()); + if (i != metricsMap.size() - 1) { + builder.append(", "); + } + i++; + } + builder.append("}"); return builder.toString(); } } - public boolean update(AutoBalancerMetrics metrics) { - if (metrics.metricClassId() != AutoBalancerMetrics.MetricClassId.BROKER_METRIC) { - LOGGER.error("Mismatched metrics type {} for broker", metrics.metricClassId()); - return false; - } - - if (!MetricsUtils.sanityCheckBrokerMetricsCompleteness(metrics)) { - LOGGER.error("Broker metrics sanity check failed, metrics is incomplete {}", metrics); + public boolean update(Map metricsMap, long time) { + if (!metricsMap.keySet().containsAll(RawMetricType.brokerMetricTypes())) { + LOGGER.error("Broker metrics for broker={} sanity check failed, metrics is incomplete {}", broker.getBrokerId(), metricsMap.keySet()); return false; } lock.lock(); try { - if (metrics.time() < broker.getTimestamp()) { - LOGGER.warn("Outdated metrics at time {}, last updated time {}", metrics.time(), broker.getTimestamp()); + if (time < broker.getTimestamp()) { + LOGGER.warn("Outdated broker metrics at time {} for broker={}, last updated time {}", time, broker.getBrokerId(), broker.getTimestamp()); return false; } - for (Map.Entry entry : metrics.getMetricTypeValueMap().entrySet()) { + for (Map.Entry entry : metricsMap.entrySet()) { + if (entry.getKey().metricScope() != RawMetricType.MetricScope.BROKER) { + continue; + } switch (entry.getKey()) { case BROKER_CAPACITY_NW_IN: broker.setCapacity(Resource.NW_IN, entry.getValue()); @@ -201,11 +217,11 @@ public boolean update(AutoBalancerMetrics metrics) { broker.setLoad(Resource.CPU, entry.getValue()); break; default: - LOGGER.error("Unsupported broker metrics type {}", entry.getKey()); + broker.setMetricValue(entry.getKey(), entry.getValue()); break; } } - broker.setTimestamp(metrics.time()); + broker.setTimestamp(time); } finally { lock.unlock(); } @@ -247,10 +263,6 @@ public Broker get(long timeSince) { return broker; } - public int id() { - return this.broker.getBrokerId(); - } - public void setActive(boolean active) { lock.lock(); try { diff --git a/core/src/main/java/kafka/autobalancer/model/ClusterModel.java b/core/src/main/java/kafka/autobalancer/model/ClusterModel.java index 0f34070f20..1014479cdf 100644 --- a/core/src/main/java/kafka/autobalancer/model/ClusterModel.java +++ b/core/src/main/java/kafka/autobalancer/model/ClusterModel.java @@ -17,24 +17,12 @@ package kafka.autobalancer.model; -import kafka.autobalancer.config.AutoBalancerControllerConfig; -import kafka.autobalancer.listeners.BrokerStatusListener; -import kafka.autobalancer.listeners.TopicPartitionStatusListener; -import kafka.autobalancer.metricsreporter.metric.BrokerMetrics; -import kafka.autobalancer.metricsreporter.metric.TopicPartitionMetrics; -import org.apache.commons.lang3.StringUtils; +import com.automq.stream.utils.LogContext; +import kafka.autobalancer.common.AutoBalancerConstants; +import kafka.autobalancer.common.RawMetricType; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; -import org.apache.kafka.common.metadata.PartitionChangeRecord; -import org.apache.kafka.common.metadata.PartitionRecord; -import org.apache.kafka.common.metadata.RegisterBrokerRecord; -import org.apache.kafka.common.metadata.RemoveTopicRecord; -import org.apache.kafka.common.metadata.TopicRecord; -import org.apache.kafka.common.metadata.UnregisterBrokerRecord; -import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.metadata.BrokerRegistrationFencingChange; -import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import java.util.Collections; @@ -45,9 +33,11 @@ import java.util.concurrent.locks.ReentrantLock; -public class ClusterModel implements BrokerStatusListener, TopicPartitionStatusListener { - private final Logger logger; +public class ClusterModel { + protected final Logger logger; private static final String DEFAULT_RACK_ID = "rack_default"; + private static final long DEFAULT_MAX_TOLERATED_METRICS_DELAY_MS = 60000L; + private static final boolean DEFAULT_AGGREGATE_BROKER_LOAD = true; /* * Guard the change on cluster structure (add/remove for brokers, replicas) @@ -61,27 +51,23 @@ public class ClusterModel implements BrokerStatusListener, TopicPartitionStatusL private final Map idToTopicNameMap = new HashMap<>(); private final Map> topicPartitionReplicaMap = new HashMap<>(); - private final long maxToleratedMetricsDelay; - private final boolean aggregateBrokerLoad; - - public ClusterModel(AutoBalancerControllerConfig config) { - this(config, null); + public ClusterModel() { + this(null); } - public ClusterModel(AutoBalancerControllerConfig config, LogContext logContext) { + public ClusterModel(LogContext logContext) { if (logContext == null) { logContext = new LogContext("[ClusterModel]"); } - logger = logContext.logger(ClusterModel.class); - maxToleratedMetricsDelay = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS); - aggregateBrokerLoad = config.getBoolean(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_LOAD_AGGREGATION); + logger = logContext.logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ); } public ClusterModelSnapshot snapshot() { - return snapshot(Collections.emptySet(), Collections.emptySet()); + return snapshot(Collections.emptySet(), Collections.emptySet(), DEFAULT_MAX_TOLERATED_METRICS_DELAY_MS, DEFAULT_AGGREGATE_BROKER_LOAD); } - public ClusterModelSnapshot snapshot(Set excludedBrokerIds, Set excludedTopics) { + public ClusterModelSnapshot snapshot(Set excludedBrokerIds, Set excludedTopics, + long maxToleratedMetricsDelay, boolean aggregateBrokerLoad) { ClusterModelSnapshot snapshot = new ClusterModelSnapshot(); clusterLock.lock(); try { @@ -125,105 +111,97 @@ public ClusterModelSnapshot snapshot(Set excludedBrokerIds, Set return snapshot; } - public boolean updateBroker(BrokerMetrics brokerMetrics) { + public boolean updateBrokerMetrics(int brokerId, Map metricsMap, long time) { BrokerUpdater brokerUpdater = null; clusterLock.lock(); try { - brokerUpdater = brokerMap.get(brokerMetrics.brokerId()); + brokerUpdater = brokerMap.get(brokerId); } finally { clusterLock.unlock(); } if (brokerUpdater != null) { - return brokerUpdater.update(brokerMetrics); + return brokerUpdater.update(metricsMap, time); } return false; } - public boolean updateTopicPartition(TopicPartitionMetrics topicPartitionMetrics) { + public boolean updateTopicPartitionMetrics(int brokerId, TopicPartition tp, Map metricsMap, long time) { TopicPartitionReplicaUpdater replicaUpdater = null; clusterLock.lock(); try { - Map replicaMap = brokerReplicaMap.get(topicPartitionMetrics.brokerId()); + Map replicaMap = brokerReplicaMap.get(brokerId); if (replicaMap != null) { - replicaUpdater = replicaMap.get(new TopicPartition(topicPartitionMetrics.topic(), topicPartitionMetrics.partition())); + replicaUpdater = replicaMap.get(tp); } } finally { clusterLock.unlock(); } if (replicaUpdater != null) { - return replicaUpdater.update(topicPartitionMetrics); + return replicaUpdater.update(metricsMap, time); } return false; } - @Override - public void onBrokerRegister(RegisterBrokerRecord record) { + public void registerBroker(int brokerId, String rackId) { clusterLock.lock(); try { - if (brokerMap.containsKey(record.brokerId())) { + if (brokerMap.containsKey(brokerId)) { return; } - String rackId = StringUtils.isEmpty(record.rack()) ? DEFAULT_RACK_ID : record.rack(); - BrokerUpdater brokerUpdater = new BrokerUpdater(record.brokerId()); + BrokerUpdater brokerUpdater = new BrokerUpdater(brokerId); brokerUpdater.setActive(true); - brokerIdToRackMap.putIfAbsent(record.brokerId(), rackId); - brokerMap.putIfAbsent(record.brokerId(), brokerUpdater); - brokerReplicaMap.put(record.brokerId(), new HashMap<>()); + if (Utils.isBlank(rackId)) { + rackId = DEFAULT_RACK_ID; + } + brokerIdToRackMap.putIfAbsent(brokerId, rackId); + brokerMap.putIfAbsent(brokerId, brokerUpdater); + brokerReplicaMap.put(brokerId, new HashMap<>()); } finally { clusterLock.unlock(); } } - @Override - public void onBrokerUnregister(UnregisterBrokerRecord record) { + public void unregisterBroker(int brokerId) { clusterLock.lock(); try { - brokerIdToRackMap.remove(record.brokerId()); - brokerMap.remove(record.brokerId()); - brokerReplicaMap.remove(record.brokerId()); + brokerIdToRackMap.remove(brokerId); + brokerMap.remove(brokerId); + brokerReplicaMap.remove(brokerId); } finally { clusterLock.unlock(); } } - @Override - public void onBrokerRegistrationChanged(BrokerRegistrationChangeRecord record) { - BrokerUpdater brokerUpdater; + public void changeBrokerStatus(int brokerId, boolean active) { clusterLock.lock(); try { - if (!brokerMap.containsKey(record.brokerId())) { - return; - } - brokerUpdater = brokerMap.get(record.brokerId()); + brokerMap.computeIfPresent(brokerId, (id, brokerUpdater) -> { + brokerUpdater.setActive(active); + return brokerUpdater; + }); } finally { clusterLock.unlock(); } - if (brokerUpdater != null) { - brokerUpdater.setActive(record.fenced() != BrokerRegistrationFencingChange.FENCE.value() - && record.inControlledShutdown() != BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()); - } } - @Override - public void onTopicCreate(TopicRecord record) { + public void createTopic(Uuid topicId, String topicName) { clusterLock.lock(); try { - idToTopicNameMap.putIfAbsent(record.topicId(), record.name()); - topicPartitionReplicaMap.putIfAbsent(record.name(), new HashMap<>()); + idToTopicNameMap.putIfAbsent(topicId, topicName); + topicPartitionReplicaMap.putIfAbsent(topicName, new HashMap<>()); } finally { clusterLock.unlock(); } } - @Override - public void onTopicDelete(RemoveTopicRecord record) { + public void deleteTopic(Uuid topicId) { clusterLock.lock(); try { - String topicName = idToTopicNameMap.get(record.topicId()); + String topicName = idToTopicNameMap.get(topicId); if (topicName == null) { return; } - idToTopicNameMap.remove(record.topicId()); + idToTopicNameMap.remove(topicId); for (Map.Entry entry : topicPartitionReplicaMap.get(topicName).entrySet()) { int partitionId = entry.getKey(); int brokerId = entry.getValue(); @@ -238,68 +216,58 @@ public void onTopicDelete(RemoveTopicRecord record) { } } - @Override - public void onPartitionCreate(PartitionRecord record) { + public void createPartition(Uuid topicId, int partitionId, int brokerId) { clusterLock.lock(); try { - String topicName = idToTopicNameMap.get(record.topicId()); + String topicName = idToTopicNameMap.get(topicId); if (topicName == null) { return; } - if (record.replicas().size() != 1) { - logger.error("Illegal replica size {} for {}-{}", record.replicas().size(), topicName, record.partitionId()); - return; - } if (!topicPartitionReplicaMap.containsKey(topicName)) { logger.error("Create partition on invalid topic {}", topicName); return; } - int brokerIdToCreateOn = record.replicas().iterator().next(); - if (!brokerMap.containsKey(brokerIdToCreateOn)) { - logger.error("Create partition for topic {} on invalid broker {}", topicName, brokerIdToCreateOn); + if (!brokerMap.containsKey(brokerId)) { + logger.error("Create partition for topic {} on invalid broker {}", topicName, brokerId); return; } - topicPartitionReplicaMap.get(topicName).put(record.partitionId(), brokerIdToCreateOn); - TopicPartition tp = new TopicPartition(topicName, record.partitionId()); - brokerReplicaMap.get(brokerIdToCreateOn).put(tp, new TopicPartitionReplicaUpdater(tp)); + topicPartitionReplicaMap.get(topicName).put(partitionId, brokerId); + TopicPartition tp = new TopicPartition(topicName, partitionId); + brokerReplicaMap.get(brokerId).put(tp, new TopicPartitionReplicaUpdater(tp)); } finally { clusterLock.unlock(); } } - @Override - public void onPartitionChange(PartitionChangeRecord record) { + public void reassignPartition(Uuid topicId, int partitionId, int brokerId) { clusterLock.lock(); try { - String topicName = idToTopicNameMap.get(record.topicId()); + String topicName = idToTopicNameMap.get(topicId); if (topicName == null) { return; } - if (record.replicas() == null || record.replicas().size() != 1) { - return; - } if (!topicPartitionReplicaMap.containsKey(topicName)) { - logger.error("Reassign partition {} on invalid topic {}", record.partitionId(), topicName); + logger.error("Reassign partition {} on invalid topic {}", partitionId, topicName); return; } - int brokerIdToReassign = record.replicas().iterator().next(); - if (!brokerMap.containsKey(brokerIdToReassign)) { - logger.error("Reassign partition {} for topic {} on invalid broker {}", record.partitionId(), topicName, brokerIdToReassign); + + if (!brokerMap.containsKey(brokerId)) { + logger.error("Reassign partition {} for topic {} on invalid broker {}", partitionId, topicName, brokerId); return; } - int oldBrokerId = topicPartitionReplicaMap.get(topicName).getOrDefault(record.partitionId(), -1); - if (oldBrokerId == brokerIdToReassign) { - logger.warn("Reassign partition {} for topic {} on same broker {}, {}", record.partitionId(), topicName, oldBrokerId, record); + int oldBrokerId = topicPartitionReplicaMap.get(topicName).getOrDefault(partitionId, -1); + if (oldBrokerId == brokerId) { + logger.warn("Reassign partition {} for topic {} on same broker {}", partitionId, topicName, oldBrokerId); return; } if (oldBrokerId != -1) { - TopicPartition tp = new TopicPartition(topicName, record.partitionId()); + TopicPartition tp = new TopicPartition(topicName, partitionId); TopicPartitionReplicaUpdater replicaUpdater = brokerReplicaMap.get(oldBrokerId).get(tp); - brokerReplicaMap.get(brokerIdToReassign).put(tp, replicaUpdater); + brokerReplicaMap.get(brokerId).put(tp, replicaUpdater); brokerReplicaMap.get(oldBrokerId).remove(tp); } - topicPartitionReplicaMap.get(topicName).put(record.partitionId(), brokerIdToReassign); + topicPartitionReplicaMap.get(topicName).put(partitionId, brokerId); } finally { clusterLock.unlock(); } diff --git a/core/src/main/java/kafka/autobalancer/model/RecordClusterModel.java b/core/src/main/java/kafka/autobalancer/model/RecordClusterModel.java new file mode 100644 index 0000000000..a552e9ace4 --- /dev/null +++ b/core/src/main/java/kafka/autobalancer/model/RecordClusterModel.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.autobalancer.model; + +import com.automq.stream.utils.LogContext; +import kafka.autobalancer.listeners.BrokerStatusListener; +import kafka.autobalancer.listeners.TopicPartitionStatusListener; +import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.RemoveTopicRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.metadata.BrokerRegistrationFencingChange; +import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange; + +public class RecordClusterModel extends ClusterModel implements BrokerStatusListener, TopicPartitionStatusListener { + + public RecordClusterModel() { + super(); + } + + public RecordClusterModel(LogContext logContext) { + super(logContext); + } + + @Override + public void onBrokerRegister(RegisterBrokerRecord record) { + registerBroker(record.brokerId(), record.rack()); + } + + @Override + public void onBrokerUnregister(UnregisterBrokerRecord record) { + unregisterBroker(record.brokerId()); + } + + @Override + public void onBrokerRegistrationChanged(BrokerRegistrationChangeRecord record) { + boolean isActive = record.fenced() != BrokerRegistrationFencingChange.FENCE.value() + && record.inControlledShutdown() != BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value(); + changeBrokerStatus(record.brokerId(), isActive); + } + + @Override + public void onTopicCreate(TopicRecord record) { + createTopic(record.topicId(), record.name()); + } + + @Override + public void onTopicDelete(RemoveTopicRecord record) { + deleteTopic(record.topicId()); + } + + @Override + public void onPartitionCreate(PartitionRecord record) { + if (record.replicas().size() != 1) { + logger.error("Illegal replica size {} for {}-{}", record.replicas().size(), record.topicId(), record.partitionId()); + return; + } + createPartition(record.topicId(), record.partitionId(), record.replicas().iterator().next()); + } + + @Override + public void onPartitionChange(PartitionChangeRecord record) { + if (record.replicas().size() != 1) { + logger.error("Illegal replica size {} for {}-{}", record.replicas().size(), record.topicId(), record.partitionId()); + return; + } + reassignPartition(record.topicId(), record.partitionId(), record.replicas().iterator().next()); + } +} diff --git a/core/src/main/java/kafka/autobalancer/model/TopicPartitionReplicaUpdater.java b/core/src/main/java/kafka/autobalancer/model/TopicPartitionReplicaUpdater.java index 3cbdaaaf32..e6b5c5546b 100644 --- a/core/src/main/java/kafka/autobalancer/model/TopicPartitionReplicaUpdater.java +++ b/core/src/main/java/kafka/autobalancer/model/TopicPartitionReplicaUpdater.java @@ -17,15 +17,13 @@ package kafka.autobalancer.model; +import kafka.autobalancer.common.RawMetricType; import kafka.autobalancer.common.Resource; -import kafka.autobalancer.metricsreporter.metric.AutoBalancerMetrics; -import kafka.autobalancer.metricsreporter.metric.MetricsUtils; -import kafka.autobalancer.metricsreporter.metric.RawMetricType; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Objects; @@ -44,7 +42,8 @@ public TopicPartitionReplicaUpdater(TopicPartition tp) { public static class TopicPartitionReplica { private final TopicPartition tp; - private final double[] replicaLoad = new double[Resource.cachedValues().size()]; + private final double[] loads = new double[Resource.cachedValues().size()]; + private final Map metricsMap = new HashMap<>(); private final Set resources = new HashSet<>(); private long timestamp; @@ -54,7 +53,7 @@ public TopicPartitionReplica(TopicPartition tp) { public TopicPartitionReplica(TopicPartitionReplica other) { this.tp = new TopicPartition(other.tp.topic(), other.tp.partition()); - System.arraycopy(other.replicaLoad, 0, this.replicaLoad, 0, other.replicaLoad.length); + System.arraycopy(other.loads, 0, this.loads, 0, other.loads.length); this.resources.addAll(other.resources); this.timestamp = other.timestamp; } @@ -65,14 +64,14 @@ public Set getResources() { public void setLoad(Resource resource, double value) { this.resources.add(resource); - this.replicaLoad[resource.id()] = value; + this.loads[resource.id()] = value; } public double load(Resource resource) { if (!this.resources.contains(resource)) { return 0.0; } - return this.replicaLoad[resource.id()]; + return this.loads[resource.id()]; } public void setTimestamp(long timestamp) { @@ -102,31 +101,52 @@ public int hashCode() { @Override public String toString() { - return "TopicPartitionReplica{" + - "tp=" + tp + - ", replicaLoad=" + Arrays.toString(replicaLoad) + - '}'; + StringBuilder builder = new StringBuilder(); + builder.append("{TopicPartition=") + .append(tp) + .append(", timestamp=").append(timestamp) + .append(", Loads=["); + for (int i = 0; i < loads.length; i++) { + builder.append(Resource.of(i).resourceString(loads[i])); + if (i != loads.length - 1) { + builder.append(", "); + } + } + builder.append("]"); + int i = 0; + for (Map.Entry entry : metricsMap.entrySet()) { + if (i == 0) { + builder.append(" Metrics={"); + } + builder.append(entry.getKey()) + .append("=") + .append(entry.getValue()); + if (i != metricsMap.size() - 1) { + builder.append(", "); + } + i++; + } + builder.append("}"); + return builder.toString(); } } - public boolean update(AutoBalancerMetrics metrics) { - if (metrics.metricClassId() != AutoBalancerMetrics.MetricClassId.PARTITION_METRIC) { - LOGGER.error("Mismatched metrics type {} for broker", metrics.metricClassId()); - return false; - } - - if (!MetricsUtils.sanityCheckTopicPartitionMetricsCompleteness(metrics)) { - LOGGER.error("Topic partition metrics sanity check failed, metrics is incomplete {}", metrics); + public boolean update(Map metricsMap, long time) { + if (!metricsMap.keySet().containsAll(RawMetricType.partitionMetricTypes())) { + LOGGER.error("Topic partition {} metrics sanity check failed, metrics is incomplete {}", replica.getTopicPartition(), metricsMap.keySet()); return false; } lock.lock(); try { - if (metrics.time() < this.replica.getTimestamp()) { - LOGGER.warn("Outdated metrics at time {}, last updated time {}", metrics.time(), this.replica.getTimestamp()); + if (time < this.replica.getTimestamp()) { + LOGGER.warn("Outdated topic partition {} metrics at time {}, last updated time {}", replica.getTopicPartition(), time, this.replica.getTimestamp()); return false; } - for (Map.Entry entry : metrics.getMetricTypeValueMap().entrySet()) { + for (Map.Entry entry : metricsMap.entrySet()) { + if (entry.getKey().metricScope() != RawMetricType.MetricScope.PARTITION) { + continue; + } switch (entry.getKey()) { case TOPIC_PARTITION_BYTES_IN: this.replica.setLoad(Resource.NW_IN, entry.getValue()); @@ -134,14 +154,12 @@ public boolean update(AutoBalancerMetrics metrics) { case TOPIC_PARTITION_BYTES_OUT: this.replica.setLoad(Resource.NW_OUT, entry.getValue()); break; - case PARTITION_SIZE: - // simply update the timestamp - break; default: - LOGGER.error("Unsupported broker metrics type {}", entry.getKey()); + metricsMap.put(entry.getKey(), entry.getValue()); + break; } } - this.replica.setTimestamp(metrics.time()); + this.replica.setTimestamp(time); } finally { lock.unlock(); } diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index ade9abc58f..0fee5dcc84 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -110,6 +110,10 @@ class ControllerServer( var migrationSupport: Option[ControllerMigrationSupport] = None var autoBalancerManager: AutoBalancerManager = _ + def buildAutoBalancerManager: AutoBalancerManager = { + new AutoBalancerManager(time, config, controller.asInstanceOf[QuorumController], raftManager.client) + } + private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = { lock.lock() try { @@ -300,7 +304,7 @@ class ControllerServer( DataPlaneAcceptor.ThreadPrefix) if (config.getBoolean(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ENABLE)) { - autoBalancerManager = new AutoBalancerManager(time, config, controller.asInstanceOf[QuorumController], raftManager.client) + autoBalancerManager = buildAutoBalancerManager autoBalancerManager.start() } /** diff --git a/core/src/test/java/kafka/autobalancer/ExecutionManagerTest.java b/core/src/test/java/kafka/autobalancer/ControllerActionExecutorServiceTest.java similarity index 91% rename from core/src/test/java/kafka/autobalancer/ExecutionManagerTest.java rename to core/src/test/java/kafka/autobalancer/ControllerActionExecutorServiceTest.java index e69b26399c..4842382e31 100644 --- a/core/src/test/java/kafka/autobalancer/ExecutionManagerTest.java +++ b/core/src/test/java/kafka/autobalancer/ControllerActionExecutorServiceTest.java @@ -20,6 +20,7 @@ import kafka.autobalancer.common.Action; import kafka.autobalancer.common.ActionType; import kafka.autobalancer.config.AutoBalancerControllerConfig; +import kafka.autobalancer.executor.ControllerActionExecutorService; import kafka.test.MockController; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; @@ -38,7 +39,7 @@ import java.util.concurrent.CompletableFuture; @Tag("S3Unit") -public class ExecutionManagerTest { +public class ControllerActionExecutorServiceTest { private boolean checkTopicPartition(AlterPartitionReassignmentsRequestData.ReassignableTopic topic, String name, int partitionId, int nodeId) { @@ -69,13 +70,13 @@ public void testExecuteActions() throws Exception { Mockito.doAnswer(answer -> CompletableFuture.completedFuture(new AlterPartitionReassignmentsResponseData())) .when(controller).alterPartitionReassignments(ctxCaptor.capture(), reqCaptor.capture()); - ExecutionManager executionManager = new ExecutionManager(config, controller); - executionManager.start(); + ControllerActionExecutorService controllerActionExecutorService = new ControllerActionExecutorService(config, controller); + controllerActionExecutorService.start(); List actionList = List.of( new Action(ActionType.MOVE, new TopicPartition("topic1", 0), 0, 1), new Action(ActionType.SWAP, new TopicPartition("topic2", 0), 0, 1, new TopicPartition("topic1", 1))); - executionManager.appendActions(actionList); + controllerActionExecutorService.execute(actionList); TestUtils.waitForCondition(() -> { List reqs = reqCaptor.getAllValues(); @@ -97,6 +98,6 @@ public void testExecuteActions() throws Exception { && checkTopicPartition(reqSwap.topics().get(1), "topic1", 1, 0); }, 5000L, 1000L, () -> "failed to meet reassign"); - executionManager.shutdown(); + controllerActionExecutorService.shutdown(); } } diff --git a/core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java b/core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java index 9760a2c5d9..331bddd8b9 100644 --- a/core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java +++ b/core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java @@ -24,6 +24,7 @@ import kafka.autobalancer.metricsreporter.AutoBalancerMetricsReporter; import kafka.autobalancer.model.ClusterModel; import kafka.autobalancer.model.ClusterModelSnapshot; +import kafka.autobalancer.model.RecordClusterModel; import kafka.autobalancer.model.TopicPartitionReplicaUpdater; import kafka.autobalancer.utils.AutoBalancerClientsIntegrationTestHarness; import kafka.cluster.EndPoint; @@ -86,8 +87,8 @@ public Map overridingBrokerProps() { return props; } - private boolean checkConsumeRecord(ClusterModel clusterModel, int brokerId) { - ClusterModelSnapshot snapshot = clusterModel.snapshot(); + private boolean checkConsumeRecord(ClusterModel clusterModel, int brokerId, long delay) { + ClusterModelSnapshot snapshot = clusterModel.snapshot(Collections.emptySet(), Collections.emptySet(), delay, true); if (snapshot.broker(brokerId) == null) { return false; } @@ -109,7 +110,7 @@ private boolean checkConsumeRecord(ClusterModel clusterModel, int brokerId) { public void testLoadRetrieverShutdown() { Map props = new HashMap<>(); AutoBalancerControllerConfig config = new AutoBalancerControllerConfig(props, false); - ClusterModel clusterModel = new ClusterModel(config); + ClusterModel clusterModel = new ClusterModel(); LoadRetriever loadRetriever = new LoadRetriever(config, cluster.controllers().values().iterator().next().controller(), clusterModel); loadRetriever.start(); @@ -138,10 +139,9 @@ public void testLoadRetrieverShutdown() { public void testConsume() throws InterruptedException { Map props = new HashMap<>(); props.put(AutoBalancerControllerConfig.AUTO_BALANCER_TOPIC_CONFIG, METRIC_TOPIC); - props.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS, 3000L); AutoBalancerControllerConfig config = new AutoBalancerControllerConfig(props, false); - ClusterModel clusterModel = new ClusterModel(config); + RecordClusterModel clusterModel = new RecordClusterModel(); LoadRetriever loadRetriever = new LoadRetriever(config, cluster.controllers().values().iterator().next().controller(), clusterModel); loadRetriever.start(); @@ -178,7 +178,7 @@ public void testConsume() throws InterruptedException { .setPartitionId(0)); loadRetriever.onBrokerRegister(record); - TestUtils.waitForCondition(() -> checkConsumeRecord(clusterModel, brokerConfig.brokerId()), + TestUtils.waitForCondition(() -> checkConsumeRecord(clusterModel, brokerConfig.brokerId(), 3000L), 15000L, 1000L, () -> "cluster model failed to reach expected status"); UnregisterBrokerRecord unregisterRecord = new UnregisterBrokerRecord() @@ -186,7 +186,7 @@ public void testConsume() throws InterruptedException { loadRetriever.onBrokerUnregister(unregisterRecord); Thread.sleep(5000); Assertions.assertTrue(() -> { - ClusterModelSnapshot snapshot = clusterModel.snapshot(); + ClusterModelSnapshot snapshot = clusterModel.snapshot(Collections.emptySet(), Collections.emptySet(), 3000L, true); if (snapshot.broker(brokerConfig.brokerId()) != null) { return false; } @@ -199,7 +199,7 @@ public void testConsume() throws InterruptedException { clusterModel.onBrokerRegister(record); loadRetriever.onBrokerRegister(record); - TestUtils.waitForCondition(() -> checkConsumeRecord(clusterModel, brokerConfig.brokerId()), + TestUtils.waitForCondition(() -> checkConsumeRecord(clusterModel, brokerConfig.brokerId(), 3000L), 15000L, 1000L, () -> "cluster model failed to reach expected status"); Assertions.assertTimeout(Duration.ofMillis(5000), loadRetriever::shutdown); } diff --git a/core/src/test/java/kafka/autobalancer/goals/AbstractGoalTest.java b/core/src/test/java/kafka/autobalancer/goals/AbstractGoalTest.java index e22146e49e..6f7b739220 100644 --- a/core/src/test/java/kafka/autobalancer/goals/AbstractGoalTest.java +++ b/core/src/test/java/kafka/autobalancer/goals/AbstractGoalTest.java @@ -17,11 +17,14 @@ package kafka.autobalancer.goals; +import kafka.autobalancer.common.Action; +import kafka.autobalancer.common.ActionType; import kafka.autobalancer.common.Resource; import kafka.autobalancer.config.AutoBalancerControllerConfig; import kafka.autobalancer.model.BrokerUpdater; import kafka.autobalancer.model.ClusterModelSnapshot; import kafka.autobalancer.model.TopicPartitionReplicaUpdater; +import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; @@ -36,7 +39,7 @@ @Tag("S3Unit") public class AbstractGoalTest extends GoalTestBase { - private final Map goalMap = new HashMap<>(); + private final Map goalMap = new HashMap<>(); @BeforeEach public void setup() { @@ -60,6 +63,65 @@ public void setup() { } } + @Test + public void testCalculateAcceptanceScore() { + ClusterModelSnapshot cluster = new ClusterModelSnapshot(); + BrokerUpdater.Broker broker0 = createBroker(cluster, RACK, 0, true); + BrokerUpdater.Broker broker1 = createBroker(cluster, RACK, 1, true); + BrokerUpdater.Broker broker2 = createBroker(cluster, RACK, 2, true); + + broker0.setCapacity(Resource.NW_IN, 50); + broker0.setLoad(Resource.NW_IN, 40); + + broker1.setCapacity(Resource.NW_IN, 50); + broker1.setLoad(Resource.NW_IN, 80); + + broker2.setCapacity(Resource.NW_IN, 50); + broker2.setLoad(Resource.NW_IN, 120); + + TopicPartitionReplicaUpdater.TopicPartitionReplica replica0 = createTopicPartition(cluster, 0, TOPIC_0, 0); + replica0.setLoad(Resource.NW_IN, 40); + + TopicPartitionReplicaUpdater.TopicPartitionReplica replica1 = createTopicPartition(cluster, 1, TOPIC_0, 1); + TopicPartitionReplicaUpdater.TopicPartitionReplica replica2 = createTopicPartition(cluster, 1, TOPIC_0, 2); + replica1.setLoad(Resource.NW_IN, 40); + replica2.setLoad(Resource.NW_IN, 40); + + TopicPartitionReplicaUpdater.TopicPartitionReplica replica3 = createTopicPartition(cluster, 2, TOPIC_0, 3); + TopicPartitionReplicaUpdater.TopicPartitionReplica replica4 = createTopicPartition(cluster, 2, TOPIC_0, 4); + TopicPartitionReplicaUpdater.TopicPartitionReplica replica5 = createTopicPartition(cluster, 2, TOPIC_0, 5); + replica3.setLoad(Resource.NW_IN, 40); + replica4.setLoad(Resource.NW_IN, 40); + replica5.setLoad(Resource.NW_IN, 40); + + Goal capacityGoal = goalMap.get(NetworkInCapacityGoal.class.getSimpleName()); + + Assertions.assertEquals(0, capacityGoal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 0), 0, 1), cluster)); + Assertions.assertEquals(0, capacityGoal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 0), 0, 2), cluster)); + + Assertions.assertEquals(0, capacityGoal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 2), 1, 0), cluster)); + Assertions.assertEquals(0, capacityGoal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 2), 1, 2), cluster)); + + Assertions.assertEquals(0, capacityGoal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 4), 2, 0), cluster)); + Assertions.assertEquals(0, capacityGoal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 4), 2, 1), cluster)); + + Goal distributionGoal = goalMap.get(NetworkInDistributionGoal.class.getSimpleName()); + + Assertions.assertEquals(0.1, distributionGoal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 0), 0, 1), cluster), 0.001); + Assertions.assertEquals(0.1, distributionGoal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 0), 0, 2), cluster), 0.001); + + Assertions.assertEquals(0.5, distributionGoal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 2), 1, 0), cluster), 0.001); + Assertions.assertEquals(0.1, distributionGoal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 2), 1, 2), cluster), 0.001); + + Assertions.assertEquals(0.9, distributionGoal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 4), 2, 0), cluster), 0.001); + Assertions.assertEquals(0.5, distributionGoal.actionAcceptanceScore(new Action(ActionType.MOVE, new TopicPartition(TOPIC_0, 4), 2, 1), cluster), 0.001); + + for (Goal goal : goalMap.values()) { + List actions = goal.optimize(cluster, goalMap.values()); + Assertions.assertTrue(actions.isEmpty()); + } + } + @Test public void testMultiGoalOptimization() { ClusterModelSnapshot cluster = new ClusterModelSnapshot(); @@ -136,7 +198,7 @@ public void testMultiGoalOptimization() { Assertions.assertEquals(60, cluster.replicasFor(3).stream().mapToDouble(e -> e.load(Resource.NW_IN)).sum()); Assertions.assertEquals(10, cluster.replicasFor(3).stream().mapToDouble(e -> e.load(Resource.NW_OUT)).sum()); - for (AbstractGoal goal : goalMap.values()) { + for (Goal goal : goalMap.values()) { goal.optimize(cluster, goalMap.values()); } for (BrokerUpdater.Broker broker : cluster.brokers()) { @@ -149,7 +211,7 @@ public void testMultiGoalOptimization() { Assertions.assertTrue(goalMap.get(NetworkOutDistributionGoal.class.getSimpleName()).isBrokerAcceptable(broker)); } } - for (AbstractGoal goal : goalMap.values()) { + for (Goal goal : goalMap.values()) { goal.optimize(cluster, goalMap.values()); } // all goals succeed in second iteration diff --git a/core/src/test/java/kafka/autobalancer/goals/AbstractResourceCapacityGoalTest.java b/core/src/test/java/kafka/autobalancer/goals/AbstractResourceCapacityGoalTest.java index 0e616e60f3..a8fdca4698 100644 --- a/core/src/test/java/kafka/autobalancer/goals/AbstractResourceCapacityGoalTest.java +++ b/core/src/test/java/kafka/autobalancer/goals/AbstractResourceCapacityGoalTest.java @@ -36,7 +36,7 @@ @Tag("S3Unit") public class AbstractResourceCapacityGoalTest extends GoalTestBase { - private final Map goalMap = new HashMap<>(); + private final Map goalMap = new HashMap<>(); @BeforeEach public void setup() { @@ -53,8 +53,8 @@ public void setup() { } } - private AbstractGoal getGoalByResource(Resource resource) { - AbstractGoal goal = null; + private Goal getGoalByResource(Resource resource) { + Goal goal = null; switch (resource) { case NW_IN: goal = goalMap.get(NetworkInCapacityGoal.class.getSimpleName()); @@ -69,7 +69,7 @@ private AbstractGoal getGoalByResource(Resource resource) { } private void testActionAcceptanceScore(Resource resource) { - AbstractGoal goal = getGoalByResource(resource); + Goal goal = getGoalByResource(resource); Assertions.assertNotNull(goal); ClusterModelSnapshot cluster = new ClusterModelSnapshot(); @@ -127,7 +127,7 @@ private void testActionAcceptanceScore(Resource resource) { } private void testSingleResourceCapacityOptimizeOneMove(Resource resource) { - AbstractGoal goal = getGoalByResource(resource); + Goal goal = getGoalByResource(resource); Assertions.assertNotNull(goal); ClusterModelSnapshot cluster = new ClusterModelSnapshot(); @@ -200,7 +200,7 @@ private void testSingleResourceCapacityOptimizeOneMove(Resource resource) { } private void testSingleResourceCapacityOptimizeMultiMove(Resource resource) { - AbstractGoal goal = getGoalByResource(resource); + Goal goal = getGoalByResource(resource); Assertions.assertNotNull(goal); ClusterModelSnapshot cluster = new ClusterModelSnapshot(); @@ -265,7 +265,7 @@ public void testSingleResourceCapacityOptimizeMultiMove() { @Test public void testMultiGoalOptimizeWithOneToOneReplicaSwap() { - AbstractGoal goal = getGoalByResource(Resource.NW_IN); + Goal goal = getGoalByResource(Resource.NW_IN); Assertions.assertNotNull(goal); ClusterModelSnapshot cluster = new ClusterModelSnapshot(); diff --git a/core/src/test/java/kafka/autobalancer/goals/AbstractResourceDistributionGoalTest.java b/core/src/test/java/kafka/autobalancer/goals/AbstractResourceDistributionGoalTest.java index 5f53387b75..36ff4672c9 100644 --- a/core/src/test/java/kafka/autobalancer/goals/AbstractResourceDistributionGoalTest.java +++ b/core/src/test/java/kafka/autobalancer/goals/AbstractResourceDistributionGoalTest.java @@ -36,7 +36,7 @@ @Tag("S3Unit") public class AbstractResourceDistributionGoalTest extends GoalTestBase { - private final Map goalMap = new HashMap<>(); + private final Map goalMap = new HashMap<>(); @BeforeEach public void setup() { @@ -55,8 +55,8 @@ public void setup() { } } - private AbstractGoal getGoalByResource(Resource resource) { - AbstractGoal goal = null; + private Goal getGoalByResource(Resource resource) { + Goal goal = null; switch (resource) { case NW_IN: goal = goalMap.get(NetworkInDistributionGoal.class.getSimpleName()); @@ -71,7 +71,7 @@ private AbstractGoal getGoalByResource(Resource resource) { } private void testActionAcceptanceScore(Resource resource) { - AbstractGoal goal = getGoalByResource(resource); + Goal goal = getGoalByResource(resource); Assertions.assertNotNull(goal); ClusterModelSnapshot cluster = new ClusterModelSnapshot(); @@ -104,7 +104,7 @@ private void testActionAcceptanceScore(Resource resource) { } private void testSingleResourceDistributionOptimizeOneMove(Resource resource) { - AbstractGoal goal = getGoalByResource(resource); + Goal goal = getGoalByResource(resource); Assertions.assertNotNull(goal); ClusterModelSnapshot cluster = new ClusterModelSnapshot(); @@ -144,7 +144,7 @@ private void testSingleResourceDistributionOptimizeOneMove(Resource resource) { } private void testSingleResourceDistributionOptimizeMultiMoveOut(Resource resource) { - AbstractGoal goal = getGoalByResource(resource); + Goal goal = getGoalByResource(resource); Assertions.assertNotNull(goal); ClusterModelSnapshot cluster = new ClusterModelSnapshot(); @@ -190,7 +190,7 @@ private void testSingleResourceDistributionOptimizeMultiMoveOut(Resource resourc } private void testSingleResourceDistributionOptimizeMultiMoveIn(Resource resource) { - AbstractGoal goal = getGoalByResource(resource); + Goal goal = getGoalByResource(resource); Assertions.assertNotNull(goal); ClusterModelSnapshot cluster = new ClusterModelSnapshot(); @@ -256,7 +256,7 @@ public void testSingleResourceDistributionOptimizeMultiMoveOut() { } private void testMultiGoalOptimizeWithOneToOneReplicaSwap(Resource resource) { - AbstractGoal goal = getGoalByResource(resource); + Goal goal = getGoalByResource(resource); Assertions.assertNotNull(goal); ClusterModelSnapshot cluster = new ClusterModelSnapshot(); diff --git a/core/src/test/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporterTest.java b/core/src/test/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporterTest.java index 340f81e109..05fd1115dd 100644 --- a/core/src/test/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporterTest.java +++ b/core/src/test/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporterTest.java @@ -17,11 +17,11 @@ package kafka.autobalancer.metricsreporter; +import kafka.autobalancer.common.RawMetricType; import kafka.autobalancer.config.AutoBalancerConfig; import kafka.autobalancer.config.AutoBalancerMetricsReporterConfig; import kafka.autobalancer.metricsreporter.metric.AutoBalancerMetrics; import kafka.autobalancer.metricsreporter.metric.MetricSerde; -import kafka.autobalancer.metricsreporter.metric.RawMetricType; import kafka.autobalancer.utils.AutoBalancerClientsIntegrationTestHarness; import kafka.server.KafkaConfig; import org.apache.kafka.clients.CommonClientConfigs; @@ -47,14 +47,14 @@ import java.util.Properties; import java.util.Set; -import static kafka.autobalancer.metricsreporter.metric.RawMetricType.ALL_TOPIC_BYTES_IN; -import static kafka.autobalancer.metricsreporter.metric.RawMetricType.ALL_TOPIC_BYTES_OUT; -import static kafka.autobalancer.metricsreporter.metric.RawMetricType.BROKER_CAPACITY_NW_IN; -import static kafka.autobalancer.metricsreporter.metric.RawMetricType.BROKER_CAPACITY_NW_OUT; -import static kafka.autobalancer.metricsreporter.metric.RawMetricType.BROKER_CPU_UTIL; -import static kafka.autobalancer.metricsreporter.metric.RawMetricType.PARTITION_SIZE; -import static kafka.autobalancer.metricsreporter.metric.RawMetricType.TOPIC_PARTITION_BYTES_IN; -import static kafka.autobalancer.metricsreporter.metric.RawMetricType.TOPIC_PARTITION_BYTES_OUT; +import static kafka.autobalancer.common.RawMetricType.ALL_TOPIC_BYTES_IN; +import static kafka.autobalancer.common.RawMetricType.ALL_TOPIC_BYTES_OUT; +import static kafka.autobalancer.common.RawMetricType.BROKER_CAPACITY_NW_IN; +import static kafka.autobalancer.common.RawMetricType.BROKER_CAPACITY_NW_OUT; +import static kafka.autobalancer.common.RawMetricType.BROKER_CPU_UTIL; +import static kafka.autobalancer.common.RawMetricType.PARTITION_SIZE; +import static kafka.autobalancer.common.RawMetricType.TOPIC_PARTITION_BYTES_IN; +import static kafka.autobalancer.common.RawMetricType.TOPIC_PARTITION_BYTES_OUT; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.doNothing; @@ -79,6 +79,7 @@ public void tearDown() { @Override protected Map overridingNodeProps() { Map props = new HashMap<>(); +// props.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ENABLE, "false"); props.put(AutoBalancerConfig.AUTO_BALANCER_TOPIC_CONFIG, METRIC_TOPIC); props.put(AutoBalancerConfig.AUTO_BALANCER_METRICS_TOPIC_NUM_PARTITIONS_CONFIG, "1"); props.put(KafkaConfig.LogFlushIntervalMessagesProp(), "1"); @@ -103,48 +104,44 @@ public void testReportingMetrics() { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MetricSerde.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, "testReportingMetrics"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - Consumer consumer = new KafkaConsumer<>(props); - - consumer.subscribe(Collections.singleton(METRIC_TOPIC)); - long startMs = System.currentTimeMillis(); - Set expectedBrokerMetricTypes = new HashSet<>(Arrays.asList( - (int) BROKER_CAPACITY_NW_IN.id(), - (int) BROKER_CAPACITY_NW_OUT.id(), - (int) ALL_TOPIC_BYTES_IN.id(), - (int) ALL_TOPIC_BYTES_OUT.id(), - (int) BROKER_CPU_UTIL.id())); - Set expectedTopicPartitionMetricTypes = new HashSet<>(Arrays.asList( - (int) TOPIC_PARTITION_BYTES_IN.id(), - (int) TOPIC_PARTITION_BYTES_OUT.id(), - (int) PARTITION_SIZE.id())); - Set expectedMetricTypes = new HashSet<>(expectedBrokerMetricTypes); - expectedMetricTypes.addAll(expectedTopicPartitionMetricTypes); - - Set metricTypes = new HashSet<>(); - ConsumerRecords records; - while (metricTypes.size() < (expectedBrokerMetricTypes.size() + expectedTopicPartitionMetricTypes.size()) - && System.currentTimeMillis() < startMs + 15000) { - records = consumer.poll(Duration.ofMillis(10L)); - for (ConsumerRecord record : records) { - AutoBalancerMetrics metrics = record.value(); - Set localMetricTypes = new HashSet<>(); - for (RawMetricType type : record.value().getMetricTypeValueMap().keySet()) { - int typeId = type.id(); - metricTypes.add(typeId); - localMetricTypes.add(typeId); + try (Consumer consumer = new KafkaConsumer<>(props)) { + consumer.subscribe(Collections.singleton(METRIC_TOPIC)); + long startMs = System.currentTimeMillis(); + Set expectedBrokerMetricTypes = new HashSet<>(Arrays.asList( + (int) BROKER_CAPACITY_NW_IN.id(), + (int) BROKER_CAPACITY_NW_OUT.id(), + (int) ALL_TOPIC_BYTES_IN.id(), + (int) ALL_TOPIC_BYTES_OUT.id(), + (int) BROKER_CPU_UTIL.id())); + Set expectedTopicPartitionMetricTypes = new HashSet<>(Arrays.asList( + (int) TOPIC_PARTITION_BYTES_IN.id(), + (int) TOPIC_PARTITION_BYTES_OUT.id(), + (int) PARTITION_SIZE.id())); + Set expectedMetricTypes = new HashSet<>(expectedBrokerMetricTypes); + expectedMetricTypes.addAll(expectedTopicPartitionMetricTypes); + + Set metricTypes = new HashSet<>(); + ConsumerRecords records; + while (metricTypes.size() < (expectedBrokerMetricTypes.size() + expectedTopicPartitionMetricTypes.size()) + && System.currentTimeMillis() < startMs + 15000) { + records = consumer.poll(Duration.ofMillis(10L)); + for (ConsumerRecord record : records) { + AutoBalancerMetrics metrics = record.value(); + Set localMetricTypes = new HashSet<>(); + for (RawMetricType type : record.value().getMetricTypeValueMap().keySet()) { + int typeId = type.id(); + metricTypes.add(typeId); + localMetricTypes.add(typeId); + } + Set expectedMap = metrics.metricClassId() == AutoBalancerMetrics.MetricClassId.BROKER_METRIC ? + expectedBrokerMetricTypes : expectedTopicPartitionMetricTypes; + Assertions.assertEquals(expectedMap, localMetricTypes, "Expected " + expectedMap + ", but saw " + localMetricTypes); } - Set expectedMap = metrics.metricClassId() == AutoBalancerMetrics.MetricClassId.BROKER_METRIC ? - expectedBrokerMetricTypes : expectedTopicPartitionMetricTypes; - Assertions.assertEquals(expectedMap, localMetricTypes, "Expected " + expectedMap + ", but saw " + localMetricTypes); } + Assertions.assertEquals(expectedMetricTypes, metricTypes, "Expected " + expectedMetricTypes + ", but saw " + metricTypes); } - Assertions.assertEquals(expectedMetricTypes, metricTypes, "Expected " + expectedMetricTypes + ", but saw " + metricTypes); } - - - - @Test public void testConfigureNwCapacity() throws NoSuchFieldException, IllegalAccessException { // Create a spy object for AutoBalancerMetricsReporter diff --git a/core/src/test/java/kafka/autobalancer/metricsreporter/metric/MetricSerdeTest.java b/core/src/test/java/kafka/autobalancer/metricsreporter/metric/MetricSerdeTest.java index 4d3d4c8dab..a51fbf206c 100644 --- a/core/src/test/java/kafka/autobalancer/metricsreporter/metric/MetricSerdeTest.java +++ b/core/src/test/java/kafka/autobalancer/metricsreporter/metric/MetricSerdeTest.java @@ -17,6 +17,7 @@ package kafka.autobalancer.metricsreporter.metric; +import kafka.autobalancer.common.RawMetricType; import kafka.autobalancer.metricsreporter.exception.UnknownVersionException; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; diff --git a/core/src/test/java/kafka/autobalancer/metricsreporter/metric/MetricsUtilsTest.java b/core/src/test/java/kafka/autobalancer/metricsreporter/metric/MetricsUtilsTest.java index d36a28bb0f..7cc556767f 100644 --- a/core/src/test/java/kafka/autobalancer/metricsreporter/metric/MetricsUtilsTest.java +++ b/core/src/test/java/kafka/autobalancer/metricsreporter/metric/MetricsUtilsTest.java @@ -17,6 +17,7 @@ package kafka.autobalancer.metricsreporter.metric; +import kafka.autobalancer.common.RawMetricType; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; diff --git a/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java b/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java index 2e2daa5e51..762d2d4030 100644 --- a/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java +++ b/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java @@ -17,9 +17,8 @@ package kafka.autobalancer.model; -import kafka.autobalancer.config.AutoBalancerControllerConfig; +import kafka.autobalancer.common.RawMetricType; import kafka.autobalancer.metricsreporter.metric.BrokerMetrics; -import kafka.autobalancer.metricsreporter.metric.RawMetricType; import kafka.autobalancer.metricsreporter.metric.TopicPartitionMetrics; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -33,7 +32,6 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import java.util.Collections; import java.util.List; @Tag("S3Unit") @@ -41,7 +39,7 @@ public class ClusterModelTest { @Test public void testRegisterBroker() { - ClusterModel clusterModel = new ClusterModel(new AutoBalancerControllerConfig(Collections.emptyMap(), false)); + RecordClusterModel clusterModel = new RecordClusterModel(); RegisterBrokerRecord record1 = new RegisterBrokerRecord() .setBrokerId(1); RegisterBrokerRecord record2 = new RegisterBrokerRecord() @@ -50,13 +48,13 @@ public void testRegisterBroker() { clusterModel.onBrokerRegister(record2); clusterModel.onBrokerRegister(record1); - Assertions.assertEquals(1, clusterModel.brokerUpdater(1).id()); - Assertions.assertEquals(2, clusterModel.brokerUpdater(2).id()); + Assertions.assertEquals(1, clusterModel.brokerUpdater(1).get().getBrokerId()); + Assertions.assertEquals(2, clusterModel.brokerUpdater(2).get().getBrokerId()); } @Test public void testUnregisterBroker() { - ClusterModel clusterModel = new ClusterModel(new AutoBalancerControllerConfig(Collections.emptyMap(), false)); + RecordClusterModel clusterModel = new RecordClusterModel(); RegisterBrokerRecord record1 = new RegisterBrokerRecord() .setBrokerId(1); RegisterBrokerRecord record2 = new RegisterBrokerRecord() @@ -65,20 +63,20 @@ public void testUnregisterBroker() { clusterModel.onBrokerRegister(record2); clusterModel.onBrokerRegister(record1); - Assertions.assertEquals(1, clusterModel.brokerUpdater(1).id()); - Assertions.assertEquals(2, clusterModel.brokerUpdater(2).id()); + Assertions.assertEquals(1, clusterModel.brokerUpdater(1).get().getBrokerId()); + Assertions.assertEquals(2, clusterModel.brokerUpdater(2).get().getBrokerId()); UnregisterBrokerRecord unregisterRecord = new UnregisterBrokerRecord() .setBrokerId(2); clusterModel.onBrokerUnregister(unregisterRecord); - Assertions.assertEquals(1, clusterModel.brokerUpdater(1).id()); + Assertions.assertEquals(1, clusterModel.brokerUpdater(1).get().getBrokerId()); Assertions.assertNull(clusterModel.brokerUpdater(2)); } @Test public void testCreateTopic() { - ClusterModel clusterModel = new ClusterModel(new AutoBalancerControllerConfig(Collections.emptyMap(), false)); + RecordClusterModel clusterModel = new RecordClusterModel(); String topicName = "testTopic"; Uuid topicId = Uuid.randomUuid(); TopicRecord record = new TopicRecord() @@ -91,7 +89,7 @@ public void testCreateTopic() { @Test public void testDeleteTopic() { - ClusterModel clusterModel = new ClusterModel(new AutoBalancerControllerConfig(Collections.emptyMap(), false)); + RecordClusterModel clusterModel = new RecordClusterModel(); String topicName = "testTopic"; Uuid topicId = Uuid.randomUuid(); TopicRecord record = new TopicRecord() @@ -110,7 +108,7 @@ public void testDeleteTopic() { @Test public void testCreatePartition() { - ClusterModel clusterModel = new ClusterModel(new AutoBalancerControllerConfig(Collections.emptyMap(), false)); + RecordClusterModel clusterModel = new RecordClusterModel(); String topicName = "testTopic"; Uuid topicId = Uuid.randomUuid(); int partition = 0; @@ -147,7 +145,7 @@ public void testCreatePartition() { @Test public void testChangePartition() { - ClusterModel clusterModel = new ClusterModel(new AutoBalancerControllerConfig(Collections.emptyMap(), false)); + RecordClusterModel clusterModel = new RecordClusterModel(); String topicName = "testTopic"; Uuid topicId = Uuid.randomUuid(); int partition = 0; @@ -196,7 +194,7 @@ public void testChangePartition() { @Test public void testUpdateBroker() { - ClusterModel clusterModel = new ClusterModel(new AutoBalancerControllerConfig(Collections.emptyMap(), false)); + RecordClusterModel clusterModel = new RecordClusterModel(); int brokerId = 1; // update on non-exist broker @@ -207,18 +205,18 @@ public void testUpdateBroker() { brokerMetrics.put(RawMetricType.ALL_TOPIC_BYTES_IN, 10); brokerMetrics.put(RawMetricType.ALL_TOPIC_BYTES_OUT, 10); brokerMetrics.put(RawMetricType.BROKER_CPU_UTIL, 10); - Assertions.assertFalse(clusterModel.updateBroker(brokerMetrics)); + Assertions.assertFalse(clusterModel.updateBrokerMetrics(brokerMetrics.brokerId(), brokerMetrics.getMetricTypeValueMap(), brokerMetrics.time())); RegisterBrokerRecord registerBrokerRecord = new RegisterBrokerRecord() .setBrokerId(brokerId); clusterModel.onBrokerRegister(registerBrokerRecord); - Assertions.assertEquals(brokerId, clusterModel.brokerUpdater(brokerId).id()); - Assertions.assertTrue(clusterModel.updateBroker(brokerMetrics)); + Assertions.assertEquals(brokerId, clusterModel.brokerUpdater(brokerId).get().getBrokerId()); + Assertions.assertTrue(clusterModel.updateBrokerMetrics(brokerMetrics.brokerId(), brokerMetrics.getMetricTypeValueMap(), brokerMetrics.time())); } @Test public void testUpdatePartition() { - ClusterModel clusterModel = new ClusterModel(new AutoBalancerControllerConfig(Collections.emptyMap(), false)); + RecordClusterModel clusterModel = new RecordClusterModel(); String topicName = "testTopic"; Uuid topicId = Uuid.randomUuid(); int partition = 0; @@ -230,7 +228,8 @@ public void testUpdatePartition() { topicPartitionMetrics.put(RawMetricType.TOPIC_PARTITION_BYTES_IN, 10); topicPartitionMetrics.put(RawMetricType.TOPIC_PARTITION_BYTES_OUT, 10); topicPartitionMetrics.put(RawMetricType.PARTITION_SIZE, 10); - Assertions.assertFalse(clusterModel.updateTopicPartition(topicPartitionMetrics)); + Assertions.assertFalse(clusterModel.updateTopicPartitionMetrics(topicPartitionMetrics.brokerId(), + new TopicPartition(topicName, partition), topicPartitionMetrics.getMetricTypeValueMap(), topicPartitionMetrics.time())); RegisterBrokerRecord registerBrokerRecord = new RegisterBrokerRecord() .setBrokerId(brokerId); @@ -244,7 +243,8 @@ public void testUpdatePartition() { .setTopicId(topicId) .setPartitionId(partition); clusterModel.onPartitionCreate(partitionRecord); - Assertions.assertTrue(clusterModel.updateTopicPartition(topicPartitionMetrics)); + Assertions.assertTrue(clusterModel.updateTopicPartitionMetrics(topicPartitionMetrics.brokerId(), + new TopicPartition(topicName, partition), topicPartitionMetrics.getMetricTypeValueMap(), topicPartitionMetrics.time())); } @Test diff --git a/core/src/test/java/kafka/autobalancer/utils/AutoBalancerIntegrationTestHarness.java b/core/src/test/java/kafka/autobalancer/utils/AutoBalancerIntegrationTestHarness.java index 19ff3fc80b..460528db99 100644 --- a/core/src/test/java/kafka/autobalancer/utils/AutoBalancerIntegrationTestHarness.java +++ b/core/src/test/java/kafka/autobalancer/utils/AutoBalancerIntegrationTestHarness.java @@ -67,9 +67,9 @@ public void setUp() { i++; } for (ControllerNode controller : nodes.controllerNodes().values()) { + controller.propertyOverrides().put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ENABLE, "true"); controller.propertyOverrides().putAll(overridingControllerProps()); controller.propertyOverrides().putAll(overridingNodeProps()); - controller.propertyOverrides().put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ENABLE, "true"); controller.propertyOverrides().put(KafkaConfig.ElasticStreamEnableProp(), "true"); controller.propertyOverrides().put(KafkaConfig.S3MockEnableProp(), "true"); }