Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronously calculating the Baseline #632

Merged
merged 5 commits into from
Dec 21, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,14 @@ public Map<String, ResourceAssignment> getBestPossibleAssignment() {
return _bestPossibleAssignment;
}

public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
/**
* @return true if a new baseline was persisted.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The java doc is incomplete.

  1. No mention of the input parameter
  2. parameter name is inconsistent with method name
  3. I feel a more generic persistAssignment(Map<String, ResourceAssignment> assignments) is sufficient. Keep the difference implemented elsewhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter is very obvious, I think saying that I'm passing a global baseline assignment by a parameter named globalBaseline is too verbose.
It is debatable if we should name it baseline or global baseline. I'm fine with both. But it is not the key part of this PR, so I don't want to extend my change's scope. Please feel free to change it.

One generic method is not enough. Unless we add an enum to point which type of assignment it is. I'd prefer to keep this unchanged.

*/
public boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
// TODO: Make the write async?
// If baseline hasn't changed, skip writing to metadata store
if (compareAssignments(_globalBaseline, globalBaseline)) {
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
return;
return false;
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
}
// Persist to ZK
HelixProperty combinedAssignments = combineAssignments(BASELINE_KEY, globalBaseline);
Expand All @@ -109,14 +112,18 @@ public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {

// Update the in-memory reference
_globalBaseline = globalBaseline;
return true;
}

public void persistBestPossibleAssignment(
/**
* @return true if a new best possible assignment was persisted.
*/
public boolean persistBestPossibleAssignment(
Map<String, ResourceAssignment> bestPossibleAssignment) {
// TODO: Make the write async?
// If bestPossibleAssignment hasn't changed, skip writing to metadata store
if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) {
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
return;
return false;
}
// Persist to ZK
HelixProperty combinedAssignments =
Expand All @@ -130,6 +137,7 @@ public void persistBestPossibleAssignment(

// Update the in-memory reference
_bestPossibleAssignment = bestPossibleAssignment;
return true;
}

protected void finalize() {
Expand Down Expand Up @@ -179,7 +187,7 @@ private Map<String, ResourceAssignment> splitAssignments(HelixProperty property)
* @param newAssignment
* @return true if they are the same. False otherwise or oldAssignment is null
*/
private boolean compareAssignments(Map<String, ResourceAssignment> oldAssignment,
protected boolean compareAssignments(Map<String, ResourceAssignment> oldAssignment,
Map<String, ResourceAssignment> newAssignment) {
// If oldAssignment is null, that means that we haven't read from/written to
// the metadata store yet. In that case, we return false so that we write to metadata store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -54,6 +58,7 @@
import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
import org.apache.helix.monitoring.metrics.model.CountMetric;
import org.apache.helix.monitoring.metrics.model.LatencyMetric;
import org.apache.helix.util.RebalanceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -79,6 +84,8 @@ public class WagedRebalancer {
.of(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, -1,
ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, -1);

// To calculate the baseline asynchronously
private final ExecutorService _baselineCalculateExecutor;
private final ResourceChangeDetector _changeDetector;
private final HelixManager _manager;
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
Expand All @@ -94,6 +101,8 @@ public class WagedRebalancer {
private final LatencyMetric _stateReadLatency;
private final BaselineDivergenceGauge _baselineDivergenceGauge;

private boolean _asyncBaselineCalculation;

// Note, the rebalance algorithm field is mutable so it should not be directly referred except for
// the public method computeNewIdealStates.
private RebalanceAlgorithm _rebalanceAlgorithm;
Expand All @@ -109,7 +118,8 @@ private static AssignmentMetadataStore constructAssignmentStore(String metadataS
}

public WagedRebalancer(HelixManager helixManager,
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference) {
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference,
boolean isAsyncGlobalRebalance) {
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
this(helixManager == null ? null
: constructAssignmentStore(helixManager.getMetadataStoreConnectionString(),
helixManager.getClusterName()), ConstraintBasedAlgorithmFactory.getInstance(preference),
Expand All @@ -127,7 +137,8 @@ public WagedRebalancer(HelixManager helixManager,
// CurrentState-based rebalancing. 2. Tests that require instrumenting the rebalancer for
// verifying whether the cluster has converged.
helixManager == null ? null
: new WagedRebalancerMetricCollector(helixManager.getClusterName()));
: new WagedRebalancerMetricCollector(helixManager.getClusterName()),
isAsyncGlobalRebalance);
_preference = ImmutableMap.copyOf(preference);
}

Expand All @@ -140,7 +151,7 @@ public WagedRebalancer(HelixManager helixManager,
*/
protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm) {
this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, null);
this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, null, false);
}

/**
Expand All @@ -152,12 +163,13 @@ protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
*/
protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm, MetricCollector metricCollector) {
this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, metricCollector);
this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, metricCollector,
false);
}

private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, HelixManager manager,
MetricCollector metricCollector) {
MetricCollector metricCollector, boolean asyncBaselineCalculation) {
if (assignmentMetadataStore == null) {
LOG.warn("Assignment Metadata Store is not configured properly."
+ " The rebalancer will not access the assignment store during the rebalance.");
Expand Down Expand Up @@ -199,23 +211,30 @@ private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
BaselineDivergenceGauge.class);

_changeDetector = new ResourceChangeDetector(true);

_baselineCalculateExecutor = Executors.newSingleThreadExecutor();
_asyncBaselineCalculation = asyncBaselineCalculation;
}

// Update the rebalancer preference configuration if the new preference is different from the
// current preference configuration.
public void updatePreference(
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> newPreference) {
if (_preference.equals(NOT_CONFIGURED_PREFERENCE) || _preference.equals(newPreference)) {
// 1. if the preference was not configured during constructing, no need to update.
// 2. if the preference equals to the new preference, no need to update.
return;
// Update the rebalancer configuration if the new options are different from the current
// configuration.
public synchronized void updateRebalanceOptions(
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> newPreference,
boolean asyncBaselineCalculation) {
_asyncBaselineCalculation = asyncBaselineCalculation;
// 1. if the preference was not configured during constructing, no need to update.
// 2. if the preference equals to the new preference, no need to update.
if (!_preference.equals(NOT_CONFIGURED_PREFERENCE) && !_preference.equals(newPreference)) {
_rebalanceAlgorithm = ConstraintBasedAlgorithmFactory.getInstance(newPreference);
_preference = ImmutableMap.copyOf(newPreference);
}
_rebalanceAlgorithm = ConstraintBasedAlgorithmFactory.getInstance(newPreference);
_preference = ImmutableMap.copyOf(newPreference);
}

// Release all the resources.
public void close() {
if (_baselineCalculateExecutor != null) {
_baselineCalculateExecutor.shutdown();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will shut the pool down, but perhaps we should follow what Oracle recommends to shut down an ExecutorService?

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is to ensure the shutdown also cancel the ongoing runnable. My original thought is that we don't cancel the ongoing calculation. Since there are at most one in running state. So it should not take a long time. In addition, since the change detector already finds something so we started calculating a new baseline, it might be worthy to finish the process instead of just ignore those changes.
What do you think?

if (_assignmentMetadataStore != null) {
_assignmentMetadataStore.close();
}
Expand Down Expand Up @@ -413,28 +432,57 @@ private void globalRebalance(ResourceControllerDataProvider clusterData,
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}

calculateAndUpdateBaseline(clusterModel, algorithm);
final boolean waitForGlobalRebalance = !_asyncBaselineCalculation;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to use a consistent boolean variable, _waitForGlobalRebalance is sufficient without _asyncBaselineCalculation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_waitForGlobalRebalance at the rebalancer level does not make much sense.
And we need to pass it into 2 methods with different values. To ensure these 2 methods get the same boolean value, we need to assign it to some final local variable.

The alternative way is localAsyncBoolean = _asyncBoolean;
But I feel changing the name a little bit would help itself explain.

final String clusterName = clusterData.getClusterName();
// Calculate the Baseline assignment for global rebalance.
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
Future<Boolean> result = _baselineCalculateExecutor.submit(() -> {
try {
// Note that we should schedule a new partial rebalance if the following calculation does
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
// not wait until the new baseline is calculated.
// So set doSchedulePartialRebalance to be !waitForGlobalRebalance
calculateAndUpdateBaseline(clusterModel, algorithm, !waitForGlobalRebalance, clusterName);
} catch (HelixRebalanceException e) {
LOG.error("Failed to calculate baseline assignment!", e);
return false;
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
}
return true;
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
});
if (waitForGlobalRebalance) {
try {
if (!result.get()) {
throw new HelixRebalanceException("Failed to calculate for the new Baseline.",
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}
} catch (InterruptedException | ExecutionException e) {
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
throw new HelixRebalanceException("Failed to execute new Baseline calculation.",
HelixRebalanceException.Type.FAILED_TO_CALCULATE, e);
}
}
}
}

/**
* Calculate and update the Baseline assignment
* @param clusterModel
* @param algorithm
* @param doSchedulePartialRebalance True if the call should trigger a following partial rebalance
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
* so the new Baseline could be applied to cluster.
* @param clusterName
* @throws HelixRebalanceException
*/
private void calculateAndUpdateBaseline(ClusterModel clusterModel, RebalanceAlgorithm algorithm)
private void calculateAndUpdateBaseline(ClusterModel clusterModel, RebalanceAlgorithm algorithm,
boolean doSchedulePartialRebalance, String clusterName)
throws HelixRebalanceException {
LOG.info("Start calculating the new baseline.");
_globalBaselineCalcCounter.increment(1L);
_globalBaselineCalcLatency.startMeasuringLatency();

boolean isbaselineUpdated = false;
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
Map<String, ResourceAssignment> newBaseline = calculateAssignment(clusterModel, algorithm);
// Write the new baseline to metadata store
if (_assignmentMetadataStore != null) {
try {
_writeLatency.startMeasuringLatency();
_assignmentMetadataStore.persistBaseline(newBaseline);
isbaselineUpdated = _assignmentMetadataStore.persistBaseline(newBaseline);
_writeLatency.endMeasuringLatency();
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
Expand All @@ -445,6 +493,11 @@ private void calculateAndUpdateBaseline(ClusterModel clusterModel, RebalanceAlgo
}
_globalBaselineCalcLatency.endMeasuringLatency();
LOG.info("Finish calculating the new baseline.");

if (isbaselineUpdated && doSchedulePartialRebalance) {
LOG.info("Schedule a new rebalance after the new baseline calculated.");
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
RebalanceUtil.scheduleOnDemandPipeline(clusterName, 0l, false);
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
}
}

private Map<String, ResourceAssignment> partialRebalance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,15 @@ public BestPossibleStateCalcStage(WagedRebalancer wagedRebalancer) {
}

private WagedRebalancer getWagedRebalancer(HelixManager helixManager,
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences,
boolean isAsyncGlobalRebalance) {
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
// Create WagedRebalancer instance if it hasn't been already initialized
if (_wagedRebalancer == null) {
_wagedRebalancer = new WagedRebalancer(helixManager, preferences);
_wagedRebalancer = new WagedRebalancer(helixManager, preferences, isAsyncGlobalRebalance);
} else {
// Since the preference can be updated at runtime, try to update the algorithm preference
// before returning the rebalancer.
_wagedRebalancer.updatePreference(preferences);
// Since the rebalance configure can be updated at runtime, try to update the rebalancer
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
// before returning.
_wagedRebalancer.updateRebalanceOptions(preferences, isAsyncGlobalRebalance);
}
return _wagedRebalancer;
}
Expand Down Expand Up @@ -281,8 +282,10 @@ private Map<String, Resource> computeResourceBestPossibleStateWithWagedRebalance

Map<String, IdealState> newIdealStates = new HashMap<>();

ClusterConfig clusterConfig = cache.getClusterConfig();
WagedRebalancer wagedRebalancer =
getWagedRebalancer(helixManager, cache.getClusterConfig().getGlobalRebalancePreference());
getWagedRebalancer(helixManager, clusterConfig.getGlobalRebalancePreference(),
clusterConfig.isGlobalRebalanceAsyncModeEnabled());
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
try {
newIdealStates.putAll(wagedRebalancer.computeNewIdealStates(cache, wagedRebalancedResourceMap,
currentStateOutput));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,16 +443,26 @@ private class ReadOnlyAssignmentMetadataStore extends AssignmentMetadataStore {
}

@Override
public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
public boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
// If baseline hasn't changed, skip writing to metadata store
if (compareAssignments(_globalBaseline, globalBaseline)) {
return false;
}
// Update the in-memory reference only
_globalBaseline = globalBaseline;
return true;
}

@Override
public void persistBestPossibleAssignment(
public boolean persistBestPossibleAssignment(
Map<String, ResourceAssignment> bestPossibleAssignment) {
// If bestPossibleAssignment hasn't changed, skip writing to metadata store
if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) {
return false;
}
// Update the in-memory reference only
_bestPossibleAssignment = bestPossibleAssignment;
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,19 @@ public Map<String, ResourceAssignment> getBaseline() {
return _persistGlobalBaseline;
}

public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
public boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
_persistGlobalBaseline = globalBaseline;
return true;
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
}

public Map<String, ResourceAssignment> getBestPossibleAssignment() {
return _persistBestPossibleAssignment;
}

public void persistBestPossibleAssignment(
public boolean persistBestPossibleAssignment(
Map<String, ResourceAssignment> bestPossibleAssignment) {
_persistBestPossibleAssignment = bestPossibleAssignment;
return true;
}

public void close() {
Expand Down
Loading