Skip to content

Commit

Permalink
Fix incompatible issue for clusterConfig mapfields disabledInstances (#…
Browse files Browse the repository at this point in the history
…2100)

Fix incompatible issue for clusterConfig mapfields
  • Loading branch information
xyuanlu committed May 18, 2022
1 parent aadd57e commit c46a70f
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ private static long getInactiveTime(String instance, Set<String> liveInstances,
// check the time instance got disabled.
if (!InstanceValidationUtil.isInstanceEnabled(instanceConfig, clusterConfig)) {
long disabledTime = instanceConfig.getInstanceEnabledTime();
Map<String, String> disabledInstances = clusterConfig.getDisabledInstances();
if (disabledInstances.containsKey(instance)) {
String batchedDisabledTime = clusterConfig.getInstanceHelixDisabledTimeStamp(instance);
if (batchedDisabledTime != null && !batchedDisabledTime.isEmpty()) {
// Update batch disable time
long batchDisableTime = Long.parseLong(clusterConfig.getInstanceHelixDisabledTimeStamp(instance));
long batchDisableTime = Long.parseLong(batchedDisabledTime);
if (disabledTime == -1 || disabledTime > batchDisableTime) {
disabledTime = batchDisableTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1933,28 +1933,35 @@ public ZNRecord update(ZNRecord currentData) {

ClusterConfig clusterConfig = new ClusterConfig(currentData);
Map<String, String> disabledInstances = new TreeMap<>(clusterConfig.getDisabledInstances());
Map<String, String> disabledInstancesWithInfo = new TreeMap<>(clusterConfig.getDisabledInstancesWithInfo());
if (enabled) {
disabledInstances.keySet().removeAll(instances);
disabledInstancesWithInfo.keySet().removeAll(instances);
} else {
for (String disabledInstance : instances) {
// We allow user to override disabledType and reason for an already disabled instance.
// TODO: we are updating both DISABLED_INSTANCES and DISABLED_INSTANCES_W_INFO for
// backward compatible. Deprecate DISABLED_INSTANCES in the future.
// TODO: update the history ZNode
disabledInstances
.put(disabledInstance, assembleInstanceBatchedDisabledInfo(disabledType, reason));
String timeStamp = String.valueOf(System.currentTimeMillis());
disabledInstances.put(disabledInstance, timeStamp);
disabledInstancesWithInfo
.put(disabledInstance, assembleInstanceBatchedDisabledInfo(disabledType, reason, timeStamp));
}
}
clusterConfig.setDisabledInstances(disabledInstances);
clusterConfig.setDisabledInstancesWithInfo(disabledInstancesWithInfo);

return clusterConfig.getRecord();
}
}, AccessOption.PERSISTENT);
}

public static String assembleInstanceBatchedDisabledInfo(
InstanceConstants.InstanceDisabledType disabledType, String reason) {
InstanceConstants.InstanceDisabledType disabledType, String reason, String timeStamp) {
Map<String, String> disableInfo = new TreeMap<>();
disableInfo.put(ClusterConfig.ClusterConfigProperty.HELIX_ENABLED_DISABLE_TIMESTAMP.toString(),
String.valueOf(System.currentTimeMillis()));
timeStamp);
if (disabledType != null) {
disableInfo.put(ClusterConfig.ClusterConfigProperty.HELIX_DISABLED_TYPE.toString(),
disabledType.toString());
Expand Down
55 changes: 45 additions & 10 deletions helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@
import org.apache.helix.api.config.HelixConfigProperty;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.api.config.StateTransitionTimeoutConfig;
import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.util.ConfigStringUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.zookeeper.datamodel.ZNRecord;

/**
* Cluster configurations
Expand Down Expand Up @@ -86,6 +85,9 @@ public enum ClusterConfigProperty {
// partitons that need recovery or in
// error exceeds this limitation
DISABLED_INSTANCES,
DISABLED_INSTANCES_WITH_INFO,
// disabled instances and disabled instances with info are for storing batch disabled instances.
// disabled instances will write into both 2 fields for backward compatibility.

VIEW_CLUSTER, // Set to "true" to indicate this is a view cluster
VIEW_CLUSTER_SOURCES, // Map field, key is the name of source cluster, value is
Expand Down Expand Up @@ -772,12 +774,32 @@ public void setDisabledInstances(Map<String, String> disabledInstances) {
_record.setMapField(ClusterConfigProperty.DISABLED_INSTANCES.name(), disabledInstances);
}

/**
* Set the disabled instance list with concatenated Info
*/
public void setDisabledInstancesWithInfo(Map<String, String> disabledInstancesWithInfo) {
_record.setMapField(ClusterConfigProperty.DISABLED_INSTANCES_WITH_INFO.name(),
disabledInstancesWithInfo);
}

/**
* Get current disabled instance map of <instance, disabledTimeStamp>
* @return a non-null map of disabled instances in cluster config
*/
public Map<String, String> getDisabledInstances() {
Map<String, String> disabledInstances = _record.getMapField(ClusterConfigProperty.DISABLED_INSTANCES.name());
Map<String, String> disabledInstances =
_record.getMapField(ClusterConfigProperty.DISABLED_INSTANCES.name());
return disabledInstances == null ? Collections.emptyMap() : disabledInstances;
}

/**
* Get current disabled instance map of
* <instance, disabledReason = "res, disabledType = typ, disabledTimeStamp = time">
* @return a non-null map of disabled instances in cluster config
*/
public Map<String, String> getDisabledInstancesWithInfo() {
Map<String, String> disabledInstances =
_record.getMapField(ClusterConfigProperty.DISABLED_INSTANCES_WITH_INFO.name());
return disabledInstances == null ? Collections.emptyMap() : disabledInstances;
}

Expand Down Expand Up @@ -1103,7 +1125,6 @@ public Map<String, Map<String, String>> getIdealStateRules() {
}
return idealStateRuleMap;
}

@Override
public int hashCode() {
return getId().hashCode();
Expand All @@ -1118,26 +1139,40 @@ public String getClusterName() {
}

public String getPlainInstanceHelixDisabledType(String instanceName) {
return ConfigStringUtil.parseConcatenatedConfig(getDisabledInstances().get(instanceName))
return ConfigStringUtil.parseConcatenatedConfig(getDisabledInstancesWithInfo().get(instanceName))
.get(ClusterConfigProperty.HELIX_DISABLED_TYPE.toString());
}

public String getInstanceHelixDisabledType(String instanceName) {
if (!getDisabledInstances().containsKey(instanceName)) {
if (!getDisabledInstancesWithInfo().containsKey(instanceName) &&
!getDisabledInstances().containsKey(instanceName)) {
return InstanceConstants.INSTANCE_NOT_DISABLED;
}
return ConfigStringUtil.parseConcatenatedConfig(getDisabledInstances().get(instanceName))
return ConfigStringUtil.parseConcatenatedConfig(getDisabledInstancesWithInfo().get(instanceName))
.getOrDefault(ClusterConfigProperty.HELIX_DISABLED_TYPE.toString(),
InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.toString());
}

/**
* @return a String representing reason.
* null if instance is not disabled in batch mode or do not have disabled reason
*/
public String getInstanceHelixDisabledReason(String instanceName) {
return ConfigStringUtil.parseConcatenatedConfig(getDisabledInstances().get(instanceName))
return ConfigStringUtil.parseConcatenatedConfig(getDisabledInstancesWithInfo().get(instanceName))
.get(ClusterConfigProperty.HELIX_DISABLED_REASON.toString());
}

/**
* @param instanceName
* @return a String representation of unix time
* null if the instance is not disabled in batch mode.
*/
public String getInstanceHelixDisabledTimeStamp(String instanceName) {
return ConfigStringUtil.parseConcatenatedConfig(getDisabledInstances().get(instanceName))
.get(ClusterConfigProperty.HELIX_ENABLED_DISABLE_TIMESTAMP.toString());
if (getDisabledInstancesWithInfo().containsKey(instanceName)) {
return ConfigStringUtil
.parseConcatenatedConfig(getDisabledInstancesWithInfo().get(instanceName))
.get(ClusterConfigProperty.HELIX_ENABLED_DISABLE_TIMESTAMP.toString());
}
return getDisabledInstances().get(instanceName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public static boolean isInstanceEnabled(InstanceConfig instanceConfig, ClusterCo
return enabledInInstanceConfig;
}
boolean enabledInClusterConfig =
!clusterConfig.getDisabledInstances().containsKey(instanceConfig.getInstanceName());
!clusterConfig.getDisabledInstances().containsKey(instanceConfig.getInstanceName())
&& !clusterConfig.getDisabledInstancesWithInfo().containsKey(instanceConfig.getInstanceName());
return enabledInClusterConfig && enabledInInstanceConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ public void testOldEnableDisable() throws InterruptedException {
for (Map<String, String> stateMap : externalView.getRecord().getMapFields().values()) {
Assert.assertTrue(!stateMap.keySet().contains(_participants[0].getInstanceName()));
}
HelixDataAccessor dataAccessor =
new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<>(_gZkClient));
ClusterConfig clusterConfig = dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
Assert.assertEquals(Long.parseLong(
clusterConfig.getInstanceHelixDisabledTimeStamp(_participants[0].getInstanceName())),
Long.parseLong(clusterConfig.getDisabledInstances().get(_participants[0].getInstanceName())));
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
_participants[0].getInstanceName(), true);
}
Expand All @@ -79,9 +85,18 @@ public void testBatchEnableDisable() throws InterruptedException {
Assert.assertTrue(!stateMap.keySet().contains(_participants[0].getInstanceName()));
Assert.assertTrue(!stateMap.keySet().contains(_participants[1].getInstanceName()));
}
HelixDataAccessor dataAccessor =
new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<>(_gZkClient));
ClusterConfig clusterConfig = dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
Assert.assertEquals(Long.parseLong(
clusterConfig.getInstanceHelixDisabledTimeStamp(_participants[1].getInstanceName())),
Long.parseLong(clusterConfig.getDisabledInstances().get(_participants[1].getInstanceName())));
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
Arrays.asList(_participants[0].getInstanceName(), _participants[1].getInstanceName()),
true);
Assert.assertEquals(Long.parseLong(
clusterConfig.getInstanceHelixDisabledTimeStamp(_participants[0].getInstanceName())),
Long.parseLong(clusterConfig.getDisabledInstances().get(_participants[0].getInstanceName())));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,18 +319,23 @@ public void enableInstance(String clusterName, List<String> instances, boolean e
ClusterConfig clusterConfig = new ClusterConfig(record);

Map<String, String> disabledInstances = new TreeMap<>();
Map<String, String> disabledInstancesWithInfo = new TreeMap<>();
if (clusterConfig.getDisabledInstances() != null) {
disabledInstances.putAll(clusterConfig.getDisabledInstances());
disabledInstancesWithInfo.putAll(clusterConfig.getDisabledInstancesWithInfo());
}
if (enabled) {
disabledInstances.keySet().removeAll(instances);
} else {
for (String disabledInstance : instances) {
String timeStamp = String.valueOf(System.currentTimeMillis());
disabledInstances.put(disabledInstance, timeStamp);
disabledInstances
.put(disabledInstance, assembleInstanceBatchedDisabledInfo(disabledType, reason));
.put(disabledInstance, assembleInstanceBatchedDisabledInfo(disabledType, reason, timeStamp));
}
}
clusterConfig.setDisabledInstances(disabledInstances);
clusterConfig.setDisabledInstancesWithInfo(disabledInstancesWithInfo);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ public void testUpdateInstances() throws IOException {
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
Assert.assertEquals(clusterConfig.getDisabledInstances().keySet(),
new HashSet<>(instancesToDisable));
Assert.assertEquals(clusterConfig.getDisabledInstancesWithInfo().keySet(),
new HashSet<>(instancesToDisable));
Assert
.assertEquals(clusterConfig.getInstanceHelixDisabledType(CLUSTER_NAME + "localhost_12918"),
"USER_OPERATION");
Expand All @@ -171,6 +173,11 @@ public void testUpdateInstances() throws IOException {
clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
Assert.assertEquals(clusterConfig.getDisabledInstances().keySet(),
new HashSet<>(Arrays.asList(CLUSTER_NAME + "localhost_12919")));
Assert.assertEquals(clusterConfig.getDisabledInstancesWithInfo().keySet(),
new HashSet<>(Arrays.asList(CLUSTER_NAME + "localhost_12919")));
Assert.assertEquals(Long.parseLong(
clusterConfig.getInstanceHelixDisabledTimeStamp(CLUSTER_NAME + "localhost_12919")),
Long.parseLong(clusterConfig.getDisabledInstances().get(CLUSTER_NAME + "localhost_12919")));
Assert
.assertEquals(clusterConfig.getInstanceHelixDisabledType(CLUSTER_NAME + "localhost_12918"),
"INSTANCE_NOT_DISABLED");
Expand Down

0 comments on commit c46a70f

Please sign in to comment.