From bf0d0a7498119dec404240b313f89133208dcb13 Mon Sep 17 00:00:00 2001 From: Jiajun Wang <1803880+jiajunwang@users.noreply.github.com> Date: Tue, 22 Oct 2019 15:08:02 -0700 Subject: [PATCH] The WAGED rebalancer returns the previously calculated assignment on calculation failure (#514) * The WAGED rebalancer returns the previously calculated assignment on calculation failure. This is to protect the cluster assignment on a rebalancing algorithm failure. For example, the cluster is out of capacity. In this case, the rebalancer will keep using the previously calculated mapping. Also, refine the new metric interface, and add the RebalanceFailureCount metric for recording the failures. Modify the test cases so that DBs from different test cases have a different name. This is to avoid previous test records to be returned by the rebalancer on calculation error. --- .../rebalancer/waged/WagedRebalancer.java | 167 ++++++++++-------- .../dynamicMBeans/SimpleDynamicMetric.java | 2 +- .../WagedRebalancerMetricCollector.java | 33 +++- .../implementation/RebalanceFailureCount.java | 19 ++ .../implementation/RebalanceLatencyGauge.java | 26 +-- .../monitoring/metrics/model/CountMetric.java | 40 ++++- .../metrics/model/LatencyMetric.java | 17 ++ .../monitoring/metrics/model/Metric.java | 5 - .../rebalancer/waged/TestWagedRebalancer.java | 50 ++++-- .../TestDelayedWagedRebalance.java | 7 +- ...yedWagedRebalanceWithDisabledInstance.java | 7 +- ...estDelayedWagedRebalanceWithRackaware.java | 7 +- .../WagedRebalancer/TestWagedRebalance.java | 48 +++-- .../TestWagedRebalanceFaultZone.java | 10 +- 14 files changed, 274 insertions(+), 164 deletions(-) create mode 100644 helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index f39d3cb97f4..9a01688b65e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -30,7 +30,6 @@ import com.google.common.collect.ImmutableSet; import org.apache.helix.HelixConstants; -import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.changedetector.ResourceChangeDetector; @@ -51,6 +50,7 @@ import org.apache.helix.model.ResourceConfig; import org.apache.helix.monitoring.metrics.MetricCollector; import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector; +import org.apache.helix.monitoring.metrics.model.CountMetric; import org.apache.helix.monitoring.metrics.model.LatencyMetric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,9 +174,36 @@ public Map computeNewIdealStates(ResourceControllerDataProvi LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString()); validateInput(clusterData, resourceMap); - // Calculate the target assignment based on the current cluster status. - Map newIdealStates = - computeBestPossibleStates(clusterData, resourceMap, currentStateOutput); + Map newIdealStates; + try { + // Calculate the target assignment based on the current cluster status. + newIdealStates = computeBestPossibleStates(clusterData, resourceMap, currentStateOutput); + } catch (HelixRebalanceException ex) { + LOG.error("Failed to calculate the new assignments.", ex); + // Record the failure in metrics. + CountMetric rebalanceFailureCount = _metricCollector.getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(), + CountMetric.class); + rebalanceFailureCount.increaseCount(1L); + + HelixRebalanceException.Type failureType = ex.getFailureType(); + if (failureType.equals(HelixRebalanceException.Type.INVALID_REBALANCER_STATUS) || failureType + .equals(HelixRebalanceException.Type.UNKNOWN_FAILURE)) { + // If the failure is unknown or because of assignment store access failure, throw the + // rebalance exception. + throw ex; + } else { // return the previously calculated assignment. + LOG.warn( + "Returning the last known-good best possible assignment from metadata store due to " + + "rebalance failure of type: {}", failureType); + // Note that don't return an assignment based on the current state if there is no previously + // calculated result in this fallback logic. + Map assignmentRecord = + getBestPossibleAssignment(_assignmentMetadataStore, new CurrentStateOutput(), + resourceMap.keySet()); + newIdealStates = convertResourceAssignment(clusterData, assignmentRecord); + } + } // Construct the new best possible states according to the current state and target assignment. // Note that the new ideal state might be an intermediate state between the current state and @@ -203,7 +230,7 @@ public Map computeNewIdealStates(ResourceControllerDataProvi } // Coordinate baseline recalculation and partial rebalance according to the cluster changes. - private Map computeBestPossibleStates( + protected Map computeBestPossibleStates( ResourceControllerDataProvider clusterData, Map resourceMap, final CurrentStateOutput currentStateOutput) throws HelixRebalanceException { getChangeDetector().updateSnapshots(clusterData); @@ -243,36 +270,15 @@ private Map computeBestPossibleStates( Map newAssignment = partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes, currentStateOutput); - // > - Map> resourceStatePriorityMap = new HashMap<>(); - // Convert the assignments into IdealState for the following state mapping calculation. - Map finalIdealStateMap = new HashMap<>(); - for (String resourceName : newAssignment.keySet()) { - IdealState newIdealState; - try { - IdealState currentIdealState = clusterData.getIdealState(resourceName); - Map statePriorityMap = clusterData - .getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap(); - // Keep the priority map for the rebalance overwrite logic later. - resourceStatePriorityMap.put(resourceName, statePriorityMap); - // Create a new IdealState instance contains the new calculated assignment in the preference - // list. - newIdealState = generateIdealStateWithAssignment(resourceName, currentIdealState, - newAssignment.get(resourceName), statePriorityMap); - } catch (Exception ex) { - throw new HelixRebalanceException( - "Fail to calculate the new IdealState for resource: " + resourceName, - HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); - } - finalIdealStateMap.put(resourceName, newIdealState); - } + Map finalIdealStateMap = + convertResourceAssignment(clusterData, newAssignment); // The additional rebalance overwrite is required since the calculated mapping may contains // some delayed rebalanced assignments. if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) { applyRebalanceOverwrite(finalIdealStateMap, clusterData, resourceMap, clusterChanges, - resourceStatePriorityMap, getBaselineAssignment(_assignmentMetadataStore, - currentStateOutput, resourceMap.keySet())); + getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, + resourceMap.keySet())); } // Replace the assignment if user-defined preference list is configured. // Note the user-defined list is intentionally applied to the final mapping after calculation. @@ -285,6 +291,40 @@ resourceStatePriorityMap, getBaselineAssignment(_assignmentMetadataStore, return finalIdealStateMap; } + /** + * Convert the resource assignment map into an IdealState map. + */ + private Map convertResourceAssignment( + ResourceControllerDataProvider clusterData, Map assignments) + throws HelixRebalanceException { + // Convert the assignments into IdealState for the following state mapping calculation. + Map finalIdealStateMap = new HashMap<>(); + for (String resourceName : assignments.keySet()) { + try { + IdealState currentIdealState = clusterData.getIdealState(resourceName); + Map statePriorityMap = + clusterData.getStateModelDef(currentIdealState.getStateModelDefRef()) + .getStatePriorityMap(); + // Create a new IdealState instance which contains the new calculated assignment in the + // preference list. + IdealState newIdealState = new IdealState(resourceName); + // Copy the simple fields + newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields()); + // Sort the preference list according to state priority. + newIdealState.setPreferenceLists( + getPreferenceLists(assignments.get(resourceName), statePriorityMap)); + // Note the state mapping in the new assignment won't directly propagate to the map fields. + // The rebalancer will calculate for the final state mapping considering the current states. + finalIdealStateMap.put(resourceName, newIdealState); + } catch (Exception ex) { + throw new HelixRebalanceException( + "Failed to calculate the new IdealState for resource: " + resourceName, + HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); + } + } + return finalIdealStateMap; + } + // TODO make the Baseline calculation async if complicated algorithm is used for the Baseline private void refreshBaseline(ResourceControllerDataProvider clusterData, Map> clusterChanges, Map resourceMap, @@ -414,23 +454,6 @@ private ResourceChangeDetector getChangeDetector() { return CHANGE_DETECTOR_THREAD_LOCAL.get(); } - // Generate a new IdealState based on the input newAssignment. - // The assignment will be propagate to the preference lists. - // Note that we will recalculate the states based on the current state, so there is no need to - // update the mapping fields in the IdealState output. - private IdealState generateIdealStateWithAssignment(String resourceName, - IdealState currentIdealState, ResourceAssignment newAssignment, - Map statePriorityMap) { - IdealState newIdealState = new IdealState(resourceName); - // Copy the simple fields - newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields()); - // Sort the preference list according to state priority. - newIdealState.setPreferenceLists(getPreferenceLists(newAssignment, statePriorityMap)); - // Note the state mapping in the new assignment won't be directly propagate to the map fields. - // The rebalancer will calculate for the final state mapping considering the current states. - return newIdealState; - } - // Generate the preference lists from the state mapping based on state priority. private Map> getPreferenceLists(ResourceAssignment newAssignment, Map statePriorityMap) { @@ -488,9 +511,6 @@ private Map getBaselineAssignment( stateReadLatency.startMeasuringLatency(); currentBaseline = assignmentMetadataStore.getBaseline(); stateReadLatency.endMeasuringLatency(); - } catch (HelixException ex) { - // Report error. and use empty mapping instead. - LOG.error("Failed to get the current baseline assignment.", ex); } catch (Exception ex) { throw new HelixRebalanceException( "Failed to get the current baseline assignment because of unexpected error.", @@ -501,6 +521,7 @@ private Map getBaselineAssignment( LOG.warn("The current baseline assignment record is empty. Use the current states instead."); currentBaseline = getCurrentStateAssingment(currentStateOutput, resources); } + currentBaseline.keySet().retainAll(resources); return currentBaseline; } @@ -524,9 +545,6 @@ private Map getBestPossibleAssignment( stateReadLatency.startMeasuringLatency(); currentBestAssignment = assignmentMetadataStore.getBestPossibleAssignment(); stateReadLatency.endMeasuringLatency(); - } catch (HelixException ex) { - // Report error. and use empty mapping instead. - LOG.error("Failed to get the current best possible assignment.", ex); } catch (Exception ex) { throw new HelixRebalanceException( "Failed to get the current best possible assignment because of unexpected error.", @@ -538,6 +556,7 @@ private Map getBestPossibleAssignment( "The current best possible assignment record is empty. Use the current states instead."); currentBestAssignment = getCurrentStateAssingment(currentStateOutput, resources); } + currentBestAssignment.keySet().retainAll(resources); return currentBestAssignment; } @@ -593,39 +612,39 @@ private void delayedRebalanceSchedule(ResourceControllerDataProvider clusterData * @param clusterData the cluster data cache. * @param resourceMap the rebalanaced resource map. * @param clusterChanges the detected cluster changes that triggeres the rebalance. - * @param resourceStatePriorityMap the state priority map for each resource. * @param baseline the baseline assignment */ private void applyRebalanceOverwrite(Map idealStateMap, ResourceControllerDataProvider clusterData, Map resourceMap, Map> clusterChanges, - Map> resourceStatePriorityMap, Map baseline) throws HelixRebalanceException { Set enabledLiveInstances = clusterData.getEnabledLiveInstances(); // Note that the calculation used the baseline as the input only. This is for minimizing // unnecessary partition movement. - Map activeAssignment = calculateAssignment(clusterData, - clusterChanges, resourceMap, enabledLiveInstances, Collections.emptyMap(), baseline); + Map activeIdealStates = convertResourceAssignment(clusterData, + calculateAssignment(clusterData, clusterChanges, resourceMap, enabledLiveInstances, + Collections.emptyMap(), baseline)); for (String resourceName : idealStateMap.keySet()) { - IdealState is = idealStateMap.get(resourceName); - if (!activeAssignment.containsKey(resourceName)) { + // The new calculated ideal state before overwrite + IdealState newIdealState = idealStateMap.get(resourceName); + if (!activeIdealStates.containsKey(resourceName)) { throw new HelixRebalanceException( "Failed to calculate the complete partition assignment with all active nodes. Cannot find the resource assignment for " - + resourceName, - HelixRebalanceException.Type.FAILED_TO_CALCULATE); + + resourceName, HelixRebalanceException.Type.FAILED_TO_CALCULATE); } + // The ideal state that is calculated based on the real alive/enabled instances list + IdealState newActiveIdealState = activeIdealStates.get(resourceName); + // The current ideal state that exists in the IdealState znode IdealState currentIdealState = clusterData.getIdealState(resourceName); - IdealState newActiveIdealState = - generateIdealStateWithAssignment(resourceName, currentIdealState, - activeAssignment.get(resourceName), resourceStatePriorityMap.get(resourceName)); - - int numReplia = currentIdealState.getReplicaCount(enabledLiveInstances.size()); - int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplia); - Map> finalPreferenceLists = - DelayedRebalanceUtil.getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(), - is.getPreferenceLists(), enabledLiveInstances, Math.min(minActiveReplica, numReplia)); - - is.setPreferenceLists(finalPreferenceLists); + int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size()); + int minActiveReplica = + DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplica); + Map> finalPreferenceLists = DelayedRebalanceUtil + .getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(), + newIdealState.getPreferenceLists(), enabledLiveInstances, + Math.min(minActiveReplica, numReplica)); + + newIdealState.setPreferenceLists(finalPreferenceLists); } } @@ -641,4 +660,8 @@ private void applyUserDefinedPreferenceList(ResourceConfig resourceConfig, } } } + + protected MetricCollector getMetricCollector() { + return _metricCollector; + } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java index 1be6a21423a..2b0f1db3cc2 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java @@ -25,7 +25,7 @@ * @param the type of the metric value */ public class SimpleDynamicMetric extends DynamicMetric { - private final String _metricName; + protected final String _metricName; /** * Instantiates a new Simple dynamic metric. diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java index 04d804d4d6f..e9494ffa1ef 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java @@ -20,8 +20,11 @@ */ import javax.management.JMException; + import org.apache.helix.monitoring.mbeans.MonitorDomainNames; +import org.apache.helix.monitoring.metrics.implementation.RebalanceFailureCount; import org.apache.helix.monitoring.metrics.implementation.RebalanceLatencyGauge; +import org.apache.helix.monitoring.metrics.model.CountMetric; import org.apache.helix.monitoring.metrics.model.LatencyMetric; public class WagedRebalancerMetricCollector extends MetricCollector { @@ -38,7 +41,12 @@ public enum WagedRebalancerMetricNames { // The following latency metrics are related to AssignmentMetadataStore StateReadLatencyGauge, - StateWriteLatencyGauge + StateWriteLatencyGauge, + + // Count of any rebalance compute failure. + // Note the rebalancer may still be able to return the last known-good assignment on a rebalance + // compute failure. And this fallback logic won't impact this counting. + RebalanceFailureCounter } public WagedRebalancerMetricCollector(String clusterName) throws JMException { @@ -62,19 +70,26 @@ public WagedRebalancerMetricCollector() { */ private void createMetrics() { // Define all metrics - LatencyMetric globalBaselineCalcLatencyGauge = new RebalanceLatencyGauge( - WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), getResetIntervalInMs()); - LatencyMetric partialRebalanceLatencyGauge = new RebalanceLatencyGauge( - WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), getResetIntervalInMs()); - LatencyMetric stateReadLatencyGauge = new RebalanceLatencyGauge( - WagedRebalancerMetricNames.StateReadLatencyGauge.name(), getResetIntervalInMs()); - LatencyMetric stateWriteLatencyGauge = new RebalanceLatencyGauge( - WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), getResetIntervalInMs()); + LatencyMetric globalBaselineCalcLatencyGauge = + new RebalanceLatencyGauge(WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), + getResetIntervalInMs()); + LatencyMetric partialRebalanceLatencyGauge = + new RebalanceLatencyGauge(WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), + getResetIntervalInMs()); + LatencyMetric stateReadLatencyGauge = + new RebalanceLatencyGauge(WagedRebalancerMetricNames.StateReadLatencyGauge.name(), + getResetIntervalInMs()); + LatencyMetric stateWriteLatencyGauge = + new RebalanceLatencyGauge(WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), + getResetIntervalInMs()); + CountMetric calcFailureCount = + new RebalanceFailureCount(WagedRebalancerMetricNames.RebalanceFailureCounter.name()); // Add metrics to WagedRebalancerMetricCollector addMetric(globalBaselineCalcLatencyGauge); addMetric(partialRebalanceLatencyGauge); addMetric(stateReadLatencyGauge); addMetric(stateWriteLatencyGauge); + addMetric(calcFailureCount); } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java new file mode 100644 index 00000000000..3764645563b --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java @@ -0,0 +1,19 @@ +package org.apache.helix.monitoring.metrics.implementation; + +import org.apache.helix.monitoring.metrics.model.CountMetric; + +public class RebalanceFailureCount extends CountMetric { + /** + * Instantiates a new Simple dynamic metric. + * + * @param metricName the metric name + */ + public RebalanceFailureCount(String metricName) { + super(metricName, 0L); + } + + @Override + public void increaseCount(long count) { + updateValue(getValue() + count); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java index e96a5893bbe..b6e58b4ab93 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java @@ -22,7 +22,6 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.SlidingTimeWindowArrayReservoir; import java.util.concurrent.TimeUnit; -import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; import org.apache.helix.monitoring.metrics.model.LatencyMetric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,22 +71,6 @@ public void endMeasuringLatency() { reset(); } - @Override - public String getMetricName() { - return _metricName; - } - - @Override - public void reset() { - _startTime = VALUE_NOT_SET; - _endTime = VALUE_NOT_SET; - } - - @Override - public String toString() { - return String.format("Metric %s's latency is %d", _metricName, getLastEmittedMetricValue()); - } - /** * Returns the most recently emitted metric value at the time of the call. * @return @@ -97,8 +80,11 @@ public long getLastEmittedMetricValue() { return _lastEmittedMetricValue; } - @Override - public DynamicMetric getDynamicMetric() { - return this; + /** + * Resets the internal state of this metric. + */ + private void reset() { + _startTime = VALUE_NOT_SET; + _endTime = VALUE_NOT_SET; } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java index 5a7f0caa339..424ac9e3005 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java @@ -19,23 +19,49 @@ * under the License. */ +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric; /** * Represents a count metric and defines methods to help with calculation. A count metric gives a * gauge value of a certain property. */ -public abstract class CountMetric extends SimpleDynamicMetric implements Metric { - protected V _count; +public abstract class CountMetric extends SimpleDynamicMetric implements Metric { /** - * Instantiates a new Simple dynamic metric. + * Instantiates a new count metric. + * * @param metricName the metric name - * @param metricObject the metric object + * @param initCount the initial count */ - public CountMetric(String metricName, V metricObject) { - super(metricName, metricObject); + public CountMetric(String metricName, long initCount) { + super(metricName, initCount); } - public abstract void setCount(Object count); + /** + * Increment the metric by the input count. + * + * @param count + */ + public abstract void increaseCount(long count); + + @Override + public String getMetricName() { + return _metricName; + } + + @Override + public String toString() { + return String.format("Metric %s's count is %d", getMetricName(), getValue()); + } + + @Override + public long getLastEmittedMetricValue() { + return getValue(); + } + + @Override + public DynamicMetric getDynamicMetric() { + return this; + } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java index c8ba5ae4b2b..d60f245b6fd 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java @@ -20,6 +20,7 @@ */ import com.codahale.metrics.Histogram; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric; /** @@ -38,6 +39,7 @@ public abstract class LatencyMetric extends HistogramDynamicMetric implements Me */ public LatencyMetric(String metricName, Histogram metricObject) { super(metricName, metricObject); + _metricName = metricName; } /** @@ -49,4 +51,19 @@ public LatencyMetric(String metricName, Histogram metricObject) { * Ends measuring the latency. */ public abstract void endMeasuringLatency(); + + @Override + public String getMetricName() { + return _metricName; + } + + @Override + public String toString() { + return String.format("Metric %s's latency is %d", getMetricName(), getLastEmittedMetricValue()); + } + + @Override + public DynamicMetric getDynamicMetric() { + return this; + } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java index ba59b4f6e53..22378dcff91 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java @@ -31,11 +31,6 @@ public interface Metric { */ String getMetricName(); - /** - * Resets the internal state of this metric. - */ - void reset(); - /** * Prints the metric along with its name. */ diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java index df368cbfce1..dd0cc8c9127 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; + import org.apache.helix.HelixConstants; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; @@ -41,6 +42,8 @@ import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector; +import org.apache.helix.monitoring.metrics.model.CountMetric; import org.mockito.Mockito; import org.mockito.stubbing.Answer; import org.testng.Assert; @@ -241,7 +244,7 @@ public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceE // TODO test with invalid capacity configuration which will fail the cluster model constructing. @Test(dependsOnMethods = "testRebalance") - public void testInvalidClusterStatus() throws IOException { + public void testInvalidClusterStatus() throws IOException, HelixRebalanceException { _metadataStore.clearMetadataStore(); WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer()); @@ -254,13 +257,19 @@ public void testInvalidClusterStatus() throws IOException { Map resourceMap = clusterData.getIdealStates().keySet().stream().collect( Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName))); try { - rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput()); Assert.fail("Rebalance shall fail."); } catch (HelixRebalanceException ex) { Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.INVALID_CLUSTER_STATUS); Assert.assertEquals(ex.getMessage(), "Failed to generate cluster model. Failure Type: INVALID_CLUSTER_STATUS"); } + + // The rebalance will be done with empty mapping result since there is no previously calculated + // assignment. + Assert.assertTrue( + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()) + .isEmpty()); } @Test(dependsOnMethods = "testRebalance") @@ -289,24 +298,45 @@ public void testInvalidRebalancerStatus() throws IOException { @Test(dependsOnMethods = "testRebalance") public void testAlgorithmException() throws IOException, HelixRebalanceException { - RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class); - when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.", - HelixRebalanceException.Type.FAILED_TO_CALCULATE)); - _metadataStore.clearMetadataStore(); WagedRebalancer rebalancer = - new WagedRebalancer(_metadataStore, badAlgorithm, new DelayedAutoRebalancer()); + new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer()); ResourceControllerDataProvider clusterData = setupClusterDataCache(); - Map resourceMap = clusterData.getIdealStates().keySet().stream().collect( - Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName))); + Map resourceMap = clusterData.getIdealStates().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> { + Resource resource = new Resource(entry.getKey()); + entry.getValue().getPartitionSet().stream() + .forEach(partition -> resource.addPartition(partition)); + return resource; + })); + // Rebalance with normal configuration. So the assignment will be persisted in the metadata store. + Map result = + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + + // Recreate a rebalance with the same metadata store but bad algorithm instance. + RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class); + when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.", + HelixRebalanceException.Type.FAILED_TO_CALCULATE)); + rebalancer = new WagedRebalancer(_metadataStore, badAlgorithm, new DelayedAutoRebalancer()); + + // Calculation will fail try { - rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput()); Assert.fail("Rebalance shall fail."); } catch (HelixRebalanceException ex) { Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE); Assert.assertEquals(ex.getMessage(), "Algorithm fails. Failure Type: FAILED_TO_CALCULATE"); } + // But if call with the public method computeNewIdealStates(), the rebalance will return with + // the previous rebalance result. + Map newResult = + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + Assert.assertEquals(newResult, result); + // Ensure failure has been recorded + Assert.assertEquals(rebalancer.getMetricCollector().getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(), + CountMetric.class).getValue().longValue(), 1l); } @Test(dependsOnMethods = "testRebalance") diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java index 713c095539f..e49cc191b1f 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; +import org.apache.helix.TestHelper; import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalance; import org.apache.helix.model.ExternalView; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; @@ -39,7 +40,7 @@ protected ZkHelixClusterVerifier getClusterVerifier() { Set dbNames = new HashSet<>(); int i = 0; for (String stateModel : TestStateModels) { - dbNames.add("Test-DB-" + i++); + dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++); } return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames) .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build(); @@ -47,10 +48,10 @@ protected ZkHelixClusterVerifier getClusterVerifier() { // create test DBs, wait it converged and return externalviews protected Map createTestDBs(long delayTime) throws InterruptedException { - Map externalViews = new HashMap(); + Map externalViews = new HashMap<>(); int i = 0; for (String stateModel : TestStateModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _minActiveReplica); _testDBs.add(db); diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java index bcb2260d88c..3d4bd6a0b17 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; +import org.apache.helix.TestHelper; import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithDisabledInstance; import org.apache.helix.model.ExternalView; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; @@ -40,7 +41,7 @@ protected ZkHelixClusterVerifier getClusterVerifier() { Set dbNames = new HashSet<>(); int i = 0; for (String stateModel : TestStateModels) { - dbNames.add("Test-DB-" + i++); + dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++); } return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames) .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build(); @@ -48,10 +49,10 @@ protected ZkHelixClusterVerifier getClusterVerifier() { // create test DBs, wait it converged and return externalviews protected Map createTestDBs(long delayTime) throws InterruptedException { - Map externalViews = new HashMap(); + Map externalViews = new HashMap<>(); int i = 0; for (String stateModel : TestStateModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _minActiveReplica); _testDBs.add(db); diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java index e0adf72f33e..bb7c11a5170 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; +import org.apache.helix.TestHelper; import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithRackaware; import org.apache.helix.model.ExternalView; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; @@ -39,7 +40,7 @@ protected ZkHelixClusterVerifier getClusterVerifier() { Set dbNames = new HashSet<>(); int i = 0; for (String stateModel : TestStateModels) { - dbNames.add("Test-DB-" + i++); + dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++); } return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames) .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build(); @@ -47,10 +48,10 @@ protected ZkHelixClusterVerifier getClusterVerifier() { // create test DBs, wait it converged and return externalviews protected Map createTestDBs(long delayTime) throws InterruptedException { - Map externalViews = new HashMap(); + Map externalViews = new HashMap<>(); int i = 0; for (String stateModel : TestStateModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _minActiveReplica); _testDBs.add(db); diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java index 4920414187d..9790b929e59 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java @@ -108,7 +108,7 @@ protected void addInstanceConfig(String storageNodeName, int seqNo, int tagCount public void test() throws Exception { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); _allDBs.add(db); @@ -148,7 +148,7 @@ public void testWithInstanceTag() throws Exception { Set tags = new HashSet(_nodeToTagMap.values()); int i = 3; for (String tag : tags) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); IdealState is = @@ -164,7 +164,7 @@ public void testWithInstanceTag() throws Exception { @Test(dependsOnMethods = "test") public void testChangeIdealState() throws InterruptedException { - String dbName = "Test-DB"; + String dbName = "Test-DB-" + TestHelper.getTestMethodName(); createResourceWithWagedRebalance(CLUSTER_NAME, dbName, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica); @@ -198,7 +198,7 @@ public void testChangeIdealState() throws InterruptedException { @Test(dependsOnMethods = "test") public void testDisableInstance() throws InterruptedException { - String dbName = "Test-DB"; + String dbName = "Test-DB-" + TestHelper.getTestMethodName(); createResourceWithWagedRebalance(CLUSTER_NAME, dbName, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica); @@ -254,7 +254,7 @@ public void testLackEnoughLiveInstances() throws Exception { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + j++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -293,7 +293,7 @@ public void testLackEnoughInstances() throws Exception { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + j++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -324,7 +324,7 @@ public void testLackEnoughInstances() throws Exception { public void testMixedRebalancerUsage() throws InterruptedException { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; if (i == 0) { _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, PARTITIONS, stateModel, IdealState.RebalanceMode.FULL_AUTO + "", CrushRebalanceStrategy.class.getName()); @@ -354,12 +354,14 @@ public void testMaxPartitionLimitation() throws Exception { String limitedResourceName = null; int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); if (i == 1) { - // The limited resource has additional limitation, so even the other resources can be assigned - // later, this resource will still be blocked by the max partition limitation. + // The limited resource has additional limitation. + // The other resources could have been assigned in theory if the WAGED rebalancer were + // not used. + // However, with the WAGED rebalancer, this restricted resource will block the other ones. limitedResourceName = db; IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); @@ -371,8 +373,9 @@ public void testMaxPartitionLimitation() throws Exception { } Thread.sleep(300); - // Since the WAGED rebalancer does not do partial rebalance, the initial assignment won't show. - Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().allMatch(db -> { + // Since the WAGED rebalancer need to finish rebalancing every resources, the initial + // assignment won't show. + Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().anyMatch(db -> { ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); return ev != null && !ev.getPartitionSet().isEmpty(); @@ -383,20 +386,13 @@ public void testMaxPartitionLimitation() throws Exception { configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); Thread.sleep(300); - // wait until any of the resources is rebalanced - TestHelper.verify(() -> { - for (String db : _allDBs) { - ExternalView ev = - _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - if (ev != null && !ev.getPartitionSet().isEmpty()) { - return true; - } - } - return false; - }, 3000); - ExternalView ev = _gSetupTool.getClusterManagementTool() - .getResourceExternalView(CLUSTER_NAME, limitedResourceName); - Assert.assertFalse(ev != null && !ev.getPartitionSet().isEmpty()); + // Since the WAGED rebalancer need to finish rebalancing every resources, the assignment won't + // show even removed cluster level restriction + Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().anyMatch(db -> { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + return ev != null && !ev.getPartitionSet().isEmpty(); + }), 2000)); // Remove the resource level limitation IdealState idealState = _gSetupTool.getClusterManagementTool() diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java index 0a4c232cc6b..831f77fbf6e 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java @@ -110,7 +110,7 @@ protected void addInstanceConfig(String storageNodeName, int seqNo, int zoneCoun public void testZoneIsolation() throws Exception { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-testZoneIsolation" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -126,7 +126,7 @@ public void testZoneIsolationWithInstanceTag() throws Exception { Set tags = new HashSet(_nodeToTagMap.values()); int i = 0; for (String tag : tags) { - String db = "Test-DB-" + i++; + String db = "Test-DB-testZoneIsolationWithInstanceTag" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); IdealState is = @@ -154,7 +154,7 @@ public void testLackEnoughLiveRacks() throws Exception { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + j++; + String db = "Test-DB-testLackEnoughLiveRacks" + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -196,7 +196,7 @@ public void testLackEnoughRacks() throws Exception { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + j++; + String db = "Test-DB-testLackEnoughRacks" + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -228,7 +228,7 @@ public void testLackEnoughRacks() throws Exception { public void testAddZone() throws Exception { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-testAddZone" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);