Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More strict partition weight validation while creating the cluster model. #511

Merged
merged 5 commits into from
Oct 18, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;

import org.apache.helix.HelixException;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
Expand Down Expand Up @@ -151,22 +152,24 @@ private Map<String, Integer> fetchCapacityUsage(String partitionName,

Map<String, Integer> partitionCapacity = capacityMap.get(partitionName);
if (partitionCapacity == null) {
partitionCapacity = capacityMap.get(ResourceConfig.DEFAULT_PARTITION_KEY);
partitionCapacity =
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
capacityMap.getOrDefault(ResourceConfig.DEFAULT_PARTITION_KEY, new HashMap<>());
}
if (partitionCapacity == null) {
LOG.warn("The capacity usage of the specified partition {} is not configured in the Resource"
+ " Config {}. No default partition capacity is configured either. Will proceed with"
+ " empty capacity configuration.", partitionName, resourceConfig.getResourceName());
partitionCapacity = new HashMap<>();

for (Map.Entry<String, Integer> capacityEntry : clusterConfig.getDefaultPartitionWeightMap()
.entrySet()) {
partitionCapacity.putIfAbsent(capacityEntry.getKey(), capacityEntry.getValue());
}

List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
// Remove the non-required capacity items.
partitionCapacity.keySet().retainAll(requiredCapacityKeys);
// If any required capacity key is not configured in the resource config, fill the partition
// capacity map with 0 usage.
for (String capacityKey : requiredCapacityKeys) {
partitionCapacity.putIfAbsent(capacityKey, 0);
// If any required capacity key is not configured in the resource config, fail the model creating.
if (!partitionCapacity.keySet().containsAll(requiredCapacityKeys)) {
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
throw new HelixException(String.format(
"The required capacity keys %s are not fully configured int the resource %s partition %s weight map %s.",
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
requiredCapacityKeys.toString(), resourceConfig.getResourceName(), partitionName,
partitionCapacity.toString()));
}

return partitionCapacity;
Expand Down
88 changes: 59 additions & 29 deletions helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
* under the License.
*/

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.helix.HelixException;
Expand All @@ -28,14 +35,6 @@
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.api.config.StateTransitionTimeoutConfig;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Cluster configurations
*/
Expand Down Expand Up @@ -90,6 +89,8 @@ public enum ClusterConfigProperty {
INSTANCE_CAPACITY_KEYS,
// The default instance capacity if no capacity is configured in the Instance Config node.
DEFAULT_INSTANCE_CAPACITY_MAP,
// The default partition weights if no weight is configured in the Resource Config node.
DEFAULT_PARTITION_WEIGHT_MAP,
// The preference of the rebalance result.
// EVENNESS - Evenness of the resource utilization, partition, and top state distribution.
// LESS_MOVEMENT - the tendency of keeping the current assignment instead of moving the partition for optimal assignment.
Expand Down Expand Up @@ -705,49 +706,78 @@ public List<String> getInstanceCapacityKeys() {

/**
* Get the default instance capacity information from the map fields.
*
* @return data map if it exists, or empty map
*/
public Map<String, Integer> getDefaultInstanceCapacityMap() {
Map<String, String> capacityData =
_record.getMapField(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name());

if (capacityData != null) {
return capacityData.entrySet().stream().collect(
Collectors.toMap(entry -> entry.getKey(), entry -> Integer.parseInt(entry.getValue())));
}
return Collections.emptyMap();
return getDefaultCapacityMap(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP);
}

/**
* Set the default instance capacity information with an Integer mapping.
* This information is required by the global rebalancer.
* @see <a href="Rebalance Algorithm">
* https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer#rebalance-algorithm-adapter
* </a>
* If the instance capacity is not configured in either Instance Config nor Cluster Config, the
* cluster topology is considered invalid. So the rebalancer may stop working.
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
* @param capacityDataMap - map of instance capacity data
* @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
*/
public void setDefaultInstanceCapacityMap(Map<String, Integer> capacityDataMap)
throws IllegalArgumentException {
setDefaultCapacityMap(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP, capacityDataMap);
}

/**
* Get the default partition weight information from the map fields.
*
* @return data map if it exists, or empty map
*/
public Map<String, Integer> getDefaultPartitionWeightMap() {
return getDefaultCapacityMap(ClusterConfigProperty.DEFAULT_PARTITION_WEIGHT_MAP);
}

/**
* Set the default partition weight information with an Integer mapping.
* This information is required by the global rebalancer.
* @see <a href="Rebalance Algorithm">
* https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer#rebalance-algorithm-adapter
* </a>
* If the instance capacity is not configured in neither Instance Config nor Cluster Config, the
* https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer#rebalance-algorithm-adapter
* </a>
* If the partition weight is not configured in either Resource Config nor Cluster Config, the
* cluster topology is considered invalid. So the rebalancer may stop working.
* @param weightDataMap - map of partition weight data
* @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
*/
public void setDefaultInstanceCapacityMap(Map<String, Integer> capacityDataMap)
public void setDefaultPartitionWeightMap(Map<String, Integer> weightDataMap)
throws IllegalArgumentException {
if (capacityDataMap == null || capacityDataMap.size() == 0) {
throw new IllegalArgumentException("Default Instance Capacity Data is empty");
}
setDefaultCapacityMap(ClusterConfigProperty.DEFAULT_PARTITION_WEIGHT_MAP, weightDataMap);
}

Map<String, String> capacityData = new HashMap<>();
private Map<String, Integer> getDefaultCapacityMap(ClusterConfigProperty capacityPropertyType) {
Map<String, String> capacityData = _record.getMapField(capacityPropertyType.name());
if (capacityData != null) {
return capacityData.entrySet().stream().collect(
Collectors.toMap(entry -> entry.getKey(), entry -> Integer.parseInt(entry.getValue())));
}
return Collections.emptyMap();
}

public void setDefaultCapacityMap(ClusterConfigProperty capacityPropertyType,
Map<String, Integer> capacityDataMap) throws IllegalArgumentException {
if (capacityDataMap == null || capacityDataMap.size() == 0) {
throw new IllegalArgumentException("Default capacity data is empty");
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
}
Map<String, String> data = new HashMap<>();
capacityDataMap.entrySet().stream().forEach(entry -> {
if (entry.getValue() < 0) {
throw new IllegalArgumentException(String
.format("Default Instance Capacity Data contains a negative value: %s = %d",
entry.getKey(), entry.getValue()));
.format("Default capacity data contains a negative value: %s = %d", entry.getKey(),
entry.getValue()));
}
capacityData.put(entry.getKey(), Integer.toString(entry.getValue()));
data.put(entry.getKey(), Integer.toString(entry.getValue()));
});

_record.setMapField(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name(), capacityData);
_record.setMapField(capacityPropertyType.name(), data);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
Expand Down Expand Up @@ -103,6 +105,8 @@ protected ResourceControllerDataProvider setupClusterDataCache() throws IOExcept
testClusterConfig.setMaxPartitionsPerInstance(5);
testClusterConfig.setDisabledInstances(Collections.emptyMap());
testClusterConfig.setInstanceCapacityKeys(new ArrayList<>(_capacityDataMap.keySet()));
testClusterConfig.setDefaultPartitionWeightMap(
_capacityDataMap.keySet().stream().collect(Collectors.toMap(key -> key, key -> 0)));
testClusterConfig.setTopologyAwareEnabled(true);
when(testCache.getClusterConfig()).thenReturn(testClusterConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;

import org.apache.helix.HelixException;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
Expand Down Expand Up @@ -102,6 +103,24 @@ public void testConstructReplicaWithResourceConfig() throws IOException {
Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), maxPartition);
}

@Test
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
public void testDefaultPartitionWeight() {
Map<String, Integer> defaultWeightDataMapResource = new HashMap<>();
defaultWeightDataMapResource.put("item1", 3);
defaultWeightDataMapResource.put("item2", 6);
ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
testClusterConfig
.setInstanceCapacityKeys(new ArrayList<>(defaultWeightDataMapResource.keySet()));
testClusterConfig.setDefaultPartitionWeightMap(defaultWeightDataMapResource);

ResourceConfig testResourceConfigResource = new ResourceConfig(resourceName);
AssignableReplica replica = new AssignableReplica(testClusterConfig, testResourceConfigResource,
partitionNamePrefix + 1, masterState, masterPriority);

Assert.assertEquals(replica.getCapacity().size(), defaultWeightDataMapResource.size());
Assert.assertEquals(replica.getCapacity(), defaultWeightDataMapResource);
}

@Test
public void testIncompletePartitionWeightConfig() throws IOException {
// Init assignable replica with a basic config object
Expand All @@ -120,6 +139,20 @@ public void testIncompletePartitionWeightConfig() throws IOException {
requiredCapacityKeys.add(newCapacityKey);
testClusterConfig.setInstanceCapacityKeys(requiredCapacityKeys);

try {
new AssignableReplica(testClusterConfig, testResourceConfigResource,
partitionNamePrefix + 1, masterState, masterPriority);
Assert.fail("Creating new replica should fail because of incomplete partition weight.");
} catch (HelixException ex) {
// expected
}

Map<String, Integer> defaultCapacityDataMap = new HashMap<>();
for (String key : requiredCapacityKeys) {
defaultCapacityDataMap.put(key, 0);
}
testClusterConfig.setDefaultPartitionWeightMap(defaultCapacityDataMap);

AssignableReplica replica = new AssignableReplica(testClusterConfig, testResourceConfigResource,
partitionNamePrefix + 1, masterState, masterPriority);
Assert.assertTrue(replica.getCapacity().keySet().containsAll(requiredCapacityKeys));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,71 @@ public void testSetInstanceCapacityMap() {
DEFAULT_INSTANCE_CAPACITY_MAP.name()), capacityDataMapString);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default Instance Capacity Data is empty")
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default capacity data is empty")
public void testSetInstanceCapacityMapEmpty() {
Map<String, Integer> capacityDataMap = new HashMap<>();

ClusterConfig testConfig = new ClusterConfig("testConfig");
testConfig.setDefaultInstanceCapacityMap(capacityDataMap);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default Instance Capacity Data contains a negative value: item3 = -3")
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default capacity data contains a negative value: item3 = -3")
public void testSetInstanceCapacityMapInvalid() {
Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", -3);

ClusterConfig testConfig = new ClusterConfig("testConfig");
testConfig.setDefaultInstanceCapacityMap(capacityDataMap);
}

@Test
public void testGetPartitionWeightMap() {
Map<String, Integer> weightDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", 3);

Map<String, String> weightDataMapString =
ImmutableMap.of("item1", "1", "item2", "2", "item3", "3");

ZNRecord rec = new ZNRecord("testId");
rec.setMapField(ClusterConfig.ClusterConfigProperty.DEFAULT_PARTITION_WEIGHT_MAP.name(),
weightDataMapString);
ClusterConfig testConfig = new ClusterConfig(rec);

Assert.assertTrue(testConfig.getDefaultPartitionWeightMap().equals(weightDataMap));
}

@Test
public void testGetPartitionWeightMapEmpty() {
ClusterConfig testConfig = new ClusterConfig("testId");

Assert.assertTrue(testConfig.getDefaultPartitionWeightMap().equals(Collections.emptyMap()));
}

@Test
public void testSetPartitionWeightMap() {
Map<String, Integer> weightDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", 3);

Map<String, String> weightDataMapString =
ImmutableMap.of("item1", "1", "item2", "2", "item3", "3");

ClusterConfig testConfig = new ClusterConfig("testConfig");
testConfig.setDefaultPartitionWeightMap(weightDataMap);

Assert.assertEquals(testConfig.getRecord().getMapField(ClusterConfig.ClusterConfigProperty.
DEFAULT_PARTITION_WEIGHT_MAP.name()), weightDataMapString);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default capacity data is empty")
public void testSetPartitionWeightMapEmpty() {
Map<String, Integer> weightDataMap = new HashMap<>();

ClusterConfig testConfig = new ClusterConfig("testConfig");
testConfig.setDefaultPartitionWeightMap(weightDataMap);
}
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default capacity data contains a negative value: item3 = -3")
public void testSetPartitionWeightMapInvalid() {
Map<String, Integer> weightDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", -3);

ClusterConfig testConfig = new ClusterConfig("testConfig");
testConfig.setDefaultPartitionWeightMap(weightDataMap);
}
}