Skip to content

Commit

Permalink
Asynchronously calculating the Baseline (apache#632)
Browse files Browse the repository at this point in the history
* Enable the Baseline calculation to be asynchronously done.

This will greatly fasten the rebalance speed. Basically, the WAGED rebalancer will firstly partial rebalance to recover the invalid replica allocations (for example, the ones that are on a disabled instance). Then it calculates the new baseline by global rebalancing.
  • Loading branch information
jiajunwang authored and huizhilu committed Aug 16, 2020
1 parent bbeb4a4 commit 451726f
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,17 @@ public Map<String, ResourceAssignment> getBestPossibleAssignment() {
return _bestPossibleAssignment;
}

public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
/**
* @return true if a new baseline was persisted.
* @throws HelixException if the method failed to persist the baseline.
*/
// TODO: Enhance the return value so it is more intuitive to understand when the persist fails and
// TODO: when it is skipped.
public boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
// TODO: Make the write async?
// If baseline hasn't changed, skip writing to metadata store
if (compareAssignments(_globalBaseline, globalBaseline)) {
return;
return false;
}
// Persist to ZK
HelixProperty combinedAssignments = combineAssignments(BASELINE_KEY, globalBaseline);
Expand All @@ -109,14 +115,21 @@ public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {

// Update the in-memory reference
_globalBaseline = globalBaseline;
return true;
}

public void persistBestPossibleAssignment(
/**
* @return true if a new best possible assignment was persisted.
* @throws HelixException if the method failed to persist the baseline.
*/
// TODO: Enhance the return value so it is more intuitive to understand when the persist fails and
// TODO: when it is skipped.
public boolean persistBestPossibleAssignment(
Map<String, ResourceAssignment> bestPossibleAssignment) {
// TODO: Make the write async?
// If bestPossibleAssignment hasn't changed, skip writing to metadata store
if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) {
return;
return false;
}
// Persist to ZK
HelixProperty combinedAssignments =
Expand All @@ -130,6 +143,7 @@ public void persistBestPossibleAssignment(

// Update the in-memory reference
_bestPossibleAssignment = bestPossibleAssignment;
return true;
}

protected void finalize() {
Expand Down Expand Up @@ -179,7 +193,7 @@ private Map<String, ResourceAssignment> splitAssignments(HelixProperty property)
* @param newAssignment
* @return true if they are the same. False otherwise or oldAssignment is null
*/
private boolean compareAssignments(Map<String, ResourceAssignment> oldAssignment,
protected boolean compareAssignments(Map<String, ResourceAssignment> oldAssignment,
Map<String, ResourceAssignment> newAssignment) {
// If oldAssignment is null, that means that we haven't read from/written to
// the metadata store yet. In that case, we return false so that we write to metadata store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -54,6 +58,7 @@
import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
import org.apache.helix.monitoring.metrics.model.CountMetric;
import org.apache.helix.monitoring.metrics.model.LatencyMetric;
import org.apache.helix.util.RebalanceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -79,21 +84,25 @@ public class WagedRebalancer {
.of(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, -1,
ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, -1);

// To calculate the baseline asynchronously
private final ExecutorService _baselineCalculateExecutor;
private final ResourceChangeDetector _changeDetector;
private final HelixManager _manager;
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
private final AssignmentMetadataStore _assignmentMetadataStore;

private final MetricCollector _metricCollector;
private final CountMetric _rebalanceFailureCount;
private final CountMetric _globalBaselineCalcCounter;
private final LatencyMetric _globalBaselineCalcLatency;
private final CountMetric _baselineCalcCounter;
private final LatencyMetric _baselineCalcLatency;
private final LatencyMetric _writeLatency;
private final CountMetric _partialRebalanceCounter;
private final LatencyMetric _partialRebalanceLatency;
private final LatencyMetric _stateReadLatency;
private final BaselineDivergenceGauge _baselineDivergenceGauge;

private boolean _asyncGlobalRebalanceEnabled;

// Note, the rebalance algorithm field is mutable so it should not be directly referred except for
// the public method computeNewIdealStates.
private RebalanceAlgorithm _rebalanceAlgorithm;
Expand All @@ -109,7 +118,8 @@ private static AssignmentMetadataStore constructAssignmentStore(String metadataS
}

public WagedRebalancer(HelixManager helixManager,
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference) {
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference,
boolean isAsyncGlobalRebalanceEnabled) {
this(helixManager == null ? null
: constructAssignmentStore(helixManager.getMetadataStoreConnectionString(),
helixManager.getClusterName()), ConstraintBasedAlgorithmFactory.getInstance(preference),
Expand All @@ -127,7 +137,8 @@ public WagedRebalancer(HelixManager helixManager,
// CurrentState-based rebalancing. 2. Tests that require instrumenting the rebalancer for
// verifying whether the cluster has converged.
helixManager == null ? null
: new WagedRebalancerMetricCollector(helixManager.getClusterName()));
: new WagedRebalancerMetricCollector(helixManager.getClusterName()),
isAsyncGlobalRebalanceEnabled);
_preference = ImmutableMap.copyOf(preference);
}

Expand All @@ -140,7 +151,7 @@ public WagedRebalancer(HelixManager helixManager,
*/
protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm) {
this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, null);
this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, null, false);
}

/**
Expand All @@ -152,12 +163,13 @@ protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
*/
protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm, MetricCollector metricCollector) {
this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, metricCollector);
this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, metricCollector,
false);
}

private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, HelixManager manager,
MetricCollector metricCollector) {
MetricCollector metricCollector, boolean isAsyncGlobalRebalanceEnabled) {
if (assignmentMetadataStore == null) {
LOG.warn("Assignment Metadata Store is not configured properly."
+ " The rebalancer will not access the assignment store during the rebalance.");
Expand All @@ -174,10 +186,10 @@ private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
_rebalanceFailureCount = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(),
CountMetric.class);
_globalBaselineCalcCounter = _metricCollector.getMetric(
_baselineCalcCounter = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(),
CountMetric.class);
_globalBaselineCalcLatency = _metricCollector.getMetric(
_baselineCalcLatency = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge
.name(),
LatencyMetric.class);
Expand All @@ -199,23 +211,32 @@ private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
BaselineDivergenceGauge.class);

_changeDetector = new ResourceChangeDetector(true);

_baselineCalculateExecutor = Executors.newSingleThreadExecutor();
_asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
}

// Update the global rebalance mode to be asynchronous or synchronous
public void setGlobalRebalanceAsyncMode(boolean isAsyncGlobalRebalanceEnabled) {
_asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
}

// Update the rebalancer preference configuration if the new preference is different from the
// current preference configuration.
public void updatePreference(
// Update the rebalancer preference if the new options are different from the current preference.
public synchronized void updateRebalancePreference(
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> newPreference) {
if (_preference.equals(NOT_CONFIGURED_PREFERENCE) || _preference.equals(newPreference)) {
// 1. if the preference was not configured during constructing, no need to update.
// 2. if the preference equals to the new preference, no need to update.
return;
// 1. if the preference was not configured during constructing, no need to update.
// 2. if the preference equals to the new preference, no need to update.
if (!_preference.equals(NOT_CONFIGURED_PREFERENCE) && !_preference.equals(newPreference)) {
_rebalanceAlgorithm = ConstraintBasedAlgorithmFactory.getInstance(newPreference);
_preference = ImmutableMap.copyOf(newPreference);
}
_rebalanceAlgorithm = ConstraintBasedAlgorithmFactory.getInstance(newPreference);
_preference = ImmutableMap.copyOf(newPreference);
}

// Release all the resources.
public void close() {
if (_baselineCalculateExecutor != null) {
_baselineCalculateExecutor.shutdownNow();
}
if (_assignmentMetadataStore != null) {
_assignmentMetadataStore.close();
}
Expand Down Expand Up @@ -295,7 +316,7 @@ public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvi
return newIdealStates;
}

// Coordinate baseline recalculation and partial rebalance according to the cluster changes.
// Coordinate global rebalance and partial rebalance according to the cluster changes.
private Map<String, IdealState> computeBestPossibleStates(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
Expand Down Expand Up @@ -413,28 +434,59 @@ private void globalRebalance(ResourceControllerDataProvider clusterData,
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}

calculateAndUpdateBaseline(clusterModel, algorithm);
final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
final String clusterName = clusterData.getClusterName();
// Calculate the Baseline assignment for global rebalance.
Future<Boolean> result = _baselineCalculateExecutor.submit(() -> {
try {
// Note that we should schedule a new partial rebalance for a future rebalance pipeline if
// the planned partial rebalance in the current rebalance pipeline won't wait for the new
// baseline being calculated.
// So set shouldSchedulePartialRebalance to be !waitForGlobalRebalance
calculateAndUpdateBaseline(clusterModel, algorithm, !waitForGlobalRebalance, clusterName);
} catch (HelixRebalanceException e) {
LOG.error("Failed to calculate baseline assignment!", e);
return false;
}
return true;
});
if (waitForGlobalRebalance) {
try {
if (!result.get()) {
throw new HelixRebalanceException("Failed to calculate for the new Baseline.",
HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}
} catch (InterruptedException | ExecutionException e) {
throw new HelixRebalanceException("Failed to execute new Baseline calculation.",
HelixRebalanceException.Type.FAILED_TO_CALCULATE, e);
}
}
}
}

/**
* Calculate and update the Baseline assignment
* @param clusterModel
* @param algorithm
* @param shouldSchedulePartialRebalance True if the call should trigger a following partial rebalance
* so the new Baseline could be applied to cluster.
* @param clusterName
* @throws HelixRebalanceException
*/
private void calculateAndUpdateBaseline(ClusterModel clusterModel, RebalanceAlgorithm algorithm)
private void calculateAndUpdateBaseline(ClusterModel clusterModel, RebalanceAlgorithm algorithm,
boolean shouldSchedulePartialRebalance, String clusterName)
throws HelixRebalanceException {
LOG.info("Start calculating the new baseline.");
_globalBaselineCalcCounter.increment(1L);
_globalBaselineCalcLatency.startMeasuringLatency();
_baselineCalcCounter.increment(1L);
_baselineCalcLatency.startMeasuringLatency();

boolean isBaselineChanged = false;
Map<String, ResourceAssignment> newBaseline = calculateAssignment(clusterModel, algorithm);
// Write the new baseline to metadata store
if (_assignmentMetadataStore != null) {
try {
_writeLatency.startMeasuringLatency();
_assignmentMetadataStore.persistBaseline(newBaseline);
isBaselineChanged = _assignmentMetadataStore.persistBaseline(newBaseline);
_writeLatency.endMeasuringLatency();
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
Expand All @@ -443,8 +495,13 @@ private void calculateAndUpdateBaseline(ClusterModel clusterModel, RebalanceAlgo
} else {
LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
}
_globalBaselineCalcLatency.endMeasuringLatency();
LOG.info("Finish calculating the new baseline.");
_baselineCalcLatency.endMeasuringLatency();
LOG.info("Global baseline calculation completed and has been persisted into metadata store.");

if (isBaselineChanged && shouldSchedulePartialRebalance) {
LOG.info("Schedule a new rebalance after the new baseline calculation has finished.");
RebalanceUtil.scheduleOnDemandPipeline(clusterName, 0L, false);
}
}

private Map<String, ResourceAssignment> partialRebalance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,17 @@ public BestPossibleStateCalcStage(WagedRebalancer wagedRebalancer) {
}

private WagedRebalancer getWagedRebalancer(HelixManager helixManager,
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences,
boolean isAsyncGlobalRebalanceEnabled) {
// Create WagedRebalancer instance if it hasn't been already initialized
if (_wagedRebalancer == null) {
_wagedRebalancer = new WagedRebalancer(helixManager, preferences);
_wagedRebalancer =
new WagedRebalancer(helixManager, preferences, isAsyncGlobalRebalanceEnabled);
} else {
// Since the preference can be updated at runtime, try to update the algorithm preference
// before returning the rebalancer.
_wagedRebalancer.updatePreference(preferences);
// Since the rebalance configuration can be updated at runtime, try to update the rebalancer
// before returning.
_wagedRebalancer.updateRebalancePreference(preferences);
_wagedRebalancer.setGlobalRebalanceAsyncMode(isAsyncGlobalRebalanceEnabled);
}
return _wagedRebalancer;
}
Expand Down Expand Up @@ -281,8 +284,10 @@ private Map<String, Resource> computeResourceBestPossibleStateWithWagedRebalance

Map<String, IdealState> newIdealStates = new HashMap<>();

ClusterConfig clusterConfig = cache.getClusterConfig();
WagedRebalancer wagedRebalancer =
getWagedRebalancer(helixManager, cache.getClusterConfig().getGlobalRebalancePreference());
getWagedRebalancer(helixManager, clusterConfig.getGlobalRebalancePreference(),
clusterConfig.isGlobalRebalanceAsyncModeEnabled());
try {
newIdealStates.putAll(wagedRebalancer.computeNewIdealStates(cache, wagedRebalancedResourceMap,
currentStateOutput));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,16 +443,26 @@ private class ReadOnlyAssignmentMetadataStore extends AssignmentMetadataStore {
}

@Override
public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
public boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
// If baseline hasn't changed, skip writing to metadata store
if (compareAssignments(_globalBaseline, globalBaseline)) {
return false;
}
// Update the in-memory reference only
_globalBaseline = globalBaseline;
return true;
}

@Override
public void persistBestPossibleAssignment(
public boolean persistBestPossibleAssignment(
Map<String, ResourceAssignment> bestPossibleAssignment) {
// If bestPossibleAssignment hasn't changed, skip writing to metadata store
if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) {
return false;
}
// Update the in-memory reference only
_bestPossibleAssignment = bestPossibleAssignment;
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,19 @@ public Map<String, ResourceAssignment> getBaseline() {
return _persistGlobalBaseline;
}

public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
public boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
_persistGlobalBaseline = globalBaseline;
return true;
}

public Map<String, ResourceAssignment> getBestPossibleAssignment() {
return _persistBestPossibleAssignment;
}

public void persistBestPossibleAssignment(
public boolean persistBestPossibleAssignment(
Map<String, ResourceAssignment> bestPossibleAssignment) {
_persistBestPossibleAssignment = bestPossibleAssignment;
return true;
}

public void close() {
Expand Down
Loading

0 comments on commit 451726f

Please sign in to comment.