Skip to content

Commit

Permalink
Implement BrokerHostUsage using java (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
sschepens authored and merlimat committed Nov 16, 2016
1 parent 5e9884c commit f999394
Show file tree
Hide file tree
Showing 13 changed files with 507 additions and 260 deletions.
5 changes: 1 addition & 4 deletions conf/broker.conf
Expand Up @@ -212,10 +212,7 @@ loadBalancerReportUpdateThresholdPercentage=10
# maximum interval to update load report # maximum interval to update load report
loadBalancerReportUpdateMaxIntervalMinutes=15 loadBalancerReportUpdateMaxIntervalMinutes=15


# Path for the script used to retrieve system usage # Frequency of report to collect
loadBalancerHostUsageScriptPath=

# Frequency of sar report to collect
loadBalancerHostUsageCheckIntervalMinutes=1 loadBalancerHostUsageCheckIntervalMinutes=1


# Load shedding interval. Broker periodically checks whether some traffic should be offload from # Load shedding interval. Broker periodically checks whether some traffic should be offload from
Expand Down
5 changes: 1 addition & 4 deletions conf/standalone.conf
Expand Up @@ -185,10 +185,7 @@ loadBalancerReportUpdateThresholdPercentage=10
# maximum interval to update load report # maximum interval to update load report
loadBalancerReportUpdateMaxIntervalMinutes=15 loadBalancerReportUpdateMaxIntervalMinutes=15


# Path for the script used to retrieve system usage # Frequency of report to collect
loadBalancerHostUsageScriptPath=

# Frequency of sar report to collect
loadBalancerHostUsageCheckIntervalMinutes=1 loadBalancerHostUsageCheckIntervalMinutes=1


# Load shedding interval. Broker periodically checks whether some traffic should be offload from # Load shedding interval. Broker periodically checks whether some traffic should be offload from
Expand Down
Expand Up @@ -186,10 +186,7 @@ public class ServiceConfiguration {
private int loadBalancerReportUpdateThresholdPercentage = 10; private int loadBalancerReportUpdateThresholdPercentage = 10;
// maximum interval to update load report // maximum interval to update load report
private int loadBalancerReportUpdateMaxIntervalMinutes = 15; private int loadBalancerReportUpdateMaxIntervalMinutes = 15;
// Path for the script used to retrieve system usage // Frequency of report to collect
@FieldContext(required = false)
private String loadBalancerHostUsageScriptPath;
// Frequency of sar report to collect
private int loadBalancerHostUsageCheckIntervalMinutes = 1; private int loadBalancerHostUsageCheckIntervalMinutes = 1;
// Load shedding interval. Broker periodically checks whether some traffic // Load shedding interval. Broker periodically checks whether some traffic
// should be offload from // should be offload from
Expand Down Expand Up @@ -715,14 +712,6 @@ public void setLoadBalancerReportUpdateMaxIntervalMinutes(int loadBalancerReport
this.loadBalancerReportUpdateMaxIntervalMinutes = loadBalancerReportUpdateMaxIntervalMinutes; this.loadBalancerReportUpdateMaxIntervalMinutes = loadBalancerReportUpdateMaxIntervalMinutes;
} }


public String getLoadBalancerHostUsageScriptPath() {
return loadBalancerHostUsageScriptPath;
}

public void setLoadBalancerHostUsageScriptPath(String loadBalancerHostUsageScriptPath) {
this.loadBalancerHostUsageScriptPath = loadBalancerHostUsageScriptPath;
}

public int getLoadBalancerHostUsageCheckIntervalMinutes() { public int getLoadBalancerHostUsageCheckIntervalMinutes() {
return loadBalancerHostUsageCheckIntervalMinutes; return loadBalancerHostUsageCheckIntervalMinutes;
} }
Expand Down
Expand Up @@ -128,6 +128,7 @@ public PulsarService(ServiceConfiguration config) {
this.brokerServiceUrlTls = brokerUrlTls(config); this.brokerServiceUrlTls = brokerUrlTls(config);
this.config = config; this.config = config;
this.shutdownService = new MessagingServiceShutdownHook(this); this.shutdownService = new MessagingServiceShutdownHook(this);
loadManagerExecutor = Executors.newSingleThreadScheduledExecutor();
} }


/** /**
Expand Down Expand Up @@ -233,11 +234,11 @@ public void start() throws PulsarServerException {
managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getZkClient(), managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getZkClient(),
getBookKeeperClientFactory()); getBookKeeperClientFactory());


this.brokerService = new BrokerService(this);

// Start load management service (even if load balancing is disabled) // Start load management service (even if load balancing is disabled)
this.loadManager = new SimpleLoadManagerImpl(this); this.loadManager = new SimpleLoadManagerImpl(this);


this.brokerService = new BrokerService(this);

this.startLoadManagementService(); this.startLoadManagementService();


// needs load management service // needs load management service
Expand Down Expand Up @@ -400,11 +401,6 @@ private void startLoadManagementService() throws PulsarServerException {
this.loadManager.start(); this.loadManager.start();


if (config.isLoadBalancerEnabled()) { if (config.isLoadBalancerEnabled()) {
if (loadManagerExecutor == null) {
loadManagerExecutor = Executors.newSingleThreadScheduledExecutor();
;
}

LOG.info("Starting load balancer"); LOG.info("Starting load balancer");
if (this.loadReportTask == null) { if (this.loadReportTask == null) {
long loadReportMinInterval = SimpleLoadManagerImpl.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL; long loadReportMinInterval = SimpleLoadManagerImpl.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL;
Expand Down Expand Up @@ -533,6 +529,10 @@ public ScheduledExecutorService getExecutor() {
return executor; return executor;
} }


public ScheduledExecutorService getLoadManagerExecutor() {
return loadManagerExecutor;
}

public OrderedSafeExecutor getOrderedExecutor() { public OrderedSafeExecutor getOrderedExecutor() {
return orderedExecutor; return orderedExecutor;
} }
Expand Down
Expand Up @@ -16,80 +16,18 @@
package com.yahoo.pulsar.broker.loadbalance; package com.yahoo.pulsar.broker.loadbalance;


import java.io.IOException; import java.io.IOException;
import java.io.StringWriter; import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.utils.CmdUtility;


/** /**
* Class that will return the broker host usage. * Class that will return the broker host usage.
* *
* *
*/ */
public class BrokerHostUsage { public interface BrokerHostUsage {
// The interval for host usage check command
private final int hostUsageCheckInterval;

// Path to the pulsar-broker-host-usage script
private final String usageScriptPath;

private static final Logger LOG = LoggerFactory.getLogger(BrokerHostUsage.class);

public BrokerHostUsage(PulsarService pulsar) {
this.usageScriptPath = pulsar.getConfiguration().getLoadBalancerHostUsageScriptPath();
this.hostUsageCheckInterval = pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes();
}

/** /**
* Returns the host usage information in the following format - * Returns the host usage information in the following format -
* *
* <pre>
* {
* "bandwidthIn" : {
* "usage" : "100",
* "limit" : "1000",
* },
* "bandwidthOut" : {
* "usage" : "659",
* "limit" : "1000",
* },
* "memory" : {
* "usage" : "16.0",
* "limit" : "16070",
* }
* "cpu-utilization" : {
* "usage" : "160.0"
* "limit" : "1600"
* }
* }
* </pre>
*
* @return Broker host usage in the json string format * @return Broker host usage in the json string format
*
* @throws IOException
*/ */
public String getBrokerHostUsage() throws IOException { SystemResourceUsage getBrokerHostUsage();
StringWriter writer = new StringWriter();
try {
/**
* Spawns a python process and runs the usage exporter script. The script return the machine information in
* the json format.
*/

int exitCode = CmdUtility.exec(writer, usageScriptPath, "--host-usage-check-interval",
Integer.toString(hostUsageCheckInterval));
if (exitCode != 0) {
LOG.warn("Process exited with non-zero exit code - [{}], stderr - [{}] ", exitCode, writer.toString());
throw new IOException(writer.toString());
}
} catch (IOException e) {
e.printStackTrace();
LOG.warn("Error running the usage script {}", e.getMessage());
throw e;
}
return writer.toString();
}
} }
@@ -0,0 +1,96 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.loadbalance.impl;

import com.sun.management.OperatingSystemMXBean;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.loadbalance.BrokerHostUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.concurrent.TimeUnit;

/**
* Class that will return the broker host usage.
*/
public class GenericBrokerHostUsageImpl implements BrokerHostUsage {
// The interval for host usage check command
private static final int CPU_CHECK_MILLIS = 1000;
private static final Logger LOG = LoggerFactory.getLogger(GenericBrokerHostUsageImpl.class);
private final int hostUsageCheckIntervalMin;
private long lastCollection;
private double totalCpuLimit;
private double cpuUsageSum = 0d;
private int cpuUsageCount = 0;
private OperatingSystemMXBean systemBean;
private SystemResourceUsage usage;

public GenericBrokerHostUsageImpl(PulsarService pulsar) {
this.hostUsageCheckIntervalMin = pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes();
this.systemBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
this.lastCollection = 0L;
this.usage = new SystemResourceUsage();
this.totalCpuLimit = getTotalCpuLimit();
pulsar.getLoadManagerExecutor().scheduleAtFixedRate(this::checkCpuLoad, 0, CPU_CHECK_MILLIS, TimeUnit.MILLISECONDS);
pulsar.getLoadManagerExecutor().scheduleAtFixedRate(this::calculateBrokerHostUsage, 0, hostUsageCheckIntervalMin, TimeUnit.MINUTES);
}

@Override
public SystemResourceUsage getBrokerHostUsage() {
return usage;
}

private void checkCpuLoad() {
cpuUsageSum += systemBean.getSystemCpuLoad();
cpuUsageCount++;
}

private void calculateBrokerHostUsage() {
SystemResourceUsage usage = new SystemResourceUsage();
usage.setCpu(getCpuUsage());
usage.setMemory(getMemUsage());

this.usage = usage;
}

private double getTotalCpuLimit() {
return (double) (100 * Runtime.getRuntime().availableProcessors());
}

private double getTotalCpuUsage() {
double cpuUsage = cpuUsageSum / cpuUsageCount;
cpuUsageSum = 0d;
cpuUsageCount = 0;
return cpuUsage;
}

private ResourceUsage getCpuUsage() {
if (cpuUsageCount == 0) {
return new ResourceUsage(0, totalCpuLimit);
}
return new ResourceUsage(getTotalCpuUsage() * totalCpuLimit, totalCpuLimit);
}

private ResourceUsage getMemUsage() {
double total = ((double) systemBean.getTotalPhysicalMemorySize()) / (1024 * 1024);
double free = ((double) systemBean.getFreePhysicalMemorySize()) / (1024 * 1024);
return new ResourceUsage(total - free, total);
}
}

0 comments on commit f999394

Please sign in to comment.