Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More strict partition weight validation while creating the cluster model. #511

Merged
merged 5 commits into from
Oct 18, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -349,17 +349,20 @@ private void updateCapacityAndUtilization(String capacityKey, int usage) {
*/
private Map<String, Integer> fetchInstanceCapacity(ClusterConfig clusterConfig,
InstanceConfig instanceConfig) {
// Fetch the capacity of instance from 2 possible sources according to the following priority.
// 1. The instance capacity that is configured in the instance config.
// 2. If the default instance capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
Map<String, Integer> instanceCapacity =
new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
instanceCapacity.putAll(instanceConfig.getInstanceCapacityMap());

List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
Map<String, Integer> instanceCapacity = instanceConfig.getInstanceCapacityMap();
if (instanceCapacity.isEmpty()) {
instanceCapacity = clusterConfig.getDefaultInstanceCapacityMap();
}
// Remove all the non-required capacity items from the map.
instanceCapacity.keySet().retainAll(requiredCapacityKeys);
// All the required keys must exist in the instance config.
if (!instanceCapacity.keySet().containsAll(requiredCapacityKeys)) {
throw new HelixException(String.format(
"The required capacity keys %s are not fully configured in the instance %s capacity map %s.",
"The required capacity keys: %s are not fully configured in the instance: %s, capacity map: %s.",
requiredCapacityKeys.toString(), _instanceName, instanceCapacity.toString()));
}
return instanceCapacity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;

import org.apache.helix.HelixException;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
Expand Down Expand Up @@ -149,24 +150,24 @@ private Map<String, Integer> fetchCapacityUsage(String partitionName,
.getResourceName(), ex);
}

Map<String, Integer> partitionCapacity = capacityMap.get(partitionName);
if (partitionCapacity == null) {
partitionCapacity = capacityMap.get(ResourceConfig.DEFAULT_PARTITION_KEY);
}
if (partitionCapacity == null) {
LOG.warn("The capacity usage of the specified partition {} is not configured in the Resource"
+ " Config {}. No default partition capacity is configured either. Will proceed with"
+ " empty capacity configuration.", partitionName, resourceConfig.getResourceName());
partitionCapacity = new HashMap<>();
}
// Fetch the capacity of partition from 3 possible sources according to the following priority.
// 1. The partition capacity that is explicitly configured in the resource config.
// 2. Or, the default partition capacity that is configured under partition name DEFAULT_PARTITION_KEY in the resource config.
// 3. If the default partition capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
Map<String, Integer> partitionCapacity =
new HashMap<>(clusterConfig.getDefaultPartitionWeightMap());
partitionCapacity.putAll(capacityMap.getOrDefault(partitionName,
capacityMap.getOrDefault(ResourceConfig.DEFAULT_PARTITION_KEY, new HashMap<>())));

List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
// Remove the non-required capacity items.
partitionCapacity.keySet().retainAll(requiredCapacityKeys);
// If any required capacity key is not configured in the resource config, fill the partition
// capacity map with 0 usage.
for (String capacityKey : requiredCapacityKeys) {
partitionCapacity.putIfAbsent(capacityKey, 0);
// If any required capacity key is not configured in the resource config, fail the model creating.
if (!partitionCapacity.keySet().containsAll(requiredCapacityKeys)) {
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
throw new HelixException(String.format(
"The required capacity keys: %s are not fully configured in the resource: %s, partition: %s, weight map: %s.",
requiredCapacityKeys.toString(), resourceConfig.getResourceName(), partitionName,
partitionCapacity.toString()));
}

return partitionCapacity;
Expand Down
88 changes: 59 additions & 29 deletions helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
* under the License.
*/

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.helix.HelixException;
Expand All @@ -28,14 +35,6 @@
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.api.config.StateTransitionTimeoutConfig;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Cluster configurations
*/
Expand Down Expand Up @@ -90,6 +89,8 @@ public enum ClusterConfigProperty {
INSTANCE_CAPACITY_KEYS,
// The default instance capacity if no capacity is configured in the Instance Config node.
DEFAULT_INSTANCE_CAPACITY_MAP,
// The default partition weights if no weight is configured in the Resource Config node.
DEFAULT_PARTITION_WEIGHT_MAP,
// The preference of the rebalance result.
// EVENNESS - Evenness of the resource utilization, partition, and top state distribution.
// LESS_MOVEMENT - the tendency of keeping the current assignment instead of moving the partition for optimal assignment.
Expand Down Expand Up @@ -705,49 +706,78 @@ public List<String> getInstanceCapacityKeys() {

/**
* Get the default instance capacity information from the map fields.
*
* @return data map if it exists, or empty map
*/
public Map<String, Integer> getDefaultInstanceCapacityMap() {
Map<String, String> capacityData =
_record.getMapField(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name());

if (capacityData != null) {
return capacityData.entrySet().stream().collect(
Collectors.toMap(entry -> entry.getKey(), entry -> Integer.parseInt(entry.getValue())));
}
return Collections.emptyMap();
return getDefaultCapacityMap(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP);
}

/**
* Set the default instance capacity information with an Integer mapping.
* This information is required by the global rebalancer.
* @see <a href="Rebalance Algorithm">
* https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer#rebalance-algorithm-adapter
* </a>
* If the instance capacity is not configured in either Instance Config nor Cluster Config, the
* cluster topology is considered invalid. So the rebalancer may stop working.
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
* @param capacityDataMap - map of instance capacity data
* @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
*/
public void setDefaultInstanceCapacityMap(Map<String, Integer> capacityDataMap)
throws IllegalArgumentException {
setDefaultCapacityMap(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP, capacityDataMap);
}

/**
* Get the default partition weight information from the map fields.
*
* @return data map if it exists, or empty map
*/
public Map<String, Integer> getDefaultPartitionWeightMap() {
return getDefaultCapacityMap(ClusterConfigProperty.DEFAULT_PARTITION_WEIGHT_MAP);
}

/**
* Set the default partition weight information with an Integer mapping.
* This information is required by the global rebalancer.
* @see <a href="Rebalance Algorithm">
* https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer#rebalance-algorithm-adapter
* </a>
* If the instance capacity is not configured in neither Instance Config nor Cluster Config, the
* https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer#rebalance-algorithm-adapter
* </a>
* If the partition weight is not configured in either Resource Config nor Cluster Config, the
* cluster topology is considered invalid. So the rebalancer may stop working.
* @param weightDataMap - map of partition weight data
* @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
*/
public void setDefaultInstanceCapacityMap(Map<String, Integer> capacityDataMap)
public void setDefaultPartitionWeightMap(Map<String, Integer> weightDataMap)
throws IllegalArgumentException {
if (capacityDataMap == null || capacityDataMap.size() == 0) {
throw new IllegalArgumentException("Default Instance Capacity Data is empty");
}
setDefaultCapacityMap(ClusterConfigProperty.DEFAULT_PARTITION_WEIGHT_MAP, weightDataMap);
}

Map<String, String> capacityData = new HashMap<>();
private Map<String, Integer> getDefaultCapacityMap(ClusterConfigProperty capacityPropertyType) {
Map<String, String> capacityData = _record.getMapField(capacityPropertyType.name());
if (capacityData != null) {
return capacityData.entrySet().stream().collect(
Collectors.toMap(entry -> entry.getKey(), entry -> Integer.parseInt(entry.getValue())));
}
return Collections.emptyMap();
}

public void setDefaultCapacityMap(ClusterConfigProperty capacityPropertyType,
Map<String, Integer> capacityDataMap) throws IllegalArgumentException {
if (capacityDataMap == null) {
throw new IllegalArgumentException("Default capacity data is null");
}
Map<String, String> data = new HashMap<>();
capacityDataMap.entrySet().stream().forEach(entry -> {
if (entry.getValue() < 0) {
throw new IllegalArgumentException(String
.format("Default Instance Capacity Data contains a negative value: %s = %d",
entry.getKey(), entry.getValue()));
.format("Default capacity data contains a negative value: %s = %d", entry.getKey(),
entry.getValue()));
}
capacityData.put(entry.getKey(), Integer.toString(entry.getValue()));
data.put(entry.getKey(), Integer.toString(entry.getValue()));
});

_record.setMapField(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name(), capacityData);
_record.setMapField(capacityPropertyType.name(), data);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ public Map<String, Integer> getInstanceCapacityMap() {
/**
* Set the instance capacity information with an Integer mapping.
* @param capacityDataMap - map of instance capacity data
* @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
* @throws IllegalArgumentException - when any of the data value is a negative number or when the map is incomplete
*
* This information is required by the global rebalancer.
* @see <a href="Rebalance Algorithm">
Expand All @@ -536,8 +536,8 @@ public Map<String, Integer> getInstanceCapacityMap() {
*/
public void setInstanceCapacityMap(Map<String, Integer> capacityDataMap)
throws IllegalArgumentException {
if (capacityDataMap == null || capacityDataMap.size() == 0) {
throw new IllegalArgumentException("Capacity Data is empty");
if (capacityDataMap == null) {
throw new IllegalArgumentException("Capacity Data is null");
}

Map<String, String> capacityData = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,13 @@ public Map<String, Map<String, Integer>> getPartitionCapacityMap() throws IOExce
* Set the partition capacity information with a map <PartitionName or DEFAULT_PARTITION_KEY, <Capacity Key, Capacity Number>>
*
* @param partitionCapacityMap - map of partition capacity data
* @throws IllegalArgumentException - when any of the data value is a negative number or map is empty
* @throws IllegalArgumentException - when any of the data value is a negative number or map is incomplete
* @throws IOException - when JSON parsing fails
*/
public void setPartitionCapacityMap(Map<String, Map<String, Integer>> partitionCapacityMap)
throws IllegalArgumentException, IOException {
if (partitionCapacityMap == null || partitionCapacityMap.isEmpty()) {
throw new IllegalArgumentException("Capacity Map is empty");
if (partitionCapacityMap == null) {
throw new IllegalArgumentException("Capacity Map is null");
}
if (!partitionCapacityMap.containsKey(DEFAULT_PARTITION_KEY)) {
throw new IllegalArgumentException(String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
Expand Down Expand Up @@ -103,6 +105,8 @@ protected ResourceControllerDataProvider setupClusterDataCache() throws IOExcept
testClusterConfig.setMaxPartitionsPerInstance(5);
testClusterConfig.setDisabledInstances(Collections.emptyMap());
testClusterConfig.setInstanceCapacityKeys(new ArrayList<>(_capacityDataMap.keySet()));
testClusterConfig.setDefaultPartitionWeightMap(
_capacityDataMap.keySet().stream().collect(Collectors.toMap(key -> key, key -> 0)));
testClusterConfig.setTopologyAwareEnabled(true);
when(testCache.getClusterConfig()).thenReturn(testClusterConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public void testDefaultInstanceCapacity() {
Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
}

@Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "The required capacity keys \\[item2, item1, item3, AdditionalCapacityKey\\] are not fully configured in the instance testInstanceId capacity map \\{item2=40, item1=20, item3=30\\}.")
@Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "The required capacity keys: \\[item2, item1, item3, AdditionalCapacityKey\\] are not fully configured in the instance: testInstanceId, capacity map: \\{item2=40, item1=20, item3=30\\}.")
public void testIncompleteInstanceCapacity() {
ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
List<String> requiredCapacityKeys = new ArrayList<>(_capacityDataMap.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;

import org.apache.helix.HelixException;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
Expand Down Expand Up @@ -102,6 +103,29 @@ public void testConstructReplicaWithResourceConfig() throws IOException {
Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), maxPartition);
}

/**
* Tests that if default partition weight map is configured in ClusterConfig and NOT in
* ResourceConfig. AssignableReplica actually will get the default weight from ClusterConfig
* even though it's not set in ResourceConfig.
*/
@Test
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
public void testDefaultPartitionWeight() {
Map<String, Integer> defaultWeightDataMapResource = new HashMap<>();
defaultWeightDataMapResource.put("item1", 3);
defaultWeightDataMapResource.put("item2", 6);
ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
testClusterConfig
.setInstanceCapacityKeys(new ArrayList<>(defaultWeightDataMapResource.keySet()));
testClusterConfig.setDefaultPartitionWeightMap(defaultWeightDataMapResource);

ResourceConfig testResourceConfigResource = new ResourceConfig(resourceName);
AssignableReplica replica = new AssignableReplica(testClusterConfig, testResourceConfigResource,
partitionNamePrefix + 1, masterState, masterPriority);

Assert.assertEquals(replica.getCapacity().size(), defaultWeightDataMapResource.size());
Assert.assertEquals(replica.getCapacity(), defaultWeightDataMapResource);
}

@Test
public void testIncompletePartitionWeightConfig() throws IOException {
// Init assignable replica with a basic config object
Expand All @@ -120,6 +144,20 @@ public void testIncompletePartitionWeightConfig() throws IOException {
requiredCapacityKeys.add(newCapacityKey);
testClusterConfig.setInstanceCapacityKeys(requiredCapacityKeys);

try {
new AssignableReplica(testClusterConfig, testResourceConfigResource,
partitionNamePrefix + 1, masterState, masterPriority);
Assert.fail("Creating new replica should fail because of incomplete partition weight.");
} catch (HelixException ex) {
// expected
}

Map<String, Integer> defaultCapacityDataMap = new HashMap<>();
for (String key : requiredCapacityKeys) {
defaultCapacityDataMap.put(key, 0);
}
testClusterConfig.setDefaultPartitionWeightMap(defaultCapacityDataMap);

AssignableReplica replica = new AssignableReplica(testClusterConfig, testResourceConfigResource,
partitionNamePrefix + 1, masterState, masterPriority);
Assert.assertTrue(replica.getCapacity().keySet().containsAll(requiredCapacityKeys));
Expand Down
Loading