Skip to content

Commit

Permalink
Generalize topology domain to support graceful node swap.
Browse files Browse the repository at this point in the history
There was a constraint that topology domain has to contain the instance name as the final path value. This constraint prevent configuring flexilbe topology for a dynamic cluster. In case of a node swap, the topology has to be completely re-calculated.
This change decouple the domain from instance name. So even with some nodes swapped, admin can still configure with the same topology. This will ensure a stable resource partition assignment.
  • Loading branch information
Jiajun Wang authored and junkaixue committed Nov 14, 2018
1 parent 3cf2901 commit d5bf3ad
Show file tree
Hide file tree
Showing 14 changed files with 636 additions and 456 deletions.
Expand Up @@ -76,8 +76,8 @@ public IdealState computeNewIdealState(String resourceName,

LinkedHashMap<String, Integer> stateCountMap = stateModelDef
.getStateCountMap(liveInstance.size(), replicas);
List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
List<String> liveNodes = new ArrayList<>(liveInstance.keySet());
List<String> allNodes = new ArrayList<>(clusterData.getAllInstances());
allNodes.removeAll(clusterData.getDisabledInstances());
liveNodes.retainAll(allNodes);

Expand Down
Expand Up @@ -19,19 +19,29 @@
* under the License.
*/

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.apache.helix.HelixException;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealer;
import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealingAdjustmentAlgorithm;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealingAdjustmentAlgorithmV2;
import org.apache.helix.controller.rebalancer.strategy.crushMapping.ConsistentHashingAdjustmentAlgorithm;
import org.apache.helix.controller.rebalancer.topology.InstanceNode;
import org.apache.helix.controller.rebalancer.topology.Node;
import org.apache.helix.controller.rebalancer.topology.Topology;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ExecutionException;

/**
* Abstract class of Forced Even Assignment Patched Algorithm.
* This class contains common logic that re-calculate assignment based on a result calculated by the base algorithm.
Expand All @@ -45,9 +55,10 @@ public abstract class AbstractEvenDistributionRebalanceStrategy implements Rebal

protected abstract RebalanceStrategy getBaseRebalanceStrategy();

protected CardDealer getCardDealingAlgorithm(Topology topology) {
protected CardDealingAdjustmentAlgorithmV2 getCardDealingAlgorithm(Topology topology) {
// by default, minimize the movement when calculating for evenness.
return new CardDealingAdjustmentAlgorithm(topology, _replica);
return new CardDealingAdjustmentAlgorithmV2(topology, _replica,
CardDealingAdjustmentAlgorithmV2.Mode.MINIMIZE_MOVEMENT);
}

@Override
Expand Down Expand Up @@ -78,18 +89,19 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
.computePartitionAssignment(allNodes, allNodes, currentMapping, clusterData);
Map<String, List<String>> origPartitionMap = origAssignment.getListFields();

// For logging only
String eventId = clusterData.getEventId();

// Try to re-assign if the original map is not empty
if (!origPartitionMap.isEmpty()) {
// Transform current assignment to instance->partitions map, and get total partitions
Map<String, List<String>> nodeToPartitionMap = convertMap(origPartitionMap);

Map<String, List<String>> finalPartitionMap = null;

// Round 2: Rebalance mapping using card dealing algorithm. For ensuring evenness distribution.
Map<String, List<Node>> finalPartitionMap = null;
Topology allNodeTopo = new Topology(allNodes, allNodes, clusterData.getInstanceConfigMap(),
clusterData.getClusterConfig());
CardDealer cardDealer = getCardDealingAlgorithm(allNodeTopo);

// Transform current assignment to instance->partitions map, and get total partitions
Map<Node, List<String>> nodeToPartitionMap =
convertPartitionMap(origPartitionMap, allNodeTopo);
// Round 2: Rebalance mapping using card dealing algorithm. For ensuring evenness distribution.
CardDealingAdjustmentAlgorithmV2 cardDealer = getCardDealingAlgorithm(allNodeTopo);
if (cardDealer.computeMapping(nodeToPartitionMap, _resourceName.hashCode())) {
// Round 3: Reorder preference Lists to ensure participants' orders (so as the states) are uniform.
finalPartitionMap = shufflePreferenceList(nodeToPartitionMap);
Expand All @@ -100,20 +112,22 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
new ConsistentHashingAdjustmentAlgorithm(allNodeTopo, liveNodes);
if (hashPlacement.computeMapping(nodeToPartitionMap, _resourceName.hashCode())) {
// Since mapping is changed by hashPlacement, need to adjust nodes order.
Map<String, List<String>> adjustedPartitionMap = convertMap(nodeToPartitionMap);
Map<String, List<Node>> adjustedPartitionMap =
convertAssignment(nodeToPartitionMap);
for (String partition : adjustedPartitionMap.keySet()) {
List<String> preSelectedList = finalPartitionMap.get(partition);
Set<String> adjustedNodeList = new HashSet<>(adjustedPartitionMap.get(partition));
List<String> finalNodeList = adjustedPartitionMap.get(partition);
List<Node> preSelectedList = finalPartitionMap.get(partition);
Set<Node> adjustedNodeList =
new HashSet<>(adjustedPartitionMap.get(partition));
List<Node> finalNodeList = adjustedPartitionMap.get(partition);
int index = 0;
// 1. Add the ones in pre-selected node list first, in order
for (String node : preSelectedList) {
for (Node node : preSelectedList) {
if (adjustedNodeList.remove(node)) {
finalNodeList.set(index++, node);
}
}
// 2. Add the rest of nodes to the map
for (String node : adjustedNodeList) {
for (Node node : adjustedNodeList) {
finalNodeList.set(index++, node);
}
}
Expand All @@ -123,24 +137,40 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
finalPartitionMap = null;
}
} catch (ExecutionException e) {
_logger.error("Failed to perform consistent hashing partition assigner.", e);
LogUtil.logError(_logger, eventId,
"Failed to perform consistent hashing partition assigner.", e);
finalPartitionMap = null;
}
}
}

if (null != finalPartitionMap) {
ZNRecord result = new ZNRecord(_resourceName);
result.setListFields(finalPartitionMap);
Map<String, List<String>> resultPartitionMap = new HashMap<>();
for (String partitionName : finalPartitionMap.keySet()) {
List<String> instanceNames = new ArrayList<>();
for (Node node : finalPartitionMap.get(partitionName)) {
if (node instanceof InstanceNode) {
instanceNames.add(((InstanceNode) node).getInstanceName());
} else {
LogUtil.logError(_logger, eventId,
String.format("Selected node is not associated with an instance: %s", node));
}
}
resultPartitionMap.put(partitionName, instanceNames);
}
result.setListFields(resultPartitionMap);
return result;
}
}

// Force even is not possible, fallback to use default strategy
if (_logger.isDebugEnabled()) {
_logger.debug("Force even distribution is not possible, using the default strategy: "
+ getBaseRebalanceStrategy().getClass().getSimpleName());
LogUtil.logDebug(_logger, eventId,
"Force even distribution is not possible, using the default strategy: "
+ getBaseRebalanceStrategy().getClass().getSimpleName());
}

if (liveNodes.equals(allNodes)) {
return origAssignment;
} else {
Expand All @@ -151,51 +181,78 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
}

// Best effort to shuffle preference lists for all partitions for uniform distribution regarding the top state.
private Map<String, List<String>> shufflePreferenceList(
Map<String, List<String>> nodeToPartitionMap) {
final Map<String, List<String>> partitionMap = convertMap(nodeToPartitionMap);
private Map<String, List<Node>> shufflePreferenceList(
Map<Node, List<String>> nodeToPartitionMap) {
final Map<String, List<Node>> partitionMap = convertAssignment(nodeToPartitionMap);
// evaluate node's order according to:
// 1. their potential top state replicas count (less count, higher priority)
// 2. their assigned top state replicas (less top state replica, higher priority)
final Map<String, Integer> nodeScores = new HashMap<>();
for (String node : nodeToPartitionMap.keySet()) {
final Map<Node, Integer> nodeScores = new HashMap<>();
for (Node node : nodeToPartitionMap.keySet()) {
// Init with the potential replicas count
nodeScores.put(node, nodeToPartitionMap.get(node).size());
}
for (final String partition : partitionMap.keySet()) {
List<String> nodes = partitionMap.get(partition);
List<Node> nodes = partitionMap.get(partition);
// order according to score
Collections.sort(nodes, new Comparator<String>() {
Collections.sort(nodes, new Comparator<Node>() {
@Override
public int compare(String o1, String o2) {
public int compare(Node o1, Node o2) {
int o1Score = nodeScores.get(o1);
int o2Score = nodeScores.get(o2);
if (o1Score == o2Score) {
return new Integer((partition + o1).hashCode()).compareTo((partition + o2).hashCode());
return new Integer((partition + o1.getName()).hashCode())
.compareTo((partition + o2.getName()).hashCode());
} else {
return o1Score - o2Score;
}
}
});
// After assignment, the nodes has less potential top states
for (int i = 0; i < nodes.size(); i++) {
String nodeName = nodes.get(i);
nodeScores.put(nodeName,
nodeScores.get(nodeName) - 1 + (i == 0 ? (int) Math.pow(_replica, 2) : 0));
Node node = nodes.get(i);
nodeScores.put(node, nodeScores.get(node) - 1 + (i == 0 ? (int) Math.pow(_replica, 2) : 0));
}
}
return partitionMap;
}

// Convert the map from <key, list of values> to a new map <original value, list of related keys>
private Map<String, List<String>> convertMap(Map<String, List<String>> originalMap) {
Map<String, List<String>> resultMap = new HashMap<>();
for (String originalKey : originalMap.keySet()) {
for (String originalValue : originalMap.get(originalKey)) {
if (!resultMap.containsKey(originalValue)) {
resultMap.put(originalValue, new ArrayList<String>());
private Map<String, List<Node>> convertAssignment(
Map<Node, List<String>> assignment) {
Map<String, List<Node>> resultMap = new HashMap<>();
for (Node instance : assignment.keySet()) {
for (String partitionName : assignment.get(instance)) {
if (!resultMap.containsKey(partitionName)) {
resultMap.put(partitionName, new ArrayList<Node>());
}
resultMap.get(partitionName).add(instance);
}
}
return resultMap;
}

// Convert the map from <Partition Name, List<instance names>> to a new map <InstanceNode, List<Partition Name>>
private Map<Node, List<String>> convertPartitionMap(Map<String, List<String>> originalMap,
Topology topology) {
Map<Node, List<String>> resultMap = new HashMap<>();
Map<String, Node> instanceMap = new HashMap<>();
for (Node node : Topology.getAllLeafNodes(topology.getRootNode())) {
if (node instanceof InstanceNode) {
InstanceNode insNode = (InstanceNode) node;
instanceMap.put(insNode.getInstanceName(), insNode);
}
}

for (String partition : originalMap.keySet()) {
for (String instanceName : originalMap.get(partition)) {
Node insNode = instanceMap.get(instanceName);
if (insNode != null) {
if (!resultMap.containsKey(insNode)) {
resultMap.put(insNode, new ArrayList<String>());
}
resultMap.get(insNode).add(partition);
}
resultMap.get(originalValue).add(originalKey);
}
}
return resultMap;
Expand Down
Expand Up @@ -19,15 +19,23 @@
* under the License.
*/

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

import org.apache.helix.HelixException;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceHardConstraint;
import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceSoftConstraint;
import org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider;
import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.common.ResourcesStateMap;
import org.apache.helix.controller.rebalancer.constraint.PartitionWeightAwareEvennessConstraint;
import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealer;
import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealingAdjustmentAlgorithmV2;
import org.apache.helix.controller.rebalancer.topology.Topology;
import org.apache.helix.controller.stages.ClusterDataCache;
Expand All @@ -36,8 +44,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

/**
* Constraints based rebalance strategy.
* Assignment is calculated according to the specified constraints.
Expand Down Expand Up @@ -108,7 +114,7 @@ public int getParticipantUsage(String participant) {
_softConstraints.add(defaultConstraint);
}

protected CardDealer getCardDealingAlgorithm(Topology topology) {
protected CardDealingAdjustmentAlgorithmV2 getCardDealingAlgorithm(Topology topology) {
// For constraint based strategy, need more fine-grained assignment for each partition.
// So evenness is more important.
return new CardDealingAdjustmentAlgorithmV2(topology, _replica,
Expand Down Expand Up @@ -156,7 +162,7 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
List<String> candidates = new ArrayList<>(allNodes);
// Only calculate for configured nodes.
// Remove all non-configured nodes.
candidates.retainAll(clusterData.getInstanceConfigMap().keySet());
candidates.retainAll(clusterData.getAllInstances());

// For generating the IdealState ZNRecord
Map<String, List<String>> preferenceList = new HashMap<>();
Expand All @@ -168,11 +174,11 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
// check for the preferred assignment
partitionMapping = validateStateMap(partitionMapping);
if (partitionMapping != null) {
_logger.debug(
LogUtil.logDebug(_logger, clusterData.getEventId(),
"The provided preferred partition assignment meets state model requirements. Skip rebalance.");
preferenceList.put(partition, new ArrayList<>(partitionMapping.keySet()));
idealStateMap.put(partition, partitionMapping);
updateConstraints(partition, partitionMapping);
updateConstraints(partition, partitionMapping, clusterData.getEventId());
continue;
}
} // else, recalculate the assignment
Expand All @@ -195,7 +201,7 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
idealStateMap.put(partition, stateMap);
preferenceList.put(partition, assignment);
// Note, only update with the new pending assignment
updateConstraints(partition, stateMap);
updateConstraints(partition, stateMap, clusterData.getEventId());
}

// recover the original weight
Expand Down Expand Up @@ -303,15 +309,18 @@ private List<String> computeSinglePartitionAssignment(String partitionName,
return partitionAssignment.getListFields().get(partitionName);
}

private void updateConstraints(String partition, Map<String, String> pendingAssignment) {
private void updateConstraints(String partition, Map<String, String> pendingAssignment,
String eventId) {
if (pendingAssignment.isEmpty()) {
_logger.warn("No pending assignment needs to update. Skip constraint update.");
LogUtil.logWarn(_logger, eventId,
"No pending assignment needs to update. Skip constraint update.");
return;
}

ResourcesStateMap tempStateMap = new ResourcesStateMap();
tempStateMap.setState(_resourceName, new Partition(partition), pendingAssignment);
_logger.debug("Update constraints with pending assignment: " + tempStateMap.toString());
LogUtil.logDebug(_logger, eventId,
"Update constraints with pending assignment: " + tempStateMap.toString());

for (AbstractRebalanceHardConstraint hardConstraint : _hardConstraints) {
hardConstraint.updateAssignment(tempStateMap);
Expand Down

0 comments on commit d5bf3ad

Please sign in to comment.