Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Java API for adding and validating resources for WAGED rebalancer #570

Merged
merged 11 commits into from
Nov 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions helix-core/src/main/java/org/apache/helix/HelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ConstraintItem;
Expand All @@ -30,8 +31,10 @@
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;


/*
* Helix cluster management
*/
Expand Down Expand Up @@ -576,4 +579,50 @@ void rebalance(String clusterName, String resourceName, int replica, String keyP
* Release resources
*/
void close();

/**
* Adds a resource with IdealState and ResourceConfig to be rebalanced by WAGED rebalancer with validation.
narendly marked this conversation as resolved.
Show resolved Hide resolved
* Validation includes the following:
* 1. Check ResourceConfig has the WEIGHT field
* 2. Check that all capacity keys from ClusterConfig are set up in the WEIGHT field
* 3. Check that all ResourceConfig's weightMap fields have all of the capacity keys
* @param clusterName
* @param idealState
* @param resourceConfig
* @return true if the resource has been added successfully. False otherwise
*/
boolean addResourceWithWeight(String clusterName, IdealState idealState,
ResourceConfig resourceConfig);

/**
* Batch-enables Waged rebalance for the names of resources given.
* @param clusterName
* @param resourceNames
* @return
*/
boolean enableWagedRebalance(String clusterName, List<String> resourceNames);

/**
* Validates the resources to see if their weight configs have been set properly.
* Validation includes the following:
* 1. Check ResourceConfig has the WEIGHT field
* 2. Check that all capacity keys from ClusterConfig are set up in the WEIGHT field
* 3. Check that all ResourceConfig's weightMap fields have all of the capacity keys
* @param resourceNames
* @return for each resource, true if the weight configs have been set properly, false otherwise
*/
Map<String, Boolean> validateResourcesForWagedRebalance(String clusterName,
List<String> resourceNames);

/**
* Validates the instances to ensure their weights in InstanceConfigs have been set up properly.
* Validation includes the following:
* 1. If default instance capacity is not set, check that the InstanceConfigs have the CAPACITY field
* 2. Check that all capacity keys defined in ClusterConfig are present in the CAPACITY field
* @param clusterName
* @param instancesNames
* @return
*/
Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
jiajunwang marked this conversation as resolved.
Show resolved Hide resolved
List<String> instancesNames);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.apache.helix.controller.rebalancer.util;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.helix.HelixException;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.ResourceConfig;


/**
* A util class that contains validation-related static methods for WAGED rebalancer.
*/
public class WagedValidationUtil {
/**
* Validates and returns instance capacities. The validation logic ensures that all required capacity keys (in ClusterConfig) are present in InstanceConfig.
* @param clusterConfig
* @param instanceConfig
* @return
*/
public static Map<String, Integer> validateAndGetInstanceCapacity(ClusterConfig clusterConfig,
InstanceConfig instanceConfig) {
// Fetch the capacity of instance from 2 possible sources according to the following priority.
// 1. The instance capacity that is configured in the instance config.
// 2. If the default instance capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
Map<String, Integer> instanceCapacity =
new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
instanceCapacity.putAll(instanceConfig.getInstanceCapacityMap());

List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
// All the required keys must exist in the instance config.
if (!instanceCapacity.keySet().containsAll(requiredCapacityKeys)) {
throw new HelixException(String.format(
"The required capacity keys: %s are not fully configured in the instance: %s, capacity map: %s.",
requiredCapacityKeys.toString(), instanceConfig.getInstanceName(),
instanceCapacity.toString()));
}
return instanceCapacity;
}

/**
* Validates and returns partition capacities. The validation logic ensures that all required capacity keys (from ClusterConfig) are present in the ResourceConfig for the partition.
* @param partitionName
* @param resourceConfig
* @param clusterConfig
* @return
*/
public static Map<String, Integer> validateAndGetPartitionCapacity(String partitionName,
ResourceConfig resourceConfig, Map<String, Map<String, Integer>> capacityMap,
ClusterConfig clusterConfig) {
// Fetch the capacity of partition from 3 possible sources according to the following priority.
// 1. The partition capacity that is explicitly configured in the resource config.
// 2. Or, the default partition capacity that is configured under partition name DEFAULT_PARTITION_KEY in the resource config.
// 3. If the default partition capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
Map<String, Integer> partitionCapacity =
new HashMap<>(clusterConfig.getDefaultPartitionWeightMap());
partitionCapacity.putAll(capacityMap.getOrDefault(partitionName,
capacityMap.getOrDefault(ResourceConfig.DEFAULT_PARTITION_KEY, new HashMap<>())));

List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
// If any required capacity key is not configured in the resource config, fail the model creating.
if (!partitionCapacity.keySet().containsAll(requiredCapacityKeys)) {
throw new HelixException(String.format(
"The required capacity keys: %s are not fully configured in the resource: %s, partition: %s, weight map: %s.",
requiredCapacityKeys.toString(), resourceConfig.getResourceName(), partitionName,
partitionCapacity.toString()));
}
return partitionCapacity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.helix.HelixException;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* This class represents a possible allocation of the replication.
* Note that any usage updates to the AssignableNode are not thread safe.
Expand Down Expand Up @@ -113,15 +115,16 @@ void assignInitBatch(Collection<AssignableReplica> replicas) {
void assign(AssignableReplica assignableReplica) {
addToAssignmentRecord(assignableReplica);
assignableReplica.getCapacity().entrySet().stream()
.forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
.forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
}

/**
* Release a replica from the node.
* If the replication is not on this node, the assignable node is not updated.
* @param replica - the replica to be released
*/
void release(AssignableReplica replica) throws IllegalArgumentException {
void release(AssignableReplica replica)
throws IllegalArgumentException {
String resourceName = replica.getResourceName();
String partitionName = replica.getPartitionName();

Expand Down Expand Up @@ -320,12 +323,12 @@ private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig inst
private void addToAssignmentRecord(AssignableReplica replica) {
String resourceName = replica.getResourceName();
String partitionName = replica.getPartitionName();
if (_currentAssignedReplicaMap.containsKey(resourceName)
&& _currentAssignedReplicaMap.get(resourceName).containsKey(partitionName)) {
throw new HelixException(String.format(
"Resource %s already has a replica with state %s from partition %s on node %s",
replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(),
getInstanceName()));
if (_currentAssignedReplicaMap.containsKey(resourceName) && _currentAssignedReplicaMap
.get(resourceName).containsKey(partitionName)) {
throw new HelixException(String
.format("Resource %s already has a replica with state %s from partition %s on node %s",
replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(),
getInstanceName()));
} else {
_currentAssignedReplicaMap.computeIfAbsent(resourceName, key -> new HashMap<>())
.put(partitionName, replica);
Expand All @@ -348,23 +351,10 @@ private void updateCapacityAndUtilization(String capacityKey, int usage) {
*/
private Map<String, Integer> fetchInstanceCapacity(ClusterConfig clusterConfig,
InstanceConfig instanceConfig) {
// Fetch the capacity of instance from 2 possible sources according to the following priority.
// 1. The instance capacity that is configured in the instance config.
// 2. If the default instance capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
Map<String, Integer> instanceCapacity =
new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
instanceCapacity.putAll(instanceConfig.getInstanceCapacityMap());

List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
// All the required keys must exist in the instance config.
if (!instanceCapacity.keySet().containsAll(requiredCapacityKeys)) {
throw new HelixException(String.format(
"The required capacity keys: %s are not fully configured in the instance: %s, capacity map: %s.",
requiredCapacityKeys.toString(), _instanceName, instanceCapacity.toString()));
}
WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, instanceConfig);
// Remove all the non-required capacity items from the map.
instanceCapacity.keySet().retainAll(requiredCapacityKeys);

instanceCapacity.keySet().retainAll(clusterConfig.getInstanceCapacityKeys());
return instanceCapacity;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;

import org.apache.helix.HelixException;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
Expand Down Expand Up @@ -149,27 +150,10 @@ private Map<String, Integer> fetchCapacityUsage(String partitionName,
"Invalid partition capacity configuration of resource: " + resourceConfig
.getResourceName(), ex);
}

// Fetch the capacity of partition from 3 possible sources according to the following priority.
// 1. The partition capacity that is explicitly configured in the resource config.
// 2. Or, the default partition capacity that is configured under partition name DEFAULT_PARTITION_KEY in the resource config.
// 3. If the default partition capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
Map<String, Integer> partitionCapacity =
new HashMap<>(clusterConfig.getDefaultPartitionWeightMap());
partitionCapacity.putAll(capacityMap.getOrDefault(partitionName,
capacityMap.getOrDefault(ResourceConfig.DEFAULT_PARTITION_KEY, new HashMap<>())));

List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
// If any required capacity key is not configured in the resource config, fail the model creating.
if (!partitionCapacity.keySet().containsAll(requiredCapacityKeys)) {
throw new HelixException(String.format(
"The required capacity keys: %s are not fully configured in the resource: %s, partition: %s, weight map: %s.",
requiredCapacityKeys.toString(), resourceConfig.getResourceName(), partitionName,
partitionCapacity.toString()));
}
Map<String, Integer> partitionCapacity = WagedValidationUtil
.validateAndGetPartitionCapacity(partitionName, resourceConfig, capacityMap, clusterConfig);
// Remove the non-required capacity items.
partitionCapacity.keySet().retainAll(requiredCapacityKeys);

partitionCapacity.keySet().retainAll(clusterConfig.getInstanceCapacityKeys());
return partitionCapacity;
}
}
Loading