Skip to content

Commit

Permalink
More strict partition weight validation while creating the cluster mo…
Browse files Browse the repository at this point in the history
…del. (#511)

1. If any capacity key is not configured in the Resource Config (or default weight) as the partition weight, the config is invalid.
2. If any partition weight is configured with a negative number, the config is invalid.
Note that the rebalancer will not compute a new assignment if any capacity/weight config is invalid.
  • Loading branch information
jiajunwang authored and Jiajun Wang committed Jan 6, 2020
1 parent 511ff13 commit f1b6e7b
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -349,19 +349,23 @@ private void updateCapacityAndUtilization(String capacityKey, int usage) {
*/
private Map<String, Integer> fetchInstanceCapacity(ClusterConfig clusterConfig,
InstanceConfig instanceConfig) {
// Fetch the capacity of instance from 2 possible sources according to the following priority.
// 1. The instance capacity that is configured in the instance config.
// 2. If the default instance capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
Map<String, Integer> instanceCapacity =
new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
instanceCapacity.putAll(instanceConfig.getInstanceCapacityMap());

List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
Map<String, Integer> instanceCapacity = instanceConfig.getInstanceCapacityMap();
if (instanceCapacity.isEmpty()) {
instanceCapacity = clusterConfig.getDefaultInstanceCapacityMap();
}
// Remove all the non-required capacity items from the map.
instanceCapacity.keySet().retainAll(requiredCapacityKeys);
// All the required keys must exist in the instance config.
if (!instanceCapacity.keySet().containsAll(requiredCapacityKeys)) {
throw new HelixException(String.format(
"The required capacity keys %s are not fully configured in the instance %s capacity map %s.",
"The required capacity keys: %s are not fully configured in the instance: %s, capacity map: %s.",
requiredCapacityKeys.toString(), _instanceName, instanceCapacity.toString()));
}
// Remove all the non-required capacity items from the map.
instanceCapacity.keySet().retainAll(requiredCapacityKeys);

return instanceCapacity;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;

import org.apache.helix.HelixException;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
Expand Down Expand Up @@ -149,25 +150,25 @@ private Map<String, Integer> fetchCapacityUsage(String partitionName,
.getResourceName(), ex);
}

Map<String, Integer> partitionCapacity = capacityMap.get(partitionName);
if (partitionCapacity == null) {
partitionCapacity = capacityMap.get(ResourceConfig.DEFAULT_PARTITION_KEY);
}
if (partitionCapacity == null) {
LOG.warn("The capacity usage of the specified partition {} is not configured in the Resource"
+ " Config {}. No default partition capacity is configured either. Will proceed with"
+ " empty capacity configuration.", partitionName, resourceConfig.getResourceName());
partitionCapacity = new HashMap<>();
}
// Fetch the capacity of partition from 3 possible sources according to the following priority.
// 1. The partition capacity that is explicitly configured in the resource config.
// 2. Or, the default partition capacity that is configured under partition name DEFAULT_PARTITION_KEY in the resource config.
// 3. If the default partition capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
Map<String, Integer> partitionCapacity =
new HashMap<>(clusterConfig.getDefaultPartitionWeightMap());
partitionCapacity.putAll(capacityMap.getOrDefault(partitionName,
capacityMap.getOrDefault(ResourceConfig.DEFAULT_PARTITION_KEY, new HashMap<>())));

List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
// If any required capacity key is not configured in the resource config, fail the model creating.
if (!partitionCapacity.keySet().containsAll(requiredCapacityKeys)) {
throw new HelixException(String.format(
"The required capacity keys: %s are not fully configured in the resource: %s, partition: %s, weight map: %s.",
requiredCapacityKeys.toString(), resourceConfig.getResourceName(), partitionName,
partitionCapacity.toString()));
}
// Remove the non-required capacity items.
partitionCapacity.keySet().retainAll(requiredCapacityKeys);
// If any required capacity key is not configured in the resource config, fill the partition
// capacity map with 0 usage.
for (String capacityKey : requiredCapacityKeys) {
partitionCapacity.putIfAbsent(capacityKey, 0);
}

return partitionCapacity;
}
Expand Down
73 changes: 52 additions & 21 deletions helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public enum ClusterConfigProperty {
INSTANCE_CAPACITY_KEYS,
// The default instance capacity if no capacity is configured in the Instance Config node.
DEFAULT_INSTANCE_CAPACITY_MAP,
// The default partition weights if no weight is configured in the Resource Config node.
DEFAULT_PARTITION_WEIGHT_MAP,
// The preference of the rebalance result.
// EVENNESS - Evenness of the resource utilization, partition, and top state distribution.
// LESS_MOVEMENT - the tendency of keeping the current assignment instead of moving the partition for optimal assignment.
Expand Down Expand Up @@ -704,49 +706,78 @@ public List<String> getInstanceCapacityKeys() {

/**
* Get the default instance capacity information from the map fields.
*
* @return data map if it exists, or empty map
*/
public Map<String, Integer> getDefaultInstanceCapacityMap() {
Map<String, String> capacityData =
_record.getMapField(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name());

if (capacityData != null) {
return capacityData.entrySet().stream().collect(
Collectors.toMap(entry -> entry.getKey(), entry -> Integer.parseInt(entry.getValue())));
}
return Collections.emptyMap();
return getDefaultCapacityMap(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP);
}

/**
* Set the default instance capacity information with an Integer mapping.
* This information is required by the global rebalancer.
* @see <a href="Rebalance Algorithm">
* https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer#rebalance-algorithm-adapter
* </a>
* If the instance capacity is not configured in either Instance Config nor Cluster Config, the
* cluster topology is considered invalid. So the rebalancer may stop working.
* @param capacityDataMap - map of instance capacity data
* @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
*/
public void setDefaultInstanceCapacityMap(Map<String, Integer> capacityDataMap)
throws IllegalArgumentException {
setDefaultCapacityMap(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP, capacityDataMap);
}

/**
* Get the default partition weight information from the map fields.
*
* @return data map if it exists, or empty map
*/
public Map<String, Integer> getDefaultPartitionWeightMap() {
return getDefaultCapacityMap(ClusterConfigProperty.DEFAULT_PARTITION_WEIGHT_MAP);
}

/**
* Set the default partition weight information with an Integer mapping.
* This information is required by the global rebalancer.
* @see <a href="Rebalance Algorithm">
* https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer#rebalance-algorithm-adapter
* </a>
* If the instance capacity is not configured in neither Instance Config nor Cluster Config, the
* https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer#rebalance-algorithm-adapter
* </a>
* If the partition weight is not configured in either Resource Config nor Cluster Config, the
* cluster topology is considered invalid. So the rebalancer may stop working.
* @param weightDataMap - map of partition weight data
* @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
*/
public void setDefaultInstanceCapacityMap(Map<String, Integer> capacityDataMap)
public void setDefaultPartitionWeightMap(Map<String, Integer> weightDataMap)
throws IllegalArgumentException {
if (capacityDataMap == null || capacityDataMap.size() == 0) {
throw new IllegalArgumentException("Default Instance Capacity Data is empty");
}
setDefaultCapacityMap(ClusterConfigProperty.DEFAULT_PARTITION_WEIGHT_MAP, weightDataMap);
}

Map<String, String> capacityData = new HashMap<>();
private Map<String, Integer> getDefaultCapacityMap(ClusterConfigProperty capacityPropertyType) {
Map<String, String> capacityData = _record.getMapField(capacityPropertyType.name());
if (capacityData != null) {
return capacityData.entrySet().stream().collect(
Collectors.toMap(entry -> entry.getKey(), entry -> Integer.parseInt(entry.getValue())));
}
return Collections.emptyMap();
}

public void setDefaultCapacityMap(ClusterConfigProperty capacityPropertyType,
Map<String, Integer> capacityDataMap) throws IllegalArgumentException {
if (capacityDataMap == null) {
throw new IllegalArgumentException("Default capacity data is null");
}
Map<String, String> data = new HashMap<>();
capacityDataMap.entrySet().stream().forEach(entry -> {
if (entry.getValue() < 0) {
throw new IllegalArgumentException(String
.format("Default Instance Capacity Data contains a negative value: %s = %d",
entry.getKey(), entry.getValue()));
.format("Default capacity data contains a negative value: %s = %d", entry.getKey(),
entry.getValue()));
}
capacityData.put(entry.getKey(), Integer.toString(entry.getValue()));
data.put(entry.getKey(), Integer.toString(entry.getValue()));
});

_record.setMapField(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name(), capacityData);
_record.setMapField(capacityPropertyType.name(), data);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ public Map<String, Integer> getInstanceCapacityMap() {
/**
* Set the instance capacity information with an Integer mapping.
* @param capacityDataMap - map of instance capacity data
* @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
* @throws IllegalArgumentException - when any of the data value is a negative number or when the map is incomplete
*
* This information is required by the global rebalancer.
* @see <a href="Rebalance Algorithm">
Expand All @@ -536,8 +536,8 @@ public Map<String, Integer> getInstanceCapacityMap() {
*/
public void setInstanceCapacityMap(Map<String, Integer> capacityDataMap)
throws IllegalArgumentException {
if (capacityDataMap == null || capacityDataMap.size() == 0) {
throw new IllegalArgumentException("Capacity Data is empty");
if (capacityDataMap == null) {
throw new IllegalArgumentException("Capacity Data is null");
}

Map<String, String> capacityData = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,13 @@ public Map<String, Map<String, Integer>> getPartitionCapacityMap() throws IOExce
* Set the partition capacity information with a map <PartitionName or DEFAULT_PARTITION_KEY, <Capacity Key, Capacity Number>>
*
* @param partitionCapacityMap - map of partition capacity data
* @throws IllegalArgumentException - when any of the data value is a negative number or map is empty
* @throws IllegalArgumentException - when any of the data value is a negative number or map is incomplete
* @throws IOException - when JSON parsing fails
*/
public void setPartitionCapacityMap(Map<String, Map<String, Integer>> partitionCapacityMap)
throws IllegalArgumentException, IOException {
if (partitionCapacityMap == null || partitionCapacityMap.isEmpty()) {
throw new IllegalArgumentException("Capacity Map is empty");
if (partitionCapacityMap == null) {
throw new IllegalArgumentException("Capacity Map is null");
}
if (!partitionCapacityMap.containsKey(DEFAULT_PARTITION_KEY)) {
throw new IllegalArgumentException(String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
Expand Down Expand Up @@ -103,6 +105,8 @@ protected ResourceControllerDataProvider setupClusterDataCache() throws IOExcept
testClusterConfig.setMaxPartitionsPerInstance(5);
testClusterConfig.setDisabledInstances(Collections.emptyMap());
testClusterConfig.setInstanceCapacityKeys(new ArrayList<>(_capacityDataMap.keySet()));
testClusterConfig.setDefaultPartitionWeightMap(
_capacityDataMap.keySet().stream().collect(Collectors.toMap(key -> key, key -> 0)));
testClusterConfig.setTopologyAwareEnabled(true);
when(testCache.getClusterConfig()).thenReturn(testClusterConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public void testDefaultInstanceCapacity() {
Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
}

@Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "The required capacity keys \\[item2, item1, item3, AdditionalCapacityKey\\] are not fully configured in the instance testInstanceId capacity map \\{item2=40, item1=20, item3=30\\}.")
@Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "The required capacity keys: \\[item2, item1, item3, AdditionalCapacityKey\\] are not fully configured in the instance: testInstanceId, capacity map: \\{item2=40, item1=20, item3=30\\}.")
public void testIncompleteInstanceCapacity() {
ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
List<String> requiredCapacityKeys = new ArrayList<>(_capacityDataMap.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;

import org.apache.helix.HelixException;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
Expand Down Expand Up @@ -102,6 +103,29 @@ public void testConstructReplicaWithResourceConfig() throws IOException {
Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), maxPartition);
}

/**
* Tests that if default partition weight map is configured in ClusterConfig and NOT in
* ResourceConfig. AssignableReplica actually will get the default weight from ClusterConfig
* even though it's not set in ResourceConfig.
*/
@Test
public void testDefaultPartitionWeight() {
Map<String, Integer> defaultWeightDataMapResource = new HashMap<>();
defaultWeightDataMapResource.put("item1", 3);
defaultWeightDataMapResource.put("item2", 6);
ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
testClusterConfig
.setInstanceCapacityKeys(new ArrayList<>(defaultWeightDataMapResource.keySet()));
testClusterConfig.setDefaultPartitionWeightMap(defaultWeightDataMapResource);

ResourceConfig testResourceConfigResource = new ResourceConfig(resourceName);
AssignableReplica replica = new AssignableReplica(testClusterConfig, testResourceConfigResource,
partitionNamePrefix + 1, masterState, masterPriority);

Assert.assertEquals(replica.getCapacity().size(), defaultWeightDataMapResource.size());
Assert.assertEquals(replica.getCapacity(), defaultWeightDataMapResource);
}

@Test
public void testIncompletePartitionWeightConfig() throws IOException {
// Init assignable replica with a basic config object
Expand All @@ -120,6 +144,20 @@ public void testIncompletePartitionWeightConfig() throws IOException {
requiredCapacityKeys.add(newCapacityKey);
testClusterConfig.setInstanceCapacityKeys(requiredCapacityKeys);

try {
new AssignableReplica(testClusterConfig, testResourceConfigResource,
partitionNamePrefix + 1, masterState, masterPriority);
Assert.fail("Creating new replica should fail because of incomplete partition weight.");
} catch (HelixException ex) {
// expected
}

Map<String, Integer> defaultCapacityDataMap = new HashMap<>();
for (String key : requiredCapacityKeys) {
defaultCapacityDataMap.put(key, 0);
}
testClusterConfig.setDefaultPartitionWeightMap(defaultCapacityDataMap);

AssignableReplica replica = new AssignableReplica(testClusterConfig, testResourceConfigResource,
partitionNamePrefix + 1, masterState, masterPriority);
Assert.assertTrue(replica.getCapacity().keySet().containsAll(requiredCapacityKeys));
Expand Down
Loading

0 comments on commit f1b6e7b

Please sign in to comment.