From 6412e5162dd8fedb9566cb160a7d5ace53bd6f74 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Fri, 29 Dec 2023 18:51:50 +0800 Subject: [PATCH] feat(core): change autobalancer capacity unit to bytes - change autobalancer capacity unit to bytes to be consistent with s3 network bandwidth configuration Signed-off-by: Shichao Nie --- .../kafka/autobalancer/common/Resource.java | 42 ++++++++++++++++--- .../AutoBalancerMetricsReporterConfig.java | 8 ++-- .../AutoBalancerMetricsReporter.java | 9 ++-- .../metricsreporter/metric/MetricsUtils.java | 4 -- .../autobalancer/model/BrokerUpdater.java | 32 ++++++++++---- 5 files changed, 68 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/kafka/autobalancer/common/Resource.java b/core/src/main/java/kafka/autobalancer/common/Resource.java index 21c222ea2f..f025dd3f42 100644 --- a/core/src/main/java/kafka/autobalancer/common/Resource.java +++ b/core/src/main/java/kafka/autobalancer/common/Resource.java @@ -22,23 +22,28 @@ import java.util.List; - /** * CPU: a host and broker-level resource. * NW (in and out): a host-level resource. * DISK: a broker-level resource. */ public enum Resource { - CPU("cpu", 0, 0.001), - NW_IN("networkInbound", 1, 10), - NW_OUT("networkOutbound", 2, 10); + CPU("CPU", 0, 0.001), + NW_IN("NWIn", 1, 10), + NW_OUT("NWOut", 2, 10), + UNKNOWN("UNKNOWN", 999, 0); + public static final Double IGNORED_CAPACITY_VALUE = -1.0; // EPSILON_PERCENT defines the acceptable nuance when comparing the utilization of the resource. // This nuance is generated due to precision loss when summing up float type utilization value. // In stress test we find that for cluster of around 800,000 replicas, the summed up nuance can be // more than 0.1% of sum value. private static final double EPSILON_PERCENT = 0.0008; - private static final List CACHED_VALUES = List.of(values()); + private static final List CACHED_VALUES = List.of( + Resource.CPU, + Resource.NW_IN, + Resource.NW_OUT + ); private final String resource; private final int id; private final double epsilon; @@ -49,6 +54,33 @@ public enum Resource { this.epsilon = epsilon; } + public static Resource of(int id) { + if (id < 0 || id >= CACHED_VALUES.size()) { + return UNKNOWN; + } + return CACHED_VALUES.get(id); + } + + public String resourceString(double value) { + String valueStr = ""; + if (value == IGNORED_CAPACITY_VALUE) { + valueStr = "ignored"; + } else { + switch (this) { + case CPU: + valueStr = String.format("%.2f%%", value * 100); + break; + case NW_IN: + case NW_OUT: + valueStr = String.format("%.2fKB/s", value / 1024); + break; + default: + break; + } + } + return this.resource + "=" + valueStr; + } + /** * Use this instead of values() because values() creates a new array each time. * diff --git a/core/src/main/java/kafka/autobalancer/config/AutoBalancerMetricsReporterConfig.java b/core/src/main/java/kafka/autobalancer/config/AutoBalancerMetricsReporterConfig.java index 61bfd06bbc..acb19428bb 100644 --- a/core/src/main/java/kafka/autobalancer/config/AutoBalancerMetricsReporterConfig.java +++ b/core/src/main/java/kafka/autobalancer/config/AutoBalancerMetricsReporterConfig.java @@ -45,8 +45,8 @@ public class AutoBalancerMetricsReporterConfig extends AutoBalancerConfig { public static final String AUTO_BALANCER_METRICS_REPORTER_BATCH_SIZE_CONFIG = PREFIX + "producer.batch.size"; public static final String AUTO_BALANCER_METRICS_REPORTER_KUBERNETES_MODE_CONFIG = PREFIX + "kubernetes.mode"; /* Default values */ - public static final double DEFAULT_AUTO_BALANCER_BROKER_NW_IN_CAPACITY = 100 * 1024; // 100MB/s - public static final double DEFAULT_AUTO_BALANCER_BROKER_NW_OUT_CAPACITY = 100 * 1024; // 100MB/s + public static final double DEFAULT_AUTO_BALANCER_BROKER_NW_IN_CAPACITY = 100 * 1024 * 1024; // 100MB/s + public static final double DEFAULT_AUTO_BALANCER_BROKER_NW_OUT_CAPACITY = 100 * 1024 * 1024; // 100MB/s public static final String DEFAULT_AUTO_BALANCER_METRICS_REPORTER_PRODUCER_CLIENT_ID = "AutoBalancerMetricsReporterProducer"; public static final long DEFAULT_AUTO_BALANCER_METRICS_TOPIC_AUTO_CREATE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); public static final Integer DEFAULT_AUTO_BALANCER_METRICS_TOPIC_AUTO_CREATE_RETRIES = 5; @@ -57,8 +57,8 @@ public class AutoBalancerMetricsReporterConfig extends AutoBalancerConfig { public static final boolean DEFAULT_AUTO_BALANCER_METRICS_REPORTER_KUBERNETES_MODE = false; public static final int DEFAULT_AUTO_BALANCER_METRICS_REPORTER_CREATE_RETRIES = 2; /* Documents */ - public static final String AUTO_BALANCER_BROKER_NW_IN_CAPACITY_DOC = "Maximum network input bandwidth available for the broker in KB/s"; - public static final String AUTO_BALANCER_BROKER_NW_OUT_CAPACITY_DOC = "Maximum network output bandwidth available for the broker in KB/s"; + public static final String AUTO_BALANCER_BROKER_NW_IN_CAPACITY_DOC = "Maximum network input bandwidth available for the broker in Bytes/s"; + public static final String AUTO_BALANCER_BROKER_NW_OUT_CAPACITY_DOC = "Maximum network output bandwidth available for the broker in Bytes/s"; private static final String AUTO_BALANCER_METRICS_TOPIC_AUTO_CREATE_TIMEOUT_MS_DOC = "Timeout on the Auto Balancer metrics topic creation"; private static final String AUTO_BALANCER_METRICS_TOPIC_AUTO_CREATE_RETRIES_DOC = "Number of retries of the Auto Balancer metrics reporter" + " for the topic creation"; diff --git a/core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java b/core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java index cecd16c012..bf6022a3ae 100644 --- a/core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java +++ b/core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java @@ -190,14 +190,13 @@ public void configure(Map rawConfigs) { } //Add AUTO_BALANCER_BROKER_NW_IN/OUT_CAPACITY by S3NetworkBaselineBandwidthProp config value if not set - if (configs.get(KafkaConfig.S3NetworkBaselineBandwidthProp()) != null) { + Object s3NetworkBaselineBandwidth = configs.get(KafkaConfig.S3NetworkBaselineBandwidthProp()); + if (s3NetworkBaselineBandwidth != null) { if (!configs.containsKey(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_BROKER_NW_IN_CAPACITY)) - configs.put(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_BROKER_NW_IN_CAPACITY, - configs.get(KafkaConfig.S3NetworkBaselineBandwidthProp())); + configs.put(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_BROKER_NW_IN_CAPACITY, s3NetworkBaselineBandwidth); if (!configs.containsKey(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_BROKER_NW_OUT_CAPACITY)) - configs.put(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_BROKER_NW_OUT_CAPACITY, - configs.get(KafkaConfig.S3NetworkBaselineBandwidthProp())); + configs.put(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_BROKER_NW_OUT_CAPACITY, s3NetworkBaselineBandwidth); } AutoBalancerMetricsReporterConfig reporterConfig = new AutoBalancerMetricsReporterConfig(configs, false); diff --git a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/MetricsUtils.java b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/MetricsUtils.java index e03abbb711..8ced714c4b 100644 --- a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/MetricsUtils.java +++ b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/MetricsUtils.java @@ -188,12 +188,8 @@ private static AutoBalancerMetrics toAutoBalancerMetric(long nowMs, } switch (name) { case BYTES_IN_PER_SEC: - // network inbound bandwidth capacity is in KB/s - value = value / 1024; return bytesInToMetric(topic, partition, nowMs, brokerId, brokerRack, value); case BYTES_OUT_PER_SEC: - // network inbound bandwidth capacity is in KB/s - value = value / 1024; return bytesOutToMetric(topic, partition, nowMs, brokerId, brokerRack, value); case SIZE: if (partition == -1) { diff --git a/core/src/main/java/kafka/autobalancer/model/BrokerUpdater.java b/core/src/main/java/kafka/autobalancer/model/BrokerUpdater.java index 22b2c8de51..ce37e6c208 100644 --- a/core/src/main/java/kafka/autobalancer/model/BrokerUpdater.java +++ b/core/src/main/java/kafka/autobalancer/model/BrokerUpdater.java @@ -34,7 +34,6 @@ public class BrokerUpdater { private static final Logger LOGGER = LoggerFactory.getLogger(BrokerUpdater.class); - private static final Double IGNORED_CAPACITY_VALUE = -1.0; private final Lock lock = new ReentrantLock(); private final Broker broker; @@ -52,7 +51,7 @@ public static class Broker { public Broker(int brokerId) { this.brokerId = brokerId; - Arrays.fill(this.brokerCapacity, IGNORED_CAPACITY_VALUE); + Arrays.fill(this.brokerCapacity, Resource.IGNORED_CAPACITY_VALUE); } public Broker(Broker other) { @@ -103,7 +102,7 @@ public void addLoad(Resource resource, double delta) { public double utilizationFor(Resource resource) { double capacity = capacity(resource); - if (capacity == IGNORED_CAPACITY_VALUE) { + if (capacity == Resource.IGNORED_CAPACITY_VALUE) { return 0.0; } if (capacity == 0.0) { @@ -143,12 +142,27 @@ public int hashCode() { @Override public String toString() { - return "Broker{" + - "brokerId=" + brokerId + - ", brokerCapacity=" + Arrays.toString(brokerCapacity) + - ", brokerLoad=" + Arrays.toString(brokerLoad) + - ", active=" + active + - '}'; + StringBuilder builder = new StringBuilder(); + builder.append("{brokerId=") + .append(brokerId) + .append(", brokerCapacity=["); + for (int i = 0; i < brokerCapacity.length; i++) { + builder.append(Resource.of(i).resourceString(brokerCapacity[i])); + if (i != brokerCapacity.length - 1) { + builder.append(", "); + } + } + builder.append("], brokerLoad=["); + for (int i = 0; i < brokerLoad.length; i++) { + builder.append(Resource.of(i).resourceString(brokerLoad[i])); + if (i != brokerLoad.length - 1) { + builder.append(", "); + } + } + builder.append("], active=").append(active) + .append(", timestamp=").append(timestamp) + .append("}"); + return builder.toString(); } }