From cf332af25e3d1cf67e36b5dd893ede3377beee93 Mon Sep 17 00:00:00 2001 From: jiajunwang Date: Wed, 16 Oct 2019 14:02:25 -0700 Subject: [PATCH 1/7] The WAGED rebalancer returns the previously calculated assignment on calculation failure. This is to protect the cluster assignment on a rebalancing algorithm failure. For example, the cluster is out of capacity. In this case, the rebalancer will keep using the previously calculated mapping. Also, refine the new metric interface, and add the RebalanceFailureCount metric for recording the failures. --- .../rebalancer/waged/WagedRebalancer.java | 170 ++++++++++-------- .../dynamicMBeans/SimpleDynamicMetric.java | 2 +- .../WagedRebalancerMetricCollector.java | 22 ++- .../implementation/RebalanceFailureCount.java | 18 ++ .../implementation/RebalanceLatencyGauge.java | 28 +-- .../monitoring/metrics/model/CountMetric.java | 37 +++- .../metrics/model/LatencyMetric.java | 15 ++ .../monitoring/metrics/model/Metric.java | 5 - .../rebalancer/waged/TestWagedRebalancer.java | 48 +++-- .../WagedRebalancer/TestWagedRebalance.java | 42 ++--- 10 files changed, 235 insertions(+), 152 deletions(-) create mode 100644 helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index f39d3cb97f..9839fad2fe 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -51,6 +51,7 @@ import org.apache.helix.model.ResourceConfig; import org.apache.helix.monitoring.metrics.MetricCollector; import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector; +import org.apache.helix.monitoring.metrics.model.CountMetric; import org.apache.helix.monitoring.metrics.model.LatencyMetric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,9 +175,33 @@ public Map computeNewIdealStates(ResourceControllerDataProvi LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString()); validateInput(clusterData, resourceMap); - // Calculate the target assignment based on the current cluster status. - Map newIdealStates = - computeBestPossibleStates(clusterData, resourceMap, currentStateOutput); + Map newIdealStates; + try { + // Calculate the target assignment based on the current cluster status. + newIdealStates = computeBestPossibleStates(clusterData, resourceMap, currentStateOutput); + } catch (HelixRebalanceException ex) { + LOG.error("The new assignment calculation fails.", ex); + // Record the failure in metrics. + CountMetric rebalanceFailureCount = _metricCollector.getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCount.name(), + CountMetric.class); + rebalanceFailureCount.increaseCount(1l); + + HelixRebalanceException.Type failureType = ex.getFailureType(); + if (failureType.equals(HelixRebalanceException.Type.INVALID_REBALANCER_STATUS) || failureType + .equals(HelixRebalanceException.Type.UNKNOWN_FAILURE)) { + throw ex; + } + + // If the failure is because of cluster status input or calculating failure, return the previously calculated assignment. + LOG.warn("Trying to return the previously calculated assignment."); + // Note that don't return an assignment based on the current state if there is no previously + // calculated result in this fallback logic. + Map assignmentRecord = + getBestPossibleAssignment(_assignmentMetadataStore, new CurrentStateOutput(), + resourceMap.keySet()); + newIdealStates = convertResourceAssignment(clusterData, assignmentRecord); + } // Construct the new best possible states according to the current state and target assignment. // Note that the new ideal state might be an intermediate state between the current state and @@ -203,7 +228,7 @@ public Map computeNewIdealStates(ResourceControllerDataProvi } // Coordinate baseline recalculation and partial rebalance according to the cluster changes. - private Map computeBestPossibleStates( + protected Map computeBestPossibleStates( ResourceControllerDataProvider clusterData, Map resourceMap, final CurrentStateOutput currentStateOutput) throws HelixRebalanceException { getChangeDetector().updateSnapshots(clusterData); @@ -243,48 +268,61 @@ private Map computeBestPossibleStates( Map newAssignment = partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes, currentStateOutput); - // > - Map> resourceStatePriorityMap = new HashMap<>(); - // Convert the assignments into IdealState for the following state mapping calculation. - Map finalIdealStateMap = new HashMap<>(); - for (String resourceName : newAssignment.keySet()) { - IdealState newIdealState; - try { - IdealState currentIdealState = clusterData.getIdealState(resourceName); - Map statePriorityMap = clusterData - .getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap(); - // Keep the priority map for the rebalance overwrite logic later. - resourceStatePriorityMap.put(resourceName, statePriorityMap); - // Create a new IdealState instance contains the new calculated assignment in the preference - // list. - newIdealState = generateIdealStateWithAssignment(resourceName, currentIdealState, - newAssignment.get(resourceName), statePriorityMap); - } catch (Exception ex) { - throw new HelixRebalanceException( - "Fail to calculate the new IdealState for resource: " + resourceName, - HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); - } - finalIdealStateMap.put(resourceName, newIdealState); - } + Map finalIdealStateMap = + convertResourceAssignment(clusterData, newAssignment); // The additional rebalance overwrite is required since the calculated mapping may contains // some delayed rebalanced assignments. if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) { applyRebalanceOverwrite(finalIdealStateMap, clusterData, resourceMap, clusterChanges, - resourceStatePriorityMap, getBaselineAssignment(_assignmentMetadataStore, - currentStateOutput, resourceMap.keySet())); + getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, + resourceMap.keySet())); } // Replace the assignment if user-defined preference list is configured. // Note the user-defined list is intentionally applied to the final mapping after calculation. // This is to avoid persisting it into the assignment store, which impacts the long term // assignment evenness and partition movements. - finalIdealStateMap.entrySet().stream() - .forEach(idealStateEntry -> applyUserDefinedPreferenceList( + finalIdealStateMap.entrySet().stream().forEach( + idealStateEntry -> applyUserDefinedPreferenceList( clusterData.getResourceConfig(idealStateEntry.getKey()), idealStateEntry.getValue())); return finalIdealStateMap; } + /** + * Convert the resource assignment map into an IdealState map. + */ + private Map convertResourceAssignment( + ResourceControllerDataProvider clusterData, Map assignments) + throws HelixRebalanceException { + // Convert the assignments into IdealState for the following state mapping calculation. + Map finalIdealStateMap = new HashMap<>(); + for (String resourceName : assignments.keySet()) { + try { + IdealState currentIdealState = clusterData.getIdealState(resourceName); + Map statePriorityMap = + clusterData.getStateModelDef(currentIdealState.getStateModelDefRef()) + .getStatePriorityMap(); + // Create a new IdealState instance contains the new calculated assignment in the preference + // list. + IdealState newIdealState = new IdealState(resourceName); + // Copy the simple fields + newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields()); + // Sort the preference list according to state priority. + newIdealState.setPreferenceLists( + getPreferenceLists(assignments.get(resourceName), statePriorityMap)); + // Note the state mapping in the new assignment won't be directly propagate to the map fields. + // The rebalancer will calculate for the final state mapping considering the current states. + finalIdealStateMap.put(resourceName, newIdealState); + } catch (Exception ex) { + throw new HelixRebalanceException( + "Fail to calculate the new IdealState for resource: " + resourceName, + HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); + } + } + return finalIdealStateMap; + } + // TODO make the Baseline calculation async if complicated algorithm is used for the Baseline private void refreshBaseline(ResourceControllerDataProvider clusterData, Map> clusterChanges, Map resourceMap, @@ -414,23 +452,6 @@ private ResourceChangeDetector getChangeDetector() { return CHANGE_DETECTOR_THREAD_LOCAL.get(); } - // Generate a new IdealState based on the input newAssignment. - // The assignment will be propagate to the preference lists. - // Note that we will recalculate the states based on the current state, so there is no need to - // update the mapping fields in the IdealState output. - private IdealState generateIdealStateWithAssignment(String resourceName, - IdealState currentIdealState, ResourceAssignment newAssignment, - Map statePriorityMap) { - IdealState newIdealState = new IdealState(resourceName); - // Copy the simple fields - newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields()); - // Sort the preference list according to state priority. - newIdealState.setPreferenceLists(getPreferenceLists(newAssignment, statePriorityMap)); - // Note the state mapping in the new assignment won't be directly propagate to the map fields. - // The rebalancer will calculate for the final state mapping considering the current states. - return newIdealState; - } - // Generate the preference lists from the state mapping based on state priority. private Map> getPreferenceLists(ResourceAssignment newAssignment, Map statePriorityMap) { @@ -487,20 +508,18 @@ private Map getBaselineAssignment( LatencyMetric.class); stateReadLatency.startMeasuringLatency(); currentBaseline = assignmentMetadataStore.getBaseline(); + currentBaseline.keySet().retainAll(resources); stateReadLatency.endMeasuringLatency(); } catch (HelixException ex) { - // Report error. and use empty mapping instead. - LOG.error("Failed to get the current baseline assignment.", ex); + LOG.error("Failed to get the current baseline assignment. Use the current states instead.", + ex); + currentBaseline = getCurrentStateAssingment(currentStateOutput, resources); } catch (Exception ex) { throw new HelixRebalanceException( "Failed to get the current baseline assignment because of unexpected error.", HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); } } - if (currentBaseline.isEmpty()) { - LOG.warn("The current baseline assignment record is empty. Use the current states instead."); - currentBaseline = getCurrentStateAssingment(currentStateOutput, resources); - } return currentBaseline; } @@ -523,21 +542,19 @@ private Map getBestPossibleAssignment( LatencyMetric.class); stateReadLatency.startMeasuringLatency(); currentBestAssignment = assignmentMetadataStore.getBestPossibleAssignment(); + currentBestAssignment.keySet().retainAll(resources); stateReadLatency.endMeasuringLatency(); } catch (HelixException ex) { - // Report error. and use empty mapping instead. - LOG.error("Failed to get the current best possible assignment.", ex); + LOG.error( + "Failed to get the current best possible assignment. Use the current states instead.", + ex); + currentBestAssignment = getCurrentStateAssingment(currentStateOutput, resources); } catch (Exception ex) { throw new HelixRebalanceException( "Failed to get the current best possible assignment because of unexpected error.", HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); } } - if (currentBestAssignment.isEmpty()) { - LOG.warn( - "The current best possible assignment record is empty. Use the current states instead."); - currentBestAssignment = getCurrentStateAssingment(currentStateOutput, resources); - } return currentBestAssignment; } @@ -593,37 +610,32 @@ private void delayedRebalanceSchedule(ResourceControllerDataProvider clusterData * @param clusterData the cluster data cache. * @param resourceMap the rebalanaced resource map. * @param clusterChanges the detected cluster changes that triggeres the rebalance. - * @param resourceStatePriorityMap the state priority map for each resource. * @param baseline the baseline assignment */ private void applyRebalanceOverwrite(Map idealStateMap, ResourceControllerDataProvider clusterData, Map resourceMap, Map> clusterChanges, - Map> resourceStatePriorityMap, Map baseline) throws HelixRebalanceException { Set enabledLiveInstances = clusterData.getEnabledLiveInstances(); // Note that the calculation used the baseline as the input only. This is for minimizing // unnecessary partition movement. - Map activeAssignment = calculateAssignment(clusterData, - clusterChanges, resourceMap, enabledLiveInstances, Collections.emptyMap(), baseline); + Map activeIdealStates = convertResourceAssignment(clusterData, + calculateAssignment(clusterData, clusterChanges, resourceMap, enabledLiveInstances, + Collections.emptyMap(), baseline)); for (String resourceName : idealStateMap.keySet()) { IdealState is = idealStateMap.get(resourceName); - if (!activeAssignment.containsKey(resourceName)) { + if (!activeIdealStates.containsKey(resourceName)) { throw new HelixRebalanceException( "Failed to calculate the complete partition assignment with all active nodes. Cannot find the resource assignment for " - + resourceName, - HelixRebalanceException.Type.FAILED_TO_CALCULATE); + + resourceName, HelixRebalanceException.Type.FAILED_TO_CALCULATE); } + IdealState newActiveIdealState = activeIdealStates.get(resourceName); IdealState currentIdealState = clusterData.getIdealState(resourceName); - IdealState newActiveIdealState = - generateIdealStateWithAssignment(resourceName, currentIdealState, - activeAssignment.get(resourceName), resourceStatePriorityMap.get(resourceName)); - - int numReplia = currentIdealState.getReplicaCount(enabledLiveInstances.size()); - int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplia); - Map> finalPreferenceLists = - DelayedRebalanceUtil.getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(), - is.getPreferenceLists(), enabledLiveInstances, Math.min(minActiveReplica, numReplia)); + int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size()); + int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplica); + Map> finalPreferenceLists = DelayedRebalanceUtil + .getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(), is.getPreferenceLists(), + enabledLiveInstances, Math.min(minActiveReplica, numReplica)); is.setPreferenceLists(finalPreferenceLists); } @@ -641,4 +653,8 @@ private void applyUserDefinedPreferenceList(ResourceConfig resourceConfig, } } } + + protected MetricCollector getMetricCollector() { + return _metricCollector; + } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java index 1be6a21423..2b0f1db3cc 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java @@ -25,7 +25,7 @@ * @param the type of the metric value */ public class SimpleDynamicMetric extends DynamicMetric { - private final String _metricName; + protected final String _metricName; /** * Instantiates a new Simple dynamic metric. diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java index 04d804d4d6..e93123298a 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java @@ -20,8 +20,11 @@ */ import javax.management.JMException; + import org.apache.helix.monitoring.mbeans.MonitorDomainNames; +import org.apache.helix.monitoring.metrics.implementation.RebalanceFailureCount; import org.apache.helix.monitoring.metrics.implementation.RebalanceLatencyGauge; +import org.apache.helix.monitoring.metrics.model.CountMetric; import org.apache.helix.monitoring.metrics.model.LatencyMetric; public class WagedRebalancerMetricCollector extends MetricCollector { @@ -38,7 +41,11 @@ public enum WagedRebalancerMetricNames { // The following latency metrics are related to AssignmentMetadataStore StateReadLatencyGauge, - StateWriteLatencyGauge + StateWriteLatencyGauge, + + // Count of any rebalance failure. + // Note the rebalancer may still be able to return an assignment based on the previous record on an error. + RebalanceFailureCount } public WagedRebalancerMetricCollector(String clusterName) throws JMException { @@ -66,15 +73,20 @@ private void createMetrics() { WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), getResetIntervalInMs()); LatencyMetric partialRebalanceLatencyGauge = new RebalanceLatencyGauge( WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), getResetIntervalInMs()); - LatencyMetric stateReadLatencyGauge = new RebalanceLatencyGauge( - WagedRebalancerMetricNames.StateReadLatencyGauge.name(), getResetIntervalInMs()); - LatencyMetric stateWriteLatencyGauge = new RebalanceLatencyGauge( - WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), getResetIntervalInMs()); + LatencyMetric stateReadLatencyGauge = + new RebalanceLatencyGauge(WagedRebalancerMetricNames.StateReadLatencyGauge.name(), + getResetIntervalInMs()); + LatencyMetric stateWriteLatencyGauge = + new RebalanceLatencyGauge(WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), + getResetIntervalInMs()); + CountMetric calcFailureCount = + new RebalanceFailureCount(WagedRebalancerMetricNames.RebalanceFailureCount.name()); // Add metrics to WagedRebalancerMetricCollector addMetric(globalBaselineCalcLatencyGauge); addMetric(partialRebalanceLatencyGauge); addMetric(stateReadLatencyGauge); addMetric(stateWriteLatencyGauge); + addMetric(calcFailureCount); } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java new file mode 100644 index 0000000000..2a3723e160 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java @@ -0,0 +1,18 @@ +package org.apache.helix.monitoring.metrics.implementation; + +import org.apache.helix.monitoring.metrics.model.CountMetric; + +public class RebalanceFailureCount extends CountMetric { + /** + * Instantiates a new Simple dynamic metric. + * + * @param metricName the metric name + */ + public RebalanceFailureCount(String metricName) { + super(metricName, 0l); + } + + public void increaseCount(long count) { + updateValue(getValue() + count); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java index e96a5893bb..f1b292dfed 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java @@ -48,7 +48,6 @@ public RebalanceLatencyGauge(String metricName, long slidingTimeWindow) { * the rebalancer could fail at any point, and we want it to recover gracefully by resetting the * internal state of this metric. */ - @Override public void startMeasuringLatency() { reset(); _startTime = System.currentTimeMillis(); @@ -57,7 +56,6 @@ public void startMeasuringLatency() { /** * WARNING: this method is not thread-safe. */ - @Override public void endMeasuringLatency() { if (_startTime == VALUE_NOT_SET || _endTime != VALUE_NOT_SET) { LOG.error( @@ -72,33 +70,19 @@ public void endMeasuringLatency() { reset(); } - @Override - public String getMetricName() { - return _metricName; - } - - @Override - public void reset() { - _startTime = VALUE_NOT_SET; - _endTime = VALUE_NOT_SET; - } - - @Override - public String toString() { - return String.format("Metric %s's latency is %d", _metricName, getLastEmittedMetricValue()); - } - /** * Returns the most recently emitted metric value at the time of the call. * @return */ - @Override public long getLastEmittedMetricValue() { return _lastEmittedMetricValue; } - @Override - public DynamicMetric getDynamicMetric() { - return this; + /** + * Resets the internal state of this metric. + */ + private void reset() { + _startTime = VALUE_NOT_SET; + _endTime = VALUE_NOT_SET; } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java index 5a7f0caa33..5132cd3627 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java @@ -19,23 +19,46 @@ * under the License. */ +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric; /** * Represents a count metric and defines methods to help with calculation. A count metric gives a * gauge value of a certain property. */ -public abstract class CountMetric extends SimpleDynamicMetric implements Metric { - protected V _count; +public abstract class CountMetric extends SimpleDynamicMetric implements Metric { /** - * Instantiates a new Simple dynamic metric. + * Instantiates a new count metric. + * * @param metricName the metric name - * @param metricObject the metric object + * @param initCount the initial count */ - public CountMetric(String metricName, V metricObject) { - super(metricName, metricObject); + public CountMetric(String metricName, long initCount) { + super(metricName, initCount); } - public abstract void setCount(Object count); + /** + * Increment the metric by the input count. + * + * @param count + */ + public abstract void increaseCount(long count); + + public String getMetricName() { + return _metricName; + } + + @Override + public String toString() { + return String.format("Metric %s's count is %d", getMetricName(), getValue()); + } + + public long getLastEmittedMetricValue() { + return getValue(); + } + + public DynamicMetric getDynamicMetric() { + return this; + } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java index c8ba5ae4b2..764b8613d2 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java @@ -20,6 +20,7 @@ */ import com.codahale.metrics.Histogram; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric; /** @@ -38,6 +39,7 @@ public abstract class LatencyMetric extends HistogramDynamicMetric implements Me */ public LatencyMetric(String metricName, Histogram metricObject) { super(metricName, metricObject); + _metricName = metricName; } /** @@ -49,4 +51,17 @@ public LatencyMetric(String metricName, Histogram metricObject) { * Ends measuring the latency. */ public abstract void endMeasuringLatency(); + + public String getMetricName() { + return _metricName; + } + + @Override + public String toString() { + return String.format("Metric %s's latency is %d", getMetricName(), getLastEmittedMetricValue()); + } + + public DynamicMetric getDynamicMetric() { + return this; + } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java index ba59b4f6e5..22378dcff9 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java @@ -31,11 +31,6 @@ public interface Metric { */ String getMetricName(); - /** - * Resets the internal state of this metric. - */ - void reset(); - /** * Prints the metric along with its name. */ diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java index df368cbfce..438702229c 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; + import org.apache.helix.HelixConstants; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; @@ -41,6 +42,8 @@ import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector; +import org.apache.helix.monitoring.metrics.model.CountMetric; import org.mockito.Mockito; import org.mockito.stubbing.Answer; import org.testng.Assert; @@ -241,7 +244,7 @@ public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceE // TODO test with invalid capacity configuration which will fail the cluster model constructing. @Test(dependsOnMethods = "testRebalance") - public void testInvalidClusterStatus() throws IOException { + public void testInvalidClusterStatus() throws IOException, HelixRebalanceException { _metadataStore.clearMetadataStore(); WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer()); @@ -254,13 +257,18 @@ public void testInvalidClusterStatus() throws IOException { Map resourceMap = clusterData.getIdealStates().keySet().stream().collect( Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName))); try { - rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput()); Assert.fail("Rebalance shall fail."); } catch (HelixRebalanceException ex) { Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.INVALID_CLUSTER_STATUS); Assert.assertEquals(ex.getMessage(), "Failed to generate cluster model. Failure Type: INVALID_CLUSTER_STATUS"); } + + // The rebalance will be done with empty mapping result since there is no previously calculated assignment. + Assert.assertTrue( + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()) + .isEmpty()); } @Test(dependsOnMethods = "testRebalance") @@ -289,24 +297,44 @@ public void testInvalidRebalancerStatus() throws IOException { @Test(dependsOnMethods = "testRebalance") public void testAlgorithmException() throws IOException, HelixRebalanceException { - RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class); - when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.", - HelixRebalanceException.Type.FAILED_TO_CALCULATE)); - _metadataStore.clearMetadataStore(); WagedRebalancer rebalancer = - new WagedRebalancer(_metadataStore, badAlgorithm, new DelayedAutoRebalancer()); + new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer()); ResourceControllerDataProvider clusterData = setupClusterDataCache(); - Map resourceMap = clusterData.getIdealStates().keySet().stream().collect( - Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName))); + Map resourceMap = clusterData.getIdealStates().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> { + Resource resource = new Resource(entry.getKey()); + entry.getValue().getPartitionSet().stream() + .forEach(partition -> resource.addPartition(partition)); + return resource; + })); + // Rebalance with normal configuration. So the assignment will be persisted in the metadata store. + Map result = + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + + // Recreate a rebalance with the same metadata store but bad algorithm instance. + RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class); + when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.", + HelixRebalanceException.Type.FAILED_TO_CALCULATE)); + rebalancer = new WagedRebalancer(_metadataStore, badAlgorithm, new DelayedAutoRebalancer()); + + // Calculation will fail try { - rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput()); Assert.fail("Rebalance shall fail."); } catch (HelixRebalanceException ex) { Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE); Assert.assertEquals(ex.getMessage(), "Algorithm fails. Failure Type: FAILED_TO_CALCULATE"); } + // But if call with the public method computeNewIdealStates(), the rebalance will return with the previous rebalance result. + Map newResult = + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + Assert.assertEquals(newResult, result); + // Ensure failure has been recorded + Assert.assertEquals(rebalancer.getMetricCollector().getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCount.name(), + CountMetric.class).getValue().longValue(), 1l); } @Test(dependsOnMethods = "testRebalance") diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java index 4920414187..2203310426 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java @@ -108,7 +108,7 @@ protected void addInstanceConfig(String storageNodeName, int seqNo, int tagCount public void test() throws Exception { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-test" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); _allDBs.add(db); @@ -148,7 +148,7 @@ public void testWithInstanceTag() throws Exception { Set tags = new HashSet(_nodeToTagMap.values()); int i = 3; for (String tag : tags) { - String db = "Test-DB-" + i++; + String db = "Test-DB-testWithInstanceTag" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); IdealState is = @@ -164,7 +164,7 @@ public void testWithInstanceTag() throws Exception { @Test(dependsOnMethods = "test") public void testChangeIdealState() throws InterruptedException { - String dbName = "Test-DB"; + String dbName = "Test-DB-testChangeIdealState"; createResourceWithWagedRebalance(CLUSTER_NAME, dbName, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica); @@ -198,7 +198,7 @@ public void testChangeIdealState() throws InterruptedException { @Test(dependsOnMethods = "test") public void testDisableInstance() throws InterruptedException { - String dbName = "Test-DB"; + String dbName = "Test-DB-testDisableInstance"; createResourceWithWagedRebalance(CLUSTER_NAME, dbName, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica); @@ -254,7 +254,7 @@ public void testLackEnoughLiveInstances() throws Exception { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + j++; + String db = "Test-DB-testLackEnoughLiveInstances" + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -293,7 +293,7 @@ public void testLackEnoughInstances() throws Exception { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + j++; + String db = "Test-DB-testLackEnoughInstances" + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -324,7 +324,7 @@ public void testLackEnoughInstances() throws Exception { public void testMixedRebalancerUsage() throws InterruptedException { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-testMixedRebalancerUsage" + i++; if (i == 0) { _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, PARTITIONS, stateModel, IdealState.RebalanceMode.FULL_AUTO + "", CrushRebalanceStrategy.class.getName()); @@ -354,12 +354,12 @@ public void testMaxPartitionLimitation() throws Exception { String limitedResourceName = null; int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-testMaxPartitionLimitation-" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); if (i == 1) { // The limited resource has additional limitation, so even the other resources can be assigned - // later, this resource will still be blocked by the max partition limitation. + // later in theory, this resource will still be blocked by the max partition limitation. limitedResourceName = db; IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); @@ -371,8 +371,8 @@ public void testMaxPartitionLimitation() throws Exception { } Thread.sleep(300); - // Since the WAGED rebalancer does not do partial rebalance, the initial assignment won't show. - Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().allMatch(db -> { + // Since the WAGED rebalancer does not partial rebalance, the initial assignment won't show. + Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().anyMatch(db -> { ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); return ev != null && !ev.getPartitionSet().isEmpty(); @@ -383,20 +383,12 @@ public void testMaxPartitionLimitation() throws Exception { configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); Thread.sleep(300); - // wait until any of the resources is rebalanced - TestHelper.verify(() -> { - for (String db : _allDBs) { - ExternalView ev = - _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - if (ev != null && !ev.getPartitionSet().isEmpty()) { - return true; - } - } - return false; - }, 3000); - ExternalView ev = _gSetupTool.getClusterManagementTool() - .getResourceExternalView(CLUSTER_NAME, limitedResourceName); - Assert.assertFalse(ev != null && !ev.getPartitionSet().isEmpty()); + // Since the WAGED rebalancer does not partial rebalance, the assignment won't show even removed cluster level restriction + Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().anyMatch(db -> { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + return ev != null && !ev.getPartitionSet().isEmpty(); + }), 2000)); // Remove the resource level limitation IdealState idealState = _gSetupTool.getClusterManagementTool() From 455def7bcc9826d3795bfb2df398dda1b2a0a00c Mon Sep 17 00:00:00 2001 From: jiajunwang Date: Fri, 18 Oct 2019 11:34:47 -0700 Subject: [PATCH 2/7] Address comments. Modify the test cases so that DBs from different test cases have different name. This is to avoid previous test records to be returned by the rebalancer on calculation error. --- .../rebalancer/waged/WagedRebalancer.java | 31 ++++++++++--------- .../implementation/RebalanceFailureCount.java | 3 +- .../implementation/RebalanceLatencyGauge.java | 4 ++- .../monitoring/metrics/model/CountMetric.java | 3 ++ .../metrics/model/LatencyMetric.java | 2 ++ .../TestDelayedWagedRebalance.java | 7 +++-- ...yedWagedRebalanceWithDisabledInstance.java | 7 +++-- ...estDelayedWagedRebalanceWithRackaware.java | 7 +++-- .../TestWagedRebalanceFaultZone.java | 10 +++--- 9 files changed, 43 insertions(+), 31 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index 9839fad2fe..21b0161153 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -185,22 +185,23 @@ public Map computeNewIdealStates(ResourceControllerDataProvi CountMetric rebalanceFailureCount = _metricCollector.getMetric( WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCount.name(), CountMetric.class); - rebalanceFailureCount.increaseCount(1l); + rebalanceFailureCount.increaseCount(1L); HelixRebalanceException.Type failureType = ex.getFailureType(); if (failureType.equals(HelixRebalanceException.Type.INVALID_REBALANCER_STATUS) || failureType .equals(HelixRebalanceException.Type.UNKNOWN_FAILURE)) { + // If the failure is unknown or because of assignment store access failure, throw the + // rebalance exception. throw ex; + } else { // return the previously calculated assignment. + LOG.warn("Trying to return the previously calculated assignment."); + // Note that don't return an assignment based on the current state if there is no previously + // calculated result in this fallback logic. + Map assignmentRecord = + getBestPossibleAssignment(_assignmentMetadataStore, new CurrentStateOutput(), + resourceMap.keySet()); + newIdealStates = convertResourceAssignment(clusterData, assignmentRecord); } - - // If the failure is because of cluster status input or calculating failure, return the previously calculated assignment. - LOG.warn("Trying to return the previously calculated assignment."); - // Note that don't return an assignment based on the current state if there is no previously - // calculated result in this fallback logic. - Map assignmentRecord = - getBestPossibleAssignment(_assignmentMetadataStore, new CurrentStateOutput(), - resourceMap.keySet()); - newIdealStates = convertResourceAssignment(clusterData, assignmentRecord); } // Construct the new best possible states according to the current state and target assignment. @@ -282,8 +283,8 @@ protected Map computeBestPossibleStates( // Note the user-defined list is intentionally applied to the final mapping after calculation. // This is to avoid persisting it into the assignment store, which impacts the long term // assignment evenness and partition movements. - finalIdealStateMap.entrySet().stream().forEach( - idealStateEntry -> applyUserDefinedPreferenceList( + finalIdealStateMap.entrySet().stream() + .forEach(idealStateEntry -> applyUserDefinedPreferenceList( clusterData.getResourceConfig(idealStateEntry.getKey()), idealStateEntry.getValue())); return finalIdealStateMap; @@ -303,15 +304,15 @@ private Map convertResourceAssignment( Map statePriorityMap = clusterData.getStateModelDef(currentIdealState.getStateModelDefRef()) .getStatePriorityMap(); - // Create a new IdealState instance contains the new calculated assignment in the preference - // list. + // Create a new IdealState instance which contains the new calculated assignment in the + // preference list. IdealState newIdealState = new IdealState(resourceName); // Copy the simple fields newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields()); // Sort the preference list according to state priority. newIdealState.setPreferenceLists( getPreferenceLists(assignments.get(resourceName), statePriorityMap)); - // Note the state mapping in the new assignment won't be directly propagate to the map fields. + // Note the state mapping in the new assignment won't directly propagate to the map fields. // The rebalancer will calculate for the final state mapping considering the current states. finalIdealStateMap.put(resourceName, newIdealState); } catch (Exception ex) { diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java index 2a3723e160..3764645563 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java @@ -9,9 +9,10 @@ public class RebalanceFailureCount extends CountMetric { * @param metricName the metric name */ public RebalanceFailureCount(String metricName) { - super(metricName, 0l); + super(metricName, 0L); } + @Override public void increaseCount(long count) { updateValue(getValue() + count); } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java index f1b292dfed..b6e58b4ab9 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java @@ -22,7 +22,6 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.SlidingTimeWindowArrayReservoir; import java.util.concurrent.TimeUnit; -import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; import org.apache.helix.monitoring.metrics.model.LatencyMetric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +47,7 @@ public RebalanceLatencyGauge(String metricName, long slidingTimeWindow) { * the rebalancer could fail at any point, and we want it to recover gracefully by resetting the * internal state of this metric. */ + @Override public void startMeasuringLatency() { reset(); _startTime = System.currentTimeMillis(); @@ -56,6 +56,7 @@ public void startMeasuringLatency() { /** * WARNING: this method is not thread-safe. */ + @Override public void endMeasuringLatency() { if (_startTime == VALUE_NOT_SET || _endTime != VALUE_NOT_SET) { LOG.error( @@ -74,6 +75,7 @@ public void endMeasuringLatency() { * Returns the most recently emitted metric value at the time of the call. * @return */ + @Override public long getLastEmittedMetricValue() { return _lastEmittedMetricValue; } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java index 5132cd3627..424ac9e300 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java @@ -45,6 +45,7 @@ public CountMetric(String metricName, long initCount) { */ public abstract void increaseCount(long count); + @Override public String getMetricName() { return _metricName; } @@ -54,10 +55,12 @@ public String toString() { return String.format("Metric %s's count is %d", getMetricName(), getValue()); } + @Override public long getLastEmittedMetricValue() { return getValue(); } + @Override public DynamicMetric getDynamicMetric() { return this; } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java index 764b8613d2..d60f245b6f 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java @@ -52,6 +52,7 @@ public LatencyMetric(String metricName, Histogram metricObject) { */ public abstract void endMeasuringLatency(); + @Override public String getMetricName() { return _metricName; } @@ -61,6 +62,7 @@ public String toString() { return String.format("Metric %s's latency is %d", getMetricName(), getLastEmittedMetricValue()); } + @Override public DynamicMetric getDynamicMetric() { return this; } diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java index 713c095539..e49cc191b1 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; +import org.apache.helix.TestHelper; import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalance; import org.apache.helix.model.ExternalView; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; @@ -39,7 +40,7 @@ protected ZkHelixClusterVerifier getClusterVerifier() { Set dbNames = new HashSet<>(); int i = 0; for (String stateModel : TestStateModels) { - dbNames.add("Test-DB-" + i++); + dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++); } return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames) .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build(); @@ -47,10 +48,10 @@ protected ZkHelixClusterVerifier getClusterVerifier() { // create test DBs, wait it converged and return externalviews protected Map createTestDBs(long delayTime) throws InterruptedException { - Map externalViews = new HashMap(); + Map externalViews = new HashMap<>(); int i = 0; for (String stateModel : TestStateModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _minActiveReplica); _testDBs.add(db); diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java index bcb2260d88..3d4bd6a0b1 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; +import org.apache.helix.TestHelper; import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithDisabledInstance; import org.apache.helix.model.ExternalView; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; @@ -40,7 +41,7 @@ protected ZkHelixClusterVerifier getClusterVerifier() { Set dbNames = new HashSet<>(); int i = 0; for (String stateModel : TestStateModels) { - dbNames.add("Test-DB-" + i++); + dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++); } return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames) .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build(); @@ -48,10 +49,10 @@ protected ZkHelixClusterVerifier getClusterVerifier() { // create test DBs, wait it converged and return externalviews protected Map createTestDBs(long delayTime) throws InterruptedException { - Map externalViews = new HashMap(); + Map externalViews = new HashMap<>(); int i = 0; for (String stateModel : TestStateModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _minActiveReplica); _testDBs.add(db); diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java index e0adf72f33..bb7c11a517 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; +import org.apache.helix.TestHelper; import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithRackaware; import org.apache.helix.model.ExternalView; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; @@ -39,7 +40,7 @@ protected ZkHelixClusterVerifier getClusterVerifier() { Set dbNames = new HashSet<>(); int i = 0; for (String stateModel : TestStateModels) { - dbNames.add("Test-DB-" + i++); + dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++); } return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames) .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build(); @@ -47,10 +48,10 @@ protected ZkHelixClusterVerifier getClusterVerifier() { // create test DBs, wait it converged and return externalviews protected Map createTestDBs(long delayTime) throws InterruptedException { - Map externalViews = new HashMap(); + Map externalViews = new HashMap<>(); int i = 0; for (String stateModel : TestStateModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _minActiveReplica); _testDBs.add(db); diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java index 0a4c232cc6..831f77fbf6 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java @@ -110,7 +110,7 @@ protected void addInstanceConfig(String storageNodeName, int seqNo, int zoneCoun public void testZoneIsolation() throws Exception { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-testZoneIsolation" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -126,7 +126,7 @@ public void testZoneIsolationWithInstanceTag() throws Exception { Set tags = new HashSet(_nodeToTagMap.values()); int i = 0; for (String tag : tags) { - String db = "Test-DB-" + i++; + String db = "Test-DB-testZoneIsolationWithInstanceTag" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); IdealState is = @@ -154,7 +154,7 @@ public void testLackEnoughLiveRacks() throws Exception { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + j++; + String db = "Test-DB-testLackEnoughLiveRacks" + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -196,7 +196,7 @@ public void testLackEnoughRacks() throws Exception { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + j++; + String db = "Test-DB-testLackEnoughRacks" + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -228,7 +228,7 @@ public void testLackEnoughRacks() throws Exception { public void testAddZone() throws Exception { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-testAddZone" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); From 5a05419da3b37e5a762b087c4427e6d4f6bd52c9 Mon Sep 17 00:00:00 2001 From: jiajunwang Date: Fri, 18 Oct 2019 16:28:52 -0700 Subject: [PATCH 3/7] Modify assignment datastore access. --- .../rebalancer/waged/WagedRebalancer.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index 21b0161153..681e1b6593 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -509,18 +509,18 @@ private Map getBaselineAssignment( LatencyMetric.class); stateReadLatency.startMeasuringLatency(); currentBaseline = assignmentMetadataStore.getBaseline(); - currentBaseline.keySet().retainAll(resources); stateReadLatency.endMeasuringLatency(); - } catch (HelixException ex) { - LOG.error("Failed to get the current baseline assignment. Use the current states instead.", - ex); - currentBaseline = getCurrentStateAssingment(currentStateOutput, resources); } catch (Exception ex) { throw new HelixRebalanceException( "Failed to get the current baseline assignment because of unexpected error.", HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); } } + if (currentBaseline.isEmpty()) { + LOG.warn("The current baseline assignment record is empty. Use the current states instead."); + currentBaseline = getCurrentStateAssingment(currentStateOutput, resources); + } + currentBaseline.keySet().retainAll(resources); return currentBaseline; } @@ -543,19 +543,19 @@ private Map getBestPossibleAssignment( LatencyMetric.class); stateReadLatency.startMeasuringLatency(); currentBestAssignment = assignmentMetadataStore.getBestPossibleAssignment(); - currentBestAssignment.keySet().retainAll(resources); stateReadLatency.endMeasuringLatency(); - } catch (HelixException ex) { - LOG.error( - "Failed to get the current best possible assignment. Use the current states instead.", - ex); - currentBestAssignment = getCurrentStateAssingment(currentStateOutput, resources); } catch (Exception ex) { throw new HelixRebalanceException( "Failed to get the current best possible assignment because of unexpected error.", HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); } } + if (currentBestAssignment.isEmpty()) { + LOG.warn( + "The current best possible assignment record is empty. Use the current states instead."); + currentBestAssignment = getCurrentStateAssingment(currentStateOutput, resources); + } + currentBestAssignment.keySet().retainAll(resources); return currentBestAssignment; } From 91ba356dafaa45aeb3bc000cc2c821cc12f03f97 Mon Sep 17 00:00:00 2001 From: jiajunwang Date: Mon, 21 Oct 2019 12:18:14 -0700 Subject: [PATCH 4/7] Address comments. --- .../rebalancer/waged/WagedRebalancer.java | 3 +-- .../metrics/WagedRebalancerMetricCollector.java | 17 ++++++++++------- .../rebalancer/waged/TestWagedRebalancer.java | 8 +++++--- .../WagedRebalancer/TestWagedRebalance.java | 3 ++- 4 files changed, 18 insertions(+), 13 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index 681e1b6593..651d20216c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -30,7 +30,6 @@ import com.google.common.collect.ImmutableSet; import org.apache.helix.HelixConstants; -import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.changedetector.ResourceChangeDetector; @@ -183,7 +182,7 @@ public Map computeNewIdealStates(ResourceControllerDataProvi LOG.error("The new assignment calculation fails.", ex); // Record the failure in metrics. CountMetric rebalanceFailureCount = _metricCollector.getMetric( - WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCount.name(), + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(), CountMetric.class); rebalanceFailureCount.increaseCount(1L); diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java index e93123298a..de34de3f27 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java @@ -44,8 +44,9 @@ public enum WagedRebalancerMetricNames { StateWriteLatencyGauge, // Count of any rebalance failure. - // Note the rebalancer may still be able to return an assignment based on the previous record on an error. - RebalanceFailureCount + // Note the rebalancer may still be able to return an assignment based on the previous record + // on an error. + RebalanceFailureCounter } public WagedRebalancerMetricCollector(String clusterName) throws JMException { @@ -69,10 +70,12 @@ public WagedRebalancerMetricCollector() { */ private void createMetrics() { // Define all metrics - LatencyMetric globalBaselineCalcLatencyGauge = new RebalanceLatencyGauge( - WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), getResetIntervalInMs()); - LatencyMetric partialRebalanceLatencyGauge = new RebalanceLatencyGauge( - WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), getResetIntervalInMs()); + LatencyMetric globalBaselineCalcLatencyGauge = + new RebalanceLatencyGauge(WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), + getResetIntervalInMs()); + LatencyMetric partialRebalanceLatencyGauge = + new RebalanceLatencyGauge(WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), + getResetIntervalInMs()); LatencyMetric stateReadLatencyGauge = new RebalanceLatencyGauge(WagedRebalancerMetricNames.StateReadLatencyGauge.name(), getResetIntervalInMs()); @@ -80,7 +83,7 @@ private void createMetrics() { new RebalanceLatencyGauge(WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), getResetIntervalInMs()); CountMetric calcFailureCount = - new RebalanceFailureCount(WagedRebalancerMetricNames.RebalanceFailureCount.name()); + new RebalanceFailureCount(WagedRebalancerMetricNames.RebalanceFailureCounter.name()); // Add metrics to WagedRebalancerMetricCollector addMetric(globalBaselineCalcLatencyGauge); diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java index 438702229c..dd0cc8c912 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java @@ -265,7 +265,8 @@ public void testInvalidClusterStatus() throws IOException, HelixRebalanceExcepti "Failed to generate cluster model. Failure Type: INVALID_CLUSTER_STATUS"); } - // The rebalance will be done with empty mapping result since there is no previously calculated assignment. + // The rebalance will be done with empty mapping result since there is no previously calculated + // assignment. Assert.assertTrue( rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()) .isEmpty()); @@ -327,13 +328,14 @@ public void testAlgorithmException() throws IOException, HelixRebalanceException Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE); Assert.assertEquals(ex.getMessage(), "Algorithm fails. Failure Type: FAILED_TO_CALCULATE"); } - // But if call with the public method computeNewIdealStates(), the rebalance will return with the previous rebalance result. + // But if call with the public method computeNewIdealStates(), the rebalance will return with + // the previous rebalance result. Map newResult = rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); Assert.assertEquals(newResult, result); // Ensure failure has been recorded Assert.assertEquals(rebalancer.getMetricCollector().getMetric( - WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCount.name(), + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(), CountMetric.class).getValue().longValue(), 1l); } diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java index 2203310426..3b20ba9002 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java @@ -383,7 +383,8 @@ public void testMaxPartitionLimitation() throws Exception { configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); Thread.sleep(300); - // Since the WAGED rebalancer does not partial rebalance, the assignment won't show even removed cluster level restriction + // Since the WAGED rebalancer does not partial rebalance, the assignment won't show even + // removed cluster level restriction Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().anyMatch(db -> { ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); From caa07910c2b1e7eec795c25bd894f37c3c9fca9a Mon Sep 17 00:00:00 2001 From: Jiajun Wang Date: Mon, 21 Oct 2019 19:47:55 -0700 Subject: [PATCH 5/7] Address comments. --- .../rebalancer/waged/WagedRebalancer.java | 21 ++++++++++++------- .../WagedRebalancerMetricCollector.java | 6 +++--- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index 651d20216c..1fdf6cc4b0 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -193,7 +193,9 @@ public Map computeNewIdealStates(ResourceControllerDataProvi // rebalance exception. throw ex; } else { // return the previously calculated assignment. - LOG.warn("Trying to return the previously calculated assignment."); + LOG.warn( + "Returning the last known-good best possible assignment from metadata store due to " + + "rebalance failure of type: {}", failureType); // Note that don't return an assignment based on the current state if there is no previously // calculated result in this fallback logic. Map assignmentRecord = @@ -316,7 +318,7 @@ private Map convertResourceAssignment( finalIdealStateMap.put(resourceName, newIdealState); } catch (Exception ex) { throw new HelixRebalanceException( - "Fail to calculate the new IdealState for resource: " + resourceName, + "Failed to calculate the new IdealState for resource: " + resourceName, HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); } } @@ -623,21 +625,26 @@ private void applyRebalanceOverwrite(Map idealStateMap, calculateAssignment(clusterData, clusterChanges, resourceMap, enabledLiveInstances, Collections.emptyMap(), baseline)); for (String resourceName : idealStateMap.keySet()) { - IdealState is = idealStateMap.get(resourceName); + // The new calculated ideal state before overwrite + IdealState newIdealState = idealStateMap.get(resourceName); if (!activeIdealStates.containsKey(resourceName)) { throw new HelixRebalanceException( "Failed to calculate the complete partition assignment with all active nodes. Cannot find the resource assignment for " + resourceName, HelixRebalanceException.Type.FAILED_TO_CALCULATE); } + // The ideal state that is calculated based on the real alive/enabled instances list IdealState newActiveIdealState = activeIdealStates.get(resourceName); + // The current ideal state that exists in the IdealState znode IdealState currentIdealState = clusterData.getIdealState(resourceName); int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size()); - int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplica); + int minActiveReplica = + DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplica); Map> finalPreferenceLists = DelayedRebalanceUtil - .getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(), is.getPreferenceLists(), - enabledLiveInstances, Math.min(minActiveReplica, numReplica)); + .getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(), + newIdealState.getPreferenceLists(), enabledLiveInstances, + Math.min(minActiveReplica, numReplica)); - is.setPreferenceLists(finalPreferenceLists); + newIdealState.setPreferenceLists(finalPreferenceLists); } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java index de34de3f27..e9494ffa1e 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java @@ -43,9 +43,9 @@ public enum WagedRebalancerMetricNames { StateReadLatencyGauge, StateWriteLatencyGauge, - // Count of any rebalance failure. - // Note the rebalancer may still be able to return an assignment based on the previous record - // on an error. + // Count of any rebalance compute failure. + // Note the rebalancer may still be able to return the last known-good assignment on a rebalance + // compute failure. And this fallback logic won't impact this counting. RebalanceFailureCounter } From f3495fa20408ac9c7e933eaa8f7c12eb4dfffb3b Mon Sep 17 00:00:00 2001 From: Jiajun Wang Date: Mon, 21 Oct 2019 21:38:44 -0700 Subject: [PATCH 6/7] change error message. --- .../helix/controller/rebalancer/waged/WagedRebalancer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index 1fdf6cc4b0..9a01688b65 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -179,7 +179,7 @@ public Map computeNewIdealStates(ResourceControllerDataProvi // Calculate the target assignment based on the current cluster status. newIdealStates = computeBestPossibleStates(clusterData, resourceMap, currentStateOutput); } catch (HelixRebalanceException ex) { - LOG.error("The new assignment calculation fails.", ex); + LOG.error("Failed to calculate the new assignments.", ex); // Record the failure in metrics. CountMetric rebalanceFailureCount = _metricCollector.getMetric( WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(), From b3187fd01802d7136b19bc45ee1fd03c482881d3 Mon Sep 17 00:00:00 2001 From: jiajunwang Date: Tue, 22 Oct 2019 12:52:48 -0700 Subject: [PATCH 7/7] Address more comments. --- .../WagedRebalancer/TestWagedRebalance.java | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java index 3b20ba9002..9790b929e5 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java @@ -108,7 +108,7 @@ protected void addInstanceConfig(String storageNodeName, int seqNo, int tagCount public void test() throws Exception { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-test" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); _allDBs.add(db); @@ -148,7 +148,7 @@ public void testWithInstanceTag() throws Exception { Set tags = new HashSet(_nodeToTagMap.values()); int i = 3; for (String tag : tags) { - String db = "Test-DB-testWithInstanceTag" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); IdealState is = @@ -164,7 +164,7 @@ public void testWithInstanceTag() throws Exception { @Test(dependsOnMethods = "test") public void testChangeIdealState() throws InterruptedException { - String dbName = "Test-DB-testChangeIdealState"; + String dbName = "Test-DB-" + TestHelper.getTestMethodName(); createResourceWithWagedRebalance(CLUSTER_NAME, dbName, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica); @@ -198,7 +198,7 @@ public void testChangeIdealState() throws InterruptedException { @Test(dependsOnMethods = "test") public void testDisableInstance() throws InterruptedException { - String dbName = "Test-DB-testDisableInstance"; + String dbName = "Test-DB-" + TestHelper.getTestMethodName(); createResourceWithWagedRebalance(CLUSTER_NAME, dbName, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica); @@ -254,7 +254,7 @@ public void testLackEnoughLiveInstances() throws Exception { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-testLackEnoughLiveInstances" + j++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -293,7 +293,7 @@ public void testLackEnoughInstances() throws Exception { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-testLackEnoughInstances" + j++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -324,7 +324,7 @@ public void testLackEnoughInstances() throws Exception { public void testMixedRebalancerUsage() throws InterruptedException { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-testMixedRebalancerUsage" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; if (i == 0) { _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, PARTITIONS, stateModel, IdealState.RebalanceMode.FULL_AUTO + "", CrushRebalanceStrategy.class.getName()); @@ -354,12 +354,14 @@ public void testMaxPartitionLimitation() throws Exception { String limitedResourceName = null; int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-testMaxPartitionLimitation-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); if (i == 1) { - // The limited resource has additional limitation, so even the other resources can be assigned - // later in theory, this resource will still be blocked by the max partition limitation. + // The limited resource has additional limitation. + // The other resources could have been assigned in theory if the WAGED rebalancer were + // not used. + // However, with the WAGED rebalancer, this restricted resource will block the other ones. limitedResourceName = db; IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); @@ -371,7 +373,8 @@ public void testMaxPartitionLimitation() throws Exception { } Thread.sleep(300); - // Since the WAGED rebalancer does not partial rebalance, the initial assignment won't show. + // Since the WAGED rebalancer need to finish rebalancing every resources, the initial + // assignment won't show. Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().anyMatch(db -> { ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); @@ -383,8 +386,8 @@ public void testMaxPartitionLimitation() throws Exception { configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); Thread.sleep(300); - // Since the WAGED rebalancer does not partial rebalance, the assignment won't show even - // removed cluster level restriction + // Since the WAGED rebalancer need to finish rebalancing every resources, the assignment won't + // show even removed cluster level restriction Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().anyMatch(db -> { ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);