-
Notifications
You must be signed in to change notification settings - Fork 218
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add latency metric components for WAGED rebalancer (#490)
Add WAGED rebalancer metric framework and latency metric implementation Changelist: 1. Add WAGED rebalancer metric interface 2. Implement latency-related metrics 3. Integrate latency metrics into WAGED rebalancer 4. Add tests
- Loading branch information
Showing
13 changed files
with
744 additions
and
98 deletions.
There are no files selected for viewing
192 changes: 126 additions & 66 deletions
192
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
100 changes: 100 additions & 0 deletions
100
helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
package org.apache.helix.monitoring.metrics; | ||
|
||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Map; | ||
import javax.management.JMException; | ||
import javax.management.ObjectName; | ||
import org.apache.helix.HelixException; | ||
import org.apache.helix.monitoring.metrics.model.Metric; | ||
import org.apache.helix.monitoring.mbeans.MonitorDomainNames; | ||
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider; | ||
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; | ||
|
||
/** | ||
* Collects and manages all metrics that implement the {@link Metric} interface. | ||
*/ | ||
public abstract class MetricCollector extends DynamicMBeanProvider { | ||
private static final String CLUSTER_NAME_KEY = "ClusterName"; | ||
private static final String ENTITY_NAME_KEY = "EntityName"; | ||
private final String _monitorDomainName; | ||
private final String _clusterName; | ||
private final String _entityName; | ||
private Map<String, Metric> _metricMap; | ||
|
||
public MetricCollector(String monitorDomainName, String clusterName, String entityName) { | ||
_monitorDomainName = monitorDomainName; | ||
_clusterName = clusterName; | ||
_entityName = entityName; | ||
_metricMap = new HashMap<>(); | ||
} | ||
|
||
@Override | ||
public DynamicMBeanProvider register() throws JMException { | ||
// First cast all Metric objects to DynamicMetrics | ||
Collection<DynamicMetric<?, ?>> dynamicMetrics = new HashSet<>(); | ||
_metricMap.values().forEach(metric -> dynamicMetrics.add(metric.getDynamicMetric())); | ||
|
||
// Define MBeanName and ObjectName | ||
// MBean name has two key-value pairs: | ||
// ------ 1) ClusterName KV pair (first %s=%s) | ||
// ------ 2) EntityName KV pair (second %s=%s) | ||
String mbeanName = | ||
String.format("%s=%s, %s=%s", CLUSTER_NAME_KEY, _clusterName, ENTITY_NAME_KEY, _entityName); | ||
|
||
// ObjectName has one key-value pair: | ||
// ------ 1) Monitor domain name KV pair where value is the MBean name | ||
doRegister(dynamicMetrics, | ||
new ObjectName(String.format("%s:%s", _monitorDomainName, mbeanName))); | ||
return this; | ||
} | ||
|
||
@Override | ||
public String getSensorName() { | ||
return String.format("%s.%s.%s", MonitorDomainNames.Rebalancer.name(), _clusterName, | ||
_entityName); | ||
} | ||
|
||
void addMetric(Metric metric) { | ||
if (metric instanceof DynamicMetric) { | ||
_metricMap.putIfAbsent(metric.getMetricName(), metric); | ||
} else { | ||
throw new HelixException("MetricCollector only supports Metrics that are DynamicMetric!"); | ||
} | ||
} | ||
|
||
/** | ||
* Returns a desired type of the metric. | ||
* @param metricName | ||
* @param metricClass Desired type | ||
* @param <T> Casted result of the metric | ||
* @return | ||
*/ | ||
public <T extends DynamicMetric> T getMetric(String metricName, Class<T> metricClass) { | ||
return metricClass.cast(_metricMap.get(metricName)); | ||
} | ||
|
||
public Map<String, Metric> getMetricMap() { | ||
return _metricMap; | ||
} | ||
} |
80 changes: 80 additions & 0 deletions
80
...ore/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package org.apache.helix.monitoring.metrics; | ||
|
||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
import javax.management.JMException; | ||
import org.apache.helix.monitoring.mbeans.MonitorDomainNames; | ||
import org.apache.helix.monitoring.metrics.implementation.RebalanceLatencyGauge; | ||
import org.apache.helix.monitoring.metrics.model.LatencyMetric; | ||
|
||
public class WagedRebalancerMetricCollector extends MetricCollector { | ||
private static final String WAGED_REBALANCER_ENTITY_NAME = "WagedRebalancer"; | ||
|
||
/** | ||
* This enum class contains all metric names defined for WagedRebalancer. Note that all enums are | ||
* in camel case for readability. | ||
*/ | ||
public enum WagedRebalancerMetricNames { | ||
// Per-stage latency metrics | ||
GlobalBaselineCalcLatencyGauge, | ||
PartialRebalanceLatencyGauge, | ||
|
||
// The following latency metrics are related to AssignmentMetadataStore | ||
StateReadLatencyGauge, | ||
StateWriteLatencyGauge | ||
} | ||
|
||
public WagedRebalancerMetricCollector(String clusterName) throws JMException { | ||
super(MonitorDomainNames.Rebalancer.name(), clusterName, WAGED_REBALANCER_ENTITY_NAME); | ||
createMetrics(); | ||
register(); | ||
} | ||
|
||
/** | ||
* This constructor will create but will not register metrics. This constructor will be used in | ||
* case of JMException so that the rebalancer could proceed without registering and emitting | ||
* metrics. | ||
*/ | ||
public WagedRebalancerMetricCollector() { | ||
super(MonitorDomainNames.Rebalancer.name(), null, null); | ||
createMetrics(); | ||
} | ||
|
||
/** | ||
* Creates and registers all metrics in MetricCollector for WagedRebalancer. | ||
*/ | ||
private void createMetrics() { | ||
// Define all metrics | ||
LatencyMetric globalBaselineCalcLatencyGauge = new RebalanceLatencyGauge( | ||
WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), getResetIntervalInMs()); | ||
LatencyMetric partialRebalanceLatencyGauge = new RebalanceLatencyGauge( | ||
WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), getResetIntervalInMs()); | ||
LatencyMetric stateReadLatencyGauge = new RebalanceLatencyGauge( | ||
WagedRebalancerMetricNames.StateReadLatencyGauge.name(), getResetIntervalInMs()); | ||
LatencyMetric stateWriteLatencyGauge = new RebalanceLatencyGauge( | ||
WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), getResetIntervalInMs()); | ||
|
||
// Add metrics to WagedRebalancerMetricCollector | ||
addMetric(globalBaselineCalcLatencyGauge); | ||
addMetric(partialRebalanceLatencyGauge); | ||
addMetric(stateReadLatencyGauge); | ||
addMetric(stateWriteLatencyGauge); | ||
} | ||
} |
104 changes: 104 additions & 0 deletions
104
...c/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
package org.apache.helix.monitoring.metrics.implementation; | ||
|
||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
import com.codahale.metrics.Histogram; | ||
import com.codahale.metrics.SlidingTimeWindowArrayReservoir; | ||
import java.util.concurrent.TimeUnit; | ||
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; | ||
import org.apache.helix.monitoring.metrics.model.LatencyMetric; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class RebalanceLatencyGauge extends LatencyMetric { | ||
private static final Logger LOG = LoggerFactory.getLogger(RebalanceLatencyGauge.class); | ||
private static final long VALUE_NOT_SET = -1; | ||
private long _lastEmittedMetricValue = VALUE_NOT_SET; | ||
|
||
/** | ||
* Instantiates a new Histogram dynamic metric. | ||
* @param metricName the metric name | ||
*/ | ||
public RebalanceLatencyGauge(String metricName, long slidingTimeWindow) { | ||
super(metricName, new Histogram( | ||
new SlidingTimeWindowArrayReservoir(slidingTimeWindow, TimeUnit.MILLISECONDS))); | ||
_metricName = metricName; | ||
} | ||
|
||
/** | ||
* WARNING: this method is not thread-safe. | ||
* Calling this method multiple times would simply overwrite the previous state. This is because | ||
* the rebalancer could fail at any point, and we want it to recover gracefully by resetting the | ||
* internal state of this metric. | ||
*/ | ||
@Override | ||
public void startMeasuringLatency() { | ||
reset(); | ||
_startTime = System.currentTimeMillis(); | ||
} | ||
|
||
/** | ||
* WARNING: this method is not thread-safe. | ||
*/ | ||
@Override | ||
public void endMeasuringLatency() { | ||
if (_startTime == VALUE_NOT_SET || _endTime != VALUE_NOT_SET) { | ||
LOG.error( | ||
"Needs to call startMeasuringLatency first! Ignoring and resetting the metric. Metric name: {}", | ||
_metricName); | ||
reset(); | ||
return; | ||
} | ||
_endTime = System.currentTimeMillis(); | ||
_lastEmittedMetricValue = _endTime - _startTime; | ||
updateValue(_lastEmittedMetricValue); | ||
reset(); | ||
} | ||
|
||
@Override | ||
public String getMetricName() { | ||
return _metricName; | ||
} | ||
|
||
@Override | ||
public void reset() { | ||
_startTime = VALUE_NOT_SET; | ||
_endTime = VALUE_NOT_SET; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return String.format("Metric %s's latency is %d", _metricName, getLastEmittedMetricValue()); | ||
} | ||
|
||
/** | ||
* Returns the most recently emitted metric value at the time of the call. | ||
* @return | ||
*/ | ||
@Override | ||
public long getLastEmittedMetricValue() { | ||
return _lastEmittedMetricValue; | ||
} | ||
|
||
@Override | ||
public DynamicMetric getDynamicMetric() { | ||
return this; | ||
} | ||
} |
Oops, something went wrong.