Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions helix-core/src/main/java/org/apache/helix/HelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ public interface HelixAdmin {
void addResource(String clusterName, String resourceName, int numPartitions,
String stateModelRef, String rebalancerMode);

/**
* Add a resource to a cluster
* @param clusterName
* @param resourceName
* @param numPartitions
* @param stateModelRef
* @param rebalancerMode
* @param rebalanceStrategy
*/
void addResource(String clusterName, String resourceName, int numPartitions,
String stateModelRef, String rebalancerMode, String rebalanceStrategy);

/**
* Add a resource to a cluster, using a bucket size > 1
* @param clusterName
Expand All @@ -138,6 +150,22 @@ void addResource(String clusterName, String resourceName, int numPartitions,
void addResource(String clusterName, String resourceName, int numPartitions,
String stateModelRef, String rebalancerMode, int bucketSize, int maxPartitionsPerInstance);


/**
* Add a resource to a cluster, using a bucket size > 1
* @param clusterName
* @param resourceName
* @param numPartitions
* @param stateModelRef
* @param rebalancerMode
* @param rebalanceStrategy
* @param bucketSize
* @param maxPartitionsPerInstance
*/
void addResource(String clusterName, String resourceName, int numPartitions,
String stateModelRef, String rebalancerMode, String rebalanceStrategy, int bucketSize,
int maxPartitionsPerInstance);

/**
* Add an instance to a cluster
* @param clusterName
Expand Down Expand Up @@ -411,6 +439,8 @@ void rebalance(String clusterName, String resourceName, int replica, String keyP
*/
void removeInstanceTag(String clusterName, String instanceName, String tag);

void setInstanceZoneId(String clusterName, String instanceName, String zoneId);

/**
* Release resources
*/
Expand Down
4 changes: 4 additions & 0 deletions helix-core/src/main/java/org/apache/helix/HelixConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ enum StateModelToken {
ANY_LIVEINSTANCE
}

/**
* Replaced by ClusterConfig.ClusterConfigProperty.
*/
@Deprecated
enum ClusterConfigType {
HELIX_DISABLE_PIPELINE_TRIGGERS,
DISABLE_FULL_AUTO // override all resources in the cluster to use SEMI-AUTO instead of FULL-AUTO
Expand Down
3 changes: 2 additions & 1 deletion helix-core/src/main/java/org/apache/helix/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import java.util.Arrays;

import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Error;
Expand Down Expand Up @@ -186,7 +187,7 @@ public PropertyKey clusterConfigs() {
* @return {@link PropertyKey}
*/
public PropertyKey clusterConfig() {
return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, HelixProperty.class,
return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, ClusterConfig.class,
_clusterName, ConfigScopeProperty.CLUSTER.toString(), _clusterName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,23 @@
import java.util.Map;
import java.util.Set;

import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.util.HelixUtil;
import org.apache.log4j.Logger;

/**
Expand All @@ -59,14 +60,14 @@
public class AutoRebalancer implements Rebalancer, MappingCalculator {
// These should be final, but are initialized in init rather than a constructor
private HelixManager _manager;
private AutoRebalanceStrategy _algorithm;
private RebalanceStrategy _rebalanceStrategy;

private static final Logger LOG = Logger.getLogger(AutoRebalancer.class);

@Override
public void init(HelixManager manager) {
this._manager = manager;
this._algorithm = null;
this._rebalanceStrategy = null;
}

@Override
Expand All @@ -78,10 +79,13 @@ public IdealState computeNewIdealState(String resourceName, IdealState currentId
Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
String replicas = currentIdealState.getReplicas();

LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
stateCountMap = stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas));
LinkedHashMap<String, Integer> stateCountMap =
stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas));
List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
allNodes.removeAll(clusterData.getDisabledInstances());
liveNodes.retainAll(allNodes);

Map<String, Map<String, String>> currentMapping =
currentMapping(currentStateOutput, resourceName, partitions, stateCountMap);

Expand Down Expand Up @@ -124,13 +128,33 @@ public IdealState computeNewIdealState(String resourceName, IdealState currentId

int maxPartition = currentIdealState.getMaxPartitionsPerInstance();

ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
placementScheme.init(_manager);
_algorithm =
new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition,
placementScheme);
ZNRecord newMapping =
_algorithm.computePartitionAssignment(liveNodes, currentMapping, allNodes);
String rebalanceStrategyName = currentIdealState.getRebalanceStrategy();
if (rebalanceStrategyName == null || rebalanceStrategyName
.equalsIgnoreCase(RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY)) {
_rebalanceStrategy =
new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition);
} else {
try {
_rebalanceStrategy = RebalanceStrategy.class
.cast(HelixUtil.loadClass(getClass(), rebalanceStrategyName).newInstance());
_rebalanceStrategy.init(resourceName, partitions, stateCountMap, maxPartition);
} catch (ClassNotFoundException ex) {
throw new HelixException(
"Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
ex);
} catch (InstantiationException ex) {
throw new HelixException(
"Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
ex);
} catch (IllegalAccessException ex) {
throw new HelixException(
"Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
ex);
}
}

ZNRecord newMapping = _rebalanceStrategy
.computePartitionAssignment(allNodes, liveNodes, currentMapping, clusterData);

if (LOG.isDebugEnabled()) {
LOG.debug("currentMapping: " + currentMapping);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,4 @@ public interface Rebalancer {
*/
IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
final CurrentStateOutput currentStateOutput, final ClusterDataCache clusterData);

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.helix.controller.strategy;
package org.apache.helix.controller.rebalancer.strategy;

/*
* Licensed to the Apache Software Foundation (ASF) under one
Expand Down Expand Up @@ -34,18 +34,18 @@

import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.log4j.Logger;

public class AutoRebalanceStrategy {

public class AutoRebalanceStrategy implements RebalanceStrategy {
private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);

private final String _resourceName;
private final List<String> _partitions;
private final LinkedHashMap<String, Integer> _states;
private final int _maximumPerNode;
private final ReplicaPlacementScheme _placementScheme;

private String _resourceName;
private List<String> _partitions;
private LinkedHashMap<String, Integer> _states;
private int _maximumPerNode;

private Map<String, Node> _nodeMap;
private List<Node> _liveNodesList;
private Map<Integer, String> _stateMap;
Expand All @@ -56,26 +56,28 @@ public class AutoRebalanceStrategy {
private Set<Replica> _orphaned;

public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
final LinkedHashMap<String, Integer> states, int maximumPerNode,
ReplicaPlacementScheme placementScheme) {
_resourceName = resourceName;
_partitions = partitions;
_states = states;
_maximumPerNode = maximumPerNode;
if (placementScheme != null) {
_placementScheme = placementScheme;
} else {
_placementScheme = new DefaultPlacementScheme();
}
final LinkedHashMap<String, Integer> states, int maximumPerNode) {
init(resourceName, partitions, states, maximumPerNode);
_placementScheme = new DefaultPlacementScheme();
}

public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
final LinkedHashMap<String, Integer> states) {
this(resourceName, partitions, states, Integer.MAX_VALUE, new DefaultPlacementScheme());
this(resourceName, partitions, states, Integer.MAX_VALUE);
}

@Override
public void init(String resourceName, final List<String> partitions,
final LinkedHashMap<String, Integer> states, int maximumPerNode) {
_resourceName = resourceName;
_partitions = partitions;
_states = states;
_maximumPerNode = maximumPerNode;
}

public ZNRecord computePartitionAssignment(final List<String> liveNodes,
final Map<String, Map<String, String>> currentMapping, final List<String> allNodes) {
@Override
public ZNRecord computePartitionAssignment(final List<String> allNodes, final List<String> liveNodes,
final Map<String, Map<String, String>> currentMapping, ClusterDataCache clusterData) {
int numReplicas = countStateReplicas();
ZNRecord znRecord = new ZNRecord(_resourceName);
if (liveNodes.size() == 0) {
Expand Down Expand Up @@ -546,7 +548,6 @@ private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes

/**
* Counts the total number of replicas given a state-count mapping
* @param states
* @return
*/
private int countStateReplicas() {
Expand Down
Loading