Skip to content

Commit

Permalink
Add partition weight gauge.
Browse files Browse the repository at this point in the history
  • Loading branch information
huizhilu committed Jan 17, 2020
1 parent 7dda679 commit df2f078
Show file tree
Hide file tree
Showing 6 changed files with 327 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,97 @@ public static double measureBaselineDivergence(Map<String, ResourceAssignment> b
return numTotalBestPossibleReplicas == 0 ? 1.0d
: (1.0d - (double) numMatchedReplicas / (double) numTotalBestPossibleReplicas);
}

/**
* Calculates average partition weight per capacity key for a resource config. Example as below:
* Input =
* {
* "partition1": {
* "capacity1": 20,
* "capacity2": 40
* },
* "partition2": {
* "capacity1": 30,
* "capacity2": 50
* },
* "partition3": {
* "capacity1": 16,
* "capacity2": 30
* }
* }
*
* Total weight for key "capacity1" = 20 + 30 + 16 = 66;
* Total weight for key "capacity2" = 40 + 50 + 30 = 120;
* Total partitions = 3;
* Average partition weight for "capacity1" = 66 / 3 = 22;
* Average partition weight for "capacity2" = 120 / 3 = 40;
*
* Output =
* {
* "capacity1": 22,
* "capacity2": 40
* }
*
* @param partitionCapacityMap A map of partition capacity:
* <PartitionName or DEFAULT_PARTITION_KEY, <Capacity Key, Capacity Number>>
* @return A map of partition weight: capacity key -> average partition weight
*/
public static Map<String, Integer> calculateAveragePartitionWeight(
Map<String, Map<String, Integer>> partitionCapacityMap) {
Map<String, PartitionWeightCounterEntry> countPartitionWeightMap =
aggregatePartitionWeight(partitionCapacityMap);

// capacity key -> average partition weight
Map<String, Integer> averagePartitionWeightMap = new HashMap<>();

// Calculate average partition weight for each capacity key.
// Per capacity key level:
// average partition weight = (total partition weight) / (number of partitions)
for (Map.Entry<String, PartitionWeightCounterEntry> entry
: countPartitionWeightMap.entrySet()) {
String capacityKey = entry.getKey();
PartitionWeightCounterEntry weightEntry = entry.getValue();
int averageWeight = weightEntry.getWeight() / weightEntry.getPartitions();
averagePartitionWeightMap.put(capacityKey, averageWeight);
}

return averagePartitionWeightMap;
}

/*
* Aggregates partition weight for each capacity key.
*/
private static Map<String, PartitionWeightCounterEntry> aggregatePartitionWeight(
Map<String, Map<String, Integer>> partitionCapacityMap) {
// capacity key -> [number of partitions, total weight per capacity key]
Map<String, PartitionWeightCounterEntry> countPartitionWeightMap = new HashMap<>();

partitionCapacityMap.values().forEach(partitionCapacityEntry ->
partitionCapacityEntry.forEach((capacityKey, weight) -> countPartitionWeightMap
.computeIfAbsent(capacityKey, counterEntry -> new PartitionWeightCounterEntry())
.increase(1, weight)));

return countPartitionWeightMap;
}

/*
* Represents total number of partitions and total partition weight for a capacity key.
*/
private static class PartitionWeightCounterEntry {
private int partitions;
private int weight;

private int getPartitions() {
return partitions;
}

private int getWeight() {
return weight;
}

private void increase(int partitions, int weight) {
this.partitions += partitions;
this.weight += weight;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.rebalancer.util.ResourceUsageCalculator;
import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
Expand All @@ -41,6 +43,7 @@
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -89,8 +92,11 @@ public void process(ClusterEvent event) throws Exception {
final ClusterStatusMonitor clusterStatusMonitor =
event.getAttribute(AttributeName.clusterStatusMonitor.name());
if (clusterStatusMonitor != null && cache instanceof ResourceControllerDataProvider) {
reportInstanceCapacityMetrics(clusterStatusMonitor, (ResourceControllerDataProvider) cache,
resourceToRebalance, currentStateOutput);
final ResourceControllerDataProvider dataProvider = (ResourceControllerDataProvider) cache;
reportInstanceCapacityMetrics(clusterStatusMonitor, dataProvider, resourceToRebalance,
currentStateOutput);
reportResourcePartitionCapacityMetrics(dataProvider.getAsyncTasksThreadPool(),
clusterStatusMonitor, dataProvider.getResourceConfigMap().values());
}
}

Expand Down Expand Up @@ -271,4 +277,22 @@ private void reportInstanceCapacityMetrics(ClusterStatusMonitor clusterStatusMon
return null;
});
}

private void reportResourcePartitionCapacityMetrics(ExecutorService executorService,
ClusterStatusMonitor clusterStatusMonitor, Collection<ResourceConfig> resourceConfigs) {
asyncExecute(executorService, () -> {
try {
for (ResourceConfig config : resourceConfigs) {
Map<String, Integer> averageWeight = ResourceUsageCalculator
.calculateAveragePartitionWeight(config.getPartitionCapacityMap());
clusterStatusMonitor.updatePartitionWeight(config.getResourceName(), averageWeight);
}
} catch (Exception ex) {
LOG.error("Failed to report resource partition capacity metrics. Exception message: {}",
ex.getMessage());
}

return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,25 @@ public void setResourceStatus(ExternalView externalView, IdealState idealState,
}
}

/**
* Updates metrics of average partition weight per capacity key for a resource. If a resource
* monitor is not yet existed for this resource, a new resource monitor will be created for this
* resource.
*
* @param resourceName The resource name for which partition weight is updated
* @param averageWeightMap A map of average partition weight of each capacity key:
* capacity key -> average partition weight
*/
public void updatePartitionWeight(String resourceName, Map<String, Integer> averageWeightMap) {
ResourceMonitor monitor = _resourceMonitorMap.get(resourceName);
if (monitor == null) {
LOG.warn("Failed to update partition weight metric for resource: {} because resource monitor"
+ " is not created.", resourceName);
return;
}
monitor.updatePartitionWeightStats(averageWeightMap);
}

public void updateMissingTopStateDurationStats(String resourceName, long totalDuration,
long helixLatency, boolean isGraceful, boolean succeeded) {
ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
Expand All @@ -519,7 +538,7 @@ public void updateRebalancerStats(String resourceName, long numPendingRecoveryRe
}
}

private ResourceMonitor getOrCreateResourceMonitor(String resourceName) {
ResourceMonitor getOrCreateResourceMonitor(String resourceName) {
try {
if (!_resourceMonitorMap.containsKey(resourceName)) {
synchronized (_resourceMonitorMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@
* under the License.
*/

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.management.JMException;
import javax.management.ObjectName;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
import com.google.common.collect.Lists;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
Expand Down Expand Up @@ -83,31 +84,13 @@ public enum RebalanceStatus {
private final String _clusterName;
private final ObjectName _initObjectName;

// A map of dynamic capacity Gauges. The map's keys could change.
private final Map<String, SimpleDynamicMetric<Long>> _dynamicCapacityMetricsMap;

@Override
public ResourceMonitor register() throws JMException {
List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
attributeList.add(_numOfPartitions);
attributeList.add(_numOfPartitionsInExternalView);
attributeList.add(_numOfErrorPartitions);
attributeList.add(_numNonTopStatePartitions);
attributeList.add(_numLessMinActiveReplicaPartitions);
attributeList.add(_numLessReplicaPartitions);
attributeList.add(_numPendingRecoveryRebalancePartitions);
attributeList.add(_numPendingLoadRebalancePartitions);
attributeList.add(_numRecoveryRebalanceThrottledPartitions);
attributeList.add(_numLoadRebalanceThrottledPartitions);
attributeList.add(_externalViewIdealStateDiff);
attributeList.add(_successfulTopStateHandoffDurationCounter);
attributeList.add(_successTopStateHandoffCounter);
attributeList.add(_failedTopStateHandoffCounter);
attributeList.add(_maxSinglePartitionTopStateHandoffDuration);
attributeList.add(_partitionTopStateHandoffDurationGauge);
attributeList.add(_partitionTopStateHandoffHelixLatencyGauge);
attributeList.add(_partitionTopStateNonGracefulHandoffDurationGauge);
attributeList.add(_totalMessageReceived);
attributeList.add(_numPendingStateTransitions);
attributeList.add(_rebalanceState);
doRegister(attributeList, _initObjectName);
public DynamicMBeanProvider register() throws JMException {
doRegister(buildAttributeList(), _initObjectName);

return this;
}

Expand All @@ -116,10 +99,12 @@ public enum MonitorState {
}

@SuppressWarnings("unchecked")
public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName) {
public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName)
throws JMException {
_clusterName = clusterName;
_resourceName = resourceName;
_initObjectName = objectName;
_dynamicCapacityMetricsMap = new ConcurrentHashMap<>();

_externalViewIdealStateDiff = new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0L);
_numLoadRebalanceThrottledPartitions =
Expand Down Expand Up @@ -382,6 +367,37 @@ public void updateRebalancerStats(long numPendingRecoveryRebalancePartitions,
_numLoadRebalanceThrottledPartitions.updateValue(numLoadRebalanceThrottledPartitions);
}

/**
* Updates partition weight metric. If the partition capacity keys are changed, all MBean
* attributes will be updated accordingly: old capacity keys will be replaced with new capacity
* keys in MBean server.
*
* @param partitionWeightMap A map of partition weight: capacity key -> partition weight
*/
void updatePartitionWeightStats(Map<String, Integer> partitionWeightMap) {
synchronized (_dynamicCapacityMetricsMap) {
if (_dynamicCapacityMetricsMap.keySet().equals(partitionWeightMap.keySet())) {
for (Map.Entry<String, Integer> entry : partitionWeightMap.entrySet()) {
_dynamicCapacityMetricsMap.get(entry.getKey()).updateValue((long) entry.getValue());
}
return;
}

// Capacity keys are changed, so capacity attribute map needs to be updated.
_dynamicCapacityMetricsMap.clear();
final String gaugeMetricSuffix = "Gauge";
for (Map.Entry<String, Integer> entry : partitionWeightMap.entrySet()) {
String capacityKey = entry.getKey();
_dynamicCapacityMetricsMap.put(capacityKey,
new SimpleDynamicMetric<>(capacityKey + gaugeMetricSuffix, (long) entry.getValue()));
}
}

// Update all MBean attributes.
updateAttributesInfo(buildAttributeList(),
"Resource monitor for resource: " + getResourceName());
}

public void setRebalanceState(RebalanceStatus state) {
_rebalanceState.updateValue(state.name());
}
Expand Down Expand Up @@ -428,4 +444,34 @@ public void resetMaxTopStateHandoffGauge() {
_lastResetTime = System.currentTimeMillis();
}
}

private List<DynamicMetric<?, ?>> buildAttributeList() {
List<DynamicMetric<?, ?>> attributeList = Lists.newArrayList(
_numOfPartitions,
_numOfPartitionsInExternalView,
_numOfErrorPartitions,
_numNonTopStatePartitions,
_numLessMinActiveReplicaPartitions,
_numLessReplicaPartitions,
_numPendingRecoveryRebalancePartitions,
_numPendingLoadRebalancePartitions,
_numRecoveryRebalanceThrottledPartitions,
_numLoadRebalanceThrottledPartitions,
_externalViewIdealStateDiff,
_successfulTopStateHandoffDurationCounter,
_successTopStateHandoffCounter,
_failedTopStateHandoffCounter,
_maxSinglePartitionTopStateHandoffDuration,
_partitionTopStateHandoffDurationGauge,
_partitionTopStateHandoffHelixLatencyGauge,
_partitionTopStateNonGracefulHandoffDurationGauge,
_totalMessageReceived,
_numPendingStateTransitions,
_rebalanceState
);

attributeList.addAll(_dynamicCapacityMetricsMap.values());

return attributeList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ public void testMeasureBaselineDivergence(Map<String, Map<String, Map<String, St
0.0d);
}

@Test
public void testCalculateAveragePartitionWeight() {
Map<String, Map<String, Integer>> partitionCapacityMap = ImmutableMap.of(
"partition1", ImmutableMap.of("capacity1", 20, "capacity2", 40),
"partition2", ImmutableMap.of("capacity1", 30, "capacity2", 50),
"partition3", ImmutableMap.of("capacity1", 16, "capacity2", 30));

Map<String, Integer> averageCapacityWeightMap =
ResourceUsageCalculator.calculateAveragePartitionWeight(partitionCapacityMap);
Map<String, Integer> expectedAverageWeightMap =
ImmutableMap.of("capacity1", 22, "capacity2", 40);

Assert.assertNotNull(averageCapacityWeightMap);
Assert.assertEquals(averageCapacityWeightMap, expectedAverageWeightMap);
}

private Map<String, ResourceAssignment> buildResourceAssignment(
Map<String, Map<String, Map<String, String>>> resourceMap) {
Map<String, ResourceAssignment> assignment = new HashMap<>();
Expand All @@ -78,7 +94,7 @@ private Map<String, ResourceAssignment> buildResourceAssignment(
}

@DataProvider(name = "TestMeasureBaselineDivergenceInput")
public Object[][] loadTestMeasureBaselineDivergenceInput() {
private Object[][] loadTestMeasureBaselineDivergenceInput() {
final String[] params =
new String[]{"baseline", "someMatchBestPossible", "noMatchBestPossible"};
return TestInputLoader
Expand Down
Loading

0 comments on commit df2f078

Please sign in to comment.