Skip to content

Commit

Permalink
Add instance capacity gauge metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
huizhilu committed Jan 14, 2020
1 parent 4948bc6 commit ff1f38e
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -249,24 +248,24 @@ private void reportInstanceCapacityMetrics(ClusterStatusMonitor clusterStatusMon
// Only use the resources in ideal states to parse all replicas.
Map<String, IdealState> idealStateMap = dataProvider.getIdealStates();
Map<String, Resource> resourceToMonitorMap = resourceMap.entrySet().stream()
.filter(resourceName -> idealStateMap.containsKey(resourceName))
.filter(idealStateMap::containsKey)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Map<String, ResourceAssignment> currentStateAssignment =
currentStateOutput.getAssignment(resourceToMonitorMap.keySet());
ClusterModel clusterModel = ClusterModelProvider.generateClusterModelFromExistingAssignment(
dataProvider, resourceToMonitorMap, currentStateAssignment);

Map<String, Double> maxUsageMap = new HashMap<>();
for (AssignableNode node : clusterModel.getAssignableNodes().values()) {
String instanceName = node.getInstanceName();
// There is no new usage adding to this node, so an empty map is passed in.
double usage = node.getProjectedHighestUtilization(Collections.emptyMap());
maxUsageMap.put(instanceName, usage);
clusterStatusMonitor
.updateInstanceCapacityStatus(instanceName, usage, node.getMaxCapacity());
}

clusterStatusMonitor.updateInstanceMaxUsage(maxUsageMap);
} catch (Exception ex) {
LOG.error("Failed to report instance capacity metrics.", ex);
LOG.error("Failed to report instance capacity metrics. Exception message: {}",
ex.getMessage());
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,24 +367,25 @@ public void increaseMessageReceived(List<Message> messages) {
}

/**
* Update max capacity usage for per instance. Before calling this API, we assume the instance
* monitors are already registered in ReadClusterDataStage. If the monitor is not registered, this
* max usage update will fail.
* Updates instance capacity status for per instance, including max usage and capacity of each
* capacity key. Before calling this API, we assume the instance monitors are already registered
* in ReadClusterDataStage. If the monitor is not registered, this instance capacity status update
* will fail.
*
* @param maxUsageMap a map of max capacity usage, {instance: maxCapacityUsage}
* @param instanceName This instance name
* @param maxUsage Max capacity usage of this instance
* @param capacityMap A map of this instance capacity, {capacity key: capacity value}
*/
public void updateInstanceMaxUsage(Map<String, Double> maxUsageMap) {
synchronized (_instanceMonitorMap) {
for (Map.Entry<String, Double> entry : maxUsageMap.entrySet()) {
InstanceMonitor monitor = _instanceMonitorMap.get(entry.getKey());
if (monitor == null) {
LOG.warn("Failed to update max usage because instance monitor is not found, instance: {}.",
entry.getKey());
continue;
}
monitor.updateMaxCapacityUsage(entry.getValue());
}
public void updateInstanceCapacityStatus(String instanceName, double maxUsage,
Map<String, Integer> capacityMap) {
InstanceMonitor monitor = _instanceMonitorMap.get(instanceName);
if (monitor == null) {
LOG.warn("Failed to update instance capacity status because instance monitor is not found, "
+ "instance: {}.", instanceName);
return;
}
monitor.updateMaxCapacityUsage(maxUsage);
monitor.updateCapacity(capacityMap);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.management.JMException;
import javax.management.ObjectName;

Expand All @@ -41,17 +42,17 @@ public class InstanceMonitor extends DynamicMBeanProvider {
/**
* Metric names for instance capacity.
*/
public enum InstanceMonitorMetrics {
public enum InstanceMonitorMetric {
// TODO: change the metric names with Counter and Gauge suffix and deprecate old names.
TOTAL_MESSAGE_RECEIVED_COUNTER("TotalMessageReceived"),
ENABLED_STATUS_GAUGE("Enabled"),
ONLINE_STATUS_GAUGE("Online"),
DISABLED_PARTITIONS_GAUGE("DisabledPartitions"),
MAX_CAPACITY_USAGE_GAUGE("MaxCapacityUsageGauge");

private String metricName;
private final String metricName;

InstanceMonitorMetrics(String name) {
InstanceMonitorMetric(String name) {
metricName = name;
}

Expand All @@ -75,6 +76,9 @@ public String metricName() {
private SimpleDynamicMetric<Long> _onlineStatusGauge;
private SimpleDynamicMetric<Double> _maxCapacityUsageGauge;

// A map of dynamic capacity Gauges. The map's keys could change.
private final Map<String, SimpleDynamicMetric<Long>> _dynamicCapacityMetricsMap;

/**
* Initialize the bean
* @param clusterName the cluster to monitor
Expand All @@ -85,26 +89,41 @@ public InstanceMonitor(String clusterName, String participantName, ObjectName ob
_participantName = participantName;
_tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
_initObjectName = objectName;
_dynamicCapacityMetricsMap = new ConcurrentHashMap<>();

createMetrics();
}

private void createMetrics() {
_totalMessagedReceivedCounter = new SimpleDynamicMetric<>(
InstanceMonitorMetrics.TOTAL_MESSAGE_RECEIVED_COUNTER.metricName(), 0L);
InstanceMonitorMetric.TOTAL_MESSAGE_RECEIVED_COUNTER.metricName(), 0L);

_disabledPartitionsGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetrics.DISABLED_PARTITIONS_GAUGE.metricName(),
new SimpleDynamicMetric<>(InstanceMonitorMetric.DISABLED_PARTITIONS_GAUGE.metricName(),
0L);
_enabledStatusGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetrics.ENABLED_STATUS_GAUGE.metricName(), 0L);
new SimpleDynamicMetric<>(InstanceMonitorMetric.ENABLED_STATUS_GAUGE.metricName(), 0L);
_onlineStatusGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetrics.ONLINE_STATUS_GAUGE.metricName(), 0L);
new SimpleDynamicMetric<>(InstanceMonitorMetric.ONLINE_STATUS_GAUGE.metricName(), 0L);
_maxCapacityUsageGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetrics.MAX_CAPACITY_USAGE_GAUGE.metricName(),
new SimpleDynamicMetric<>(InstanceMonitorMetric.MAX_CAPACITY_USAGE_GAUGE.metricName(),
0.0d);
}

private List<DynamicMetric<?, ?>> buildAttributeList() {
List<DynamicMetric<?, ?>> attributeList = Lists.newArrayList(
_totalMessagedReceivedCounter,
_disabledPartitionsGauge,
_enabledStatusGauge,
_onlineStatusGauge,
_maxCapacityUsageGauge
);

attributeList.addAll(_dynamicCapacityMetricsMap.values());

return attributeList;
}

@Override
public String getSensorName() {
return String.format("%s.%s.%s.%s", ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
Expand Down Expand Up @@ -183,33 +202,58 @@ public synchronized void increaseMessageCount(long messageReceived) {
}

/**
* Update max capacity usage for this instance.
* Updates max capacity usage for this instance.
* @param maxUsage max capacity usage of this instance
*/
public synchronized void updateMaxCapacityUsage(double maxUsage) {
_maxCapacityUsageGauge.updateValue(maxUsage);
}

/**
* Get max capacity usage of this instance.
* Gets max capacity usage of this instance.
* @return Max capacity usage of this instance.
*/
protected synchronized double getMaxCapacityUsageGauge() {
return _maxCapacityUsageGauge.getValue();
}

@Override
public DynamicMBeanProvider register()
throws JMException {
List<DynamicMetric<?, ?>> attributeList = ImmutableList.of(
_totalMessagedReceivedCounter,
_disabledPartitionsGauge,
_enabledStatusGauge,
_onlineStatusGauge,
_maxCapacityUsageGauge
);
/**
* Updates instance capacity metrics.
* @param capacity A map of instance capacity.
*/
public void updateCapacity(Map<String, Integer> capacity) {
synchronized (_dynamicCapacityMetricsMap) {
// If capacity keys don't have any change, we just update the metric values.
if (_dynamicCapacityMetricsMap.keySet().equals(capacity.keySet())) {
for (Map.Entry<String, Integer> entry : capacity.entrySet()) {
_dynamicCapacityMetricsMap.get(entry.getKey()).updateValue((long) entry.getValue());
}
return;
}

doRegister(attributeList, _initObjectName);
// If capacity keys have any changes, we need to retain the capacity metrics.
// Make sure capacity metrics map has the same capacity keys.
// And update metrics values.
_dynamicCapacityMetricsMap.keySet().retainAll(capacity.keySet());
for (Map.Entry<String, Integer> entry : capacity.entrySet()) {
String capacityName = entry.getKey();
if (_dynamicCapacityMetricsMap.containsKey(capacityName)) {
_dynamicCapacityMetricsMap.get(capacityName).updateValue((long) entry.getValue());
} else {
_dynamicCapacityMetricsMap.put(capacityName,
new SimpleDynamicMetric<>(capacityName + "Gauge", (long) entry.getValue()));
}
}
}

// Update MBean's all attributes.
updateAttributesInfo(buildAttributeList(),
"Instance monitor for instance: " + getInstanceName());
}

@Override
public DynamicMBeanProvider register() throws JMException {
doRegister(buildAttributeList(), _initObjectName);

return this;
}
Expand Down
Loading

0 comments on commit ff1f38e

Please sign in to comment.