Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the remaining implementation of ConstraintBasedAlgorithmFactory #478

Merged
merged 6 commits into from
Sep 17, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
*/

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -29,7 +28,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
Expand All @@ -42,13 +40,17 @@
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;

/**
* Weight-Aware Globally-Even Distribute Rebalancer.
* @see <a
Expand All @@ -62,10 +64,8 @@ public class WagedRebalancer {
// When any of the following change happens, the rebalancer needs to do a global rebalance which
// contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
Collections
.unmodifiableSet(new HashSet<>(Arrays.asList(HelixConstants.ChangeType.RESOURCE_CONFIG,
HelixConstants.ChangeType.CLUSTER_CONFIG,
HelixConstants.ChangeType.INSTANCE_CONFIG)));
ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
// The cluster change detector is a stateful object.
// Make it static to avoid unnecessary reinitialization.
private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
Expand All @@ -79,13 +79,12 @@ public class WagedRebalancer {
private final RebalanceAlgorithm _rebalanceAlgorithm;
// ------------------------------------------------------------------------------------//

public WagedRebalancer(HelixManager helixManager) {
public WagedRebalancer(HelixManager helixManager,
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
this(
// TODO init the metadata store according to their requirement when integrate,
// or change to final static method if possible.
new AssignmentMetadataStore(helixManager),
// TODO parse the cluster setting
ConstraintBasedAlgorithmFactory.getInstance(),
new AssignmentMetadataStore(helixManager), ConstraintBasedAlgorithmFactory.getInstance(preferences),
// Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
// Mapping calculator will translate the best possible assignment into the applicable state
// mapping based on the current states.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,12 @@
class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
private static final Logger LOG = LoggerFactory.getLogger(ConstraintBasedAlgorithm.class);
private final List<HardConstraint> _hardConstraints;
private final List<SoftConstraint> _softConstraints;
private final SoftConstraintWeightModel _softConstraintsWeightModel;
private final Map<SoftConstraint, Float> _softConstraints;

ConstraintBasedAlgorithm(List<HardConstraint> hardConstraints,
List<SoftConstraint> softConstraints, SoftConstraintWeightModel softConstraintWeightModel) {
Map<SoftConstraint, Float> softConstraints) {
_hardConstraints = hardConstraints;
_softConstraints = softConstraints;
_softConstraintsWeightModel = softConstraintWeightModel;
}

@Override
Expand Down Expand Up @@ -115,13 +113,22 @@ private Optional<AssignableNode> getNodeWithHighestPoints(AssignableReplica repl
}

Function<AssignableNode, Float> calculatePoints =
(candidateNode) -> _softConstraintsWeightModel.getSumOfScores(_softConstraints.stream()
.collect(Collectors.toMap(Function.identity(), softConstraint -> softConstraint
.getAssignmentNormalizedScore(candidateNode, replica, clusterContext))));
(candidateNode) -> getAssignmentNormalizedScore(candidateNode, replica, clusterContext);

return candidateNodes.stream().max(Comparator.comparing(calculatePoints));
}

private float getAssignmentNormalizedScore(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
float sum = 0;
for (Map.Entry<SoftConstraint, Float> softConstraintEntry : _softConstraints.entrySet()) {
SoftConstraint softConstraint = softConstraintEntry.getKey();
float weight = softConstraintEntry.getValue();
sum += weight * softConstraint.getAssignmentNormalizedScore(node, replica, clusterContext);
}
return sum;
}

private List<String> convertFailureReasons(List<HardConstraint> hardConstraints) {
return hardConstraints.stream().map(HardConstraint::getDescription)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,41 @@
* under the License.
*/

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
import org.apache.helix.model.ClusterConfig;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

/**
* The factory class to create an instance of {@link ConstraintBasedAlgorithm}
*/
public class ConstraintBasedAlgorithmFactory {

// TODO: the parameter comes from cluster config, will tune how these 2 integers will change the
// soft constraint weight model
public static RebalanceAlgorithm getInstance() {
// TODO initialize constraints, depending on constraints implementations PRs
List<HardConstraint> hardConstraints = new ArrayList<>();
List<SoftConstraint> softConstraints = new ArrayList<>();
SoftConstraintWeightModel softConstraintWeightModel = new SoftConstraintWeightModel();
public static RebalanceAlgorithm getInstance(
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
List<HardConstraint> hardConstraints =
ImmutableList.of(new FaultZoneAwareConstraint(), new NodeCapacityConstraint(),
new ReplicaActivateConstraint(), new NodeMaxPartitionLimitConstraint(),
new ValidGroupTagConstraint(), new SamePartitionOnInstanceConstraint());

int evennessPreference =
preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 1);
int movementPreference =
preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1);
float evennessRatio = (float) evennessPreference / (evennessPreference + movementPreference);
float movementRatio = (float) movementPreference / (evennessPreference + movementPreference);

Map<SoftConstraint, Float> softConstraints = ImmutableMap.<SoftConstraint, Float> builder()
.put(new PartitionMovementConstraint(), movementRatio * 0.5f)
i3wangyi marked this conversation as resolved.
Show resolved Hide resolved
.put(new InstancePartitionsCountConstraint(), 0.5f * evennessRatio)
i3wangyi marked this conversation as resolved.
Show resolved Hide resolved
.put(new ResourcePartitionAntiAffinityConstraint(), 0.1f * evennessRatio)
.put(new ResourceTopStateAntiAffinityConstraint(), 0.1f * evennessRatio)
.put(new MaxCapacityUsageInstanceConstraint(), 0.25f * evennessRatio).build();

return new ConstraintBasedAlgorithm(hardConstraints, softConstraints,
softConstraintWeightModel);
return new ConstraintBasedAlgorithm(hardConstraints, softConstraints);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.MaintenanceSignal;
Expand Down Expand Up @@ -121,7 +122,9 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource
// configured to use the WAGED rebalancer.
// For the other resources, the legacy rebalancers will be triggered in the next step.
Map<String, IdealState> newIdealStates = new HashMap<>();
WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager);
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences = cache.getClusterConfig()
.getGlobalRebalancePreference();
WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, preferences);
try {
newIdealStates
.putAll(wagedRebalancer.computeNewIdealStates(cache, resourceMap, currentStateOutput));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.testng.annotations.Test;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

public class TestConstraintBasedAlgorithm {
private ConstraintBasedAlgorithm _algorithm;
Expand All @@ -42,12 +43,11 @@ public class TestConstraintBasedAlgorithm {
public void beforeMethod() {
HardConstraint mockHardConstraint = mock(HardConstraint.class);
SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class);
when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(false);
when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f);

_algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel);
ImmutableMap.of(mockSoftConstraint, 1f));
}

@Test(expectedExceptions = HelixRebalanceException.class)
Expand All @@ -60,12 +60,10 @@ public void testCalculateNoValidAssignment() throws IOException, HelixRebalanceE
public void testCalculateWithValidAssignment() throws IOException, HelixRebalanceException {
HardConstraint mockHardConstraint = mock(HardConstraint.class);
SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class);
when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(true);
when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f);
when(mockSoftConstraintWeightModel.getSumOfScores(any())).thenReturn(1.0f);
_algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel);
ImmutableMap.of(mockSoftConstraint, 1f));
ClusterModel clusterModel = new ClusterModelTestHelper().getDefaultClusterModel();
OptimalAssignment optimalAssignment = _algorithm.calculate(clusterModel);

Expand Down