Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Dry-run Waged Rebalancer for the verifiers and tests. #573

Merged
merged 6 commits into from
Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.HashMap;
import java.util.Map;

import com.google.common.annotations.VisibleForTesting;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.BucketDataAccessor;
Expand All @@ -35,6 +34,7 @@
import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.ResourceAssignment;


/**
* A placeholder before we have the real assignment metadata store.
*/
Expand All @@ -49,14 +49,14 @@ public class AssignmentMetadataStore {
private BucketDataAccessor _dataAccessor;
private String _baselinePath;
private String _bestPossiblePath;
private Map<String, ResourceAssignment> _globalBaseline;
private Map<String, ResourceAssignment> _bestPossibleAssignment;
protected Map<String, ResourceAssignment> _globalBaseline;
protected Map<String, ResourceAssignment> _bestPossibleAssignment;

AssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
}

AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) {
protected AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) {
_dataAccessor = bucketDataAccessor;
_baselinePath = String.format(BASELINE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
_bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
Expand Down Expand Up @@ -153,8 +153,8 @@ private HelixProperty combineAssignments(String name,
HelixProperty property = new HelixProperty(name);
// Add each resource's assignment as a simple field in one ZNRecord
// Node that don't use Arrays.toString() for the record converting. The deserialize will fail.
assignmentMap.forEach((resource, assignment) -> property.getRecord().setSimpleField(resource,
new String(SERIALIZER.serialize(assignment.getRecord()))));
assignmentMap.forEach((resource, assignment) -> property.getRecord()
.setSimpleField(resource, new String(SERIALIZER.serialize(assignment.getRecord()))));
return property;
}

Expand All @@ -167,8 +167,8 @@ private Map<String, ResourceAssignment> splitAssignments(HelixProperty property)
Map<String, ResourceAssignment> assignmentMap = new HashMap<>();
// Convert each resource's assignment String into a ResourceAssignment object and put it in a
// map
property.getRecord().getSimpleFields()
.forEach((resource, assignmentStr) -> assignmentMap.put(resource,
property.getRecord().getSimpleFields().forEach((resource, assignmentStr) -> assignmentMap
.put(resource,
new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignmentStr.getBytes()))));
return assignmentMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixManager;
Expand Down Expand Up @@ -70,58 +71,70 @@ public class WagedRebalancer {
// When any of the following change happens, the rebalancer needs to do a global rebalance which
// contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
HelixConstants.ChangeType.IDEAL_STATE,
HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
ImmutableSet
.of(HelixConstants.ChangeType.RESOURCE_CONFIG, HelixConstants.ChangeType.IDEAL_STATE,
HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
// To identify if the preference has been configured or not.
private static final Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer>
NOT_CONFIGURED_PREFERENCE = ImmutableMap
.of(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, -1,
ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, -1);

private final ResourceChangeDetector _changeDetector;
private final HelixManager _manager;
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
private final AssignmentMetadataStore _assignmentMetadataStore;
private final RebalanceAlgorithm _rebalanceAlgorithm;
private MetricCollector _metricCollector;

private static AssignmentMetadataStore constructAssignmentStore(HelixManager helixManager) {
AssignmentMetadataStore assignmentMetadataStore = null;
if (helixManager != null) {
String metadataStoreAddrs = helixManager.getMetadataStoreConnectionString();
String clusterName = helixManager.getClusterName();
if (metadataStoreAddrs != null && clusterName != null) {
assignmentMetadataStore = new AssignmentMetadataStore(metadataStoreAddrs, clusterName);
}
private final MetricCollector _metricCollector;
private RebalanceAlgorithm _rebalanceAlgorithm;
private Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> _preference =
NOT_CONFIGURED_PREFERENCE;

private static AssignmentMetadataStore constructAssignmentStore(String metadataStoreAddrs,
String clusterName) {
if (metadataStoreAddrs != null && clusterName != null) {
return new AssignmentMetadataStore(metadataStoreAddrs, clusterName);
}
return assignmentMetadataStore;
return null;
}

public WagedRebalancer(HelixManager helixManager,
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences,
MetricCollector metricCollector) {
this(constructAssignmentStore(helixManager),
ConstraintBasedAlgorithmFactory.getInstance(preferences),
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference) {
this(helixManager == null ? null
: constructAssignmentStore(helixManager.getMetadataStoreConnectionString(),
helixManager.getClusterName()), ConstraintBasedAlgorithmFactory.getInstance(preference),
// Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
// Mapping calculator will translate the best possible assignment into the applicable state
// mapping based on the current states.
// TODO abstract and separate the main mapping calculator logic from DelayedAutoRebalancer
new DelayedAutoRebalancer(),
// Helix Manager is required for the rebalancer scheduler
helixManager, metricCollector);
helixManager,
// If HelixManager is null, we just pass in null for MetricCollector so that a
// non-functioning WagedRebalancerMetricCollector would be created in WagedRebalancer's
// constructor. This is to handle two cases: 1. HelixManager is null for non-testing cases -
// in this case, WagedRebalancer will not read/write to metadata store and just use
// CurrentState-based rebalancing. 2. Tests that require instrumenting the rebalancer for
// verifying whether the cluster has converged.
helixManager == null ? null
: new WagedRebalancerMetricCollector(helixManager.getClusterName()));
_preference = ImmutableMap.copyOf(preference);
}

/**
* This constructor will use null for HelixManager and MetricCollector. With null HelixManager,
* the rebalancer will rebalance solely based on CurrentStates. With null MetricCollector, the
* the rebalancer will not schedule for a future delayed rebalance. With null MetricCollector, the
* rebalancer will not emit JMX metrics.
* @param assignmentMetadataStore
* @param algorithm
* @param mappingCalculator
*/
protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) {
this(assignmentMetadataStore, algorithm, mappingCalculator, null, null);
RebalanceAlgorithm algorithm) {
this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, null);
}

/**
* This constructor will use null for HelixManager and MetricCollector. With null HelixManager,
* the rebalancer will rebalance solely based on CurrentStates.
* This constructor will use null for HelixManager. With null HelixManager, the rebalancer will
* not schedule for a future delayed rebalance.
* @param assignmentMetadataStore
* @param algorithm
* @param metricCollector
Expand Down Expand Up @@ -149,11 +162,25 @@ private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
_changeDetector = new ResourceChangeDetector(true);
}

// Update the rebalancer preference configuration if the new preference is different from the
// current preference configuration.
public void updatePreference(
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> newPreference) {
if (_preference.equals(NOT_CONFIGURED_PREFERENCE) || _preference.equals(newPreference)) {
// 1. if the preference was not configured during constructing, no need to update.
// 2. if the preference equals to the new preference, no need to update.
return;
}
_rebalanceAlgorithm = ConstraintBasedAlgorithmFactory.getInstance(newPreference);
_preference = ImmutableMap.copyOf(newPreference);
}

// Release all the resources.
public void close() {
if (_assignmentMetadataStore != null) {
_assignmentMetadataStore.close();
}
_metricCollector.unregister();
}

/**
Expand Down Expand Up @@ -231,9 +258,43 @@ public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvi
}

// Coordinate baseline recalculation and partial rebalance according to the cluster changes.
protected Map<String, IdealState> computeBestPossibleStates(
private Map<String, IdealState> computeBestPossibleStates(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
final CurrentStateOutput currentStateOutput) throws HelixRebalanceException {
final CurrentStateOutput currentStateOutput)
throws HelixRebalanceException {
Set<String> activeNodes = DelayedRebalanceUtil
.getActiveNodes(clusterData.getAllInstances(), clusterData.getEnabledLiveInstances(),
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), clusterData.getClusterConfig());

// Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());

Map<String, IdealState> newIdealStates = convertResourceAssignment(clusterData,
computeBestPossibleAssignment(clusterData, resourceMap, activeNodes, currentStateOutput));

// The additional rebalance overwrite is required since the calculated mapping may contain
// some delayed rebalanced assignments.
if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
applyRebalanceOverwrite(newIdealStates, clusterData, resourceMap,
getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet()));
}
// Replace the assignment if user-defined preference list is configured.
// Note the user-defined list is intentionally applied to the final mapping after calculation.
// This is to avoid persisting it into the assignment store, which impacts the long term
// assignment evenness and partition movements.
newIdealStates.entrySet().stream().forEach(idealStateEntry -> applyUserDefinedPreferenceList(
clusterData.getResourceConfig(idealStateEntry.getKey()), idealStateEntry.getValue()));

return newIdealStates;
}

// Coordinate baseline recalculation and partial rebalance according to the cluster changes.
protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Set<String> activeNodes, final CurrentStateOutput currentStateOutput)
throws HelixRebalanceException {
getChangeDetector().updateSnapshots(clusterData);
// Get all the changed items' information
Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
Expand All @@ -257,37 +318,11 @@ protected Map<String, IdealState> computeBestPossibleStates(
refreshBaseline(clusterData, clusterChanges, resourceMap, currentStateOutput);
}

Set<String> activeNodes = DelayedRebalanceUtil.getActiveNodes(clusterData.getAllInstances(),
clusterData.getEnabledLiveInstances(), clusterData.getInstanceOfflineTimeMap(),
clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(),
clusterData.getClusterConfig());

// Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());

// Perform partial rebalance
Map<String, ResourceAssignment> newAssignment =
partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes, currentStateOutput);

Map<String, IdealState> finalIdealStateMap =
convertResourceAssignment(clusterData, newAssignment);

// The additional rebalance overwrite is required since the calculated mapping may contains
// some delayed rebalanced assignments.
if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
applyRebalanceOverwrite(finalIdealStateMap, clusterData, resourceMap, clusterChanges,
getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet()));
}
// Replace the assignment if user-defined preference list is configured.
// Note the user-defined list is intentionally applied to the final mapping after calculation.
// This is to avoid persisting it into the assignment store, which impacts the long term
// assignment evenness and partition movements.
finalIdealStateMap.entrySet().stream()
.forEach(idealStateEntry -> applyUserDefinedPreferenceList(
clusterData.getResourceConfig(idealStateEntry.getKey()), idealStateEntry.getValue()));

return finalIdealStateMap;
return newAssignment;
}

/**
Expand Down Expand Up @@ -503,7 +538,7 @@ private void validateInput(ResourceControllerDataProvider clusterData,
Set<String> nonCompatibleResources = resourceMap.entrySet().stream().filter(resourceEntry -> {
IdealState is = clusterData.getIdealState(resourceEntry.getKey());
return is == null || !is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
|| !getClass().getName().equals(is.getRebalancerClassName());
|| !WagedRebalancer.class.getName().equals(is.getRebalancerClassName());
}).map(Map.Entry::getKey).collect(Collectors.toSet());
if (!nonCompatibleResources.isEmpty()) {
throw new HelixRebalanceException(String.format(
Expand Down Expand Up @@ -554,7 +589,7 @@ private Map<String, ResourceAssignment> getBaselineAssignment(
* assignmentMetadataStore, return the current state assignment.
* @throws HelixRebalanceException
*/
private Map<String, ResourceAssignment> getBestPossibleAssignment(
protected Map<String, ResourceAssignment> getBestPossibleAssignment(
AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput,
Set<String> resources) throws HelixRebalanceException {
Map<String, ResourceAssignment> currentBestAssignment = Collections.emptyMap();
Expand Down Expand Up @@ -614,18 +649,16 @@ private void delayedRebalanceSchedule(ResourceControllerDataProvider clusterData
* @param idealStateMap the calculated ideal states.
* @param clusterData the cluster data cache.
* @param resourceMap the rebalanaced resource map.
* @param clusterChanges the detected cluster changes that triggeres the rebalance.
* @param baseline the baseline assignment
*/
private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
Map<String, ResourceAssignment> baseline) throws HelixRebalanceException {
Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
// Note that the calculation used the baseline as the input only. This is for minimizing
// unnecessary partition movement.
Map<String, IdealState> activeIdealStates = convertResourceAssignment(clusterData,
calculateAssignment(clusterData, clusterChanges, resourceMap, enabledLiveInstances,
calculateAssignment(clusterData, Collections.emptyMap(), resourceMap, enabledLiveInstances,
Collections.emptyMap(), baseline));
for (String resourceName : idealStateMap.keySet()) {
// The new calculated ideal state before overwrite
Expand Down Expand Up @@ -664,6 +697,10 @@ private void applyUserDefinedPreferenceList(ResourceConfig resourceConfig,
}
}

protected AssignmentMetadataStore getAssignmentMetadataStore() {
return _assignmentMetadataStore;
}

protected MetricCollector getMetricCollector() {
return _metricCollector;
}
Expand Down
Loading