Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add latency metric components for WAGED rebalancer #490

Merged
merged 5 commits into from
Oct 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@ public class SystemPropertyKeys {

// Controller
public static final String CONTROLLER_MESSAGE_PURGE_DELAY = "helix.controller.stages.MessageGenerationPhase.messagePurgeDelay";

// MBean monitor for helix.
public static final String HELIX_MONITOR_TIME_WINDOW_LENGTH_MS = "helix.monitor.slidingTimeWindow.ms";
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class ConstraintBasedAlgorithmFactory {
// enlarge the overall weight of the evenness constraints compared with the movement constraint.
// TODO: Tune or make the following factor configurable.
private static final int EVENNESS_PREFERENCE_NORMALIZE_FACTOR = 50;
private static final Map<String, Float> MODEL = new HashMap<>() {
private static final Map<String, Float> MODEL = new HashMap<String, Float>() {
narendly marked this conversation as resolved.
Show resolved Hide resolved
{
// The default setting
put(PartitionMovementConstraint.class.getSimpleName(), 1f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

import javax.management.JMException;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
Expand All @@ -42,6 +43,8 @@
import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.monitoring.metrics.MetricCollector;
import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
Expand All @@ -64,6 +67,8 @@
public class BestPossibleStateCalcStage extends AbstractBaseStage {
private static final Logger logger =
LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName());
// Create a ThreadLocal of MetricCollector. Metrics could only be updated by the controller thread only.
private static final ThreadLocal<MetricCollector> METRIC_COLLECTOR_THREAD_LOCAL = new ThreadLocal<>();

@Override
public void process(ClusterEvent event) throws Exception {
Expand Down Expand Up @@ -253,20 +258,41 @@ private Map<String, Resource> computeResourceBestPossibleStateWithWagedRebalance
Map<String, IdealState> newIdealStates = new HashMap<>();

// Init rebalancer with the rebalance preferences.
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences = cache.getClusterConfig()
.getGlobalRebalancePreference();
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences =
cache.getClusterConfig().getGlobalRebalancePreference();

if (METRIC_COLLECTOR_THREAD_LOCAL.get() == null) {
try {
// If HelixManager is null, we just pass in null for MetricCollector so that a
// non-functioning WagedRebalancerMetricCollector would be created in WagedRebalancer's
// constructor. This is to handle two cases: 1. HelixManager is null for non-testing cases -
// in this case, WagedRebalancer will not read/write to metadata store and just use
// CurrentState-based rebalancing. 2. Tests that require instrumenting the rebalancer for
// verifying whether the cluster has converged.
METRIC_COLLECTOR_THREAD_LOCAL.set(helixManager == null ? null
: new WagedRebalancerMetricCollector(helixManager.getClusterName()));
} catch (JMException e) {
LogUtil.logWarn(logger, _eventId, String.format(
"MetricCollector instantiation failed! WagedRebalancer will not emit metrics due to JMException %s",
e));
}
}

// TODO avoid creating the rebalancer on every rebalance call for performance enhancement
WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, preferences);
WagedRebalancer wagedRebalancer =
new WagedRebalancer(helixManager, preferences, METRIC_COLLECTOR_THREAD_LOCAL.get());
narendly marked this conversation as resolved.
Show resolved Hide resolved
try {
newIdealStates.putAll(wagedRebalancer
.computeNewIdealStates(cache, wagedRebalancedResourceMap, currentStateOutput));
newIdealStates.putAll(wagedRebalancer.computeNewIdealStates(cache, wagedRebalancedResourceMap,
currentStateOutput));
} catch (HelixRebalanceException ex) {
// Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result.
// Since it calculates for all the eligible resources globally, a partial result is invalid.
// TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring.
LogUtil.logError(logger, _eventId, String
.format("Failed to calculate the new Ideal States using the rebalancer %s due to %s",
wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex);
LogUtil.logError(logger, _eventId,
String.format(
"Failed to calculate the new Ideal States using the rebalancer %s due to %s",
wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()),
ex);
} finally {
wagedRebalancer.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ public enum MonitorDomainNames {
HelixThreadPoolExecutor,
HelixCallback,
RoutingTableProvider,
CLMParticipantReport
CLMParticipantReport,
Rebalancer
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import java.util.List;
import java.util.Map;

import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.monitoring.SensorNameProvider;
import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -223,4 +225,13 @@ protected void incrementSimpleDynamicMetric(SimpleDynamicMetric<Long> metric) {
protected void incrementSimpleDynamicMetric(SimpleDynamicMetric<Long> metric, long value) {
metric.updateValue(metric.getValue() + value);
}

/**
* Return the interval length for the underlying reservoir used by the MBean metric configured
* in the system env variables. If not found, use default value.
*/
protected Long getResetIntervalInMs() {
narendly marked this conversation as resolved.
Show resolved Hide resolved
return HelixUtil.getSystemPropertyAsLong(SystemPropertyKeys.HELIX_MONITOR_TIME_WINDOW_LENGTH_MS,
DEFAULT_RESET_INTERVAL_MS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.apache.helix.monitoring.metrics;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import javax.management.JMException;
import javax.management.ObjectName;
import org.apache.helix.HelixException;
import org.apache.helix.monitoring.metrics.model.Metric;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;

/**
* Collects and manages all metrics that implement the {@link Metric} interface.
*/
public abstract class MetricCollector extends DynamicMBeanProvider {
private static final String CLUSTER_NAME_KEY = "ClusterName";
private static final String ENTITY_NAME_KEY = "EntityName";
private final String _monitorDomainName;
private final String _clusterName;
private final String _entityName;
private Map<String, Metric> _metricMap;

public MetricCollector(String monitorDomainName, String clusterName, String entityName) {
_monitorDomainName = monitorDomainName;
_clusterName = clusterName;
_entityName = entityName;
_metricMap = new HashMap<>();
}

@Override
public DynamicMBeanProvider register() throws JMException {
// First cast all Metric objects to DynamicMetrics
Collection<DynamicMetric<?, ?>> dynamicMetrics = new HashSet<>();
_metricMap.values().forEach(metric -> dynamicMetrics.add(metric.getDynamicMetric()));

// Define MBeanName and ObjectName
// MBean name has two key-value pairs:
// ------ 1) ClusterName KV pair (first %s=%s)
// ------ 2) EntityName KV pair (second %s=%s)
String mbeanName =
String.format("%s=%s, %s=%s", CLUSTER_NAME_KEY, _clusterName, ENTITY_NAME_KEY, _entityName);

// ObjectName has one key-value pair:
// ------ 1) Monitor domain name KV pair where value is the MBean name
doRegister(dynamicMetrics,
new ObjectName(String.format("%s:%s", _monitorDomainName, mbeanName)));
return this;
}

@Override
public String getSensorName() {
return String.format("%s.%s.%s", MonitorDomainNames.Rebalancer.name(), _clusterName,
_entityName);
}

void addMetric(Metric metric) {
if (metric instanceof DynamicMetric) {
_metricMap.putIfAbsent(metric.getMetricName(), metric);
} else {
throw new HelixException("MetricCollector only supports Metrics that are DynamicMetric!");
}
}

/**
* Returns a desired type of the metric.
* @param metricName
* @param metricClass Desired type
* @param <T> Casted result of the metric
* @return
*/
public <T extends DynamicMetric> T getMetric(String metricName, Class<T> metricClass) {
return metricClass.cast(_metricMap.get(metricName));
}

public Map<String, Metric> getMetricMap() {
return _metricMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.apache.helix.monitoring.metrics;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import javax.management.JMException;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.monitoring.metrics.implementation.RebalanceLatencyGauge;
import org.apache.helix.monitoring.metrics.model.LatencyMetric;

public class WagedRebalancerMetricCollector extends MetricCollector {
private static final String WAGED_REBALANCER_ENTITY_NAME = "WagedRebalancer";

/**
* This enum class contains all metric names defined for WagedRebalancer. Note that all enums are
* in camel case for readability.
*/
public enum WagedRebalancerMetricNames {
// Per-stage latency metrics
GlobalBaselineCalcLatencyGauge,
PartialRebalanceLatencyGauge,

// The following latency metrics are related to AssignmentMetadataStore
StateReadLatencyGauge,
StateWriteLatencyGauge
}

public WagedRebalancerMetricCollector(String clusterName) throws JMException {
super(MonitorDomainNames.Rebalancer.name(), clusterName, WAGED_REBALANCER_ENTITY_NAME);
createMetrics();
register();
}

/**
* This constructor will create but will not register metrics. This constructor will be used in
* case of JMException so that the rebalancer could proceed without registering and emitting
* metrics.
*/
public WagedRebalancerMetricCollector() {
super(MonitorDomainNames.Rebalancer.name(), null, null);
createMetrics();
}

/**
* Creates and registers all metrics in MetricCollector for WagedRebalancer.
*/
private void createMetrics() {
// Define all metrics
LatencyMetric globalBaselineCalcLatencyGauge = new RebalanceLatencyGauge(
WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), getResetIntervalInMs());
LatencyMetric partialRebalanceLatencyGauge = new RebalanceLatencyGauge(
WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), getResetIntervalInMs());
LatencyMetric stateReadLatencyGauge = new RebalanceLatencyGauge(
WagedRebalancerMetricNames.StateReadLatencyGauge.name(), getResetIntervalInMs());
LatencyMetric stateWriteLatencyGauge = new RebalanceLatencyGauge(
WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), getResetIntervalInMs());

// Add metrics to WagedRebalancerMetricCollector
addMetric(globalBaselineCalcLatencyGauge);
addMetric(partialRebalanceLatencyGauge);
addMetric(stateReadLatencyGauge);
addMetric(stateWriteLatencyGauge);
}
}
Loading