From bc0aa76a9de6243928e53e1a1d01e7502ff8267c Mon Sep 17 00:00:00 2001 From: Lei Xia Date: Tue, 31 May 2016 19:17:39 -0700 Subject: [PATCH 1/3] [HELIX-633] AutoRebalancer should ignore disabled instance and all partitions on disabled instances should be dropped in FULL_AUTO rebalance mode --- .../controller/rebalancer/AutoRebalancer.java | 3 + .../util/ConstraintBasedAssignment.java | 22 +-- .../controller/stages/ClusterDataCache.java | 16 ++ ...TestAutoRebalanceWithDisabledInstance.java | 142 ++++++++++++++++++ .../TestStateTransitionTimeout.java | 28 ---- .../integration/ZkStandAloneCMTestBase.java | 2 + .../mock/participant/MockMSStateModel.java | 65 +------- 7 files changed, 180 insertions(+), 98 deletions(-) create mode 100644 helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java index a8d83a20bf..e47297fda4 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java @@ -82,6 +82,9 @@ public IdealState computeNewIdealState(String resourceName, IdealState currentId stateCountMap = stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas)); List liveNodes = new ArrayList(liveInstance.keySet()); List allNodes = new ArrayList(clusterData.getInstanceConfigMap().keySet()); + allNodes.removeAll(clusterData.getDisabledInstances()); + liveNodes.retainAll(allNodes); + Map> currentMapping = currentMapping(currentStateOutput, resourceName, partitions, stateCountMap); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java index a520803ae6..9366bcf13e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java @@ -75,24 +75,26 @@ public static Map computeAutoBestStateForPartition(ClusterDataCa boolean isResourceEnabled) { Map instanceStateMap = new HashMap(); - // if the ideal state is deleted, instancePreferenceList will be empty and - // we should drop all resources. if (currentStateMap != null) { for (String instance : currentStateMap.keySet()) { - if ((instancePreferenceList == null || !instancePreferenceList.contains(instance)) - && !disabledInstancesForPartition.contains(instance)) { - // if dropped (whether disabled or not), transit to DROPPED + if (instancePreferenceList == null || !instancePreferenceList.contains(instance)) { + // The partition is dropped from preference list. + // Transit to DROPPED no matter the instance is disabled or not. instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString()); - } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals( - HelixDefinedState.ERROR.name())) - && (disabledInstancesForPartition.contains(instance) || !isResourceEnabled)) { + } else { // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE) - instanceStateMap.put(instance, stateModelDef.getInitialState()); + if (disabledInstancesForPartition.contains(instance) || !isResourceEnabled) { + if (currentStateMap.get(instance) == null || !currentStateMap.get(instance) + .equals(HelixDefinedState.ERROR.name())) { + instanceStateMap.put(instance, stateModelDef.getInitialState()); + } + } } } } - // ideal state is deleted + // if the ideal state is deleted, instancePreferenceList will be empty and + // we should drop all resources. if (instancePreferenceList == null) { return instanceStateMap; } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index b77ce0d9cd..cb5bda8fa6 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -390,6 +390,22 @@ public Set getDisabledInstancesForPartition(String partition) { return disabledInstancesSet; } + + /** + * This method allows one to fetch the set of nodes that are disabled + * @return + */ + public Set getDisabledInstances() { + Set disabledInstancesSet = new HashSet(); + for (String instance : _instanceConfigMap.keySet()) { + InstanceConfig config = _instanceConfigMap.get(instance); + if (config.getInstanceEnabled() == false) { + disabledInstancesSet.add(instance); + } + } + return disabledInstancesSet; + } + /** * Returns the number of replicas for a given resource. * @param resourceName diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java new file mode 100644 index 0000000000..84eca6b338 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java @@ -0,0 +1,142 @@ +package org.apache.helix.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixAdmin; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.tools.ClusterStateVerifier; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class TestAutoRebalanceWithDisabledInstance extends ZkStandAloneCMTestBase { + private static String TEST_DB_2 = "TestDB2"; + + @BeforeClass + @Override + public void beforeClass() throws Exception { + super.beforeClass(); + _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB_2, _PARTITIONS, STATE_MODEL, + RebalanceMode.FULL_AUTO + ""); + _setupTool.rebalanceResource(CLUSTER_NAME, TEST_DB_2, _replica); + + Thread.sleep(200); + + boolean result = + ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + CLUSTER_NAME)); + Assert.assertTrue(result); + } + + @Test() + public void testDisableEnableInstanceAutoRebalance() throws Exception { + String disabledInstance = _participants[0].getInstanceName(); + + Set assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, + disabledInstance); + Assert.assertFalse(assignedPartitions.isEmpty()); + Set currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, + disabledInstance); + Assert.assertFalse(currentPartitions.isEmpty()); + + // disable instance + _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, disabledInstance, false); + Thread.sleep(400); + assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance); + Assert.assertTrue(assignedPartitions.isEmpty()); + currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance); + Assert.assertTrue(currentPartitions.isEmpty()); + + //enable instance + _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, disabledInstance, true); + Thread.sleep(400); + assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance); + Assert.assertFalse(assignedPartitions.isEmpty()); + currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance); + Assert.assertFalse(currentPartitions.isEmpty()); + } + + @Test() + public void testAddDisabledInstanceAutoRebalance() throws Exception { + // add disabled instance. + String nodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + NODE_NR); + _setupTool.addInstanceToCluster(CLUSTER_NAME, nodeName); + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, nodeName); + _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, nodeName, false); + + participant.syncStart(); + + Thread.sleep(400); + Set assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, nodeName); + Assert.assertTrue(assignedPartitions.isEmpty()); + Set currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, + nodeName); + Assert.assertTrue(currentPartitions.isEmpty()); + + //enable instance + _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, nodeName, true); + Thread.sleep(400); + assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, nodeName); + Assert.assertFalse(assignedPartitions.isEmpty()); + currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, nodeName); + Assert.assertFalse(currentPartitions.isEmpty()); + } + + private Set getPartitionsAssignedtoInstance(String cluster, String dbName, String instance) { + HelixAdmin admin = _setupTool.getClusterManagementTool(); + Set partitionSet = new HashSet(); + IdealState is = admin.getResourceIdealState(cluster, dbName); + for (String partition : is.getRecord().getListFields().keySet()) { + List assignments = is.getRecord().getListField(partition); + for (String ins : assignments) { + if (ins.equals(instance)) { + partitionSet.add(partition); + } + } + } + + return partitionSet; + } + + private Set getCurrentPartitionsOnInstance(String cluster, String dbName, String instance) { + HelixAdmin admin = _setupTool.getClusterManagementTool(); + Set partitionSet = new HashSet(); + + ExternalView ev = admin.getResourceExternalView(cluster, dbName); + for (String partition : ev.getRecord().getMapFields().keySet()) { + Map assignments = ev.getRecord().getMapField(partition); + for (String ins : assignments.keySet()) { + if (ins.equals(instance)) { + partitionSet.add(partition); + } + } + } + return partitionSet; + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java index 443d484e53..fb534fd8c3 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java @@ -99,14 +99,6 @@ public TimeOutStateModel(MockTransition transition, boolean sleep) { _sleep = sleep; } - @Override - @Transition(to = "SLAVE", from = "OFFLINE") - public void onBecomeSlaveFromOffline(Message message, NotificationContext context) { - LOG.info("Become SLAVE from OFFLINE"); - - } - - @Override @Transition(to = "MASTER", from = "SLAVE") public void onBecomeMasterFromSlave(Message message, NotificationContext context) throws InterruptedException { @@ -116,26 +108,6 @@ public void onBecomeMasterFromSlave(Message message, NotificationContext context } } - @Override - @Transition(to = "SLAVE", from = "MASTER") - public void onBecomeSlaveFromMaster(Message message, NotificationContext context) { - LOG.info("Become SLAVE from MASTER"); - } - - @Override - @Transition(to = "OFFLINE", from = "SLAVE") - public void onBecomeOfflineFromSlave(Message message, NotificationContext context) { - LOG.info("Become OFFLINE from SLAVE"); - - } - - @Override - @Transition(to = "DROPPED", from = "OFFLINE") - public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { - LOG.info("Become DROPPED from OFFLINE"); - - } - @Override public void rollbackOnError(Message message, NotificationContext context, StateTransitionError error) { diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java index 5d169d5021..f6946182ed 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java @@ -91,6 +91,8 @@ public void beforeClass() throws Exception { ClusterStateVerifier .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); + result = ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME)); diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java index 61733ba26a..7d90063e04 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java +++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java @@ -43,67 +43,12 @@ public void setTransition(MockTransition transition) { _transition = transition; } - // overwrite default error->dropped transition - @Transition(to = "DROPPED", from = "ERROR") - public void onBecomeDroppedFromError(Message message, NotificationContext context) + @Transition(to = "*", from = "*") + public void generalTransitionHandle(Message message, NotificationContext context) throws InterruptedException { - LOG.info("Become DROPPED from ERROR"); - if (_transition != null) { - _transition.doTransition(message, context); - } - } - - @Transition(to = "SLAVE", from = "OFFLINE") - public void onBecomeSlaveFromOffline(Message message, NotificationContext context) - throws InterruptedException { - LOG.info("Become SLAVE from OFFLINE"); - if (_transition != null) { - _transition.doTransition(message, context); - - } - } - - @Transition(to = "MASTER", from = "SLAVE") - public void onBecomeMasterFromSlave(Message message, NotificationContext context) - throws InterruptedException { - LOG.info("Become MASTER from SLAVE"); - if (_transition != null) { - _transition.doTransition(message, context); - } - } - - @Transition(to = "SLAVE", from = "MASTER") - public void onBecomeSlaveFromMaster(Message message, NotificationContext context) - throws InterruptedException { - LOG.info("Become SLAVE from MASTER"); - if (_transition != null) { - _transition.doTransition(message, context); - } - } - - @Transition(to = "OFFLINE", from = "SLAVE") - public void onBecomeOfflineFromSlave(Message message, NotificationContext context) - throws InterruptedException { - LOG.info("Become OFFLINE from SLAVE"); - if (_transition != null) { - _transition.doTransition(message, context); - } - } - - @Transition(to = "DROPPED", from = "OFFLINE") - public void onBecomeDroppedFromOffline(Message message, NotificationContext context) - throws InterruptedException { - LOG.info("Become DROPPED from OFFLINE"); - if (_transition != null) { - _transition.doTransition(message, context); - } - } - - @Transition(to = "OFFLINE", from = "ERROR") - public void onBecomeOfflineFromError(Message message, NotificationContext context) - throws InterruptedException { - LOG.info("Become OFFLINE from ERROR"); - // System.err.println("Become OFFLINE from ERROR"); + LOG.info(String + .format("Resource %s partition %s becomes %s from %s", message.getResourceName(), + message.getPartitionName(), message.getToState(), message.getFromState())); if (_transition != null) { _transition.doTransition(message, context); } From ea0fbbbce302974b88a2b8253bf06616fd91aa5b Mon Sep 17 00:00:00 2001 From: Lei Xia Date: Tue, 7 Jun 2016 14:42:43 -0700 Subject: [PATCH 2/3] [HELIX-634] Refactor AutoRebalancer to allow configuable placement strategy. --- .../controller/rebalancer/AutoRebalancer.java | 40 +++++++--- .../strategy/AutoRebalanceStrategy.java | 40 +++++----- .../strategy/RebalanceStrategy.java | 52 ++++++++++++ .../org/apache/helix/model/IdealState.java | 23 +++++- .../model/builder/IdealStateBuilder.java | 80 +++++++++++++++++++ .../task/GenericTaskAssignmentCalculator.java | 5 +- .../strategy/TestAutoRebalanceStrategy.java | 25 +++--- 7 files changed, 217 insertions(+), 48 deletions(-) create mode 100644 helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java index e47297fda4..6682426340 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; +import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; import org.apache.helix.controller.rebalancer.internal.MappingCalculator; @@ -35,8 +36,7 @@ import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.controller.strategy.AutoRebalanceStrategy; -import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme; -import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme; +import org.apache.helix.controller.strategy.RebalanceStrategy; import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.model.LiveInstance; @@ -44,6 +44,7 @@ import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.util.HelixUtil; import org.apache.log4j.Logger; /** @@ -59,14 +60,14 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator { // These should be final, but are initialized in init rather than a constructor private HelixManager _manager; - private AutoRebalanceStrategy _algorithm; + private RebalanceStrategy _rebalanceStrategy; private static final Logger LOG = Logger.getLogger(AutoRebalancer.class); @Override public void init(HelixManager manager) { this._manager = manager; - this._algorithm = null; + this._rebalanceStrategy = null; } @Override @@ -127,13 +128,32 @@ public IdealState computeNewIdealState(String resourceName, IdealState currentId int maxPartition = currentIdealState.getMaxPartitionsPerInstance(); - ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme(); - placementScheme.init(_manager); - _algorithm = - new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition, - placementScheme); + String rebalanceStrategyName = currentIdealState.getRebalanceStrategy(); + if (rebalanceStrategyName == null || rebalanceStrategyName.equalsIgnoreCase("default")) { + _rebalanceStrategy = + new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition); + } else { + try { + _rebalanceStrategy = RebalanceStrategy.class + .cast(HelixUtil.loadClass(getClass(), rebalanceStrategyName).newInstance()); + _rebalanceStrategy.init(resourceName, partitions, stateCountMap, maxPartition); + } catch (ClassNotFoundException ex) { + throw new HelixException( + "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName, + ex); + } catch (InstantiationException ex) { + throw new HelixException( + "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName, + ex); + } catch (IllegalAccessException ex) { + throw new HelixException( + "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName, + ex); + } + } + ZNRecord newMapping = - _algorithm.computePartitionAssignment(liveNodes, currentMapping, allNodes); + _rebalanceStrategy.computePartitionAssignment(liveNodes, currentMapping, allNodes); if (LOG.isDebugEnabled()) { LOG.debug("currentMapping: " + currentMapping); diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java index 11b5b0d4a1..959609f497 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java @@ -36,16 +36,15 @@ import org.apache.helix.ZNRecord; import org.apache.log4j.Logger; -public class AutoRebalanceStrategy { - +public class AutoRebalanceStrategy implements RebalanceStrategy { private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class); - - private final String _resourceName; - private final List _partitions; - private final LinkedHashMap _states; - private final int _maximumPerNode; private final ReplicaPlacementScheme _placementScheme; + private String _resourceName; + private List _partitions; + private LinkedHashMap _states; + private int _maximumPerNode; + private Map _nodeMap; private List _liveNodesList; private Map _stateMap; @@ -56,24 +55,26 @@ public class AutoRebalanceStrategy { private Set _orphaned; public AutoRebalanceStrategy(String resourceName, final List partitions, - final LinkedHashMap states, int maximumPerNode, - ReplicaPlacementScheme placementScheme) { - _resourceName = resourceName; - _partitions = partitions; - _states = states; - _maximumPerNode = maximumPerNode; - if (placementScheme != null) { - _placementScheme = placementScheme; - } else { - _placementScheme = new DefaultPlacementScheme(); - } + final LinkedHashMap states, int maximumPerNode) { + init(resourceName, partitions, states, maximumPerNode); + _placementScheme = new DefaultPlacementScheme(); } public AutoRebalanceStrategy(String resourceName, final List partitions, final LinkedHashMap states) { - this(resourceName, partitions, states, Integer.MAX_VALUE, new DefaultPlacementScheme()); + this(resourceName, partitions, states, Integer.MAX_VALUE); + } + + @Override + public void init(String resourceName, final List partitions, + final LinkedHashMap states, int maximumPerNode) { + _resourceName = resourceName; + _partitions = partitions; + _states = states; + _maximumPerNode = maximumPerNode; } + @Override public ZNRecord computePartitionAssignment(final List liveNodes, final Map> currentMapping, final List allNodes) { int numReplicas = countStateReplicas(); @@ -546,7 +547,6 @@ private Map computePreferredPlacement(final List allNodes /** * Counts the total number of replicas given a state-count mapping - * @param states * @return */ private int countStateReplicas() { diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java new file mode 100644 index 0000000000..4daae8208b --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java @@ -0,0 +1,52 @@ +package org.apache.helix.controller.strategy; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.ZNRecord; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Assignment strategy interface that computes the assignment of partition->instance. + */ +public interface RebalanceStrategy { + /** + * Perform the necessary initialization for the rebalance strategy object. + * @param resourceName + * @param partitions + * @param states + * @param maximumPerNode + */ + void init(String resourceName, final List partitions, + final LinkedHashMap states, int maximumPerNode); + + /** + * Compute the preference lists and (optional partition-state mapping) for the given resource. + * + * @param liveNodes + * @param currentMapping + * @param allNodes + * @return + */ + ZNRecord computePartitionAssignment(final List liveNodes, + final Map> currentMapping, final List allNodes); +} diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java index 44f4219230..7c4cf546fe 100644 --- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java +++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java @@ -53,10 +53,11 @@ public enum IdealStateProperty { @Deprecated IDEAL_STATE_MODE, REBALANCE_MODE, + REBALANCER_CLASS_NAME, REBALANCE_TIMER_PERIOD, + REBALANCE_STRATEGY, MAX_PARTITIONS_PER_INSTANCE, INSTANCE_GROUP_TAG, - REBALANCER_CLASS_NAME, HELIX_ENABLED, RESOURCE_GROUP_NAME, GROUP_ROUTING_ENABLED, @@ -164,6 +165,26 @@ public String getRebalancerClassName() { return _record.getSimpleField(IdealStateProperty.REBALANCER_CLASS_NAME.toString()); } + /** + * Specify the strategy for Helix to use to compute the partition-instance assignment, + * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.strategy.RebalanceStrategy} + * + * @param rebalanceStrategy + * @return + */ + public void setRebalanceStrategy(String rebalanceStrategy) { + _record.setSimpleField(IdealStateProperty.REBALANCE_STRATEGY.name(), rebalanceStrategy); + } + + /** + * Get the rebalance strategy for this resource. + * + * @return rebalance strategy, or null if not specified. + */ + public String getRebalanceStrategy() { + return _record.getSimpleField(IdealStateProperty.REBALANCE_STRATEGY.name()); + } + /** * Set the resource group name * @param resourceGroupName diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java index d3bc3f2480..9ad3023683 100644 --- a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java +++ b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java @@ -52,6 +52,17 @@ public abstract class IdealStateBuilder { * Helix rebalancer strategies. AUTO, SEMI_AUTO, CUSTOMIZED */ protected IdealState.RebalanceMode rebalancerMode; + + /** + * Customized rebalancer class. + */ + private String rebalancerClassName; + + /** + * Custom rebalance strategy + */ + private String rebalanceStrategy; + /** * A constraint that limits the maximum number of partitions per Node. */ @@ -68,6 +79,16 @@ public abstract class IdealStateBuilder { */ private Boolean disableExternalView = null; + /** + * Resource group name. + */ + private String resourceGroupName; + + /** + * Whether the resource group routing should be enabled in routingProvider. + */ + private Boolean enableGroupRouting; + protected ZNRecord _record; /** @@ -143,6 +164,44 @@ public IdealStateBuilder setRebalancerMode(IdealState.RebalanceMode rebalancerMo return this; } + /** + * Set custom rebalancer class name. + * @return IdealStateBuilder + */ + public IdealStateBuilder setRebalancerClass(String rebalancerClassName) { + this.rebalancerClassName = rebalancerClassName; + return this; + } + + /** + * Set custom rebalance strategy name. + * @param rebalanceStrategy + * @return + */ + public IdealStateBuilder setRebalanceStrategy(String rebalanceStrategy) { + this.rebalanceStrategy = rebalanceStrategy; + return this; + } + + /** + * + * @param resourceGroupName + * @return + */ + public IdealStateBuilder setResourceGroupName(String resourceGroupName) { + this.resourceGroupName = resourceGroupName; + return this; + } + + /** + * Enable Group Routing for this resource. + * @return + */ + public IdealStateBuilder enableGroupRouting() { + this.enableGroupRouting = true; + return this; + } + /** * @return */ @@ -154,10 +213,31 @@ public IdealState build() { idealstate.setStateModelFactoryName(stateModelFactoryName); idealstate.setRebalanceMode(rebalancerMode); idealstate.setReplicas("" + numReplica); + + if (rebalancerClassName != null) { + idealstate.setRebalancerClassName(rebalancerClassName); + } + + if (rebalanceStrategy != null) { + idealstate.setRebalanceStrategy(rebalanceStrategy); + } + + if (maxPartitionsPerNode > 0) { + idealstate.setMaxPartitionsPerInstance(maxPartitionsPerNode); + } + if (disableExternalView != null) { idealstate.setDisableExternalView(disableExternalView); } + if (resourceGroupName != null) { + idealstate.setResourceGroupName(resourceGroupName); + } + + if (enableGroupRouting != null) { + idealstate.enableGroupRouting(enableGroupRouting); + } + if (!idealstate.isValid()) { throw new HelixException("invalid ideal-state: " + idealstate); } diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java index b0a1a3339e..623357f052 100644 --- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java @@ -33,6 +33,7 @@ import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.controller.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.strategy.RebalanceStrategy; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Partition; @@ -121,9 +122,7 @@ public String apply(Integer partitionNum) { } // Get the assignment keyed on partition - AutoRebalanceStrategy strategy = - new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE, - new AutoRebalanceStrategy.DefaultPlacementScheme()); + RebalanceStrategy strategy = new AutoRebalanceStrategy(resourceId, partitions, states); List allNodes = Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache)); Collections.sort(allNodes); diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java index 985d0c8746..adc92d6d05 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java +++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java @@ -116,8 +116,7 @@ private void runTest(String name, int numIterations, int numPartitions, int numL StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states); new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode, - stateModelDef, new AutoRebalanceStrategy.DefaultPlacementScheme()) - .runRepeatedly(numIterations); + stateModelDef).runRepeatedly(numIterations); } /** @@ -157,13 +156,11 @@ class AutoRebalanceTester { private List _allNodes; private int _maxPerNode; private StateModelDefinition _stateModelDef; - private ReplicaPlacementScheme _placementScheme; private Random _random; public AutoRebalanceTester(List partitions, LinkedHashMap states, List liveNodes, Map> currentMapping, - List allNodes, int maxPerNode, StateModelDefinition stateModelDef, - ReplicaPlacementScheme placementScheme) { + List allNodes, int maxPerNode, StateModelDefinition stateModelDef) { _partitions = partitions; _states = states; _liveNodes = liveNodes; @@ -182,7 +179,6 @@ public AutoRebalanceTester(List partitions, LinkedHashMap partitions, LinkedHashMap T getRandomSetElement(Set source) { From 7147ec874e912f27905c299fefe0d09ca31ebd42 Mon Sep 17 00:00:00 2001 From: Lei Xia Date: Thu, 16 Jun 2016 12:06:34 -0700 Subject: [PATCH 3/3] [HELIX-568] Add new topology aware (rack-aware) rebalance strategy based on CRUSH algorithm. Design doc is available at: https://cwiki.apache.org/confluence/display/HELIX/Helix+Topology-aware+Rebalance+Strategy --- .../java/org/apache/helix/HelixAdmin.java | 30 ++ .../java/org/apache/helix/HelixConstants.java | 4 + .../java/org/apache/helix/PropertyKey.java | 3 +- .../controller/rebalancer/AutoRebalancer.java | 15 +- .../controller/rebalancer/Rebalancer.java | 1 - .../strategy/AutoRebalanceStrategy.java | 7 +- .../strategy/CrushRebalanceStrategy.java | 174 ++++++++++ .../strategy/RebalanceStrategy.java | 11 +- .../crushMapping/CRUSHPlacementAlgorithm.java | 316 ++++++++++++++++++ .../strategy/crushMapping/JenkinsHash.java | 140 ++++++++ .../controller/rebalancer/topology/Node.java | 208 ++++++++++++ .../rebalancer/topology/Topology.java | 295 ++++++++++++++++ .../controller/stages/ClusterDataCache.java | 14 +- .../apache/helix/manager/zk/ZKHelixAdmin.java | 43 ++- .../org/apache/helix/model/ClusterConfig.java | 92 +++++ .../org/apache/helix/model/IdealState.java | 2 +- .../apache/helix/model/InstanceConfig.java | 50 ++- .../task/GenericTaskAssignmentCalculator.java | 7 +- .../org/apache/helix/tools/ClusterSetup.java | 6 + .../TestAutoRebalanceStrategy.java | 27 +- .../controller/strategy/TestTopology.java | 172 ++++++++++ .../integration/TestCrushAutoRebalance.java | 221 ++++++++++++ .../manager/MockParticipantManager.java | 10 +- 23 files changed, 1804 insertions(+), 44 deletions(-) rename helix-core/src/main/java/org/apache/helix/controller/{ => rebalancer}/strategy/AutoRebalanceStrategy.java (99%) create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java rename helix-core/src/main/java/org/apache/helix/controller/{ => rebalancer}/strategy/RebalanceStrategy.java (81%) create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java create mode 100644 helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java rename helix-core/src/test/java/org/apache/helix/controller/{strategy => Strategy}/TestAutoRebalanceStrategy.java (96%) create mode 100644 helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java create mode 100644 helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java index fbfab26401..aeacd4bfb8 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java @@ -113,6 +113,18 @@ public interface HelixAdmin { void addResource(String clusterName, String resourceName, int numPartitions, String stateModelRef, String rebalancerMode); + /** + * Add a resource to a cluster + * @param clusterName + * @param resourceName + * @param numPartitions + * @param stateModelRef + * @param rebalancerMode + * @param rebalanceStrategy + */ + void addResource(String clusterName, String resourceName, int numPartitions, + String stateModelRef, String rebalancerMode, String rebalanceStrategy); + /** * Add a resource to a cluster, using a bucket size > 1 * @param clusterName @@ -138,6 +150,22 @@ void addResource(String clusterName, String resourceName, int numPartitions, void addResource(String clusterName, String resourceName, int numPartitions, String stateModelRef, String rebalancerMode, int bucketSize, int maxPartitionsPerInstance); + + /** + * Add a resource to a cluster, using a bucket size > 1 + * @param clusterName + * @param resourceName + * @param numPartitions + * @param stateModelRef + * @param rebalancerMode + * @param rebalanceStrategy + * @param bucketSize + * @param maxPartitionsPerInstance + */ + void addResource(String clusterName, String resourceName, int numPartitions, + String stateModelRef, String rebalancerMode, String rebalanceStrategy, int bucketSize, + int maxPartitionsPerInstance); + /** * Add an instance to a cluster * @param clusterName @@ -411,6 +439,8 @@ void rebalance(String clusterName, String resourceName, int replica, String keyP */ void removeInstanceTag(String clusterName, String instanceName, String tag); + void setInstanceZoneId(String clusterName, String instanceName, String zoneId); + /** * Release resources */ diff --git a/helix-core/src/main/java/org/apache/helix/HelixConstants.java b/helix-core/src/main/java/org/apache/helix/HelixConstants.java index 5318fa958a..6de0ff14d2 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixConstants.java +++ b/helix-core/src/main/java/org/apache/helix/HelixConstants.java @@ -43,6 +43,10 @@ enum StateModelToken { ANY_LIVEINSTANCE } + /** + * Replaced by ClusterConfig.ClusterConfigProperty. + */ + @Deprecated enum ClusterConfigType { HELIX_DISABLE_PIPELINE_TRIGGERS, DISABLE_FULL_AUTO // override all resources in the cluster to use SEMI-AUTO instead of FULL-AUTO diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java index 33355f12b0..01259029a1 100644 --- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java +++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java @@ -38,6 +38,7 @@ import java.util.Arrays; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ClusterConstraints; import org.apache.helix.model.CurrentState; import org.apache.helix.model.Error; @@ -186,7 +187,7 @@ public PropertyKey clusterConfigs() { * @return {@link PropertyKey} */ public PropertyKey clusterConfig() { - return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, HelixProperty.class, + return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, ClusterConfig.class, _clusterName, ConfigScopeProperty.CLUSTER.toString(), _clusterName); } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java index 6682426340..ba237b1ac9 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java @@ -35,8 +35,8 @@ import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; -import org.apache.helix.controller.strategy.AutoRebalanceStrategy; -import org.apache.helix.controller.strategy.RebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.model.LiveInstance; @@ -79,8 +79,8 @@ public IdealState computeNewIdealState(String resourceName, IdealState currentId Map liveInstance = clusterData.getLiveInstances(); String replicas = currentIdealState.getReplicas(); - LinkedHashMap stateCountMap = new LinkedHashMap(); - stateCountMap = stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas)); + LinkedHashMap stateCountMap = + stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas)); List liveNodes = new ArrayList(liveInstance.keySet()); List allNodes = new ArrayList(clusterData.getInstanceConfigMap().keySet()); allNodes.removeAll(clusterData.getDisabledInstances()); @@ -129,7 +129,8 @@ public IdealState computeNewIdealState(String resourceName, IdealState currentId int maxPartition = currentIdealState.getMaxPartitionsPerInstance(); String rebalanceStrategyName = currentIdealState.getRebalanceStrategy(); - if (rebalanceStrategyName == null || rebalanceStrategyName.equalsIgnoreCase("default")) { + if (rebalanceStrategyName == null || rebalanceStrategyName + .equalsIgnoreCase(RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY)) { _rebalanceStrategy = new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition); } else { @@ -152,8 +153,8 @@ public IdealState computeNewIdealState(String resourceName, IdealState currentId } } - ZNRecord newMapping = - _rebalanceStrategy.computePartitionAssignment(liveNodes, currentMapping, allNodes); + ZNRecord newMapping = _rebalanceStrategy + .computePartitionAssignment(allNodes, liveNodes, currentMapping, clusterData); if (LOG.isDebugEnabled()) { LOG.debug("currentMapping: " + currentMapping); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java index f5a4ae8f64..6935378b7a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java @@ -46,5 +46,4 @@ public interface Rebalancer { */ IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, final CurrentStateOutput currentStateOutput, final ClusterDataCache clusterData); - } diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java similarity index 99% rename from helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java rename to helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java index 959609f497..868d20728a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java @@ -1,4 +1,4 @@ -package org.apache.helix.controller.strategy; +package org.apache.helix.controller.rebalancer.strategy; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -34,6 +34,7 @@ import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; +import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.log4j.Logger; public class AutoRebalanceStrategy implements RebalanceStrategy { @@ -75,8 +76,8 @@ public void init(String resourceName, final List partitions, } @Override - public ZNRecord computePartitionAssignment(final List liveNodes, - final Map> currentMapping, final List allNodes) { + public ZNRecord computePartitionAssignment(final List allNodes, final List liveNodes, + final Map> currentMapping, ClusterDataCache clusterData) { int numReplicas = countStateReplicas(); ZNRecord znRecord = new ZNRecord(_resourceName); if (liveNodes.size() == 0) { diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java new file mode 100644 index 0000000000..a8fe107f0e --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java @@ -0,0 +1,174 @@ +package org.apache.helix.controller.rebalancer.strategy; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import org.apache.helix.HelixException; +import org.apache.helix.ZNRecord; +import org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm; +import org.apache.helix.controller.rebalancer.strategy.crushMapping.JenkinsHash; +import org.apache.helix.controller.rebalancer.topology.Node; +import org.apache.helix.controller.rebalancer.topology.Topology; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.model.InstanceConfig; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * CRUSH-based partition mapping strategy. + */ +public class CrushRebalanceStrategy implements RebalanceStrategy { + private String _resourceName; + private List _partitions; + private Topology _clusterTopo; + private int _replicas; + + @Override + public void init(String resourceName, final List partitions, + final LinkedHashMap states, int maximumPerNode) { + _resourceName = resourceName; + _partitions = partitions; + _replicas = countStateReplicas(states); + } + + /** + * Compute the preference lists and (optional partition-state mapping) for the given resource. + * + * @param allNodes All instances + * @param liveNodes List of live instances + * @param currentMapping current replica mapping + * @param clusterData cluster data + * @return + * @throws HelixException if a map can not be found + */ + @Override + public ZNRecord computePartitionAssignment(final List allNodes, + final List liveNodes, final Map> currentMapping, + ClusterDataCache clusterData) throws HelixException { + Map instanceConfigMap = clusterData.getInstanceConfigMap(); + _clusterTopo = + new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig()); + Node topNode = _clusterTopo.getRootNode(); + + Map> newPreferences = new HashMap>(); + for (int i = 0; i < _partitions.size(); i++) { + String partitionName = _partitions.get(i); + long data = partitionName.hashCode(); + + // apply the placement rules + List selected = select(topNode, data, _replicas); + + List nodeList = new ArrayList(); + for (int j = 0; j < selected.size(); j++) { + nodeList.add(selected.get(j).getName()); + } + + newPreferences.put(partitionName, nodeList); + } + + ZNRecord result = new ZNRecord(_resourceName); + result.setListFields(newPreferences); + + return result; + } + + /** + * Number of retries for finding an appropriate instance for a replica. + */ + private static final int MAX_RETRY = 100; + private final JenkinsHash hashFun = new JenkinsHash(); + private CRUSHPlacementAlgorithm placementAlgorithm = new CRUSHPlacementAlgorithm(); + + /** + * Enforce isolation on the specified fault zone. + * The caller will either get the expected number of selected nodes as a result, or an exception will be thrown. + */ + private List select(Node topNode, long data, int rf) + throws HelixException { + List nodes = new ArrayList(rf); + Set selectedZones = new HashSet(); + long input = data; + int count = rf; + int tries = 0; + while (nodes.size() < rf) { + doSelect(topNode, input, count, nodes, selectedZones); + count = rf - nodes.size(); + if (count > 0) { + input = hashFun.hash(input); // create a different hash value for retrying + tries++; + if (tries >= MAX_RETRY) { + throw new HelixException( + String.format("could not find all mappings after %d tries", tries)); + } + } + } + return nodes; + } + + private void doSelect(Node topNode, long input, int rf, List selectedNodes, + Set selectedZones) { + String zoneType = _clusterTopo.getFaultZoneType(); + String endNodeType = _clusterTopo.getEndNodeType(); + + if (!zoneType.equals(endNodeType)) { + // pick fault zones first + List zones = placementAlgorithm + .select(topNode, input, rf, zoneType, nodeAlreadySelected(selectedZones)); + // add the racks to the selected racks + selectedZones.addAll(zones); + // pick one end node from each fault zone. + for (Node zone : zones) { + List endNode = placementAlgorithm.select(zone, input, 1, endNodeType); + selectedNodes.addAll(endNode); + } + } else { + // pick end node directly + List nodes = placementAlgorithm.select(topNode, input, rf, endNodeType, + nodeAlreadySelected(new HashSet(selectedNodes))); + selectedNodes.addAll(nodes); + } + } + + /** + * Use the predicate to reject already selected zones or nodes. + */ + private Predicate nodeAlreadySelected(Set selectedNodes) { + return Predicates.not(Predicates.in(selectedNodes)); + } + + /** + * Counts the total number of replicas given a state-count mapping + * @return + */ + private int countStateReplicas(Map stateCountMap) { + int total = 0; + for (Integer count : stateCountMap.values()) { + total += count; + } + return total; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java similarity index 81% rename from helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java rename to helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java index 4daae8208b..a3c7e94cff 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java @@ -1,4 +1,4 @@ -package org.apache.helix.controller.strategy; +package org.apache.helix.controller.rebalancer.strategy; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -20,6 +20,7 @@ */ import org.apache.helix.ZNRecord; +import org.apache.helix.controller.stages.ClusterDataCache; import java.util.LinkedHashMap; import java.util.List; @@ -29,8 +30,11 @@ * Assignment strategy interface that computes the assignment of partition->instance. */ public interface RebalanceStrategy { + String DEFAULT_REBALANCE_STRATEGY = "DEFAULT"; + /** * Perform the necessary initialization for the rebalance strategy object. + * * @param resourceName * @param partitions * @param states @@ -47,6 +51,7 @@ void init(String resourceName, final List partitions, * @param allNodes * @return */ - ZNRecord computePartitionAssignment(final List liveNodes, - final Map> currentMapping, final List allNodes); + ZNRecord computePartitionAssignment(final List allNodes, + final List liveNodes, final Map> currentMapping, + ClusterDataCache clusterData); } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java new file mode 100644 index 0000000000..870656cf3e --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java @@ -0,0 +1,316 @@ +/** + * Copyright 2013 Twitter, Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.helix.controller.rebalancer.strategy.crushMapping; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.controller.rebalancer.topology.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The transcription of the CRUSH placement algorithm from the Weil paper. This is a fairly simple + * adaptation, but a couple of important changes have been made to work with the crunch mapping. + */ +public class CRUSHPlacementAlgorithm { + /** + * In case the select() method fails to select after looping back to the origin of selection after + * so many tries, we stop the search. This constant denotes the maximum number of retries after + * looping back to the origin. It is expected that in most cases the selection will either succeed + * with a small number of tries, or it will never succeed. So a reasonably large number to + * distinguish these two cases should be sufficient. + */ + private static final int MAX_LOOPBACK_COUNT = 50; + private static final Logger logger = LoggerFactory.getLogger(CRUSHPlacementAlgorithm.class); + + private final boolean keepOffset; + private final Map roundOffset; + + /** + * Creates the crush placement object. + */ + public CRUSHPlacementAlgorithm() { + this(false); + } + + /** + * Creates the crush placement algorithm with the indication whether the round offset should be + * kept for the duration of this object for successive selection of the same input. + */ + public CRUSHPlacementAlgorithm(boolean keepOffset) { + this.keepOffset = keepOffset; + roundOffset = keepOffset ? new HashMap() : null; + } + + /** + * Returns a list of (count) nodes of the desired type. If the count is more than the number of + * available nodes, an exception is thrown. Note that it is possible for this method to return a + * list whose size is smaller than the requested size (count) if it is unable to select all the + * nodes for any reason. Callers should check the size of the returned list and take action if + * needed. + */ + public List select(Node parent, long input, int count, String type) { + return select(parent, input, count, type, Predicates.alwaysTrue()); + } + + public List select(Node parent, long input, int count, String type, + Predicate nodePredicate) { + int childCount = parent.getChildrenCount(type); + if (childCount < count) { + throw new IllegalArgumentException(count + " nodes of type " + type + + " were requested but the tree has only " + childCount + " nodes!"); + } + + List selected = new ArrayList(count); + // use the index stored in the map + Integer offset; + if (keepOffset) { + offset = roundOffset.get(input); + if (offset == null) { + offset = 0; + roundOffset.put(input, offset); + } + } else { + offset = 0; + } + + int rPrime = 0; + for (int r = 1; r <= count; r++) { + int failure = 0; + // number of times we had to loop back to the origin + int loopbackCount = 0; + boolean escape = false; + boolean retryOrigin; + Node out = null; + do { + retryOrigin = false; // initialize at the outset + Node in = parent; + Set rejected = new HashSet(); + boolean retryNode; + do { + retryNode = false; // initialize at the outset + rPrime = r + offset + failure; + logger.trace("{}.select({}, {})", new Object[] {in, input, rPrime}); + Selector selector = new Selector(in); + out = selector.select(input, rPrime); + if (!out.getType().equalsIgnoreCase(type)) { + logger.trace("selected output {} for data {} didn't match the type {}: walking down " + + "the hierarchy...", new Object[] {out, input, type}); + in = out; // walk down the hierarchy + retryNode = true; // stay within the node and walk down the tree + } else { // type matches + boolean predicateRejected = !nodePredicate.apply(out); + if (selected.contains(out) || predicateRejected) { + if (predicateRejected) { + logger.trace("{} was rejected by the node predicate for data {}: rejecting and " + + "increasing rPrime", out, input); + rejected.add(out); + } else { // already selected + logger.trace("{} was already selected for data {}: rejecting and increasing rPrime", + out, input); + } + + // we need to see if we have selected all possible nodes from this parent, in which + // case we should loop back to the origin and start over + if (allChildNodesEliminated(in, selected, rejected)) { + logger.trace("all child nodes of {} have been eliminated", in); + if (loopbackCount == MAX_LOOPBACK_COUNT) { + // we looped back the maximum times we specified; we give up search, and exit + escape = true; + break; + } + loopbackCount++; + logger.trace("looping back to the original parent node ({})", parent); + retryOrigin = true; + } else { + retryNode = true; // go back and reselect on the same parent + } + failure++; + } else if (nodeIsOut(out)) { + logger.trace("{} is marked as out (failed or over the maximum assignment) for data " + + "{}! looping back to the original parent node", out, input); + failure++; + if (loopbackCount == MAX_LOOPBACK_COUNT) { + // we looped back the maximum times we specified; we give up search, and exit + escape = true; + break; + } + loopbackCount++; + // re-selection on the same parent is detrimental in case of node failure: loop back + // to the origin + retryOrigin = true; + } else { + // we got a successful selection + break; + } + } + } while (retryNode); + } while (retryOrigin); + + if (escape) { + // cannot find a node under this parent; return a smaller set than was intended + logger.debug("we could not select a node for data {} under parent {}; a smaller data set " + + "than is requested will be returned", input, parent); + continue; + } + + logger.trace("{} was selected for data {}", out, input); + selected.add(out); + } + if (keepOffset) { + roundOffset.put(input, rPrime); + } + return selected; + } + + + private boolean nodeIsOut(Node node) { + if (node.isLeaf() && node.isFailed()) { + return true; + } + return false; + } + + /** + * Examines the immediate child nodes of the given parent node, and sees if all of the children + * that can be selected (i.e. not failed) are already selected. This is used to determine whether + * this parent node should no longer be used in the selection. + */ + private boolean allChildNodesEliminated(Node parent, List selected, Set rejected) { + List children = parent.getChildren(); + if (children != null) { + for (Node child: children) { + if (!nodeIsOut(child) && !selected.contains(child) && !rejected.contains(child)) { + return false; + } + } + } + return true; + } + + /** + * Selection algorithm based on the "straw" bucket type as described in the CRUSH algorithm. + */ + private class Selector { + private final Map straws = new HashMap(); + private final JenkinsHash hashFunction; + + public Selector(Node node) { + if (!node.isLeaf()) { + // create a map from the nodes to their values + List sortedNodes = sortNodes(node.getChildren()); // do a reverse sort by weight + + int numLeft = sortedNodes.size(); + float straw = 1.0f; + float wbelow = 0.0f; + float lastw = 0.0f; + int i = 0; + final int length = sortedNodes.size(); + while (i < length) { + Node current = sortedNodes.get(i); + if (current.getWeight() == 0) { + straws.put(current, 0L); + i++; + continue; + } + straws.put(current, (long)(straw*0x10000)); + i++; + if (i == length) { + break; + } + + current = sortedNodes.get(i); + Node previous = sortedNodes.get(i-1); + if (current.getWeight() == previous.getWeight()) { + continue; + } + wbelow += (float)(previous.getWeight() - lastw)*numLeft; + for (int j = i; j < length; j++) { + if (sortedNodes.get(j).getWeight() == current.getWeight()) { + numLeft--; + } else { + break; + } + } + float wnext = (float)(numLeft * (current.getWeight() - previous.getWeight())); + float pbelow = wbelow/(wbelow + wnext); + straw *= Math.pow(1.0/pbelow, 1.0/numLeft); + lastw = previous.getWeight(); + } + } + hashFunction = new JenkinsHash(); + } + + /** + * Returns a new list that's sorted in the reverse order of the weight. + */ + private List sortNodes(List nodes) { + List ret = new ArrayList(nodes); + sortNodesInPlace(ret); + return ret; + } + + /** + * Sorts the list in place in the reverse order of the weight. + */ + private void sortNodesInPlace(List nodes) { + Collections.sort(nodes, new Comparator() { + public int compare(Node n1, Node n2) { + if (n2.getWeight() == n1.getWeight()) { + return 0; + } + return (n2.getWeight() - n1.getWeight() > 0) ? 1 : -1; + // sort by weight only in the reverse order + } + }); + } + + public Node select(long input, long round) { + Node selected = null; + long hiScore = -1; + for (Map.Entry e: straws.entrySet()) { + Node child = e.getKey(); + long straw = e.getValue(); + long score = weightedScore(child, straw, input, round); + if (score > hiScore) { + selected = child; + hiScore = score; + } + } + if (selected == null) { + throw new IllegalStateException(); + } + return selected; + } + + private long weightedScore(Node child, long straw, long input, long round) { + long hash = hashFunction.hash(input, child.getId(), round); + hash = hash&0xffff; + long weightedScore = hash*straw; + return weightedScore; + } + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java new file mode 100644 index 0000000000..66566f8b32 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java @@ -0,0 +1,140 @@ +/** + * Copyright 2013 Twitter, Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.helix.controller.rebalancer.strategy.crushMapping; + +public class JenkinsHash { + // max value to limit it to 4 bytes + private static final long MAX_VALUE = 0xFFFFFFFFL; + private static final long CRUSH_HASH_SEED = 1315423911L; + + /** + * Convert a byte into a long value without making it negative. + */ + private static long byteToLong(byte b) { + long val = b & 0x7F; + if ((b & 0x80) != 0) { + val += 128; + } + return val; + } + + /** + * Do addition and turn into 4 bytes. + */ + private static long add(long val, long add) { + return (val + add) & MAX_VALUE; + } + + /** + * Do subtraction and turn into 4 bytes. + */ + private static long subtract(long val, long subtract) { + return (val - subtract) & MAX_VALUE; + } + + /** + * Left shift val by shift bits and turn in 4 bytes. + */ + private static long xor(long val, long xor) { + return (val ^ xor) & MAX_VALUE; + } + + /** + * Left shift val by shift bits. Cut down to 4 bytes. + */ + private static long leftShift(long val, int shift) { + return (val << shift) & MAX_VALUE; + } + + /** + * Convert 4 bytes from the buffer at offset into a long value. + */ + private static long fourByteToLong(byte[] bytes, int offset) { + return (byteToLong(bytes[offset + 0]) + + (byteToLong(bytes[offset + 1]) << 8) + + (byteToLong(bytes[offset + 2]) << 16) + + (byteToLong(bytes[offset + 3]) << 24)); + } + + /** + * Mix up the values in the hash function. + */ + private static Triple hashMix(Triple t) { + long a = t.a; long b = t.b; long c = t.c; + a = subtract(a, b); a = subtract(a, c); a = xor(a, c >> 13); + b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 8)); + c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 13)); + a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 12)); + b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 16)); + c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 5)); + a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 3)); + b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 10)); + c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 15)); + return new Triple(a, b, c); + } + + private static class Triple { + long a; + long b; + long c; + + public Triple(long a, long b, long c) { + this.a = a; this.b = b; this.c = c; + } + } + + public long hash(long a) { + long hash = xor(CRUSH_HASH_SEED, a); + long b = a; + long x = 231232L; + long y = 1232L; + Triple val = hashMix(new Triple(b, x, hash)); + b = val.a; x = val.b; hash = val.c; + val = hashMix(new Triple(y, a, hash)); + hash = val.c; + return hash; + } + + public long hash(long a, long b) { + long hash = xor(xor(CRUSH_HASH_SEED, a), b); + long x = 231232L; + long y = 1232L; + Triple val = hashMix(new Triple(a, b, hash)); + a = val.a; b = val.b; hash = val.c; + val = hashMix(new Triple(x, a, hash)); + x = val.a; a = val.b; hash = val.c; + val = hashMix(new Triple(b, y, hash)); + hash = val.c; + return hash; + } + + public long hash(long a, long b, long c) { + long hash = xor(xor(xor(CRUSH_HASH_SEED, a), b), c); + long x = 231232L; + long y = 1232L; + Triple val = hashMix(new Triple(a, b, hash)); + a = val.a; b = val.b; hash = val.c; + val = hashMix(new Triple(c, x, hash)); + c = val.a; x = val.b; hash = val.c; + val = hashMix(new Triple(y, a, hash)); + y = val.a; a = val.b; hash = val.c; + val = hashMix(new Triple(b, x, hash)); + b = val.a; x = val.b; hash = val.c; + val = hashMix(new Triple(y, c, hash)); + hash = val.c; + return hash; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java new file mode 100644 index 0000000000..3a52a21a6e --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java @@ -0,0 +1,208 @@ +package org.apache.helix.controller.rebalancer.topology; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; + +public class Node implements Comparable { + private String _name; + private String _type; + private long _id; + private long _weight; + + private LinkedHashMap _children = new LinkedHashMap(); + private Node _parent; + + private boolean _failed; + + public Node() { + + } + + public Node(Node node) { + _name = node.getName(); + _type = node.getType(); + _id = node.getId(); + _weight = node.getWeight(); + _failed = node.isFailed(); + } + + public String getName() { + return _name; + } + + public void setName(String name) { + _name = name; + } + + public String getType() { + return _type; + } + + public void setType(String type) { + _type = type; + } + + public long getId() { + return _id; + } + + public void setId(long id) { + _id = id; + } + + public long getWeight() { + return _weight; + } + + public void setWeight(long weight) { + _weight = weight; + } + + public void addWeight(long weight) { _weight += weight; } + + public boolean isFailed() { + return _failed; + } + + public void setFailed(boolean failed) { + if (!isLeaf()) { + throw new UnsupportedOperationException("you cannot set failed on a non-leaf!"); + } + _failed = failed; + } + + public List getChildren() { + return new ArrayList(_children.values()); + } + + /** + * Add a child, if there exists a child with the same name, will replace it. + * + * @param child + */ + public void addChild(Node child) { + _children.put(child.getName(), child); + } + + /** + * Has child with given name. + * @param name + * @return + */ + public boolean hasChild(String name) { + return _children.containsKey(name); + } + + /** + * Get child node with given name. + * + * @param name + * @return + */ + public Node getChild(String name) { + return _children.get(name); + } + + public boolean isLeaf() { + return _children == null || _children.isEmpty(); + } + + public Node getParent() { + return _parent; + } + + public void setParent(Node parent) { + _parent = parent; + } + + /** + * Returns all child nodes that match the type. Returns itself if this node matches it. If no + * child matches the type, an empty list is returned. + */ + protected List findChildren(String type) { + List nodes = new ArrayList(); + if (_type.equalsIgnoreCase(type)) { + nodes.add(this); + } else if (!isLeaf()) { + for (Node child: _children.values()) { + nodes.addAll(child.findChildren(type)); + } + } + return nodes; + } + + /** + * Returns the number of all child nodes that match the type. Returns 1 if this node matches it. + * Returns 0 if no child matches the type. + */ + public int getChildrenCount(String type) { + int count = 0; + if (_type.equalsIgnoreCase(type)) { + count++; + } else if (!isLeaf()) { + for (Node child: _children.values()) { + count += child.getChildrenCount(type); + } + } + return count; + } + + /** + * Returns the top-most ("root") node from this node. If this node itself does not have a parent, + * returns itself. + */ + public Node getRoot() { + Node node = this; + while (node.getParent() != null) { + node = node.getParent(); + } + return node; + } + + @Override + public String toString() { + return _name + ":" + _id; + } + + @Override + public int hashCode() { + return _name.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof Node)) { + return false; + } + Node that = (Node)obj; + return _name.equals(that.getName()); + } + + @Override + public int compareTo(Node o) { + return _name.compareTo(o.getName()); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java new file mode 100644 index 0000000000..1057fad5f2 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java @@ -0,0 +1,295 @@ +package org.apache.helix.controller.rebalancer.topology; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; + +import org.apache.helix.HelixException; +import org.apache.helix.HelixProperty; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.apache.log4j.Logger; + + +/** + * Topology represents the structure of a cluster (the hierarchy of the nodes, its fault boundary, etc). + * This class is intended for topology-aware partition placement. + */ +public class Topology { + private static Logger logger = Logger.getLogger(Topology.class); + public enum Types { + ROOT, + ZONE, + INSTANCE + } + private static final int DEFAULT_NODE_WEIGHT = 1000; + + private final MessageDigest _md; + private Node _root; // root of the tree structure of all nodes; + private List _allInstances; + private List _liveInstances; + private Map _instanceConfigMap; + private HelixProperty _clusterConfig; + private String _faultZoneType; + private String _endNodeType; + private boolean _useDefaultTopologyDef; + private LinkedHashSet _types; + + /* default names for domain paths, if value is not specified for a domain path, the default one is used */ + // TODO: default values can be defined in clusterConfig. + private Map _defaultDomainPathValues = new HashMap(); + + public Topology(final List allNodes, final List liveNodes, + final Map instanceConfigMap, ClusterConfig clusterConfig) { + try { + _md = MessageDigest.getInstance("SHA-1"); + _allInstances = allNodes; + _liveInstances = liveNodes; + _instanceConfigMap = instanceConfigMap; + _clusterConfig = clusterConfig; + _types = new LinkedHashSet(); + + String topologyDef = _clusterConfig.getRecord() + .getSimpleField(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name()); + if (topologyDef != null) { + // Customized cluster topology definition is configured. + String[] types = topologyDef.trim().split("/"); + for (int i = 0; i < types.length; i++) { + if (types[i].length() != 0) { + _types.add(types[i]); + } + } + if (_types.size() == 0) { + logger.error("Invalid cluster topology definition " + topologyDef); + throw new HelixException("Invalid cluster topology definition " + topologyDef); + } else { + String lastType = null; + for (String type : _types) { + _defaultDomainPathValues.put(type, "Helix_default_" + type); + lastType = type; + } + _endNodeType = lastType; + _faultZoneType = _clusterConfig.getRecord() + .getStringField(ClusterConfig.ClusterConfigProperty.FAULT_ZONE_TYPE.name(), + _endNodeType); + if (!_types.contains(_faultZoneType)) { + throw new HelixException(String + .format("Invalid fault zone type %s, not present in topology definition %s.", + _faultZoneType, topologyDef)); + } + _useDefaultTopologyDef = false; + } + } else { + // Use default cluster topology definition, i,e. /root/zone/instance + _types.add(Types.ZONE.name()); + _types.add(Types.INSTANCE.name()); + _endNodeType = Types.INSTANCE.name(); + _faultZoneType = Types.ZONE.name(); + _useDefaultTopologyDef = true; + } + } catch (NoSuchAlgorithmException ex) { + throw new IllegalArgumentException(ex); + } + if (_useDefaultTopologyDef) { + _root = createClusterTreeWithDefaultTopologyDef(); + } else { + _root = createClusterTreeWithCustomizedTopology(); + } + } + + public String getEndNodeType() { + return _endNodeType; + } + + public String getFaultZoneType() { + return _faultZoneType; + } + + public Node getRootNode() { + return _root; + } + + public List getFaultZones() { + if (_root != null) { + return _root.findChildren(getFaultZoneType()); + } + return Collections.emptyList(); + } + + /** + * Creates a tree representing the cluster structure using default cluster topology definition + * (i,e no topology definition given and no domain id set). + */ + private Node createClusterTreeWithDefaultTopologyDef() { + // root + Node root = new Node(); + root.setName("root"); + root.setId(computeId("root")); + root.setType(Types.ROOT.name()); + + for (String ins : _allInstances) { + InstanceConfig config = _instanceConfigMap.get(ins); + if (config == null) { + throw new HelixException(String.format("Config for instance %s is not found!", ins)); + } + String zone = config.getZoneId(); + if (zone == null) { + //TODO: we should allow non-rack cluster for back-compatible. This should be solved once + // we have the hierarchy style of domain id for instance. + throw new HelixException(String + .format("ZONE_ID for instance %s is not set, failed the topology-aware placement!", + ins)); + } + Map pathValueMap = new HashMap(); + pathValueMap.put(Types.ZONE.name(), zone); + pathValueMap.put(Types.INSTANCE.name(), ins); + + int weight = config.getWeight(); + if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) { + weight = DEFAULT_NODE_WEIGHT; + } + addEndNode(root, ins, pathValueMap, weight, _liveInstances); + } + + return root; + } + + /** + * Creates a tree representing the cluster structure using default cluster topology definition + * (i,e no topology definition given and no domain id set). + */ + private Node createClusterTreeWithCustomizedTopology() { + // root + Node root = new Node(); + root.setName("root"); + root.setId(computeId("root")); + root.setType(Types.ROOT.name()); + + for (String ins : _allInstances) { + InstanceConfig insConfig = _instanceConfigMap.get(ins); + if (insConfig == null) { + throw new HelixException(String.format("Config for instance %s is not found!", ins)); + } + String domain = insConfig.getDomain(); + if (domain == null) { + throw new HelixException(String + .format("Domain for instance %s is not set, failed the topology-aware placement!", + ins)); + } + + String[] pathPairs = domain.trim().split(","); + Map pathValueMap = new HashMap(); + for (String pair : pathPairs) { + String[] values = pair.trim().split("="); + if (values.length != 2 || values[0].isEmpty() || values[1].isEmpty()) { + throw new HelixException(String.format( + "Domain-Value pair %s for instance %s is not valid, failed the topology-aware placement!", + pair, ins)); + } + String type = values[0]; + String value = values[1]; + + if (!_types.contains(type)) { + logger.warn(String + .format("Path %s defined in domain of instance %s not recognized, ignored!", pair, + ins)); + continue; + } + pathValueMap.put(type, value); + } + + int weight = insConfig.getWeight(); + if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) { + weight = DEFAULT_NODE_WEIGHT; + } + + root = addEndNode(root, ins, pathValueMap, weight, _liveInstances); + } + + return root; + } + + + /** + * Add an end node to the tree, create all the paths to the leaf node if not present. + */ + private Node addEndNode(Node root, String instanceName, Map pathNameMap, + int instanceWeight, List liveInstances) { + Node current = root; + List pathNodes = new ArrayList(); + for (String path : _types) { + String pathValue = pathNameMap.get(path); + if (pathValue == null || pathValue.isEmpty()) { + pathValue = _defaultDomainPathValues.get(path); + } + pathNodes.add(current); + if (!current.hasChild(pathValue)) { + Node n = new Node(); + n.setName(pathValue); + n.setId(computeId(pathValue)); + n.setType(path); + n.setParent(current); + + // if it is leaf node. + if (path.equals(_endNodeType)) { + if (liveInstances.contains(instanceName)) { + // node is alive + n.setWeight(instanceWeight); + // add instance weight to all of its parent nodes. + for (Node node : pathNodes) { + node.addWeight(instanceWeight); + } + } else { + n.setFailed(true); + n.setWeight(0); + } + } + current.addChild(n); + } + current = current.getChild(pathValue); + } + return root; + } + + private long computeId(String name) { + byte[] h = _md.digest(name.getBytes()); + return bstrTo32bit(h); + } + + private long bstrTo32bit(byte[] bstr) { + if (bstr.length < 4) { + throw new IllegalArgumentException("hashed is less than 4 bytes!"); + } + // need to "simulate" unsigned int + return (long) (((ord(bstr[0]) << 24) | (ord(bstr[1]) << 16) | (ord(bstr[2]) << 8) | (ord( + bstr[3])))) & 0xffffffffL; + } + + private int ord(byte b) { + return b & 0xff; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index cb5bda8fa6..dacf98dd16 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -32,6 +32,7 @@ import org.apache.helix.HelixProperty; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ClusterConstraints; import org.apache.helix.model.ClusterConstraints.ConstraintType; import org.apache.helix.model.CurrentState; @@ -56,6 +57,7 @@ public class ClusterDataCache { private static final String IDEAL_STATE_RULE_PREFIX = "IdealStateRule!"; + private ClusterConfig _clusterConfig; Map _liveInstanceMap; Map _liveInstanceCacheMap; Map _idealStateMap; @@ -200,11 +202,11 @@ public synchronized boolean refresh(HelixDataAccessor accessor) { _currentStateMap = Collections.unmodifiableMap(allCurStateMap); _idealStateRuleMap = Maps.newHashMap(); - HelixProperty clusterConfig = accessor.getProperty(keyBuilder.clusterConfig()); - if (clusterConfig != null) { - for (String simpleKey : clusterConfig.getRecord().getSimpleFields().keySet()) { + _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig()); + if (_clusterConfig != null) { + for (String simpleKey : _clusterConfig.getRecord().getSimpleFields().keySet()) { if (simpleKey.startsWith(IDEAL_STATE_RULE_PREFIX)) { - String simpleValue = clusterConfig.getRecord().getSimpleField(simpleKey); + String simpleValue = _clusterConfig.getRecord().getSimpleField(simpleKey); String[] rules = simpleValue.split("(? singleRule = Maps.newHashMap(); for (String rule : rules) { @@ -232,6 +234,10 @@ public synchronized boolean refresh(HelixDataAccessor accessor) { return true; } + public ClusterConfig getClusterConfig() { + return _clusterConfig; + } + /** * Retrieves the idealstates for all resources * @return diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index e97ac9b44f..73f2cbbc74 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -52,6 +52,7 @@ import org.apache.helix.PropertyPathConfig; import org.apache.helix.PropertyType; import org.apache.helix.ZNRecord; +import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; import org.apache.helix.model.ClusterConstraints; import org.apache.helix.model.ClusterConstraints.ConstraintType; import org.apache.helix.model.ConstraintItem; @@ -606,6 +607,13 @@ public void addResource(String clusterName, String resourceName, int partitions, addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, 0); } + @Override + public void addResource(String clusterName, String resourceName, int partitions, + String stateModelRef, String rebalancerMode, String rebalanceStrategy) { + addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, + rebalanceStrategy, 0, -1); + } + @Override public void addResource(String clusterName, String resourceName, IdealState idealstate) { String stateModelRef = idealstate.getStateModelDefRef(); @@ -629,14 +637,21 @@ public void addResource(String clusterName, String resourceName, IdealState idea @Override public void addResource(String clusterName, String resourceName, int partitions, String stateModelRef, String rebalancerMode, int bucketSize) { - addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, bucketSize, - -1); + addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, bucketSize, -1); } @Override public void addResource(String clusterName, String resourceName, int partitions, String stateModelRef, String rebalancerMode, int bucketSize, int maxPartitionsPerInstance) { + addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, + RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY, bucketSize, maxPartitionsPerInstance); + } + + @Override + public void addResource(String clusterName, String resourceName, int partitions, + String stateModelRef, String rebalancerMode, String rebalanceStrategy, int bucketSize, + int maxPartitionsPerInstance) { if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) { throw new HelixException("cluster " + clusterName + " is not setup yet"); } @@ -647,6 +662,7 @@ public void addResource(String clusterName, String resourceName, int partitions, RebalanceMode mode = idealState.rebalanceModeFromString(rebalancerMode, RebalanceMode.SEMI_AUTO); idealState.setRebalanceMode(mode); + idealState.setRebalanceStrategy(rebalanceStrategy); idealState.setReplicas("" + 0); idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY); if (maxPartitionsPerInstance > 0 && maxPartitionsPerInstance < Integer.MAX_VALUE) { @@ -1014,8 +1030,7 @@ public void setConstraint(String clusterName, final ConstraintType constraintTyp @Override public ZNRecord update(ZNRecord currentData) { ClusterConstraints constraints = - currentData == null ? new ClusterConstraints(constraintType) : new ClusterConstraints( - currentData); + currentData == null ? new ClusterConstraints(constraintType) : new ClusterConstraints(currentData); constraints.addConstraintItem(constraintId, constraintItem); return constraints.getRecord(); @@ -1152,6 +1167,26 @@ public void removeInstanceTag(String clusterName, String instanceName, String ta accessor.setProperty(keyBuilder.instanceConfig(instanceName), config); } + @Override + public void setInstanceZoneId(String clusterName, String instanceName, String zoneId) { + if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) { + throw new HelixException("cluster " + clusterName + " is not setup yet"); + } + + if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) { + throw new HelixException("cluster " + clusterName + " instance " + instanceName + + " is not setup yet"); + } + HelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + Builder keyBuilder = accessor.keyBuilder(); + + PropertyKey configKey = keyBuilder.instanceConfig(instanceName); + InstanceConfig config = accessor.getProperty(configKey); + config.setZoneId(zoneId); + accessor.setProperty(configKey, config); + } + @Override public void close() { if (_zkClient != null) { diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java new file mode 100644 index 0000000000..25a16d1e51 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -0,0 +1,92 @@ +package org.apache.helix.model; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixProperty; +import org.apache.helix.ZNRecord; + +/** + * Cluster configurations + */ +public class ClusterConfig extends HelixProperty { + /** + * Configurable characteristics of a cluster + */ + public enum ClusterConfigProperty { + HELIX_DISABLE_PIPELINE_TRIGGERS, + TOPOLOGY, // cluster topology definition, for example, "/zone/rack/host/instance" + FAULT_ZONE_TYPE // the type in which isolation should be applied on when Helix places the replicas from same partition. + } + + /** + * Instantiate for a specific cluster + * + * @param cluster the cluster identifier + */ + public ClusterConfig(String cluster) { + super(cluster); + } + + /** + * Instantiate with a pre-populated record + * + * @param record a ZNRecord corresponding to a cluster configuration + */ + public ClusterConfig(ZNRecord record) { + super(record); + } + + /** + * Whether to persist best possible assignment in a resource's idealstate. + * + * @return + */ + public Boolean isPipelineTriggersDisabled() { + return _record + .getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ClusterConfig) { + ClusterConfig that = (ClusterConfig) obj; + + if (this.getId().equals(that.getId())) { + return true; + } + } + return false; + } + + @Override + public int hashCode() { + return getId().hashCode(); + } + + /** + * Get the name of this resource + * + * @return the instance name + */ + public String getClusterName() { + return _record.getId(); + } +} + diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java index 7c4cf546fe..55d4734d08 100644 --- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java +++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java @@ -167,7 +167,7 @@ public String getRebalancerClassName() { /** * Specify the strategy for Helix to use to compute the partition-instance assignment, - * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.strategy.RebalanceStrategy} + * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy} * * @param rebalanceStrategy * @return diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java index eb1c652f94..ecf290044d 100644 --- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java @@ -39,10 +39,14 @@ public class InstanceConfig extends HelixProperty { public enum InstanceConfigProperty { HELIX_HOST, HELIX_PORT, + HELIX_ZONE_ID, HELIX_ENABLED, HELIX_DISABLED_PARTITION, - TAG_LIST + TAG_LIST, + INSTANCE_WEIGHT, + DOMAIN } + public static final int WEIGHT_NOT_SET = -1; private static final Logger _logger = Logger.getLogger(InstanceConfig.class.getName()); @@ -94,6 +98,50 @@ public void setPort(String port) { _record.setSimpleField(InstanceConfigProperty.HELIX_PORT.toString(), port); } + public String getZoneId() { + return _record.getSimpleField(InstanceConfigProperty.HELIX_ZONE_ID.name()); + } + + public void setZoneId(String zoneId) { + _record.setSimpleField(InstanceConfigProperty.HELIX_ZONE_ID.name(), zoneId); + } + + /** + * Domain represents a hierarchy identifier for an instance. + * @return + */ + public String getDomain() { + return _record.getSimpleField(InstanceConfigProperty.DOMAIN.name()); + } + + /** + * Domain represents a hierarchy identifier for an instance. + * Example: "cluster=myCluster,zone=myZone1,rack=myRack,host=hostname,instance=instance001". + * @return + */ + public void setDomain(String domain) { + _record.setSimpleField(InstanceConfigProperty.DOMAIN.name(), domain); + } + + public int getWeight() { + String w = _record.getSimpleField(InstanceConfigProperty.INSTANCE_WEIGHT.name()); + if (w != null) { + try { + int weight = Integer.valueOf(w); + return weight; + } catch (NumberFormatException e) { + } + } + return WEIGHT_NOT_SET; + } + + public void setWeight(int weight) { + if (weight <= 0) { + throw new IllegalArgumentException("Instance weight can not be equal or less than 0!"); + } + _record.setSimpleField(InstanceConfigProperty.INSTANCE_WEIGHT.name(), String.valueOf(weight)); + } + /** * Get arbitrary tags associated with the instance * @return a list of tags diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java index 623357f052..ac96768afb 100644 --- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java @@ -32,8 +32,8 @@ import org.apache.helix.ZNRecord; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; -import org.apache.helix.controller.strategy.AutoRebalanceStrategy; -import org.apache.helix.controller.strategy.RebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Partition; @@ -126,7 +126,8 @@ public String apply(Integer partitionNum) { List allNodes = Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache)); Collections.sort(allNodes); - ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes); + ZNRecord record = + strategy.computePartitionAssignment(allNodes, allNodes, currentMapping, cache); Map> preferenceLists = record.getListFields(); // Convert to an assignment keyed on participant diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java index 9d411bbc58..08ccbdc8cc 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java @@ -361,6 +361,12 @@ public void addResourceToCluster(String clusterName, String resourceName, int nu _admin.addResource(clusterName, resourceName, numResources, stateModelRef, rebalancerMode); } + public void addResourceToCluster(String clusterName, String resourceName, int numResources, + String stateModelRef, String rebalancerMode, String rebalanceStrategy) { + _admin.addResource(clusterName, resourceName, numResources, stateModelRef, rebalancerMode, + rebalanceStrategy); + } + public void addResourceToCluster(String clusterName, String resourceName, int numResources, String stateModelRef, String rebalancerMode, int bucketSize) { _admin.addResource(clusterName, resourceName, numResources, stateModelRef, rebalancerMode, diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java similarity index 96% rename from helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java rename to helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java index adc92d6d05..a4e38a1cba 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java +++ b/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java @@ -1,4 +1,4 @@ -package org.apache.helix.controller.strategy; +package org.apache.helix.controller.Strategy; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -38,9 +38,10 @@ import org.apache.helix.PropertyKey.Builder; import org.apache.helix.ZNRecord; import org.apache.helix.controller.rebalancer.AutoRebalancer; +import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; import org.apache.helix.controller.stages.ClusterDataCache; -import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.tools.StateModelConfigGenerator; @@ -192,7 +193,7 @@ public void runRepeatedly(int numIterations) { RebalanceStrategy strategy = new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode); ZNRecord initialResult = - strategy.computePartitionAssignment(_liveNodes, _currentMapping, _allNodes); + strategy.computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); _currentMapping = getMapping(initialResult.getListFields()); logger.info(_currentMapping); getRunResult(_currentMapping, initialResult.getListFields()); @@ -498,7 +499,7 @@ public ZNRecord addSingleNode(String node) { _nonLiveSet.remove(node); return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode). - computePartitionAssignment(_liveNodes, _currentMapping, _allNodes); + computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); } /** @@ -532,7 +533,7 @@ public ZNRecord removeSingleNode(String node) { } return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode) - .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes); + .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); } /** @@ -558,7 +559,7 @@ public ZNRecord resurrectSingleNode(String node) { _liveSet.add(node); return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode) - .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes); + .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); } private T getRandomSetElement(Set source) { @@ -606,7 +607,7 @@ public void testOrphansNotPreferred() { AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT); ZNRecord znRecord = new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(liveNodes, currentMapping, allNodes); + .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); Map> preferenceLists = znRecord.getListFields(); for (String partition : currentMapping.keySet()) { // make sure these are all MASTER @@ -624,7 +625,7 @@ public void testOrphansNotPreferred() { } znRecord = new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(liveNodes, currentMapping, allNodes); + .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); preferenceLists = znRecord.getListFields(); for (String partition : currentMapping.keySet()) { List preferenceList = preferenceLists.get(partition); @@ -642,7 +643,7 @@ public void testOrphansNotPreferred() { } znRecord = new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(liveNodes, currentMapping, allNodes); + .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); preferenceLists = znRecord.getListFields(); Set firstNodes = Sets.newHashSet(); for (String partition : currentMapping.keySet()) { @@ -664,7 +665,7 @@ public void testOrphansNotPreferred() { currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER"); znRecord = new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(liveNodes, currentMapping, allNodes); + .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); preferenceLists = znRecord.getListFields(); boolean newNodeUsed = false; for (String partition : currentMapping.keySet()) { @@ -692,7 +693,7 @@ public void testOrphansNotPreferred() { currentMapping.get(PARTITIONS[2]).put(NODES[2], "SLAVE"); znRecord = new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(liveNodes, currentMapping, allNodes); + .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); preferenceLists = znRecord.getListFields(); firstNodes.clear(); Set secondNodes = Sets.newHashSet(); @@ -720,7 +721,7 @@ public void testOrphansNotPreferred() { currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER"); znRecord = new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(liveNodes, currentMapping, allNodes); + .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); preferenceLists = znRecord.getListFields(); for (String partition : currentMapping.keySet()) { List preferenceList = preferenceLists.get(partition); @@ -751,7 +752,7 @@ public void testOrphansNotPreferred() { currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER"); znRecord = new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(liveNodes, currentMapping, allNodes); + .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); preferenceLists = znRecord.getListFields(); firstNodes.clear(); for (String partition : currentMapping.keySet()) { diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java new file mode 100644 index 0000000000..5169edd54f --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java @@ -0,0 +1,172 @@ +package org.apache.helix.controller.Strategy; + +import org.apache.helix.HelixProperty; +import org.apache.helix.controller.rebalancer.topology.Node; +import org.apache.helix.controller.rebalancer.topology.Topology; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public class TestTopology { + private static Logger logger = Logger.getLogger(TestTopology.class); + + @Test + public void testCreateClusterTopology() { + ClusterConfig clusterConfig = new ClusterConfig("Test_Cluster"); + + String topology = "/Rack/Sub-Rack/Host/Instance"; + clusterConfig.getRecord().getSimpleFields() + .put(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name(), topology); + clusterConfig.getRecord().getSimpleFields() + .put(ClusterConfig.ClusterConfigProperty.FAULT_ZONE_TYPE.name(), "Sub-Rack"); + + List allNodes = new ArrayList(); + List liveNodes = new ArrayList(); + Map instanceConfigMap = new HashMap(); + + Map nodeToWeightMap = new HashMap(); + + for (int i = 0; i < 100; i++) { + String instance = "localhost_" + i; + InstanceConfig config = new InstanceConfig(instance); + String rack_id = "rack_" + i/25; + String sub_rack_id = "subrack-" + i/5; + + String domain = + String.format("Rack=%s, Sub-Rack=%s, Host=%s", rack_id, sub_rack_id, instance); + config.setDomain(domain); + config.setHostName(instance); + config.setPort("9000"); + allNodes.add(instance); + + int weight = 0; + if (i % 10 != 0) { + liveNodes.add(instance); + weight = 1000; + if (i % 3 == 0) { + // set random instance weight. + weight = (i+1) * 100; + config.setWeight(weight); + } + } + + instanceConfigMap.put(instance, config); + + if (!nodeToWeightMap.containsKey(rack_id)) { + nodeToWeightMap.put(rack_id, 0); + } + nodeToWeightMap.put(rack_id, nodeToWeightMap.get(rack_id) + weight); + if (!nodeToWeightMap.containsKey(sub_rack_id)) { + nodeToWeightMap.put(sub_rack_id, 0); + } + nodeToWeightMap.put(sub_rack_id, nodeToWeightMap.get(sub_rack_id) + weight); + } + + Topology topo = new Topology(allNodes, liveNodes, instanceConfigMap, clusterConfig); + + Assert.assertTrue(topo.getEndNodeType().equals("Instance")); + Assert.assertTrue(topo.getFaultZoneType().equals("Sub-Rack")); + + List faultZones = topo.getFaultZones(); + Assert.assertEquals(faultZones.size(), 20); + + Node root = topo.getRootNode(); + + Assert.assertEquals(root.getChildrenCount("Rack"), 4); + Assert.assertEquals(root.getChildrenCount("Sub-Rack"), 20); + Assert.assertEquals(root.getChildrenCount("Host"), 100); + Assert.assertEquals(root.getChildrenCount("Instance"), 100); + + + // validate weights. + for (Node rack : root.getChildren()) { + Assert.assertEquals(rack.getWeight(), (long)nodeToWeightMap.get(rack.getName())); + for (Node subRack : rack.getChildren()) { + Assert.assertEquals(subRack.getWeight(), (long)nodeToWeightMap.get(subRack.getName())); + } + } + } + + @Test + public void testCreateClusterTopologyWithDefaultTopology() { + ClusterConfig clusterConfig = new ClusterConfig("Test_Cluster"); + + List allNodes = new ArrayList(); + List liveNodes = new ArrayList(); + Map instanceConfigMap = new HashMap(); + + Map nodeToWeightMap = new HashMap(); + + for (int i = 0; i < 100; i++) { + String instance = "localhost_" + i; + InstanceConfig config = new InstanceConfig(instance); + String zoneId = "rack_" + i / 10; + config.setZoneId(zoneId); + config.setHostName(instance); + config.setPort("9000"); + allNodes.add(instance); + + int weight = 0; + if (i % 10 != 0) { + liveNodes.add(instance); + weight = 1000; + if (i % 3 == 0) { + // set random instance weight. + weight = (i + 1) * 100; + config.setWeight(weight); + } + } + + instanceConfigMap.put(instance, config); + + if (!nodeToWeightMap.containsKey(zoneId)) { + nodeToWeightMap.put(zoneId, 0); + } + nodeToWeightMap.put(zoneId, nodeToWeightMap.get(zoneId) + weight); + } + + Topology topo = new Topology(allNodes, liveNodes, instanceConfigMap, clusterConfig); + + Assert.assertTrue(topo.getEndNodeType().equals(Topology.Types.INSTANCE.name())); + Assert.assertTrue(topo.getFaultZoneType().equals(Topology.Types.ZONE.name())); + + List faultZones = topo.getFaultZones(); + Assert.assertEquals(faultZones.size(), 10); + + Node root = topo.getRootNode(); + + Assert.assertEquals(root.getChildrenCount(Topology.Types.ZONE.name()), 10); + Assert.assertEquals(root.getChildrenCount(topo.getEndNodeType()), 100); + + // validate weights. + for (Node rack : root.getChildren()) { + Assert.assertEquals(rack.getWeight(), (long) nodeToWeightMap.get(rack.getName())); + } + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java new file mode 100644 index 0000000000..5c347922f8 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java @@ -0,0 +1,221 @@ +package org.apache.helix.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class TestCrushAutoRebalance extends ZkIntegrationTestBase { + final int NUM_NODE = 6; + protected static final int START_PORT = 12918; + protected static final int _PARTITIONS = 20; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + protected ClusterControllerManager _controller; + + protected ClusterSetup _setupTool = null; + List _participants = new ArrayList(); + Map _nodeToZoneMap = new HashMap(); + Map _nodeToTagMap = new HashMap(); + List _nodes = new ArrayList(); + List _allDBs = new ArrayList(); + int _replica = 3; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + _setupTool = new ClusterSetup(_gZkClient); + _setupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + String zone = "zone-" + i % 3; + String tag = "tag-" + i % 2; + _setupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, storageNodeName, zone); + _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag); + _nodeToZoneMap.put(storageNodeName, zone); + _nodeToTagMap.put(storageNodeName, tag); + _nodes.add(storageNodeName); + } + + // start dummy participants + for (String node : _nodes) { + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node); + participant.syncStart(); + _participants.add(participant); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + } + + @DataProvider(name = "rebalanceStrategies") + public static String [][] rebalanceStrategies() { + return new String[][] { {"CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName()}}; + } + + @Test(dataProvider = "rebalanceStrategies") + public void testZoneIsolation(String rebalanceStrategyName, String rebalanceStrategyClass) + throws Exception { + System.out.println("Test " + rebalanceStrategyName); + List testDBs = new ArrayList(); + String[] testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(), + BuiltInStateModelDefinitions.MasterSlave.name(), + BuiltInStateModelDefinitions.LeaderStandby.name() + }; + int i = 0; + for (String stateModel : testModels) { + String db = "Test-DB-" + rebalanceStrategyName + "-" + i++; + _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, + RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + testDBs.add(db); + _allDBs.add(db); + } + Thread.sleep(300); + + boolean result = ClusterStateVerifier.verifyByZkCallback( + new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); + + for (String db : testDBs) { + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateZoneAndTagIsolation(is, ev); + } + } + + @Test(dataProvider = "rebalanceStrategies") + public void testZoneIsolationWithInstanceTag( + String rebalanceStrategyName, String rebalanceStrategyClass) throws Exception { + List testDBs = new ArrayList(); + Set tags = new HashSet(_nodeToTagMap.values()); + int i = 0; + for (String tag : tags) { + String db = "Test-DB-Tag-" + rebalanceStrategyName + "-" + i++; + _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, + BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO + "", + rebalanceStrategyClass); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + is.setInstanceGroupTag(tag); + _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + testDBs.add(db); + _allDBs.add(db); + } + Thread.sleep(300); + + boolean result = ClusterStateVerifier.verifyByZkCallback( + new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); + + for (String db : testDBs) { + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateZoneAndTagIsolation(is, ev); + } + } + + /** + * Validate instances for each partition is on different zone and with necessary tagged instances. + */ + private void validateZoneAndTagIsolation(IdealState is, ExternalView ev) { + int replica = Integer.valueOf(is.getReplicas()); + String tag = is.getInstanceGroupTag(); + + for (String partition : is.getPartitionSet()) { + Set assignedZones = new HashSet(); + + Set instancesInIs = new HashSet(is.getRecord().getListField(partition)); + Map assignmentMap = ev.getRecord().getMapField(partition); + Set instancesInEV = assignmentMap.keySet(); + Assert.assertEquals(instancesInEV, instancesInIs); + for (String instance : instancesInEV) { + assignedZones.add(_nodeToZoneMap.get(instance)); + if (tag != null) { + InstanceConfig config = + _setupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance); + Assert.assertTrue(config.containsTag(tag)); + } + } + Assert.assertEquals(assignedZones.size(), replica); + } + } + + @Test() + public void testAddZone() throws Exception { + //TODO + } + + @Test() + public void testAddNodes() throws Exception { + //TODO + } + + @Test() + public void testNodeFailure() throws Exception { + //TODO + } + + @AfterClass + public void afterClass() throws Exception { + /** + * shutdown order: 1) disconnect the controller 2) disconnect participants + */ + _controller.syncStop(); + for (MockParticipantManager participant : _participants) { + participant.syncStop(); + } + _setupTool.deleteCluster(CLUSTER_NAME); + System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java index 917be1741a..51dd19d006 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java @@ -31,6 +31,7 @@ import org.apache.helix.mock.participant.MockMSModelFactory; import org.apache.helix.mock.participant.MockSchemataModelFactory; import org.apache.helix.mock.participant.MockTransition; +import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.participant.StateMachineEngine; import org.apache.log4j.Logger; @@ -73,14 +74,17 @@ public void syncStart() { public void run() { try { StateMachineEngine stateMach = getStateMachineEngine(); - stateMach.registerStateModelFactory("MasterSlave", _msModelFactory); + stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.MasterSlave.name(), + _msModelFactory); DummyLeaderStandbyStateModelFactory lsModelFactory = new DummyLeaderStandbyStateModelFactory(10); DummyOnlineOfflineStateModelFactory ofModelFactory = new DummyOnlineOfflineStateModelFactory(10); - stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory); - stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory); + stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.LeaderStandby.name(), + lsModelFactory); + stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(), + ofModelFactory); MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory(); stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);