Skip to content
Permalink
Browse files
Add rest endpoint for virtual topology group (#1958)
  • Loading branch information
qqu0127 authored and junkaixue committed Feb 22, 2022
1 parent 90a3832 commit de572a6192f4186cc1cabe8ee8ceb38a7d3dc615
Showing 3 changed files with 147 additions and 35 deletions.
@@ -59,6 +59,7 @@ public enum Properties {
public enum Command {
activate,
addInstanceTag,
addVirtualTopologyGroup,
expand,
enable,
disable,
@@ -70,6 +70,7 @@
import org.apache.helix.rest.server.json.cluster.ClusterTopology;
import org.apache.helix.rest.server.service.ClusterService;
import org.apache.helix.rest.server.service.ClusterServiceImpl;
import org.apache.helix.rest.server.service.VirtualTopologyGroupService;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -235,6 +236,21 @@ public Response updateCluster(@PathParam("clusterId") String clusterId,
}
break;

case addVirtualTopologyGroup:
try {
addVirtualTopologyGroup(clusterId, content);
} catch (JsonProcessingException ex) {
LOG.error("Failed to parse json string: {}", content, ex);
return badRequest("Invalid payload json body: " + content);
} catch (IllegalArgumentException ex) {
LOG.error("Illegal input {} for command {}.", content, command, ex);
return badRequest(String.format("Illegal input %s for command %s", content, command));
} catch (Exception ex) {
LOG.error("Failed to add virtual topology group to cluster {}", clusterId, ex);
return serverError(ex);
}
break;

case expand:
try {
clusterSetup.expandCluster(clusterId);
@@ -305,6 +321,15 @@ public Response updateCluster(@PathParam("clusterId") String clusterId,
return OK();
}

private void addVirtualTopologyGroup(String clusterId, String content) throws JsonProcessingException {
ClusterService clusterService = new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
VirtualTopologyGroupService service = new VirtualTopologyGroupService(
getHelixAdmin(), clusterService, getConfigAccessor(), getDataAccssor(clusterId));
Map<String, String> customFieldsMap =
OBJECT_MAPPER.readValue(content, new TypeReference<HashMap<String, String>>() { });
service.addVirtualTopologyGroup(clusterId, customFieldsMap);
}

@ResponseMetered(name = HttpConstants.READ_REQUEST)
@Timed(name = HttpConstants.READ_REQUEST)
@GET
@@ -19,6 +19,7 @@
* under the License.
*/

import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -45,6 +46,7 @@
import org.apache.helix.api.status.ClusterManagementModeRequest;
import org.apache.helix.cloud.azure.AzureConstants;
import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.integration.manager.ClusterDistributedController;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -70,10 +72,13 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class TestClusterAccessor extends AbstractTestClass {

private static final String VG_CLUSTER = "vgCluster";

@BeforeClass
public void beforeClass() {
for (String cluster : _clusters) {
@@ -167,10 +172,7 @@ public void testGetClusterTopologyAndFaultZoneMap() throws IOException {
updateClusterConfigFromRest(cluster, configDelta, Command.update);

//get valid cluster topology map
String topologyMapDef = get(topologyMapUrlBase, null, Response.Status.OK.getStatusCode(), true);
Map<String, Object> topologyMap =
OBJECT_MAPPER.readValue(topologyMapDef, new TypeReference<HashMap<String, Object>>() {
});
Map<String, Object> topologyMap = getMapResponseFromRest(topologyMapUrlBase);
Assert.assertEquals(topologyMap.size(), 2);
Assert.assertTrue(topologyMap.get("/helixZoneId:zone0") instanceof List);
List<String> instances = (List<String>) topologyMap.get("/helixZoneId:zone0");
@@ -197,10 +199,7 @@ public void testGetClusterTopologyAndFaultZoneMap() throws IOException {
updateClusterConfigFromRest(cluster, configDelta, Command.update);

//get valid cluster fault zone map
String faultZoneMapDef = get(faultZoneUrlBase, null, Response.Status.OK.getStatusCode(), true);
Map<String, Object> faultZoneMap =
OBJECT_MAPPER.readValue(faultZoneMapDef, new TypeReference<HashMap<String, Object>>() {
});
Map<String, Object> faultZoneMap = getMapResponseFromRest(faultZoneUrlBase);
Assert.assertEquals(faultZoneMap.size(), 2);
Assert.assertTrue(faultZoneMap.get("/helixZoneId:zone0") instanceof List);
instances = (List<String>) faultZoneMap.get("/helixZoneId:zone0");
@@ -223,6 +222,108 @@ public void testGetClusterTopologyAndFaultZoneMap() throws IOException {
"/instance:TestCluster_1localhost_12927"))));
}

@Test(dataProvider = "prepareVirtualTopologyTests", dependsOnMethods = "testGetClusters")
public void testAddVirtualTopologyGroup(String requestParam, int numGroups,
Map<String, String> instanceToGroup) throws IOException {
post("clusters/" + VG_CLUSTER,
ImmutableMap.of("command", "addVirtualTopologyGroup"),
Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE),
Response.Status.OK.getStatusCode());
Map<String, Object> topology = getMapResponseFromRest(String.format("clusters/%s/topology", VG_CLUSTER));
Assert.assertTrue(topology.containsKey("zones"));
Assert.assertEquals(((List) topology.get("zones")).size(), numGroups);

ClusterConfig clusterConfig = getClusterConfigFromRest(VG_CLUSTER);
String expectedTopology = "/" + VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE + "/hostname";
Assert.assertEquals(clusterConfig.getTopology(), expectedTopology);
Assert.assertEquals(clusterConfig.getFaultZoneType(), VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE);

HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(VG_CLUSTER, _baseAccessor);
for (Map.Entry<String, String> entry : instanceToGroup.entrySet()) {
InstanceConfig instanceConfig =
helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().instanceConfig(entry.getKey()));
String expectedGroup = entry.getValue();
Assert.assertEquals(instanceConfig.getDomainAsMap().get(VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE),
expectedGroup);
}
}

@Test(dependsOnMethods = "testGetClusters")
public void testVirtualTopologyGroupMaintenanceMode() throws JsonProcessingException {
setupClusterForVirtualTopology(VG_CLUSTER);
String requestParam = "{\"virtualTopologyGroupNumber\":\"7\",\"virtualTopologyGroupName\":\"vgTest\","
+ "\"autoMaintenanceModeDisabled\":\"true\"}";
// expect failure as cluster is not in maintenance mode while autoMaintenanceModeDisabled=true
post("clusters/" + VG_CLUSTER,
ImmutableMap.of("command", "addVirtualTopologyGroup"),
Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE),
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
// enable maintenance mode and expect success
post("clusters/" + VG_CLUSTER,
ImmutableMap.of("command", "enableMaintenanceMode"),
Entity.entity("virtual group", MediaType.APPLICATION_JSON_TYPE),
Response.Status.OK.getStatusCode());
post("clusters/" + VG_CLUSTER,
ImmutableMap.of("command", "addVirtualTopologyGroup"),
Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE),
Response.Status.OK.getStatusCode());

Assert.assertTrue(isMaintenanceModeEnabled(VG_CLUSTER));
}

private boolean isMaintenanceModeEnabled(String clusterName) throws JsonProcessingException {
String body =
get("clusters/" + clusterName + "/maintenance", null, Response.Status.OK.getStatusCode(), true);
return OBJECT_MAPPER.readTree(body).get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue();
}

@DataProvider
public Object[][] prepareVirtualTopologyTests() {
setupClusterForVirtualTopology(VG_CLUSTER);
String test1 = "{\"virtualTopologyGroupNumber\":\"7\",\"virtualTopologyGroupName\":\"vgTest\"}";
String test2 = "{\"virtualTopologyGroupNumber\":\"9\",\"virtualTopologyGroupName\":\"vgTest\"}";
return new Object[][] {
{test1, 7, ImmutableMap.of(
"vgCluster_localhost_12918", "vgTest_0",
"vgCluster_localhost_12919", "vgTest_0",
"vgCluster_localhost_12925", "vgTest_4",
"vgCluster_localhost_12927", "vgTest_6")},
{test2, 9, ImmutableMap.of(
"vgCluster_localhost_12918", "vgTest_0",
"vgCluster_localhost_12919", "vgTest_0",
"vgCluster_localhost_12925", "vgTest_6",
"vgCluster_localhost_12927", "vgTest_8")},
// repeat test1 for deterministic and test for decreasing numGroups
{test1, 7, ImmutableMap.of(
"vgCluster_localhost_12918", "vgTest_0",
"vgCluster_localhost_12919", "vgTest_0",
"vgCluster_localhost_12925", "vgTest_4",
"vgCluster_localhost_12927", "vgTest_6")}
};
}

private void setupClusterForVirtualTopology(String clusterName) {
HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
ZNRecord record = new ZNRecord("testZnode");
record.setBooleanField(CloudConfig.CloudConfigProperty.CLOUD_ENABLED.name(), true);
record.setSimpleField(CloudConfig.CloudConfigProperty.CLOUD_ID.name(), "TestCloudID");
record.setSimpleField(CloudConfig.CloudConfigProperty.CLOUD_PROVIDER.name(), CloudProvider.AZURE.name());
CloudConfig cloudConfig = new CloudConfig.Builder(record).build();
_gSetupTool.addCluster(clusterName, true, cloudConfig);

Set<String> instances = new HashSet<>();
for (int i = 0; i < 10; i++) {
String instanceName = clusterName + "_localhost_" + (12918 + i);
_gSetupTool.addInstanceToCluster(clusterName, instanceName);
InstanceConfig instanceConfig =
helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceName));
instanceConfig.setDomain("faultDomain=" + i / 2 + ",hostname=" + instanceName);
helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceName), instanceConfig);
instances.add(instanceName);
}
startInstances(clusterName, instances, 10);
}

@Test(dependsOnMethods = "testGetClusterTopologyAndFaultZoneMap")
public void testAddConfigFields() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
@@ -399,19 +500,11 @@ public void testEnableDisableMaintenanceMode() throws IOException {
Entity.entity(reason, MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode());

// verify is in maintenance mode
String body =
get("clusters/" + cluster + "/maintenance", null, Response.Status.OK.getStatusCode(), true);
JsonNode node = OBJECT_MAPPER.readTree(body);
boolean maintenance =
node.get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue();
Assert.assertTrue(maintenance);
Assert.assertTrue(isMaintenanceModeEnabled(cluster));

// Check that we could retrieve maintenance signal correctly
String signal = get("clusters/" + cluster + "/controller/maintenanceSignal", null,
Response.Status.OK.getStatusCode(), true);
Map<String, Object> maintenanceSignalMap =
OBJECT_MAPPER.readValue(signal, new TypeReference<HashMap<String, Object>>() {
});
getMapResponseFromRest("clusters/" + cluster + "/controller/maintenanceSignal");
Assert.assertEquals(maintenanceSignalMap.get("TRIGGERED_BY"), "USER");
Assert.assertEquals(maintenanceSignalMap.get("REASON"), reason);
Assert.assertNotNull(maintenanceSignalMap.get("TIMESTAMP"));
@@ -422,10 +515,7 @@ public void testEnableDisableMaintenanceMode() throws IOException {
Entity.entity("", MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode());

// verify no longer in maintenance mode
body = get("clusters/" + cluster + "/maintenance", null, Response.Status.OK.getStatusCode(), true);
node = OBJECT_MAPPER.readTree(body);
Assert.assertFalse(
node.get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue());
Assert.assertFalse(isMaintenanceModeEnabled(cluster));

get("clusters/" + cluster + "/controller/maintenanceSignal", null,
Response.Status.NOT_FOUND.getStatusCode(), false);
@@ -448,11 +538,8 @@ public void testGetControllerLeadershipHistory() throws IOException {
Assert.assertNotNull(leader, "Leader name cannot be null!");

// Get the controller leadership history JSON's last entry
String leadershipHistory = get("clusters/" + cluster + "/controller/history", null,
Response.Status.OK.getStatusCode(), true);
Map<String, Object> leadershipHistoryMap =
OBJECT_MAPPER.readValue(leadershipHistory, new TypeReference<HashMap<String, Object>>() {
});
Map<String, Object> leadershipHistoryMap = getMapResponseFromRest("clusters/" + cluster + "/controller/history");

Assert.assertNotNull(leadershipHistoryMap, "Leadership history cannot be null!");
Object leadershipHistoryList =
leadershipHistoryMap.get(AbstractResource.Properties.history.name());
@@ -477,11 +564,8 @@ public void testGetMaintenanceHistory() throws IOException {
Entity.entity(reason, MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode());

// Get the maintenance history JSON's last entry
String maintenanceHistory = get("clusters/" + cluster + "/controller/maintenanceHistory", null,
Response.Status.OK.getStatusCode(), true);
Map<String, Object> maintenanceHistoryMap =
OBJECT_MAPPER.readValue(maintenanceHistory, new TypeReference<HashMap<String, Object>>() {
});
getMapResponseFromRest("clusters/" + cluster + "/controller/maintenanceHistory");
Object maintenanceHistoryList =
maintenanceHistoryMap.get(ClusterAccessor.ClusterProperties.maintenanceHistory.name());
Assert.assertNotNull(maintenanceHistoryList);
@@ -571,10 +655,7 @@ public void testGetStateModelDef() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
String cluster = "TestCluster_1";
String urlBase = "clusters/TestCluster_1/statemodeldefs/";
String stateModelDefs =
get(urlBase, null, Response.Status.OK.getStatusCode(), true);
Map<String, Object> defMap = OBJECT_MAPPER.readValue(stateModelDefs, new TypeReference<HashMap<String, Object>>() {
});
Map<String, Object> defMap = getMapResponseFromRest(urlBase);

Assert.assertTrue(defMap.size() == 2);
Assert.assertTrue(defMap.get("stateModelDefinitions") instanceof List);
@@ -1427,4 +1508,9 @@ private void validateAuditLog(AuditLog auditLog, String httpMethod, String reque
Assert.assertEquals(auditLog.getResponseCode(), statusCode);
Assert.assertEquals(auditLog.getResponseEntity(), responseEntity);
}

private Map<String, Object> getMapResponseFromRest(String uri) throws JsonProcessingException {
String response = get(uri, null, Response.Status.OK.getStatusCode(), true);
return OBJECT_MAPPER.readValue(response, new TypeReference<HashMap<String, Object>>() { });
}
}

0 comments on commit de572a6

Please sign in to comment.