Skip to content

Commit

Permalink
Add the remaining implementation of ConstraintBasedAlgorithmFactory (#…
Browse files Browse the repository at this point in the history
…478)

Implementation of ConstraintBasedAlgorithmFactory and the soft constraint weight model.
Remove SoftConstraintWeightModel class.
Get the rebalance preference and adjust the corresponding weight.
Pass the preference keys instead of cluster config.
  • Loading branch information
i3wangyi authored and Jiajun Wang committed Dec 12, 2019
1 parent fbfb893 commit 95a2e4e
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
*/

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -29,7 +28,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
Expand All @@ -42,13 +40,17 @@
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;

/**
* Weight-Aware Globally-Even Distribute Rebalancer.
* @see <a
Expand All @@ -62,10 +64,8 @@ public class WagedRebalancer {
// When any of the following change happens, the rebalancer needs to do a global rebalance which
// contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
Collections
.unmodifiableSet(new HashSet<>(Arrays.asList(HelixConstants.ChangeType.RESOURCE_CONFIG,
HelixConstants.ChangeType.CLUSTER_CONFIG,
HelixConstants.ChangeType.INSTANCE_CONFIG)));
ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
// The cluster change detector is a stateful object.
// Make it static to avoid unnecessary reinitialization.
private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
Expand All @@ -79,13 +79,12 @@ public class WagedRebalancer {
private final RebalanceAlgorithm _rebalanceAlgorithm;
// ------------------------------------------------------------------------------------//

public WagedRebalancer(HelixManager helixManager) {
public WagedRebalancer(HelixManager helixManager,
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
this(
// TODO init the metadata store according to their requirement when integrate,
// or change to final static method if possible.
new AssignmentMetadataStore(helixManager),
// TODO parse the cluster setting
ConstraintBasedAlgorithmFactory.getInstance(),
new AssignmentMetadataStore(helixManager), ConstraintBasedAlgorithmFactory.getInstance(preferences),
// Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
// Mapping calculator will translate the best possible assignment into the applicable state
// mapping based on the current states.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,12 @@
class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
private static final Logger LOG = LoggerFactory.getLogger(ConstraintBasedAlgorithm.class);
private final List<HardConstraint> _hardConstraints;
private final List<SoftConstraint> _softConstraints;
private final SoftConstraintWeightModel _softConstraintsWeightModel;
private final Map<SoftConstraint, Float> _softConstraints;

ConstraintBasedAlgorithm(List<HardConstraint> hardConstraints,
List<SoftConstraint> softConstraints, SoftConstraintWeightModel softConstraintWeightModel) {
Map<SoftConstraint, Float> softConstraints) {
_hardConstraints = hardConstraints;
_softConstraints = softConstraints;
_softConstraintsWeightModel = softConstraintWeightModel;
}

@Override
Expand Down Expand Up @@ -115,13 +113,22 @@ private Optional<AssignableNode> getNodeWithHighestPoints(AssignableReplica repl
}

Function<AssignableNode, Float> calculatePoints =
(candidateNode) -> _softConstraintsWeightModel.getSumOfScores(_softConstraints.stream()
.collect(Collectors.toMap(Function.identity(), softConstraint -> softConstraint
.getAssignmentNormalizedScore(candidateNode, replica, clusterContext))));
(candidateNode) -> getAssignmentNormalizedScore(candidateNode, replica, clusterContext);

return candidateNodes.stream().max(Comparator.comparing(calculatePoints));
}

private float getAssignmentNormalizedScore(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
float sum = 0;
for (Map.Entry<SoftConstraint, Float> softConstraintEntry : _softConstraints.entrySet()) {
SoftConstraint softConstraint = softConstraintEntry.getKey();
float weight = softConstraintEntry.getValue();
sum += weight * softConstraint.getAssignmentNormalizedScore(node, replica, clusterContext);
}
return sum;
}

private List<String> convertFailureReasons(List<HardConstraint> hardConstraints) {
return hardConstraints.stream().map(HardConstraint::getDescription)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,41 @@
* under the License.
*/

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
import org.apache.helix.model.ClusterConfig;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

/**
* The factory class to create an instance of {@link ConstraintBasedAlgorithm}
*/
public class ConstraintBasedAlgorithmFactory {

// TODO: the parameter comes from cluster config, will tune how these 2 integers will change the
// soft constraint weight model
public static RebalanceAlgorithm getInstance() {
// TODO initialize constraints, depending on constraints implementations PRs
List<HardConstraint> hardConstraints = new ArrayList<>();
List<SoftConstraint> softConstraints = new ArrayList<>();
SoftConstraintWeightModel softConstraintWeightModel = new SoftConstraintWeightModel();
public static RebalanceAlgorithm getInstance(
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
List<HardConstraint> hardConstraints =
ImmutableList.of(new FaultZoneAwareConstraint(), new NodeCapacityConstraint(),
new ReplicaActivateConstraint(), new NodeMaxPartitionLimitConstraint(),
new ValidGroupTagConstraint(), new SamePartitionOnInstanceConstraint());

int evennessPreference =
preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 1);
int movementPreference =
preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1);
float evennessRatio = (float) evennessPreference / (evennessPreference + movementPreference);
float movementRatio = (float) movementPreference / (evennessPreference + movementPreference);

Map<SoftConstraint, Float> softConstraints = ImmutableMap.<SoftConstraint, Float> builder()
.put(new PartitionMovementConstraint(), movementRatio)
.put(new InstancePartitionsCountConstraint(), 0.3f * evennessRatio)
.put(new ResourcePartitionAntiAffinityConstraint(), 0.1f * evennessRatio)
.put(new ResourceTopStateAntiAffinityConstraint(), 0.1f * evennessRatio)
.put(new MaxCapacityUsageInstanceConstraint(), 0.5f * evennessRatio).build();

return new ConstraintBasedAlgorithm(hardConstraints, softConstraints,
softConstraintWeightModel);
return new ConstraintBasedAlgorithm(hardConstraints, softConstraints);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.MaintenanceSignal;
Expand Down Expand Up @@ -121,7 +122,9 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource
// configured to use the WAGED rebalancer.
// For the other resources, the legacy rebalancers will be triggered in the next step.
Map<String, IdealState> newIdealStates = new HashMap<>();
WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager);
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences = cache.getClusterConfig()
.getGlobalRebalancePreference();
WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, preferences);
try {
newIdealStates
.putAll(wagedRebalancer.computeNewIdealStates(cache, resourceMap, currentStateOutput));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.testng.annotations.Test;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

public class TestConstraintBasedAlgorithm {
private ConstraintBasedAlgorithm _algorithm;
Expand All @@ -42,12 +43,11 @@ public class TestConstraintBasedAlgorithm {
public void beforeMethod() {
HardConstraint mockHardConstraint = mock(HardConstraint.class);
SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class);
when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(false);
when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f);

_algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel);
ImmutableMap.of(mockSoftConstraint, 1f));
}

@Test(expectedExceptions = HelixRebalanceException.class)
Expand All @@ -60,12 +60,10 @@ public void testCalculateNoValidAssignment() throws IOException, HelixRebalanceE
public void testCalculateWithValidAssignment() throws IOException, HelixRebalanceException {
HardConstraint mockHardConstraint = mock(HardConstraint.class);
SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class);
when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(true);
when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f);
when(mockSoftConstraintWeightModel.getSumOfScores(any())).thenReturn(1.0f);
_algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel);
ImmutableMap.of(mockSoftConstraint, 1f));
ClusterModel clusterModel = new ClusterModelTestHelper().getDefaultClusterModel();
OptimalAssignment optimalAssignment = _algorithm.calculate(clusterModel);

Expand Down

0 comments on commit 95a2e4e

Please sign in to comment.