From 75638e2dd6c91cb5eff0872cb0e2b1e9f4fac80d Mon Sep 17 00:00:00 2001 From: Yi Wang Date: Tue, 10 Sep 2019 15:37:15 -0700 Subject: [PATCH] Implement one of the soft constraints (#450) Implement Instance Partitions Count soft constraint. Evaluate by instance's current partition count versus estimated max partition count. Intuitively, Encourage the assignment if the instance's occupancy rate is below average; Discourage the assignment if the instance's occupancy rate is above average. The final normalized score will be within [0, 1]. The implementation of the class will depend on the cluster current total partitions count as the max score. --- .../constraints/ConstraintBasedAlgorithm.java | 2 +- .../InstancePartitionsCountConstraint.java | 47 +++++++++++++++ .../LeastPartitionCountConstraint.java | 53 ---------------- .../waged/constraints/SoftConstraint.java | 40 +++++++++---- .../SoftConstraintWeightModel.java | 12 ++-- .../waged/model/ClusterContext.java | 11 ++-- .../TestConstraintBasedAlgorithm.java | 4 +- ...TestInstancePartitionsCountConstraint.java | 60 +++++++++++++++++++ .../TestSoftConstraintNormalizeFunction.java | 47 +++++++++++++++ 9 files changed, 196 insertions(+), 80 deletions(-) create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/InstancePartitionsCountConstraint.java delete mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/LeastPartitionCountConstraint.java create mode 100644 helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestInstancePartitionsCountConstraint.java create mode 100644 helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSoftConstraintNormalizeFunction.java diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java index 99d8d2aa77..479fb78d1f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java @@ -117,7 +117,7 @@ private Optional getNodeWithHighestPoints(AssignableReplica repl Function calculatePoints = (candidateNode) -> _softConstraintsWeightModel.getSumOfScores(_softConstraints.stream() .collect(Collectors.toMap(Function.identity(), softConstraint -> softConstraint - .getAssignmentOriginScore(candidateNode, replica, clusterContext)))); + .getAssignmentNormalizedScore(candidateNode, replica, clusterContext)))); return candidateNodes.stream().max(Comparator.comparing(calculatePoints)); } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/InstancePartitionsCountConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/InstancePartitionsCountConstraint.java new file mode 100644 index 0000000000..ca05cf8429 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/InstancePartitionsCountConstraint.java @@ -0,0 +1,47 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; + +/** + * Evaluate by instance's current partition count versus estimated max partition count + * Intuitively, Encourage the assignment if the instance's occupancy rate is below average; + * Discourage the assignment if the instance's occupancy rate is above average + */ +class InstancePartitionsCountConstraint extends SoftConstraint { + private static final float MAX_SCORE = 1f; + private static final float MIN_SCORE = 0f; + + InstancePartitionsCountConstraint() { + super(MAX_SCORE, MIN_SCORE); + } + + @Override + protected float getAssignmentScore(AssignableNode node, AssignableReplica replica, + ClusterContext clusterContext) { + float doubleEstimatedMaxPartitionCount = 2 * clusterContext.getEstimatedMaxPartitionCount(); + float currentPartitionCount = node.getAssignedReplicaCount(); + return Math.max((doubleEstimatedMaxPartitionCount - currentPartitionCount) + / doubleEstimatedMaxPartitionCount, 0); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/LeastPartitionCountConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/LeastPartitionCountConstraint.java deleted file mode 100644 index a8d36dbc28..0000000000 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/LeastPartitionCountConstraint.java +++ /dev/null @@ -1,53 +0,0 @@ -package org.apache.helix.controller.rebalancer.waged.constraints; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; -import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; -import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; - -/** - * Evaluate the proposed assignment according to the instance's partition count. - */ -class LeastPartitionCountConstraint extends SoftConstraint { - static LeastPartitionCountConstraint INSTANCE = new LeastPartitionCountConstraint(); - - private LeastPartitionCountConstraint() { - } - - /** - * Returns a score depending on the number of assignments on this node. The score is scaled evenly - * between the minScore and maxScore. - * When the node is idle, return with the maxScore. - * When the node usage reaches the estimated max partition, return with (minScore + maxScore ) / - * 2. - * When the node usage reaches 2 * estimated_max or more, return with the minScore. - * If the estimated max partition count is not set, it defaults to Integer.MAX_VALUE in - * clusterContext. - */ - @Override - float getAssignmentOriginScore(AssignableNode node, AssignableReplica replica, - ClusterContext clusterContext) { - throw new UnsupportedOperationException("The POC implementation has a bug, will fix it as TODO"); -// float doubleMaxPartitionCount = 2.0f * clusterContext.getEstimatedMaxPartitionCount(); -// int curPartitionCount = node.getCurrentAssignmentCount(); -// return Math.max((doubleMaxPartitionCount - curPartitionCount) / doubleMaxPartitionCount, 0); - } -} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraint.java index db145fedd4..0f2bdbcf37 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraint.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraint.java @@ -24,14 +24,15 @@ import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; /** - * Evaluate a partition allocation proposal and return a score within the normalized range. - * A higher score means the proposal is more preferred. + * The "soft" constraint evaluates the optimality of an assignment by giving it a score of a scale of [minScore, maxScore] + * The higher the score, the better the assignment; Intuitively, the assignment is encouraged. + * The lower score the score, the worse the assignment; Intuitively, the assignment is penalized. */ abstract class SoftConstraint { private float _maxScore = 1000f; private float _minScore = -1000f; - interface ScalerFunction { + interface NormalizeFunction { /** * Scale the origin score to a normalized range (0, 1). * The purpose is to compare scores between different soft constraints. @@ -57,23 +58,38 @@ interface ScalerFunction { _minScore = minScore; } + float getMaxScore() { + return _maxScore; + } + + float getMinScore() { + return _minScore; + } + /** - * The scoring function returns a score between MINIMAL_SCORE and MAXIMUM_SCORE, which is then - * weighted by the - * individual normalized constraint weights. - * Each individual constraint will define the meaning of MINIMAL_SCORE to MAXIMUM_SCORE - * differently. - * @return float value representing the score + * Evaluate and give a score for an potential assignment partition -> instance + * Child class only needs to care about how the score is implemented + * @return The score of the assignment in float value */ - abstract float getAssignmentOriginScore(AssignableNode node, AssignableReplica replica, + protected abstract float getAssignmentScore(AssignableNode node, AssignableReplica replica, ClusterContext clusterContext); + /** + * Evaluate and give a score for an potential assignment partition -> instance + * It's the only exposed method to the caller + * @return The score is normalized to be within MinScore and MaxScore + */ + float getAssignmentNormalizedScore(AssignableNode node, AssignableReplica replica, + ClusterContext clusterContext) { + return getNormalizeFunction().scale(getAssignmentScore(node, replica, clusterContext)); + } + /** * The default scaler function that squashes any score within (min_score, max_score) to (0, 1); * Child class could override the method and customize the method on its own * @return The MinMaxScaler instance by default */ - ScalerFunction getScalerFunction() { - return (score) -> (score - _minScore) / (_maxScore - _minScore); + NormalizeFunction getNormalizeFunction() { + return (score) -> (score - getMinScore()) / (getMaxScore() - getMinScore()); } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java index 41e4334a1e..a9619366c7 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java @@ -27,7 +27,7 @@ * The class retrieves the offline model that defines the relative importance of soft constraints. */ class SoftConstraintWeightModel { - private static Map MODEL; + private static Map MODEL; // TODO either define the weights in property files or zookeeper node or static human input SoftConstraintWeightModel() { @@ -35,8 +35,9 @@ class SoftConstraintWeightModel { } static { - MODEL = ImmutableMap. builder() - .put(LeastPartitionCountConstraint.INSTANCE, 1.0f).build(); + //TODO update the weight + MODEL = ImmutableMap. builder().put(InstancePartitionsCountConstraint.class, 1.0f) + .build(); } /** @@ -48,9 +49,8 @@ float getSumOfScores(Map originScoresMap) { float sum = 0; for (Map.Entry softConstraintScoreEntry : originScoresMap.entrySet()) { SoftConstraint softConstraint = softConstraintScoreEntry.getKey(); - float score = softConstraint.getScalerFunction().scale(softConstraintScoreEntry.getValue()); - float weight = MODEL.get(softConstraint); - sum += score * weight; + float weight = MODEL.get(softConstraint.getClass()); + sum += softConstraintScoreEntry.getValue() * weight; } return sum; diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java index c163e4c78c..a0c841a4a6 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java @@ -19,8 +19,6 @@ * under the License. */ -import org.apache.helix.HelixException; - import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -29,6 +27,8 @@ import java.util.Set; import java.util.stream.Collectors; +import org.apache.helix.HelixException; + /** * This class tracks the rebalance-related global cluster status. */ @@ -47,8 +47,7 @@ public class ClusterContext { /** * Construct the cluster context based on the current instance status. - * - * @param replicaSet All the partition replicas that are managed by the rebalancer + * @param replicaSet All the partition replicas that are managed by the rebalancer * @param instanceCount The count of all the active instances that can be used to host partitions. */ ClusterContext(Set replicaSet, int instanceCount) { @@ -95,8 +94,8 @@ public Set getPartitionsForResourceAndFaultZone(String resourceName, Str void addPartitionToFaultZone(String faultZoneId, String resourceName, String partition) { if (!_assignmentForFaultZoneMap.computeIfAbsent(faultZoneId, k -> new HashMap<>()) .computeIfAbsent(resourceName, k -> new HashSet<>()).add(partition)) { - throw new HelixException(String - .format("Resource %s already has a replica from partition %s in fault zone %s", + throw new HelixException( + String.format("Resource %s already has a replica from partition %s in fault zone %s", resourceName, partition, faultZoneId)); } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java index d06cc5fac5..0e61eb3054 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java @@ -44,7 +44,7 @@ public void beforeMethod() { SoftConstraint mockSoftConstraint = mock(SoftConstraint.class); SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class); when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(false); - when(mockSoftConstraint.getAssignmentOriginScore(any(), any(), any())).thenReturn(1.0f); + when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f); _algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint), ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel); @@ -62,7 +62,7 @@ public void testCalculateWithValidAssignment() throws IOException, HelixRebalanc SoftConstraint mockSoftConstraint = mock(SoftConstraint.class); SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class); when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(true); - when(mockSoftConstraint.getAssignmentOriginScore(any(), any(), any())).thenReturn(1.0f); + when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f); when(mockSoftConstraintWeightModel.getSumOfScores(any())).thenReturn(1.0f); _algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint), ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel); diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestInstancePartitionsCountConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestInstancePartitionsCountConstraint.java new file mode 100644 index 0000000000..7ffc40b3b1 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestInstancePartitionsCountConstraint.java @@ -0,0 +1,60 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.mockito.Mockito.when; + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestInstancePartitionsCountConstraint { + private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class); + private final AssignableNode _testNode = Mockito.mock(AssignableNode.class); + private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class); + + private final SoftConstraint _constraint = new InstancePartitionsCountConstraint(); + + @Test + public void testWhenInstanceIsIdle() { + when(_testNode.getAssignedReplicaCount()).thenReturn(0); + float score = _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext); + Assert.assertEquals(score, 1.0f); + } + + @Test + public void testWhenInstanceIsFull() { + when(_testNode.getAssignedReplicaCount()).thenReturn(10); + when(_clusterContext.getEstimatedMaxPartitionCount()).thenReturn(10); + float score = _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext); + Assert.assertEquals(score, 0.5f); + } + + @Test + public void testWhenInstanceHalfOccupied() { + when(_testNode.getAssignedReplicaCount()).thenReturn(10); + when(_clusterContext.getEstimatedMaxPartitionCount()).thenReturn(20); + float score = _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext); + Assert.assertEquals(score, 0.75f); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSoftConstraintNormalizeFunction.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSoftConstraintNormalizeFunction.java new file mode 100644 index 0000000000..b5239595a2 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSoftConstraintNormalizeFunction.java @@ -0,0 +1,47 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestSoftConstraintNormalizeFunction { + @Test + public void testDefaultNormalizeFunction() { + int maxScore = 100; + int minScore = 0; + SoftConstraint softConstraint = new SoftConstraint(maxScore, minScore) { + @Override + protected float getAssignmentScore(AssignableNode node, AssignableReplica replica, + ClusterContext clusterContext) { + return 0; + } + }; + + for (int i = minScore; i <= maxScore; i++) { + float normalized = softConstraint.getNormalizeFunction().scale(i); + Assert.assertTrue(normalized <= 1 && normalized >= 0, + String.format("input: %s, output: %s", i, normalized)); + } + } +}