Skip to content

Commit

Permalink
Add Java API for adding and validating resources for WAGED rebalancer (
Browse files Browse the repository at this point in the history
…#570)

Add Java API methods for adding and validating resources for WAGED rebalancer. This is a set of convenience APIs provided through HelixAdmin the user could use to more easily add resources and validate them for WAGED rebalance usage.
Changelist:
1. Add API methods in HelixAdmin
2. Implement the said methods
3. Add tests
  • Loading branch information
narendly authored and Jiajun Wang committed Jan 6, 2020
1 parent dc23170 commit c5122b3
Show file tree
Hide file tree
Showing 8 changed files with 535 additions and 77 deletions.
47 changes: 47 additions & 0 deletions helix-core/src/main/java/org/apache/helix/HelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;

/*
Expand Down Expand Up @@ -579,4 +580,50 @@ void rebalance(String clusterName, String resourceName, int replica, String keyP
default void close() {
System.out.println("Default close() was invoked! No operation was executed.");
}

/**
* Adds a resource with IdealState and ResourceConfig to be rebalanced by WAGED rebalancer with validation.
* Validation includes the following:
* 1. Check ResourceConfig has the WEIGHT field
* 2. Check that all capacity keys from ClusterConfig are set up in the WEIGHT field
* 3. Check that all ResourceConfig's weightMap fields have all of the capacity keys
* @param clusterName
* @param idealState
* @param resourceConfig
* @return true if the resource has been added successfully. False otherwise
*/
boolean addResourceWithWeight(String clusterName, IdealState idealState,
ResourceConfig resourceConfig);

/**
* Batch-enables Waged rebalance for the names of resources given.
* @param clusterName
* @param resourceNames
* @return
*/
boolean enableWagedRebalance(String clusterName, List<String> resourceNames);

/**
* Validates the resources to see if their weight configs have been set properly.
* Validation includes the following:
* 1. Check ResourceConfig has the WEIGHT field
* 2. Check that all capacity keys from ClusterConfig are set up in the WEIGHT field
* 3. Check that all ResourceConfig's weightMap fields have all of the capacity keys
* @param resourceNames
* @return for each resource, true if the weight configs have been set properly, false otherwise
*/
Map<String, Boolean> validateResourcesForWagedRebalance(String clusterName,
List<String> resourceNames);

/**
* Validates the instances to ensure their weights in InstanceConfigs have been set up properly.
* Validation includes the following:
* 1. If default instance capacity is not set, check that the InstanceConfigs have the CAPACITY field
* 2. Check that all capacity keys defined in ClusterConfig are present in the CAPACITY field
* @param clusterName
* @param instancesNames
* @return
*/
Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
List<String> instancesNames);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.apache.helix.controller.rebalancer.util;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.helix.HelixException;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.ResourceConfig;


/**
* A util class that contains validation-related static methods for WAGED rebalancer.
*/
public class WagedValidationUtil {
/**
* Validates and returns instance capacities. The validation logic ensures that all required capacity keys (in ClusterConfig) are present in InstanceConfig.
* @param clusterConfig
* @param instanceConfig
* @return
*/
public static Map<String, Integer> validateAndGetInstanceCapacity(ClusterConfig clusterConfig,
InstanceConfig instanceConfig) {
// Fetch the capacity of instance from 2 possible sources according to the following priority.
// 1. The instance capacity that is configured in the instance config.
// 2. If the default instance capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
Map<String, Integer> instanceCapacity =
new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
instanceCapacity.putAll(instanceConfig.getInstanceCapacityMap());

List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
// All the required keys must exist in the instance config.
if (!instanceCapacity.keySet().containsAll(requiredCapacityKeys)) {
throw new HelixException(String.format(
"The required capacity keys: %s are not fully configured in the instance: %s, capacity map: %s.",
requiredCapacityKeys.toString(), instanceConfig.getInstanceName(),
instanceCapacity.toString()));
}
return instanceCapacity;
}

/**
* Validates and returns partition capacities. The validation logic ensures that all required capacity keys (from ClusterConfig) are present in the ResourceConfig for the partition.
* @param partitionName
* @param resourceConfig
* @param clusterConfig
* @return
*/
public static Map<String, Integer> validateAndGetPartitionCapacity(String partitionName,
ResourceConfig resourceConfig, Map<String, Map<String, Integer>> capacityMap,
ClusterConfig clusterConfig) {
// Fetch the capacity of partition from 3 possible sources according to the following priority.
// 1. The partition capacity that is explicitly configured in the resource config.
// 2. Or, the default partition capacity that is configured under partition name DEFAULT_PARTITION_KEY in the resource config.
// 3. If the default partition capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
Map<String, Integer> partitionCapacity =
new HashMap<>(clusterConfig.getDefaultPartitionWeightMap());
partitionCapacity.putAll(capacityMap.getOrDefault(partitionName,
capacityMap.getOrDefault(ResourceConfig.DEFAULT_PARTITION_KEY, new HashMap<>())));

List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
// If any required capacity key is not configured in the resource config, fail the model creating.
if (!partitionCapacity.keySet().containsAll(requiredCapacityKeys)) {
throw new HelixException(String.format(
"The required capacity keys: %s are not fully configured in the resource: %s, partition: %s, weight map: %s.",
requiredCapacityKeys.toString(), resourceConfig.getResourceName(), partitionName,
partitionCapacity.toString()));
}
return partitionCapacity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.helix.HelixException;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* This class represents a possible allocation of the replication.
* Note that any usage updates to the AssignableNode are not thread safe.
Expand Down Expand Up @@ -113,15 +115,16 @@ void assignInitBatch(Collection<AssignableReplica> replicas) {
void assign(AssignableReplica assignableReplica) {
addToAssignmentRecord(assignableReplica);
assignableReplica.getCapacity().entrySet().stream()
.forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
.forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
}

/**
* Release a replica from the node.
* If the replication is not on this node, the assignable node is not updated.
* @param replica - the replica to be released
*/
void release(AssignableReplica replica) throws IllegalArgumentException {
void release(AssignableReplica replica)
throws IllegalArgumentException {
String resourceName = replica.getResourceName();
String partitionName = replica.getPartitionName();

Expand Down Expand Up @@ -320,12 +323,12 @@ private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig inst
private void addToAssignmentRecord(AssignableReplica replica) {
String resourceName = replica.getResourceName();
String partitionName = replica.getPartitionName();
if (_currentAssignedReplicaMap.containsKey(resourceName)
&& _currentAssignedReplicaMap.get(resourceName).containsKey(partitionName)) {
throw new HelixException(String.format(
"Resource %s already has a replica with state %s from partition %s on node %s",
replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(),
getInstanceName()));
if (_currentAssignedReplicaMap.containsKey(resourceName) && _currentAssignedReplicaMap
.get(resourceName).containsKey(partitionName)) {
throw new HelixException(String
.format("Resource %s already has a replica with state %s from partition %s on node %s",
replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(),
getInstanceName()));
} else {
_currentAssignedReplicaMap.computeIfAbsent(resourceName, key -> new HashMap<>())
.put(partitionName, replica);
Expand All @@ -348,23 +351,10 @@ private void updateCapacityAndUtilization(String capacityKey, int usage) {
*/
private Map<String, Integer> fetchInstanceCapacity(ClusterConfig clusterConfig,
InstanceConfig instanceConfig) {
// Fetch the capacity of instance from 2 possible sources according to the following priority.
// 1. The instance capacity that is configured in the instance config.
// 2. If the default instance capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
Map<String, Integer> instanceCapacity =
new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
instanceCapacity.putAll(instanceConfig.getInstanceCapacityMap());

List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
// All the required keys must exist in the instance config.
if (!instanceCapacity.keySet().containsAll(requiredCapacityKeys)) {
throw new HelixException(String.format(
"The required capacity keys: %s are not fully configured in the instance: %s, capacity map: %s.",
requiredCapacityKeys.toString(), _instanceName, instanceCapacity.toString()));
}
WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, instanceConfig);
// Remove all the non-required capacity items from the map.
instanceCapacity.keySet().retainAll(requiredCapacityKeys);

instanceCapacity.keySet().retainAll(clusterConfig.getInstanceCapacityKeys());
return instanceCapacity;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;

import org.apache.helix.HelixException;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
Expand Down Expand Up @@ -149,27 +150,10 @@ private Map<String, Integer> fetchCapacityUsage(String partitionName,
"Invalid partition capacity configuration of resource: " + resourceConfig
.getResourceName(), ex);
}

// Fetch the capacity of partition from 3 possible sources according to the following priority.
// 1. The partition capacity that is explicitly configured in the resource config.
// 2. Or, the default partition capacity that is configured under partition name DEFAULT_PARTITION_KEY in the resource config.
// 3. If the default partition capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
Map<String, Integer> partitionCapacity =
new HashMap<>(clusterConfig.getDefaultPartitionWeightMap());
partitionCapacity.putAll(capacityMap.getOrDefault(partitionName,
capacityMap.getOrDefault(ResourceConfig.DEFAULT_PARTITION_KEY, new HashMap<>())));

List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
// If any required capacity key is not configured in the resource config, fail the model creating.
if (!partitionCapacity.keySet().containsAll(requiredCapacityKeys)) {
throw new HelixException(String.format(
"The required capacity keys: %s are not fully configured in the resource: %s, partition: %s, weight map: %s.",
requiredCapacityKeys.toString(), resourceConfig.getResourceName(), partitionName,
partitionCapacity.toString()));
}
Map<String, Integer> partitionCapacity = WagedValidationUtil
.validateAndGetPartitionCapacity(partitionName, resourceConfig, capacityMap, clusterConfig);
// Remove the non-required capacity items.
partitionCapacity.keySet().retainAll(requiredCapacityKeys);

partitionCapacity.keySet().retainAll(clusterConfig.getInstanceCapacityKeys());
return partitionCapacity;
}
}
Loading

0 comments on commit c5122b3

Please sign in to comment.