Skip to content
Permalink
Browse files
Code refactor and cleanup on instance validation (#2032)
Unify the usage of checking instance enable/disable using InstanceValidationUtil
  • Loading branch information
qqu0127 committed Apr 18, 2022
1 parent 5335b63 commit 30fc9cc70bfc56ff0fbcb59d787d3a06d277bfc1
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 51 deletions.
@@ -62,6 +62,7 @@
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.InstanceValidationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -798,25 +799,18 @@ private void updateDisabledInstances(Collection<InstanceConfig> instanceConfigs,
_disabledInstanceSet.clear();
for (InstanceConfig config : instanceConfigs) {
Map<String, List<String>> disabledPartitionMap = config.getDisabledPartitionsMap();
if (!config.getInstanceEnabled()) {
if (!InstanceValidationUtil.isInstanceEnabled(config, clusterConfig)) {
_disabledInstanceSet.add(config.getInstanceName());
}
for (String resource : disabledPartitionMap.keySet()) {
if (!_disabledInstanceForPartitionMap.containsKey(resource)) {
_disabledInstanceForPartitionMap.put(resource, new HashMap<>());
}
_disabledInstanceForPartitionMap.putIfAbsent(resource, new HashMap<>());
for (String partition : disabledPartitionMap.get(resource)) {
if (!_disabledInstanceForPartitionMap.get(resource).containsKey(partition)) {
_disabledInstanceForPartitionMap.get(resource).put(partition, new HashSet<>());
}
_disabledInstanceForPartitionMap.get(resource).get(partition)
_disabledInstanceForPartitionMap.get(resource)
.computeIfAbsent(partition, key -> new HashSet<>())
.add(config.getInstanceName());
}
}
}
if (clusterConfig != null && clusterConfig.getDisabledInstances() != null) {
_disabledInstanceSet.addAll(clusterConfig.getDisabledInstances().keySet());
}
}

/*
@@ -32,6 +32,7 @@
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.util.InstanceValidationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -168,7 +169,7 @@ private Node createClusterTree(ClusterConfig clusterConfig) {
}
addEndNode(root, instanceName, instanceTopologyMap, weight, _liveInstances);
} catch (IllegalArgumentException e) {
if (isInstanceEnabled(clusterConfig, instanceName, insConfig)) {
if (InstanceValidationUtil.isInstanceEnabled(insConfig, clusterConfig)) {
throw e;
} else {
logger.warn("Topology setting {} for instance {} is unset or invalid, ignore the instance!",
@@ -179,12 +180,6 @@ private Node createClusterTree(ClusterConfig clusterConfig) {
return root;
}

private static boolean isInstanceEnabled(ClusterConfig clusterConfig, String instanceName,
InstanceConfig instanceConfig) {
return (instanceConfig.getInstanceEnabled() && (clusterConfig.getDisabledInstances() == null
|| !clusterConfig.getDisabledInstances().containsKey(instanceName)));
}

/**
* Construct the instance topology map for an instance.
* The mapping is the cluster topology path name to its corresponding value.
@@ -31,7 +31,7 @@
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.util.ConfigStringUtil;
import org.apache.helix.util.InstanceValidationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -137,15 +137,12 @@ private static long getInactiveTime(String instance, Set<String> liveInstances,
}

// check the time instance got disabled.
if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null
&& clusterConfig.getDisabledInstances().containsKey(instance))) {
if (!InstanceValidationUtil.isInstanceEnabled(instanceConfig, clusterConfig)) {
long disabledTime = instanceConfig.getInstanceEnabledTime();
if (clusterConfig.getDisabledInstances() != null && clusterConfig.getDisabledInstances()
.containsKey(instance)) {
Map<String, String> disabledInstances = clusterConfig.getDisabledInstances();
if (disabledInstances.containsKey(instance)) {
// Update batch disable time
long batchDisableTime = Long.parseLong(ConfigStringUtil
.parseConcatenatedConfig(clusterConfig.getDisabledInstances().get(instance))
.get(ClusterConfig.ClusterConfigProperty.HELIX_ENABLED_DISABLE_TIMESTAMP.toString()));
long batchDisableTime = Long.parseLong(clusterConfig.getInstanceHelixDisabledTimeStamp(instance));
if (disabledTime == -1 || disabledTime > batchDisableTime) {
disabledTime = batchDisableTime;
}
@@ -41,6 +41,7 @@
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.util.InstanceValidationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -91,8 +92,7 @@ public void process(ClusterEvent event) throws Exception {
instanceMessageMap.put(instanceName,
Sets.newHashSet(dataProvider.getMessages(instanceName).values()));
}
if (!config.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null
&& clusterConfig.getDisabledInstances().containsKey(instanceName))) {
if (!InstanceValidationUtil.isInstanceEnabled(config, clusterConfig)) {
disabledInstanceSet.add(instanceName);
}

@@ -1928,10 +1928,7 @@ public ZNRecord update(ZNRecord currentData) {
}

ClusterConfig clusterConfig = new ClusterConfig(currentData);
Map<String, String> disabledInstances = new TreeMap<>();
if (clusterConfig.getDisabledInstances() != null) {
disabledInstances.putAll(clusterConfig.getDisabledInstances());
}
Map<String, String> disabledInstances = new TreeMap<>(clusterConfig.getDisabledInstances());
if (enabled) {
disabledInstances.keySet().removeAll(instances);
} else {
@@ -774,10 +774,11 @@ public void setDisabledInstances(Map<String, String> disabledInstances) {

/**
* Get current disabled instance map of <instance, disabledTimeStamp>
* @return
* @return a non-null map of disabled instances in cluster config
*/
public Map<String, String> getDisabledInstances() {
return _record.getMapField(ClusterConfigProperty.DISABLED_INSTANCES.name());
Map<String, String> disabledInstances = _record.getMapField(ClusterConfigProperty.DISABLED_INSTANCES.name());
return disabledInstances == null ? Collections.emptyMap() : disabledInstances;
}

/**
@@ -68,6 +68,7 @@
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.InstanceValidationUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -308,8 +309,7 @@ public void dropInstanceFromCluster(String clusterName, String instanceId) {

ClusterConfig clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
// ensure node is disabled, otherwise fail
if (config.getInstanceEnabled() && (clusterConfig.getDisabledInstances() == null
|| !clusterConfig.getDisabledInstances().containsKey(instanceId))) {
if (InstanceValidationUtil.isInstanceEnabled(config, clusterConfig)) {
String error = "Node " + instanceId + " is enabled, cannot drop";
_logger.warn(error);
throw new HelixException(error);
@@ -398,9 +398,10 @@ public static Map<String, Map<String, String>> getIdealAssignmentForFullAuto(
idealState.getMaxPartitionsPerInstance());

// Remove all disabled instances so that Helix will not consider them live.
List<String> disabledInstance =
instanceConfigs.stream().filter(enabled -> !enabled.getInstanceEnabled())
.map(InstanceConfig::getInstanceName).collect(Collectors.toList());
List<String> disabledInstance = instanceConfigs.stream()
.filter(instanceConfig -> !InstanceValidationUtil.isInstanceEnabled(instanceConfig, clusterConfig))
.map(InstanceConfig::getInstanceName)
.collect(Collectors.toList());
liveInstances.removeAll(disabledInstance);

Map<String, List<String>> preferenceLists = strategy
@@ -114,11 +114,22 @@ public static String getInstanceHelixDisabledType(HelixDataAccessor dataAccessor
: instanceConfig.getInstanceDisabledType();
}

private static boolean isInstanceEnabled(InstanceConfig instanceConfig, ClusterConfig clusterConfig) {
/**
* Check if the instance is enabled by configuration
* @param instanceConfig
* @param clusterConfig
* @return
*/
public static boolean isInstanceEnabled(InstanceConfig instanceConfig, ClusterConfig clusterConfig) {
if (instanceConfig == null) {
throw new HelixException("InstanceConfig is NULL");
}
boolean enabledInInstanceConfig = instanceConfig.getInstanceEnabled();
Map<String, String> disabledInstances = clusterConfig.getDisabledInstances();
if (clusterConfig == null) {
return enabledInInstanceConfig;
}
boolean enabledInClusterConfig =
disabledInstances == null || !disabledInstances.keySet().contains(instanceConfig.getInstanceName());
!clusterConfig.getDisabledInstances().containsKey(instanceConfig.getInstanceName());
return enabledInClusterConfig && enabledInInstanceConfig;
}

@@ -134,16 +145,23 @@ public static boolean isAlive(HelixDataAccessor dataAccessor, String instanceNam
return liveInstance != null;
}

/**
* Deprecated. Please use {@link #isResourceAssigned} instead.
*/
@Deprecated
public static boolean hasResourceAssigned(HelixDataAccessor dataAccessor, String clusterId,
String instanceName) {
return isResourceAssigned(dataAccessor, instanceName);
}

/**
* Method to check if the instance is assigned at least 1 resource, not in a idle state;
* Independent of the instance alive/enabled status
* @param dataAccessor
* @param clusterId
* @param instanceName
* @return
*/
public static boolean hasResourceAssigned(HelixDataAccessor dataAccessor, String clusterId,
String instanceName) {
public static boolean isResourceAssigned(HelixDataAccessor dataAccessor, String instanceName) {
PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder();
LiveInstance liveInstance = dataAccessor.getProperty(propertyKeyBuilder.liveInstance(instanceName));
if (liveInstance != null) {
@@ -136,7 +136,7 @@ public void TestHasResourceAssigned_success() {
.getProperty(argThat(new PropertyKeyArgument(PropertyType.CURRENTSTATES)));

Assert.assertTrue(
InstanceValidationUtil.hasResourceAssigned(mock.dataAccessor, TEST_CLUSTER, TEST_INSTANCE));
InstanceValidationUtil.isResourceAssigned(mock.dataAccessor, TEST_INSTANCE));
}

@Test
@@ -156,7 +156,7 @@ public void TestHasResourceAssigned_fail() {
.getProperty(argThat(new PropertyKeyArgument(PropertyType.CURRENTSTATES)));

Assert.assertFalse(
InstanceValidationUtil.hasResourceAssigned(mock.dataAccessor, TEST_CLUSTER, TEST_INSTANCE));
InstanceValidationUtil.isResourceAssigned(mock.dataAccessor, TEST_INSTANCE));
}

@Test
@@ -166,7 +166,7 @@ public void TestHasResourceAssigned_whenNotAlive() {
.getProperty(argThat(new PropertyKeyArgument(PropertyType.LIVEINSTANCES)));

Assert.assertFalse(
InstanceValidationUtil.hasResourceAssigned(mock.dataAccessor, TEST_CLUSTER, TEST_INSTANCE));
InstanceValidationUtil.isResourceAssigned(mock.dataAccessor, TEST_INSTANCE));
}

@Test
@@ -699,7 +699,7 @@ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String
break;
case EMPTY_RESOURCE_ASSIGNMENT:
healthStatus.put(HealthCheck.EMPTY_RESOURCE_ASSIGNMENT.name(),
InstanceValidationUtil.hasResourceAssigned(_dataAccessor, clusterId, instanceName));
InstanceValidationUtil.isResourceAssigned(_dataAccessor, instanceName));
break;
case MIN_ACTIVE_REPLICA_CHECK_FAILED:
healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(),
@@ -56,6 +56,7 @@
import org.apache.helix.rest.server.resources.exceptions.HelixHealthException;
import org.apache.helix.rest.server.service.ClusterService;
import org.apache.helix.rest.server.service.ClusterServiceImpl;
import org.apache.helix.util.InstanceValidationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -120,8 +121,7 @@ public Response getAllInstances(@PathParam("clusterId") String clusterId,
InstanceConfig instanceConfig =
accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName));
if (instanceConfig != null) {
if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null
&& clusterConfig.getDisabledInstances().containsKey(instanceName))) {
if (!InstanceValidationUtil.isInstanceEnabled(instanceConfig, clusterConfig)) {
disabledNode.add(JsonNodeFactory.instance.textNode(instanceName));
}

0 comments on commit 30fc9cc

Please sign in to comment.