Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions core/src/main/java/kafka/autobalancer/common/Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,28 @@

import java.util.List;


/**
* CPU: a host and broker-level resource.
* NW (in and out): a host-level resource.
* DISK: a broker-level resource.
*/
public enum Resource {
CPU("cpu", 0, 0.001),
NW_IN("networkInbound", 1, 10),
NW_OUT("networkOutbound", 2, 10);
CPU("CPU", 0, 0.001),
NW_IN("NWIn", 1, 10),
NW_OUT("NWOut", 2, 10),
UNKNOWN("UNKNOWN", 999, 0);

public static final Double IGNORED_CAPACITY_VALUE = -1.0;
// EPSILON_PERCENT defines the acceptable nuance when comparing the utilization of the resource.
// This nuance is generated due to precision loss when summing up float type utilization value.
// In stress test we find that for cluster of around 800,000 replicas, the summed up nuance can be
// more than 0.1% of sum value.
private static final double EPSILON_PERCENT = 0.0008;
private static final List<Resource> CACHED_VALUES = List.of(values());
private static final List<Resource> CACHED_VALUES = List.of(
Resource.CPU,
Resource.NW_IN,
Resource.NW_OUT
);
private final String resource;
private final int id;
private final double epsilon;
Expand All @@ -49,6 +54,33 @@ public enum Resource {
this.epsilon = epsilon;
}

public static Resource of(int id) {
if (id < 0 || id >= CACHED_VALUES.size()) {
return UNKNOWN;
}
return CACHED_VALUES.get(id);
}

public String resourceString(double value) {
String valueStr = "";
if (value == IGNORED_CAPACITY_VALUE) {
valueStr = "ignored";
} else {
switch (this) {
case CPU:
valueStr = String.format("%.2f%%", value * 100);
break;
case NW_IN:
case NW_OUT:
valueStr = String.format("%.2fKB/s", value / 1024);
break;
default:
break;
}
}
return this.resource + "=" + valueStr;
}

/**
* Use this instead of values() because values() creates a new array each time.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public class AutoBalancerMetricsReporterConfig extends AutoBalancerConfig {
public static final String AUTO_BALANCER_METRICS_REPORTER_BATCH_SIZE_CONFIG = PREFIX + "producer.batch.size";
public static final String AUTO_BALANCER_METRICS_REPORTER_KUBERNETES_MODE_CONFIG = PREFIX + "kubernetes.mode";
/* Default values */
public static final double DEFAULT_AUTO_BALANCER_BROKER_NW_IN_CAPACITY = 100 * 1024; // 100MB/s
public static final double DEFAULT_AUTO_BALANCER_BROKER_NW_OUT_CAPACITY = 100 * 1024; // 100MB/s
public static final double DEFAULT_AUTO_BALANCER_BROKER_NW_IN_CAPACITY = 100 * 1024 * 1024; // 100MB/s
public static final double DEFAULT_AUTO_BALANCER_BROKER_NW_OUT_CAPACITY = 100 * 1024 * 1024; // 100MB/s
public static final String DEFAULT_AUTO_BALANCER_METRICS_REPORTER_PRODUCER_CLIENT_ID = "AutoBalancerMetricsReporterProducer";
public static final long DEFAULT_AUTO_BALANCER_METRICS_TOPIC_AUTO_CREATE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
public static final Integer DEFAULT_AUTO_BALANCER_METRICS_TOPIC_AUTO_CREATE_RETRIES = 5;
Expand All @@ -57,8 +57,8 @@ public class AutoBalancerMetricsReporterConfig extends AutoBalancerConfig {
public static final boolean DEFAULT_AUTO_BALANCER_METRICS_REPORTER_KUBERNETES_MODE = false;
public static final int DEFAULT_AUTO_BALANCER_METRICS_REPORTER_CREATE_RETRIES = 2;
/* Documents */
public static final String AUTO_BALANCER_BROKER_NW_IN_CAPACITY_DOC = "Maximum network input bandwidth available for the broker in KB/s";
public static final String AUTO_BALANCER_BROKER_NW_OUT_CAPACITY_DOC = "Maximum network output bandwidth available for the broker in KB/s";
public static final String AUTO_BALANCER_BROKER_NW_IN_CAPACITY_DOC = "Maximum network input bandwidth available for the broker in Bytes/s";
public static final String AUTO_BALANCER_BROKER_NW_OUT_CAPACITY_DOC = "Maximum network output bandwidth available for the broker in Bytes/s";
private static final String AUTO_BALANCER_METRICS_TOPIC_AUTO_CREATE_TIMEOUT_MS_DOC = "Timeout on the Auto Balancer metrics topic creation";
private static final String AUTO_BALANCER_METRICS_TOPIC_AUTO_CREATE_RETRIES_DOC = "Number of retries of the Auto Balancer metrics reporter"
+ " for the topic creation";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,13 @@ public void configure(Map<String, ?> rawConfigs) {
}

//Add AUTO_BALANCER_BROKER_NW_IN/OUT_CAPACITY by S3NetworkBaselineBandwidthProp config value if not set
if (configs.get(KafkaConfig.S3NetworkBaselineBandwidthProp()) != null) {
Object s3NetworkBaselineBandwidth = configs.get(KafkaConfig.S3NetworkBaselineBandwidthProp());
if (s3NetworkBaselineBandwidth != null) {
if (!configs.containsKey(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_BROKER_NW_IN_CAPACITY))
configs.put(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_BROKER_NW_IN_CAPACITY,
configs.get(KafkaConfig.S3NetworkBaselineBandwidthProp()));
configs.put(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_BROKER_NW_IN_CAPACITY, s3NetworkBaselineBandwidth);

if (!configs.containsKey(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_BROKER_NW_OUT_CAPACITY))
configs.put(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_BROKER_NW_OUT_CAPACITY,
configs.get(KafkaConfig.S3NetworkBaselineBandwidthProp()));
configs.put(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_BROKER_NW_OUT_CAPACITY, s3NetworkBaselineBandwidth);
}

AutoBalancerMetricsReporterConfig reporterConfig = new AutoBalancerMetricsReporterConfig(configs, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,8 @@ private static AutoBalancerMetrics toAutoBalancerMetric(long nowMs,
}
switch (name) {
case BYTES_IN_PER_SEC:
// network inbound bandwidth capacity is in KB/s
value = value / 1024;
return bytesInToMetric(topic, partition, nowMs, brokerId, brokerRack, value);
case BYTES_OUT_PER_SEC:
// network inbound bandwidth capacity is in KB/s
value = value / 1024;
return bytesOutToMetric(topic, partition, nowMs, brokerId, brokerRack, value);
case SIZE:
if (partition == -1) {
Expand Down
32 changes: 23 additions & 9 deletions core/src/main/java/kafka/autobalancer/model/BrokerUpdater.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

public class BrokerUpdater {
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerUpdater.class);
private static final Double IGNORED_CAPACITY_VALUE = -1.0;
private final Lock lock = new ReentrantLock();
private final Broker broker;

Expand All @@ -52,7 +51,7 @@ public static class Broker {

public Broker(int brokerId) {
this.brokerId = brokerId;
Arrays.fill(this.brokerCapacity, IGNORED_CAPACITY_VALUE);
Arrays.fill(this.brokerCapacity, Resource.IGNORED_CAPACITY_VALUE);
}

public Broker(Broker other) {
Expand Down Expand Up @@ -103,7 +102,7 @@ public void addLoad(Resource resource, double delta) {

public double utilizationFor(Resource resource) {
double capacity = capacity(resource);
if (capacity == IGNORED_CAPACITY_VALUE) {
if (capacity == Resource.IGNORED_CAPACITY_VALUE) {
return 0.0;
}
if (capacity == 0.0) {
Expand Down Expand Up @@ -143,12 +142,27 @@ public int hashCode() {

@Override
public String toString() {
return "Broker{" +
"brokerId=" + brokerId +
", brokerCapacity=" + Arrays.toString(brokerCapacity) +
", brokerLoad=" + Arrays.toString(brokerLoad) +
", active=" + active +
'}';
StringBuilder builder = new StringBuilder();
builder.append("{brokerId=")
.append(brokerId)
.append(", brokerCapacity=[");
for (int i = 0; i < brokerCapacity.length; i++) {
builder.append(Resource.of(i).resourceString(brokerCapacity[i]));
if (i != brokerCapacity.length - 1) {
builder.append(", ");
}
}
builder.append("], brokerLoad=[");
for (int i = 0; i < brokerLoad.length; i++) {
builder.append(Resource.of(i).resourceString(brokerLoad[i]));
if (i != brokerLoad.length - 1) {
builder.append(", ");
}
}
builder.append("], active=").append(active)
.append(", timestamp=").append(timestamp)
.append("}");
return builder.toString();
}
}

Expand Down