Skip to content

Commit

Permalink
The WAGED rebalancer returns the previously calculated assignment on …
Browse files Browse the repository at this point in the history
…calculation failure (#514)

* The WAGED rebalancer returns the previously calculated assignment on calculation failure.

This is to protect the cluster assignment on a rebalancing algorithm failure. For example, the cluster is out of capacity. In this case, the rebalancer will keep using the previously calculated mapping.
Also, refine the new metric interface, and add the RebalanceFailureCount metric for recording the failures.

Modify the test cases so that DBs from different test cases have a different name. This is to avoid previous test records to be returned by the rebalancer on calculation error.
  • Loading branch information
jiajunwang authored and Jiajun Wang committed Dec 12, 2019
1 parent 8f9b3e2 commit eb78edc
Show file tree
Hide file tree
Showing 14 changed files with 274 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import com.google.common.collect.ImmutableSet;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.changedetector.ResourceChangeDetector;
Expand All @@ -51,6 +50,7 @@
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.monitoring.metrics.MetricCollector;
import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
import org.apache.helix.monitoring.metrics.model.CountMetric;
import org.apache.helix.monitoring.metrics.model.LatencyMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -174,9 +174,36 @@ public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvi
LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString());
validateInput(clusterData, resourceMap);

// Calculate the target assignment based on the current cluster status.
Map<String, IdealState> newIdealStates =
computeBestPossibleStates(clusterData, resourceMap, currentStateOutput);
Map<String, IdealState> newIdealStates;
try {
// Calculate the target assignment based on the current cluster status.
newIdealStates = computeBestPossibleStates(clusterData, resourceMap, currentStateOutput);
} catch (HelixRebalanceException ex) {
LOG.error("Failed to calculate the new assignments.", ex);
// Record the failure in metrics.
CountMetric rebalanceFailureCount = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(),
CountMetric.class);
rebalanceFailureCount.increaseCount(1L);

HelixRebalanceException.Type failureType = ex.getFailureType();
if (failureType.equals(HelixRebalanceException.Type.INVALID_REBALANCER_STATUS) || failureType
.equals(HelixRebalanceException.Type.UNKNOWN_FAILURE)) {
// If the failure is unknown or because of assignment store access failure, throw the
// rebalance exception.
throw ex;
} else { // return the previously calculated assignment.
LOG.warn(
"Returning the last known-good best possible assignment from metadata store due to "
+ "rebalance failure of type: {}", failureType);
// Note that don't return an assignment based on the current state if there is no previously
// calculated result in this fallback logic.
Map<String, ResourceAssignment> assignmentRecord =
getBestPossibleAssignment(_assignmentMetadataStore, new CurrentStateOutput(),
resourceMap.keySet());
newIdealStates = convertResourceAssignment(clusterData, assignmentRecord);
}
}

// Construct the new best possible states according to the current state and target assignment.
// Note that the new ideal state might be an intermediate state between the current state and
Expand All @@ -203,7 +230,7 @@ public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvi
}

// Coordinate baseline recalculation and partial rebalance according to the cluster changes.
private Map<String, IdealState> computeBestPossibleStates(
protected Map<String, IdealState> computeBestPossibleStates(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
final CurrentStateOutput currentStateOutput) throws HelixRebalanceException {
getChangeDetector().updateSnapshots(clusterData);
Expand Down Expand Up @@ -243,36 +270,15 @@ private Map<String, IdealState> computeBestPossibleStates(
Map<String, ResourceAssignment> newAssignment =
partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes, currentStateOutput);

// <ResourceName, <State, Priority>>
Map<String, Map<String, Integer>> resourceStatePriorityMap = new HashMap<>();
// Convert the assignments into IdealState for the following state mapping calculation.
Map<String, IdealState> finalIdealStateMap = new HashMap<>();
for (String resourceName : newAssignment.keySet()) {
IdealState newIdealState;
try {
IdealState currentIdealState = clusterData.getIdealState(resourceName);
Map<String, Integer> statePriorityMap = clusterData
.getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap();
// Keep the priority map for the rebalance overwrite logic later.
resourceStatePriorityMap.put(resourceName, statePriorityMap);
// Create a new IdealState instance contains the new calculated assignment in the preference
// list.
newIdealState = generateIdealStateWithAssignment(resourceName, currentIdealState,
newAssignment.get(resourceName), statePriorityMap);
} catch (Exception ex) {
throw new HelixRebalanceException(
"Fail to calculate the new IdealState for resource: " + resourceName,
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}
finalIdealStateMap.put(resourceName, newIdealState);
}
Map<String, IdealState> finalIdealStateMap =
convertResourceAssignment(clusterData, newAssignment);

// The additional rebalance overwrite is required since the calculated mapping may contains
// some delayed rebalanced assignments.
if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
applyRebalanceOverwrite(finalIdealStateMap, clusterData, resourceMap, clusterChanges,
resourceStatePriorityMap, getBaselineAssignment(_assignmentMetadataStore,
currentStateOutput, resourceMap.keySet()));
getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet()));
}
// Replace the assignment if user-defined preference list is configured.
// Note the user-defined list is intentionally applied to the final mapping after calculation.
Expand All @@ -285,6 +291,40 @@ resourceStatePriorityMap, getBaselineAssignment(_assignmentMetadataStore,
return finalIdealStateMap;
}

/**
* Convert the resource assignment map into an IdealState map.
*/
private Map<String, IdealState> convertResourceAssignment(
ResourceControllerDataProvider clusterData, Map<String, ResourceAssignment> assignments)
throws HelixRebalanceException {
// Convert the assignments into IdealState for the following state mapping calculation.
Map<String, IdealState> finalIdealStateMap = new HashMap<>();
for (String resourceName : assignments.keySet()) {
try {
IdealState currentIdealState = clusterData.getIdealState(resourceName);
Map<String, Integer> statePriorityMap =
clusterData.getStateModelDef(currentIdealState.getStateModelDefRef())
.getStatePriorityMap();
// Create a new IdealState instance which contains the new calculated assignment in the
// preference list.
IdealState newIdealState = new IdealState(resourceName);
// Copy the simple fields
newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
// Sort the preference list according to state priority.
newIdealState.setPreferenceLists(
getPreferenceLists(assignments.get(resourceName), statePriorityMap));
// Note the state mapping in the new assignment won't directly propagate to the map fields.
// The rebalancer will calculate for the final state mapping considering the current states.
finalIdealStateMap.put(resourceName, newIdealState);
} catch (Exception ex) {
throw new HelixRebalanceException(
"Failed to calculate the new IdealState for resource: " + resourceName,
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}
}
return finalIdealStateMap;
}

// TODO make the Baseline calculation async if complicated algorithm is used for the Baseline
private void refreshBaseline(ResourceControllerDataProvider clusterData,
Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap,
Expand Down Expand Up @@ -414,23 +454,6 @@ private ResourceChangeDetector getChangeDetector() {
return CHANGE_DETECTOR_THREAD_LOCAL.get();
}

// Generate a new IdealState based on the input newAssignment.
// The assignment will be propagate to the preference lists.
// Note that we will recalculate the states based on the current state, so there is no need to
// update the mapping fields in the IdealState output.
private IdealState generateIdealStateWithAssignment(String resourceName,
IdealState currentIdealState, ResourceAssignment newAssignment,
Map<String, Integer> statePriorityMap) {
IdealState newIdealState = new IdealState(resourceName);
// Copy the simple fields
newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
// Sort the preference list according to state priority.
newIdealState.setPreferenceLists(getPreferenceLists(newAssignment, statePriorityMap));
// Note the state mapping in the new assignment won't be directly propagate to the map fields.
// The rebalancer will calculate for the final state mapping considering the current states.
return newIdealState;
}

// Generate the preference lists from the state mapping based on state priority.
private Map<String, List<String>> getPreferenceLists(ResourceAssignment newAssignment,
Map<String, Integer> statePriorityMap) {
Expand Down Expand Up @@ -488,9 +511,6 @@ private Map<String, ResourceAssignment> getBaselineAssignment(
stateReadLatency.startMeasuringLatency();
currentBaseline = assignmentMetadataStore.getBaseline();
stateReadLatency.endMeasuringLatency();
} catch (HelixException ex) {
// Report error. and use empty mapping instead.
LOG.error("Failed to get the current baseline assignment.", ex);
} catch (Exception ex) {
throw new HelixRebalanceException(
"Failed to get the current baseline assignment because of unexpected error.",
Expand All @@ -501,6 +521,7 @@ private Map<String, ResourceAssignment> getBaselineAssignment(
LOG.warn("The current baseline assignment record is empty. Use the current states instead.");
currentBaseline = getCurrentStateAssingment(currentStateOutput, resources);
}
currentBaseline.keySet().retainAll(resources);
return currentBaseline;
}

Expand All @@ -524,9 +545,6 @@ private Map<String, ResourceAssignment> getBestPossibleAssignment(
stateReadLatency.startMeasuringLatency();
currentBestAssignment = assignmentMetadataStore.getBestPossibleAssignment();
stateReadLatency.endMeasuringLatency();
} catch (HelixException ex) {
// Report error. and use empty mapping instead.
LOG.error("Failed to get the current best possible assignment.", ex);
} catch (Exception ex) {
throw new HelixRebalanceException(
"Failed to get the current best possible assignment because of unexpected error.",
Expand All @@ -538,6 +556,7 @@ private Map<String, ResourceAssignment> getBestPossibleAssignment(
"The current best possible assignment record is empty. Use the current states instead.");
currentBestAssignment = getCurrentStateAssingment(currentStateOutput, resources);
}
currentBestAssignment.keySet().retainAll(resources);
return currentBestAssignment;
}

Expand Down Expand Up @@ -593,39 +612,39 @@ private void delayedRebalanceSchedule(ResourceControllerDataProvider clusterData
* @param clusterData the cluster data cache.
* @param resourceMap the rebalanaced resource map.
* @param clusterChanges the detected cluster changes that triggeres the rebalance.
* @param resourceStatePriorityMap the state priority map for each resource.
* @param baseline the baseline assignment
*/
private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
Map<String, Map<String, Integer>> resourceStatePriorityMap,
Map<String, ResourceAssignment> baseline) throws HelixRebalanceException {
Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
// Note that the calculation used the baseline as the input only. This is for minimizing
// unnecessary partition movement.
Map<String, ResourceAssignment> activeAssignment = calculateAssignment(clusterData,
clusterChanges, resourceMap, enabledLiveInstances, Collections.emptyMap(), baseline);
Map<String, IdealState> activeIdealStates = convertResourceAssignment(clusterData,
calculateAssignment(clusterData, clusterChanges, resourceMap, enabledLiveInstances,
Collections.emptyMap(), baseline));
for (String resourceName : idealStateMap.keySet()) {
IdealState is = idealStateMap.get(resourceName);
if (!activeAssignment.containsKey(resourceName)) {
// The new calculated ideal state before overwrite
IdealState newIdealState = idealStateMap.get(resourceName);
if (!activeIdealStates.containsKey(resourceName)) {
throw new HelixRebalanceException(
"Failed to calculate the complete partition assignment with all active nodes. Cannot find the resource assignment for "
+ resourceName,
HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+ resourceName, HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}
// The ideal state that is calculated based on the real alive/enabled instances list
IdealState newActiveIdealState = activeIdealStates.get(resourceName);
// The current ideal state that exists in the IdealState znode
IdealState currentIdealState = clusterData.getIdealState(resourceName);
IdealState newActiveIdealState =
generateIdealStateWithAssignment(resourceName, currentIdealState,
activeAssignment.get(resourceName), resourceStatePriorityMap.get(resourceName));

int numReplia = currentIdealState.getReplicaCount(enabledLiveInstances.size());
int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplia);
Map<String, List<String>> finalPreferenceLists =
DelayedRebalanceUtil.getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(),
is.getPreferenceLists(), enabledLiveInstances, Math.min(minActiveReplica, numReplia));

is.setPreferenceLists(finalPreferenceLists);
int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
int minActiveReplica =
DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplica);
Map<String, List<String>> finalPreferenceLists = DelayedRebalanceUtil
.getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(),
newIdealState.getPreferenceLists(), enabledLiveInstances,
Math.min(minActiveReplica, numReplica));

newIdealState.setPreferenceLists(finalPreferenceLists);
}
}

Expand All @@ -641,4 +660,8 @@ private void applyUserDefinedPreferenceList(ResourceConfig resourceConfig,
}
}
}

protected MetricCollector getMetricCollector() {
return _metricCollector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* @param <T> the type of the metric value
*/
public class SimpleDynamicMetric<T> extends DynamicMetric<T, T> {
private final String _metricName;
protected final String _metricName;

/**
* Instantiates a new Simple dynamic metric.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
*/

import javax.management.JMException;

import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.monitoring.metrics.implementation.RebalanceFailureCount;
import org.apache.helix.monitoring.metrics.implementation.RebalanceLatencyGauge;
import org.apache.helix.monitoring.metrics.model.CountMetric;
import org.apache.helix.monitoring.metrics.model.LatencyMetric;

public class WagedRebalancerMetricCollector extends MetricCollector {
Expand All @@ -38,7 +41,12 @@ public enum WagedRebalancerMetricNames {

// The following latency metrics are related to AssignmentMetadataStore
StateReadLatencyGauge,
StateWriteLatencyGauge
StateWriteLatencyGauge,

// Count of any rebalance compute failure.
// Note the rebalancer may still be able to return the last known-good assignment on a rebalance
// compute failure. And this fallback logic won't impact this counting.
RebalanceFailureCounter
}

public WagedRebalancerMetricCollector(String clusterName) throws JMException {
Expand All @@ -62,19 +70,26 @@ public WagedRebalancerMetricCollector() {
*/
private void createMetrics() {
// Define all metrics
LatencyMetric globalBaselineCalcLatencyGauge = new RebalanceLatencyGauge(
WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), getResetIntervalInMs());
LatencyMetric partialRebalanceLatencyGauge = new RebalanceLatencyGauge(
WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), getResetIntervalInMs());
LatencyMetric stateReadLatencyGauge = new RebalanceLatencyGauge(
WagedRebalancerMetricNames.StateReadLatencyGauge.name(), getResetIntervalInMs());
LatencyMetric stateWriteLatencyGauge = new RebalanceLatencyGauge(
WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), getResetIntervalInMs());
LatencyMetric globalBaselineCalcLatencyGauge =
new RebalanceLatencyGauge(WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(),
getResetIntervalInMs());
LatencyMetric partialRebalanceLatencyGauge =
new RebalanceLatencyGauge(WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(),
getResetIntervalInMs());
LatencyMetric stateReadLatencyGauge =
new RebalanceLatencyGauge(WagedRebalancerMetricNames.StateReadLatencyGauge.name(),
getResetIntervalInMs());
LatencyMetric stateWriteLatencyGauge =
new RebalanceLatencyGauge(WagedRebalancerMetricNames.StateWriteLatencyGauge.name(),
getResetIntervalInMs());
CountMetric calcFailureCount =
new RebalanceFailureCount(WagedRebalancerMetricNames.RebalanceFailureCounter.name());

// Add metrics to WagedRebalancerMetricCollector
addMetric(globalBaselineCalcLatencyGauge);
addMetric(partialRebalanceLatencyGauge);
addMetric(stateReadLatencyGauge);
addMetric(stateWriteLatencyGauge);
addMetric(calcFailureCount);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.apache.helix.monitoring.metrics.implementation;

import org.apache.helix.monitoring.metrics.model.CountMetric;

public class RebalanceFailureCount extends CountMetric {
/**
* Instantiates a new Simple dynamic metric.
*
* @param metricName the metric name
*/
public RebalanceFailureCount(String metricName) {
super(metricName, 0L);
}

@Override
public void increaseCount(long count) {
updateValue(getValue() + count);
}
}

0 comments on commit eb78edc

Please sign in to comment.