Skip to content
Browse files
Implement java API and utils for virtual topology group (#1935)
Add comment to VirtualTopologyGroupService.
  • Loading branch information
qqu0127 authored and junkaixue committed Feb 22, 2022
1 parent 199d997 commit 90a3832a0f9811dac0007031ca33fbe4e0bd15ec
Showing 5 changed files with 407 additions and 5 deletions.
@@ -23,6 +23,7 @@
public class VirtualTopologyGroupConstants {
public static final String GROUP_NAME = "virtualTopologyGroupName";
public static final String GROUP_NUMBER = "virtualTopologyGroupNumber";
public static final String AUTO_MAINTENANCE_MODE_DISABLED = "autoMaintenanceModeDisabled";
public static final String GROUP_NAME_SPLITTER = "_";
public static final String PATH_NAME_SPLITTER = "/";
public static final String VIRTUAL_FAULT_ZONE_TYPE = "virtualZone";
@@ -19,7 +19,6 @@
* under the License.

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -63,6 +63,8 @@ public enum InstanceConfigProperty {
public static final int WEIGHT_NOT_SET = -1;
public static final int MAX_CONCURRENT_TASK_NOT_SET = -1;
private static final int TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
private static final String DOMAIN_FIELD_SPLITTER = ",";
private static final String DOMAIN_VALUE_JOINER = "=";

private static final Logger _logger = LoggerFactory.getLogger(InstanceConfig.class.getName());

@@ -156,10 +158,9 @@ public Map<String, String> getDomainAsMap() {
if (domain == null || domain.isEmpty()) {
return domainAsMap;

String[] pathPairs = domain.trim().split(",");
String[] pathPairs = domain.trim().split(DOMAIN_FIELD_SPLITTER);
for (String pair : pathPairs) {
String[] values = pair.split("=");
String[] values = pair.split(DOMAIN_VALUE_JOINER);
if (values.length != 2 || values[0].isEmpty() || values[1].isEmpty()) {
throw new IllegalArgumentException(
String.format("Domain-Value pair %s is not valid.", pair));
@@ -173,12 +174,24 @@ public Map<String, String> getDomainAsMap() {
* Domain represents a hierarchy identifier for an instance.
* Example: "cluster=myCluster,zone=myZone1,rack=myRack,host=hostname,instance=instance001".
* @return
public void setDomain(String domain) {
_record.setSimpleField(, domain);

* Set domain from its map representation.
* @param domainMap domain as a map
public void setDomain(Map<String, String> domainMap) {
String domain = domainMap
.map(entry -> entry.getKey() + DOMAIN_VALUE_JOINER + entry.getValue())

public int getWeight() {
String w = _record.getSimpleField(;
if (w != null) {
@@ -0,0 +1,197 @@

* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.model.CloudConfig;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

* Service for virtual topology group.
* It's a virtualization layer on top of physical fault domain and topology in cloud environments.
* The service computes the mapping from virtual group to instances based on the current cluster topology and update the
* information to cluster and all instances in the cluster.
public class VirtualTopologyGroupService {
private static final Logger LOG = LoggerFactory.getLogger(VirtualTopologyGroupService.class);

private final HelixAdmin _helixAdmin;
private final ClusterService _clusterService;
private final ConfigAccessor _configAccessor;
private final HelixDataAccessor _dataAccessor;
private final VirtualGroupAssignmentAlgorithm _assignmentAlgorithm;

public VirtualTopologyGroupService(HelixAdmin helixAdmin, ClusterService clusterService,
ConfigAccessor configAccessor, HelixDataAccessor dataAccessor) {
_helixAdmin = helixAdmin;
_clusterService = clusterService;
_configAccessor = configAccessor;
_dataAccessor = dataAccessor;
_assignmentAlgorithm = FifoVirtualGroupAssignmentAlgorithm.getInstance();

* Add virtual topology group for a cluster.
* This includes calculating the virtual group assignment for all instances in the cluster then update instance config
* and cluster config. We override {@link ClusterConfig.ClusterConfigProperty#TOPOLOGY} and
* {@link ClusterConfig.ClusterConfigProperty#FAULT_ZONE_TYPE} for cluster config, and add new field to
* {@link InstanceConfig.InstanceConfigProperty#DOMAIN} that contains virtual topology group information.
* This is only supported for cloud environments. Cluster is expected to be in maintenance mode during config change.
* @param clusterName the cluster name.
* @param customFields custom fields, {@link VirtualTopologyGroupConstants#GROUP_NAME}
* and {@link VirtualTopologyGroupConstants#GROUP_NUMBER} are required,
* {@link VirtualTopologyGroupConstants#AUTO_MAINTENANCE_MODE_DISABLED} is optional.
* -- if set ture, the cluster will NOT automatically enter/exit maintenance mode during this API call;
* -- if set false or not set, the cluster will automatically enter maintenance mode and exit after
* the call succeeds. It won't proceed if the cluster is already in maintenance mode.
* Either case, the cluster must be in maintenance mode before config change.
public void addVirtualTopologyGroup(String clusterName, Map<String, String> customFields) {
// validation
CloudConfig cloudConfig = _configAccessor.getCloudConfig(clusterName);
if (cloudConfig == null || !cloudConfig.isCloudEnabled()) {
throw new HelixException(
"Cloud is not enabled, addVirtualTopologyGroup is not allowed to run in non-cloud environment.");
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
"Topology-aware rebalance is not enabled in cluster " + clusterName);
String groupName = customFields.get(VirtualTopologyGroupConstants.GROUP_NAME);
String groupNumberStr = customFields.get(VirtualTopologyGroupConstants.GROUP_NUMBER);
Preconditions.checkArgument(!StringUtils.isEmpty(groupName), "virtualTopologyGroupName cannot be empty!");
Preconditions.checkArgument(!StringUtils.isEmpty(groupNumberStr), "virtualTopologyGroupNumber cannot be empty!");
int numGroups = 0;
try {
numGroups = Integer.parseInt(groupNumberStr);
Preconditions.checkArgument(numGroups > 0, "Number of virtual groups should be positive.");
} catch (NumberFormatException ex) {
throw new IllegalArgumentException("virtualTopologyGroupNumber " + groupNumberStr + " is not an integer.", ex);
}"Computing virtual topology group for cluster {} with param {}", clusterName, customFields);

// compute group assignment
ClusterTopology clusterTopology = _clusterService.getClusterTopology(clusterName);
Preconditions.checkArgument(numGroups <= clusterTopology.getAllInstances().size(),
"Number of virtual groups cannot be greater than the number of instances.");
Map<String, Set<String>> assignment =
_assignmentAlgorithm.computeAssignment(numGroups, groupName, clusterTopology.toZoneMapping());

boolean autoMaintenanceModeDisabled = Boolean.parseBoolean(
customFields.getOrDefault(VirtualTopologyGroupConstants.AUTO_MAINTENANCE_MODE_DISABLED, "false"));
// if auto mode is NOT disabled, let service enter maintenance mode and exit after the API succeeds.
if (!autoMaintenanceModeDisabled) {
"This operation is not allowed if cluster is already in maintenance mode before the API call. "
+ "Please set autoMaintenanceModeDisabled=true if this is intended.");
_helixAdmin.manuallyEnableMaintenanceMode(clusterName, true,
"Enable maintenanceMode for virtual topology group change.", customFields);
"Cluster is not in maintenance mode. This is required for virtual topology group setting. "
+ "Please set autoMaintenanceModeDisabled=false (default) to let the cluster enter maintenance mode automatically, "
+ "or use autoMaintenanceModeDisabled=true and control cluster maintenance mode in client side.");

updateConfigs(clusterName, clusterConfig, assignment);
if (!autoMaintenanceModeDisabled) {
_helixAdmin.manuallyEnableMaintenanceMode(clusterName, false,
"Disable maintenanceMode after virtual topology group change.", customFields);

private void updateConfigs(String clusterName, ClusterConfig clusterConfig, Map<String, Set<String>> assignment) {
List<String> zkPaths = new ArrayList<>();
List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
createInstanceConfigUpdater(clusterName, assignment).forEach((zkPath, updater) -> {
// update instance config
boolean[] results = _dataAccessor.updateChildren(zkPaths, updaters, AccessOption.EPHEMERAL);
for (int i = 0; i < results.length; i++) {
if (!results[i]) {
throw new HelixException("Failed to update instance config for path " + zkPaths.get(i));
// update cluster config
String virtualTopologyString = computeVirtualTopologyString(clusterConfig);
_configAccessor.updateClusterConfig(clusterName, clusterConfig);"Successfully update instance and cluster config for {}", clusterName);

static String computeVirtualTopologyString(ClusterConfig clusterConfig) {
ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
String endNodeType = clusterTopologyConfig.getEndNodeType();
String[] splits = new String[] {"", VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE, endNodeType};
return String.join(VirtualTopologyGroupConstants.PATH_NAME_SPLITTER, splits);

* Create updater for instance config for async update.
* @param clusterName cluster name of the instances.
* @param assignment virtual group assignment.
* @return a map from instance zkPath to its {@link DataUpdater} to update.
static Map<String, DataUpdater<ZNRecord>> createInstanceConfigUpdater(
String clusterName, Map<String, Set<String>> assignment) {
Map<String, DataUpdater<ZNRecord>> updaters = new HashMap<>();
for (Map.Entry<String, Set<String>> entry : assignment.entrySet()) {
String virtualGroup = entry.getKey();
for (String instanceName : entry.getValue()) {
String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
updaters.put(path, currentData -> {
InstanceConfig instanceConfig = new InstanceConfig(currentData);
Map<String, String> domainMap = instanceConfig.getDomainAsMap();
domainMap.put(VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE, virtualGroup);
return instanceConfig.getRecord();
return updaters;

0 comments on commit 90a3832

Please sign in to comment.