Skip to content
Permalink
Browse files
Introduce VirtualTopologyGroup and its assignment logic with benchmar…
…k. (#1948)



* Cleanup unused assignment schemes and minor change.

* Further refactor and code cleanup.
  • Loading branch information
qqu0127 authored and junkaixue committed Feb 22, 2022
1 parent 47ee384 commit 199d997a18f7ca3f911a0a8cc219dae5009d5aa2
Showing 6 changed files with 270 additions and 0 deletions.
@@ -0,0 +1,29 @@
package org.apache.helix.cloud.constants;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


public class VirtualTopologyGroupConstants {
public static final String GROUP_NAME = "virtualTopologyGroupName";
public static final String GROUP_NUMBER = "virtualTopologyGroupNumber";
public static final String GROUP_NAME_SPLITTER = "_";
public static final String PATH_NAME_SPLITTER = "/";
public static final String VIRTUAL_FAULT_ZONE_TYPE = "virtualZone";
}
@@ -0,0 +1,79 @@
package org.apache.helix.cloud.topology;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants;
import org.apache.helix.util.HelixUtil;


/**
* A strategy that densely assign virtual groups with input instance list, it doesn't move to the next one until
* the current one is filled.
* Given that instances.size = instancesPerGroup * numGroups + residuals,
* we break [residuals] into the first few groups, as a result each virtual group will have
* either [instancesPerGroup] or [instancesPerGroup + 1] instances.
*/
public class FifoVirtualGroupAssignmentAlgorithm implements VirtualGroupAssignmentAlgorithm {
private static final FifoVirtualGroupAssignmentAlgorithm _instance = new FifoVirtualGroupAssignmentAlgorithm();

private FifoVirtualGroupAssignmentAlgorithm() { }

public static FifoVirtualGroupAssignmentAlgorithm getInstance() {
return _instance;
}

@Override
public Map<String, Set<String>> computeAssignment(int numGroups, String virtualGroupName,
Map<String, Set<String>> zoneMapping) {
List<String> sortedInstances = HelixUtil.sortAndFlattenZoneMapping(zoneMapping);
Map<String, Set<String>> assignment = new HashMap<>();
// #instances = instancesPerGroupBase * numGroups + residuals
int instancesPerGroupBase = sortedInstances.size() / numGroups;
int residuals = sortedInstances.size() % numGroups; // assign across the first #residuals groups
List<Integer> numInstances = new ArrayList<>();
int instanceInd = 0;
for (int groupInd = 0; groupInd < numGroups; groupInd++) {
int num = groupInd < residuals
? instancesPerGroupBase + 1
: instancesPerGroupBase;
String groupId = computeVirtualGroupId(groupInd, virtualGroupName);
assignment.put(groupId, new HashSet<>());
for (int i = 0; i < num; i++) {
assignment.get(groupId).add(sortedInstances.get(instanceInd));
instanceInd++;
}
numInstances.add(num);
}
Preconditions.checkState(numInstances.stream().mapToInt(Integer::intValue).sum() == sortedInstances.size());
return ImmutableMap.copyOf(assignment);
}

private static String computeVirtualGroupId(int groupIndex, String virtualGroupName) {
return virtualGroupName + VirtualTopologyGroupConstants.GROUP_NAME_SPLITTER + groupIndex;
}
}
@@ -0,0 +1,38 @@
package org.apache.helix.cloud.topology;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.Map;
import java.util.Set;


public interface VirtualGroupAssignmentAlgorithm {

/**
* Compute the assignment for each virtual topology group.
*
* @param numGroups number of the virtual groups
* @param virtualGroupName virtual group name
* @param zoneMapping current zone mapping from zoneId to instanceIds
* @return the assignment as mapping from virtual group ID to instanceIds
*/
Map<String, Set<String>> computeAssignment(int numGroups, String virtualGroupName,
Map<String, Set<String>> zoneMapping);
}
@@ -580,4 +580,18 @@ private static boolean isInstanceInManagementMode(String instance,
|| (instancesMessages.getOrDefault(instance, Collections.emptyList()).stream()
.anyMatch(Message::isParticipantStatusChangeType));
}

/**
* Sort zoneMapping for each virtual group and flatten to a list.
* @param zoneMapping virtual group mapping.
* @return a list of instances sorted and flattened.
*/
public static List<String> sortAndFlattenZoneMapping(Map<String, Set<String>> zoneMapping) {
return zoneMapping
.entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.flatMap(entry -> entry.getValue().stream().sorted())
.collect(Collectors.toList());
}
}
@@ -0,0 +1,94 @@
package org.apache.helix.cloud.virtualTopologyGroup;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants;
import org.apache.helix.cloud.topology.FifoVirtualGroupAssignmentAlgorithm;
import org.apache.helix.cloud.topology.VirtualGroupAssignmentAlgorithm;
import org.apache.helix.util.HelixUtil;
import org.testng.Assert;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class TestVirtualTopologyGroupAssignment {

private static final String GROUP_NAME = "test_virtual_group";
private final List<String> _flattenExpected = Arrays.asList(
"1", "2", "3",
"4", "5", "6",
"7", "8", "9",
"a", "b", "c", "d");
private Map<String, Set<String>> _zoneMapping = new HashMap<>();

@BeforeTest
public void prepare() {
_zoneMapping = new HashMap<>();
_zoneMapping.put("c", Sets.newHashSet("9", "8", "7"));
_zoneMapping.put("a", Sets.newHashSet("2", "3", "1"));
_zoneMapping.put("z", Sets.newHashSet("b", "c", "d", "a"));
_zoneMapping.put("b", Sets.newHashSet("5", "4", "6"));
}

@Test
public void testFlattenZoneMapping() {
Assert.assertEquals(HelixUtil.sortAndFlattenZoneMapping(_zoneMapping), _flattenExpected);
}

@Test(dataProvider = "getMappingTests")
public void testAssignmentScheme(int numGroups, Map<String, Set<String>> expected,
VirtualGroupAssignmentAlgorithm algorithm) {
Assert.assertEquals(algorithm.computeAssignment(numGroups, GROUP_NAME, _zoneMapping), expected);
}

@DataProvider
public Object[][] getMappingTests() {
Map<String, Set<String>> virtualMapping = new HashMap<>();
VirtualGroupAssignmentAlgorithm algorithm = FifoVirtualGroupAssignmentAlgorithm.getInstance();
virtualMapping.put(computeVirtualGroupId(0), Sets.newHashSet("1", "2", "3", "4", "5"));
virtualMapping.put(computeVirtualGroupId(1), Sets.newHashSet("6", "7", "8", "9"));
virtualMapping.put(computeVirtualGroupId(2), Sets.newHashSet("a", "b", "c", "d"));
Assert.assertEquals(algorithm.computeAssignment(3, GROUP_NAME, _zoneMapping),
virtualMapping);
Map<String, Set<String>> virtualMapping2 = new HashMap<>();
virtualMapping2.put(computeVirtualGroupId(0), Sets.newHashSet("1", "2"));
virtualMapping2.put(computeVirtualGroupId(1), Sets.newHashSet("3", "4"));
virtualMapping2.put(computeVirtualGroupId(2), Sets.newHashSet("5", "6"));
virtualMapping2.put(computeVirtualGroupId(3), Sets.newHashSet("7", "8"));
virtualMapping2.put(computeVirtualGroupId(4), Sets.newHashSet("9", "a"));
virtualMapping2.put(computeVirtualGroupId(5), Sets.newHashSet("b"));
virtualMapping2.put(computeVirtualGroupId(6), Sets.newHashSet("c"));
virtualMapping2.put(computeVirtualGroupId(7), Sets.newHashSet("d"));
return new Object[][] {
{3, virtualMapping, algorithm},
{8, virtualMapping2, algorithm}
};
}

private static String computeVirtualGroupId(int groupIndex) {
return GROUP_NAME + VirtualTopologyGroupConstants.GROUP_NAME_SPLITTER + groupIndex;
}
}
@@ -257,6 +257,22 @@ public void testParseFaultZone() throws IOException {
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);

Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance");

// test fault zone not in top of topology
testClusterConfig = new ClusterConfig("testClusterConfigId");
testClusterConfig.setFaultZoneType("zone");
testClusterConfig.setTopologyAwareEnabled(true);
testClusterConfig.setTopology("/rack/zone/instance");

testInstanceConfig = new InstanceConfig("testInstanceConfigId");
testInstanceConfig.setDomain("rack=3, zone=2, instance=testInstanceConfigId");
instanceConfigMap = new HashMap<>();
instanceConfigMap.put(_testInstanceId, testInstanceConfig);
when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
assignableNode = new AssignableNode(testCache.getClusterConfig(),
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
Assert.assertEquals(assignableNode.getFaultZone(), "3/2");
}

@Test

0 comments on commit 199d997

Please sign in to comment.