diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index f39d3cb97f..9a01688b65 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -30,7 +30,6 @@ import com.google.common.collect.ImmutableSet; import org.apache.helix.HelixConstants; -import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.changedetector.ResourceChangeDetector; @@ -51,6 +50,7 @@ import org.apache.helix.model.ResourceConfig; import org.apache.helix.monitoring.metrics.MetricCollector; import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector; +import org.apache.helix.monitoring.metrics.model.CountMetric; import org.apache.helix.monitoring.metrics.model.LatencyMetric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,9 +174,36 @@ public Map computeNewIdealStates(ResourceControllerDataProvi LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString()); validateInput(clusterData, resourceMap); - // Calculate the target assignment based on the current cluster status. - Map newIdealStates = - computeBestPossibleStates(clusterData, resourceMap, currentStateOutput); + Map newIdealStates; + try { + // Calculate the target assignment based on the current cluster status. + newIdealStates = computeBestPossibleStates(clusterData, resourceMap, currentStateOutput); + } catch (HelixRebalanceException ex) { + LOG.error("Failed to calculate the new assignments.", ex); + // Record the failure in metrics. + CountMetric rebalanceFailureCount = _metricCollector.getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(), + CountMetric.class); + rebalanceFailureCount.increaseCount(1L); + + HelixRebalanceException.Type failureType = ex.getFailureType(); + if (failureType.equals(HelixRebalanceException.Type.INVALID_REBALANCER_STATUS) || failureType + .equals(HelixRebalanceException.Type.UNKNOWN_FAILURE)) { + // If the failure is unknown or because of assignment store access failure, throw the + // rebalance exception. + throw ex; + } else { // return the previously calculated assignment. + LOG.warn( + "Returning the last known-good best possible assignment from metadata store due to " + + "rebalance failure of type: {}", failureType); + // Note that don't return an assignment based on the current state if there is no previously + // calculated result in this fallback logic. + Map assignmentRecord = + getBestPossibleAssignment(_assignmentMetadataStore, new CurrentStateOutput(), + resourceMap.keySet()); + newIdealStates = convertResourceAssignment(clusterData, assignmentRecord); + } + } // Construct the new best possible states according to the current state and target assignment. // Note that the new ideal state might be an intermediate state between the current state and @@ -203,7 +230,7 @@ public Map computeNewIdealStates(ResourceControllerDataProvi } // Coordinate baseline recalculation and partial rebalance according to the cluster changes. - private Map computeBestPossibleStates( + protected Map computeBestPossibleStates( ResourceControllerDataProvider clusterData, Map resourceMap, final CurrentStateOutput currentStateOutput) throws HelixRebalanceException { getChangeDetector().updateSnapshots(clusterData); @@ -243,36 +270,15 @@ private Map computeBestPossibleStates( Map newAssignment = partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes, currentStateOutput); - // > - Map> resourceStatePriorityMap = new HashMap<>(); - // Convert the assignments into IdealState for the following state mapping calculation. - Map finalIdealStateMap = new HashMap<>(); - for (String resourceName : newAssignment.keySet()) { - IdealState newIdealState; - try { - IdealState currentIdealState = clusterData.getIdealState(resourceName); - Map statePriorityMap = clusterData - .getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap(); - // Keep the priority map for the rebalance overwrite logic later. - resourceStatePriorityMap.put(resourceName, statePriorityMap); - // Create a new IdealState instance contains the new calculated assignment in the preference - // list. - newIdealState = generateIdealStateWithAssignment(resourceName, currentIdealState, - newAssignment.get(resourceName), statePriorityMap); - } catch (Exception ex) { - throw new HelixRebalanceException( - "Fail to calculate the new IdealState for resource: " + resourceName, - HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); - } - finalIdealStateMap.put(resourceName, newIdealState); - } + Map finalIdealStateMap = + convertResourceAssignment(clusterData, newAssignment); // The additional rebalance overwrite is required since the calculated mapping may contains // some delayed rebalanced assignments. if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) { applyRebalanceOverwrite(finalIdealStateMap, clusterData, resourceMap, clusterChanges, - resourceStatePriorityMap, getBaselineAssignment(_assignmentMetadataStore, - currentStateOutput, resourceMap.keySet())); + getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, + resourceMap.keySet())); } // Replace the assignment if user-defined preference list is configured. // Note the user-defined list is intentionally applied to the final mapping after calculation. @@ -285,6 +291,40 @@ resourceStatePriorityMap, getBaselineAssignment(_assignmentMetadataStore, return finalIdealStateMap; } + /** + * Convert the resource assignment map into an IdealState map. + */ + private Map convertResourceAssignment( + ResourceControllerDataProvider clusterData, Map assignments) + throws HelixRebalanceException { + // Convert the assignments into IdealState for the following state mapping calculation. + Map finalIdealStateMap = new HashMap<>(); + for (String resourceName : assignments.keySet()) { + try { + IdealState currentIdealState = clusterData.getIdealState(resourceName); + Map statePriorityMap = + clusterData.getStateModelDef(currentIdealState.getStateModelDefRef()) + .getStatePriorityMap(); + // Create a new IdealState instance which contains the new calculated assignment in the + // preference list. + IdealState newIdealState = new IdealState(resourceName); + // Copy the simple fields + newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields()); + // Sort the preference list according to state priority. + newIdealState.setPreferenceLists( + getPreferenceLists(assignments.get(resourceName), statePriorityMap)); + // Note the state mapping in the new assignment won't directly propagate to the map fields. + // The rebalancer will calculate for the final state mapping considering the current states. + finalIdealStateMap.put(resourceName, newIdealState); + } catch (Exception ex) { + throw new HelixRebalanceException( + "Failed to calculate the new IdealState for resource: " + resourceName, + HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); + } + } + return finalIdealStateMap; + } + // TODO make the Baseline calculation async if complicated algorithm is used for the Baseline private void refreshBaseline(ResourceControllerDataProvider clusterData, Map> clusterChanges, Map resourceMap, @@ -414,23 +454,6 @@ private ResourceChangeDetector getChangeDetector() { return CHANGE_DETECTOR_THREAD_LOCAL.get(); } - // Generate a new IdealState based on the input newAssignment. - // The assignment will be propagate to the preference lists. - // Note that we will recalculate the states based on the current state, so there is no need to - // update the mapping fields in the IdealState output. - private IdealState generateIdealStateWithAssignment(String resourceName, - IdealState currentIdealState, ResourceAssignment newAssignment, - Map statePriorityMap) { - IdealState newIdealState = new IdealState(resourceName); - // Copy the simple fields - newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields()); - // Sort the preference list according to state priority. - newIdealState.setPreferenceLists(getPreferenceLists(newAssignment, statePriorityMap)); - // Note the state mapping in the new assignment won't be directly propagate to the map fields. - // The rebalancer will calculate for the final state mapping considering the current states. - return newIdealState; - } - // Generate the preference lists from the state mapping based on state priority. private Map> getPreferenceLists(ResourceAssignment newAssignment, Map statePriorityMap) { @@ -488,9 +511,6 @@ private Map getBaselineAssignment( stateReadLatency.startMeasuringLatency(); currentBaseline = assignmentMetadataStore.getBaseline(); stateReadLatency.endMeasuringLatency(); - } catch (HelixException ex) { - // Report error. and use empty mapping instead. - LOG.error("Failed to get the current baseline assignment.", ex); } catch (Exception ex) { throw new HelixRebalanceException( "Failed to get the current baseline assignment because of unexpected error.", @@ -501,6 +521,7 @@ private Map getBaselineAssignment( LOG.warn("The current baseline assignment record is empty. Use the current states instead."); currentBaseline = getCurrentStateAssingment(currentStateOutput, resources); } + currentBaseline.keySet().retainAll(resources); return currentBaseline; } @@ -524,9 +545,6 @@ private Map getBestPossibleAssignment( stateReadLatency.startMeasuringLatency(); currentBestAssignment = assignmentMetadataStore.getBestPossibleAssignment(); stateReadLatency.endMeasuringLatency(); - } catch (HelixException ex) { - // Report error. and use empty mapping instead. - LOG.error("Failed to get the current best possible assignment.", ex); } catch (Exception ex) { throw new HelixRebalanceException( "Failed to get the current best possible assignment because of unexpected error.", @@ -538,6 +556,7 @@ private Map getBestPossibleAssignment( "The current best possible assignment record is empty. Use the current states instead."); currentBestAssignment = getCurrentStateAssingment(currentStateOutput, resources); } + currentBestAssignment.keySet().retainAll(resources); return currentBestAssignment; } @@ -593,39 +612,39 @@ private void delayedRebalanceSchedule(ResourceControllerDataProvider clusterData * @param clusterData the cluster data cache. * @param resourceMap the rebalanaced resource map. * @param clusterChanges the detected cluster changes that triggeres the rebalance. - * @param resourceStatePriorityMap the state priority map for each resource. * @param baseline the baseline assignment */ private void applyRebalanceOverwrite(Map idealStateMap, ResourceControllerDataProvider clusterData, Map resourceMap, Map> clusterChanges, - Map> resourceStatePriorityMap, Map baseline) throws HelixRebalanceException { Set enabledLiveInstances = clusterData.getEnabledLiveInstances(); // Note that the calculation used the baseline as the input only. This is for minimizing // unnecessary partition movement. - Map activeAssignment = calculateAssignment(clusterData, - clusterChanges, resourceMap, enabledLiveInstances, Collections.emptyMap(), baseline); + Map activeIdealStates = convertResourceAssignment(clusterData, + calculateAssignment(clusterData, clusterChanges, resourceMap, enabledLiveInstances, + Collections.emptyMap(), baseline)); for (String resourceName : idealStateMap.keySet()) { - IdealState is = idealStateMap.get(resourceName); - if (!activeAssignment.containsKey(resourceName)) { + // The new calculated ideal state before overwrite + IdealState newIdealState = idealStateMap.get(resourceName); + if (!activeIdealStates.containsKey(resourceName)) { throw new HelixRebalanceException( "Failed to calculate the complete partition assignment with all active nodes. Cannot find the resource assignment for " - + resourceName, - HelixRebalanceException.Type.FAILED_TO_CALCULATE); + + resourceName, HelixRebalanceException.Type.FAILED_TO_CALCULATE); } + // The ideal state that is calculated based on the real alive/enabled instances list + IdealState newActiveIdealState = activeIdealStates.get(resourceName); + // The current ideal state that exists in the IdealState znode IdealState currentIdealState = clusterData.getIdealState(resourceName); - IdealState newActiveIdealState = - generateIdealStateWithAssignment(resourceName, currentIdealState, - activeAssignment.get(resourceName), resourceStatePriorityMap.get(resourceName)); - - int numReplia = currentIdealState.getReplicaCount(enabledLiveInstances.size()); - int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplia); - Map> finalPreferenceLists = - DelayedRebalanceUtil.getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(), - is.getPreferenceLists(), enabledLiveInstances, Math.min(minActiveReplica, numReplia)); - - is.setPreferenceLists(finalPreferenceLists); + int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size()); + int minActiveReplica = + DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplica); + Map> finalPreferenceLists = DelayedRebalanceUtil + .getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(), + newIdealState.getPreferenceLists(), enabledLiveInstances, + Math.min(minActiveReplica, numReplica)); + + newIdealState.setPreferenceLists(finalPreferenceLists); } } @@ -641,4 +660,8 @@ private void applyUserDefinedPreferenceList(ResourceConfig resourceConfig, } } } + + protected MetricCollector getMetricCollector() { + return _metricCollector; + } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java index 1be6a21423..2b0f1db3cc 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java @@ -25,7 +25,7 @@ * @param the type of the metric value */ public class SimpleDynamicMetric extends DynamicMetric { - private final String _metricName; + protected final String _metricName; /** * Instantiates a new Simple dynamic metric. diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java index 04d804d4d6..e9494ffa1e 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java @@ -20,8 +20,11 @@ */ import javax.management.JMException; + import org.apache.helix.monitoring.mbeans.MonitorDomainNames; +import org.apache.helix.monitoring.metrics.implementation.RebalanceFailureCount; import org.apache.helix.monitoring.metrics.implementation.RebalanceLatencyGauge; +import org.apache.helix.monitoring.metrics.model.CountMetric; import org.apache.helix.monitoring.metrics.model.LatencyMetric; public class WagedRebalancerMetricCollector extends MetricCollector { @@ -38,7 +41,12 @@ public enum WagedRebalancerMetricNames { // The following latency metrics are related to AssignmentMetadataStore StateReadLatencyGauge, - StateWriteLatencyGauge + StateWriteLatencyGauge, + + // Count of any rebalance compute failure. + // Note the rebalancer may still be able to return the last known-good assignment on a rebalance + // compute failure. And this fallback logic won't impact this counting. + RebalanceFailureCounter } public WagedRebalancerMetricCollector(String clusterName) throws JMException { @@ -62,19 +70,26 @@ public WagedRebalancerMetricCollector() { */ private void createMetrics() { // Define all metrics - LatencyMetric globalBaselineCalcLatencyGauge = new RebalanceLatencyGauge( - WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), getResetIntervalInMs()); - LatencyMetric partialRebalanceLatencyGauge = new RebalanceLatencyGauge( - WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), getResetIntervalInMs()); - LatencyMetric stateReadLatencyGauge = new RebalanceLatencyGauge( - WagedRebalancerMetricNames.StateReadLatencyGauge.name(), getResetIntervalInMs()); - LatencyMetric stateWriteLatencyGauge = new RebalanceLatencyGauge( - WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), getResetIntervalInMs()); + LatencyMetric globalBaselineCalcLatencyGauge = + new RebalanceLatencyGauge(WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), + getResetIntervalInMs()); + LatencyMetric partialRebalanceLatencyGauge = + new RebalanceLatencyGauge(WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), + getResetIntervalInMs()); + LatencyMetric stateReadLatencyGauge = + new RebalanceLatencyGauge(WagedRebalancerMetricNames.StateReadLatencyGauge.name(), + getResetIntervalInMs()); + LatencyMetric stateWriteLatencyGauge = + new RebalanceLatencyGauge(WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), + getResetIntervalInMs()); + CountMetric calcFailureCount = + new RebalanceFailureCount(WagedRebalancerMetricNames.RebalanceFailureCounter.name()); // Add metrics to WagedRebalancerMetricCollector addMetric(globalBaselineCalcLatencyGauge); addMetric(partialRebalanceLatencyGauge); addMetric(stateReadLatencyGauge); addMetric(stateWriteLatencyGauge); + addMetric(calcFailureCount); } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java new file mode 100644 index 0000000000..3764645563 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java @@ -0,0 +1,19 @@ +package org.apache.helix.monitoring.metrics.implementation; + +import org.apache.helix.monitoring.metrics.model.CountMetric; + +public class RebalanceFailureCount extends CountMetric { + /** + * Instantiates a new Simple dynamic metric. + * + * @param metricName the metric name + */ + public RebalanceFailureCount(String metricName) { + super(metricName, 0L); + } + + @Override + public void increaseCount(long count) { + updateValue(getValue() + count); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java index e96a5893bb..b6e58b4ab9 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java @@ -22,7 +22,6 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.SlidingTimeWindowArrayReservoir; import java.util.concurrent.TimeUnit; -import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; import org.apache.helix.monitoring.metrics.model.LatencyMetric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,22 +71,6 @@ public void endMeasuringLatency() { reset(); } - @Override - public String getMetricName() { - return _metricName; - } - - @Override - public void reset() { - _startTime = VALUE_NOT_SET; - _endTime = VALUE_NOT_SET; - } - - @Override - public String toString() { - return String.format("Metric %s's latency is %d", _metricName, getLastEmittedMetricValue()); - } - /** * Returns the most recently emitted metric value at the time of the call. * @return @@ -97,8 +80,11 @@ public long getLastEmittedMetricValue() { return _lastEmittedMetricValue; } - @Override - public DynamicMetric getDynamicMetric() { - return this; + /** + * Resets the internal state of this metric. + */ + private void reset() { + _startTime = VALUE_NOT_SET; + _endTime = VALUE_NOT_SET; } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java index 5a7f0caa33..424ac9e300 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java @@ -19,23 +19,49 @@ * under the License. */ +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric; /** * Represents a count metric and defines methods to help with calculation. A count metric gives a * gauge value of a certain property. */ -public abstract class CountMetric extends SimpleDynamicMetric implements Metric { - protected V _count; +public abstract class CountMetric extends SimpleDynamicMetric implements Metric { /** - * Instantiates a new Simple dynamic metric. + * Instantiates a new count metric. + * * @param metricName the metric name - * @param metricObject the metric object + * @param initCount the initial count */ - public CountMetric(String metricName, V metricObject) { - super(metricName, metricObject); + public CountMetric(String metricName, long initCount) { + super(metricName, initCount); } - public abstract void setCount(Object count); + /** + * Increment the metric by the input count. + * + * @param count + */ + public abstract void increaseCount(long count); + + @Override + public String getMetricName() { + return _metricName; + } + + @Override + public String toString() { + return String.format("Metric %s's count is %d", getMetricName(), getValue()); + } + + @Override + public long getLastEmittedMetricValue() { + return getValue(); + } + + @Override + public DynamicMetric getDynamicMetric() { + return this; + } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java index c8ba5ae4b2..d60f245b6f 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java @@ -20,6 +20,7 @@ */ import com.codahale.metrics.Histogram; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric; /** @@ -38,6 +39,7 @@ public abstract class LatencyMetric extends HistogramDynamicMetric implements Me */ public LatencyMetric(String metricName, Histogram metricObject) { super(metricName, metricObject); + _metricName = metricName; } /** @@ -49,4 +51,19 @@ public LatencyMetric(String metricName, Histogram metricObject) { * Ends measuring the latency. */ public abstract void endMeasuringLatency(); + + @Override + public String getMetricName() { + return _metricName; + } + + @Override + public String toString() { + return String.format("Metric %s's latency is %d", getMetricName(), getLastEmittedMetricValue()); + } + + @Override + public DynamicMetric getDynamicMetric() { + return this; + } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java index ba59b4f6e5..22378dcff9 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java @@ -31,11 +31,6 @@ public interface Metric { */ String getMetricName(); - /** - * Resets the internal state of this metric. - */ - void reset(); - /** * Prints the metric along with its name. */ diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java index df368cbfce..dd0cc8c912 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; + import org.apache.helix.HelixConstants; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; @@ -41,6 +42,8 @@ import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector; +import org.apache.helix.monitoring.metrics.model.CountMetric; import org.mockito.Mockito; import org.mockito.stubbing.Answer; import org.testng.Assert; @@ -241,7 +244,7 @@ public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceE // TODO test with invalid capacity configuration which will fail the cluster model constructing. @Test(dependsOnMethods = "testRebalance") - public void testInvalidClusterStatus() throws IOException { + public void testInvalidClusterStatus() throws IOException, HelixRebalanceException { _metadataStore.clearMetadataStore(); WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer()); @@ -254,13 +257,19 @@ public void testInvalidClusterStatus() throws IOException { Map resourceMap = clusterData.getIdealStates().keySet().stream().collect( Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName))); try { - rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput()); Assert.fail("Rebalance shall fail."); } catch (HelixRebalanceException ex) { Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.INVALID_CLUSTER_STATUS); Assert.assertEquals(ex.getMessage(), "Failed to generate cluster model. Failure Type: INVALID_CLUSTER_STATUS"); } + + // The rebalance will be done with empty mapping result since there is no previously calculated + // assignment. + Assert.assertTrue( + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()) + .isEmpty()); } @Test(dependsOnMethods = "testRebalance") @@ -289,24 +298,45 @@ public void testInvalidRebalancerStatus() throws IOException { @Test(dependsOnMethods = "testRebalance") public void testAlgorithmException() throws IOException, HelixRebalanceException { - RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class); - when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.", - HelixRebalanceException.Type.FAILED_TO_CALCULATE)); - _metadataStore.clearMetadataStore(); WagedRebalancer rebalancer = - new WagedRebalancer(_metadataStore, badAlgorithm, new DelayedAutoRebalancer()); + new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer()); ResourceControllerDataProvider clusterData = setupClusterDataCache(); - Map resourceMap = clusterData.getIdealStates().keySet().stream().collect( - Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName))); + Map resourceMap = clusterData.getIdealStates().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> { + Resource resource = new Resource(entry.getKey()); + entry.getValue().getPartitionSet().stream() + .forEach(partition -> resource.addPartition(partition)); + return resource; + })); + // Rebalance with normal configuration. So the assignment will be persisted in the metadata store. + Map result = + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + + // Recreate a rebalance with the same metadata store but bad algorithm instance. + RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class); + when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.", + HelixRebalanceException.Type.FAILED_TO_CALCULATE)); + rebalancer = new WagedRebalancer(_metadataStore, badAlgorithm, new DelayedAutoRebalancer()); + + // Calculation will fail try { - rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput()); Assert.fail("Rebalance shall fail."); } catch (HelixRebalanceException ex) { Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE); Assert.assertEquals(ex.getMessage(), "Algorithm fails. Failure Type: FAILED_TO_CALCULATE"); } + // But if call with the public method computeNewIdealStates(), the rebalance will return with + // the previous rebalance result. + Map newResult = + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + Assert.assertEquals(newResult, result); + // Ensure failure has been recorded + Assert.assertEquals(rebalancer.getMetricCollector().getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(), + CountMetric.class).getValue().longValue(), 1l); } @Test(dependsOnMethods = "testRebalance") diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java index 713c095539..e49cc191b1 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; +import org.apache.helix.TestHelper; import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalance; import org.apache.helix.model.ExternalView; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; @@ -39,7 +40,7 @@ protected ZkHelixClusterVerifier getClusterVerifier() { Set dbNames = new HashSet<>(); int i = 0; for (String stateModel : TestStateModels) { - dbNames.add("Test-DB-" + i++); + dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++); } return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames) .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build(); @@ -47,10 +48,10 @@ protected ZkHelixClusterVerifier getClusterVerifier() { // create test DBs, wait it converged and return externalviews protected Map createTestDBs(long delayTime) throws InterruptedException { - Map externalViews = new HashMap(); + Map externalViews = new HashMap<>(); int i = 0; for (String stateModel : TestStateModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _minActiveReplica); _testDBs.add(db); diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java index bcb2260d88..3d4bd6a0b1 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; +import org.apache.helix.TestHelper; import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithDisabledInstance; import org.apache.helix.model.ExternalView; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; @@ -40,7 +41,7 @@ protected ZkHelixClusterVerifier getClusterVerifier() { Set dbNames = new HashSet<>(); int i = 0; for (String stateModel : TestStateModels) { - dbNames.add("Test-DB-" + i++); + dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++); } return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames) .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build(); @@ -48,10 +49,10 @@ protected ZkHelixClusterVerifier getClusterVerifier() { // create test DBs, wait it converged and return externalviews protected Map createTestDBs(long delayTime) throws InterruptedException { - Map externalViews = new HashMap(); + Map externalViews = new HashMap<>(); int i = 0; for (String stateModel : TestStateModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _minActiveReplica); _testDBs.add(db); diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java index e0adf72f33..bb7c11a517 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; +import org.apache.helix.TestHelper; import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithRackaware; import org.apache.helix.model.ExternalView; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; @@ -39,7 +40,7 @@ protected ZkHelixClusterVerifier getClusterVerifier() { Set dbNames = new HashSet<>(); int i = 0; for (String stateModel : TestStateModels) { - dbNames.add("Test-DB-" + i++); + dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++); } return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames) .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build(); @@ -47,10 +48,10 @@ protected ZkHelixClusterVerifier getClusterVerifier() { // create test DBs, wait it converged and return externalviews protected Map createTestDBs(long delayTime) throws InterruptedException { - Map externalViews = new HashMap(); + Map externalViews = new HashMap<>(); int i = 0; for (String stateModel : TestStateModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _minActiveReplica); _testDBs.add(db); diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java index 4920414187..9790b929e5 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java @@ -108,7 +108,7 @@ protected void addInstanceConfig(String storageNodeName, int seqNo, int tagCount public void test() throws Exception { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); _allDBs.add(db); @@ -148,7 +148,7 @@ public void testWithInstanceTag() throws Exception { Set tags = new HashSet(_nodeToTagMap.values()); int i = 3; for (String tag : tags) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); IdealState is = @@ -164,7 +164,7 @@ public void testWithInstanceTag() throws Exception { @Test(dependsOnMethods = "test") public void testChangeIdealState() throws InterruptedException { - String dbName = "Test-DB"; + String dbName = "Test-DB-" + TestHelper.getTestMethodName(); createResourceWithWagedRebalance(CLUSTER_NAME, dbName, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica); @@ -198,7 +198,7 @@ public void testChangeIdealState() throws InterruptedException { @Test(dependsOnMethods = "test") public void testDisableInstance() throws InterruptedException { - String dbName = "Test-DB"; + String dbName = "Test-DB-" + TestHelper.getTestMethodName(); createResourceWithWagedRebalance(CLUSTER_NAME, dbName, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica); @@ -254,7 +254,7 @@ public void testLackEnoughLiveInstances() throws Exception { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + j++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -293,7 +293,7 @@ public void testLackEnoughInstances() throws Exception { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + j++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -324,7 +324,7 @@ public void testLackEnoughInstances() throws Exception { public void testMixedRebalancerUsage() throws InterruptedException { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; if (i == 0) { _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, PARTITIONS, stateModel, IdealState.RebalanceMode.FULL_AUTO + "", CrushRebalanceStrategy.class.getName()); @@ -354,12 +354,14 @@ public void testMaxPartitionLimitation() throws Exception { String limitedResourceName = null; int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); if (i == 1) { - // The limited resource has additional limitation, so even the other resources can be assigned - // later, this resource will still be blocked by the max partition limitation. + // The limited resource has additional limitation. + // The other resources could have been assigned in theory if the WAGED rebalancer were + // not used. + // However, with the WAGED rebalancer, this restricted resource will block the other ones. limitedResourceName = db; IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); @@ -371,8 +373,9 @@ public void testMaxPartitionLimitation() throws Exception { } Thread.sleep(300); - // Since the WAGED rebalancer does not do partial rebalance, the initial assignment won't show. - Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().allMatch(db -> { + // Since the WAGED rebalancer need to finish rebalancing every resources, the initial + // assignment won't show. + Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().anyMatch(db -> { ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); return ev != null && !ev.getPartitionSet().isEmpty(); @@ -383,20 +386,13 @@ public void testMaxPartitionLimitation() throws Exception { configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); Thread.sleep(300); - // wait until any of the resources is rebalanced - TestHelper.verify(() -> { - for (String db : _allDBs) { - ExternalView ev = - _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - if (ev != null && !ev.getPartitionSet().isEmpty()) { - return true; - } - } - return false; - }, 3000); - ExternalView ev = _gSetupTool.getClusterManagementTool() - .getResourceExternalView(CLUSTER_NAME, limitedResourceName); - Assert.assertFalse(ev != null && !ev.getPartitionSet().isEmpty()); + // Since the WAGED rebalancer need to finish rebalancing every resources, the assignment won't + // show even removed cluster level restriction + Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().anyMatch(db -> { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + return ev != null && !ev.getPartitionSet().isEmpty(); + }), 2000)); // Remove the resource level limitation IdealState idealState = _gSetupTool.getClusterManagementTool() diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java index 0a4c232cc6..831f77fbf6 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java @@ -110,7 +110,7 @@ protected void addInstanceConfig(String storageNodeName, int seqNo, int zoneCoun public void testZoneIsolation() throws Exception { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-testZoneIsolation" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -126,7 +126,7 @@ public void testZoneIsolationWithInstanceTag() throws Exception { Set tags = new HashSet(_nodeToTagMap.values()); int i = 0; for (String tag : tags) { - String db = "Test-DB-" + i++; + String db = "Test-DB-testZoneIsolationWithInstanceTag" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); IdealState is = @@ -154,7 +154,7 @@ public void testLackEnoughLiveRacks() throws Exception { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + j++; + String db = "Test-DB-testLackEnoughLiveRacks" + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -196,7 +196,7 @@ public void testLackEnoughRacks() throws Exception { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + j++; + String db = "Test-DB-testLackEnoughRacks" + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -228,7 +228,7 @@ public void testLackEnoughRacks() throws Exception { public void testAddZone() throws Exception { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-testAddZone" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);