Skip to content
Permalink
Browse files
Clean format for disabled instances in ClusterConfig (#2106)
We reverted enabling batch mode for instance enable/disable. This change hooks readData stage when controller starts in the first place and will clean up batch enabled field in cluster config.
This change should be reverted after next next version. (1.0.5.0 +)
  • Loading branch information
xyuanlu committed Jun 6, 2022
1 parent 8860a16 commit efd4ce0a14a60da1c2546cfafa32135c10349dbc
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 14 deletions.
@@ -63,6 +63,8 @@
import org.apache.helix.task.TaskConstants;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.InstanceValidationUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -74,8 +76,7 @@
* This class will be moved to helix-common module in the future
*/
public class BaseControllerDataProvider implements ControlContextProvider {
private static final Logger logger =
LoggerFactory.getLogger(BaseControllerDataProvider.class);
private static final Logger logger = LoggerFactory.getLogger(BaseControllerDataProvider.class);

// We only refresh EV and TEV the very first time the cluster data cache is initialized
private static final List<HelixConstants.ChangeType> _noFullRefreshProperty = Arrays
@@ -129,8 +130,7 @@ public BaseControllerDataProvider(String clusterName, String pipelineName) {
_propertyDataChangedMap = new ConcurrentHashMap<>();
for (HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) {
// refresh every type when it is initialized
_propertyDataChangedMap
.put(type, new AtomicBoolean(true));
_propertyDataChangedMap.put(type, new AtomicBoolean(true));
}

// initialize caches
@@ -241,21 +241,64 @@ private void refreshClusterConfig(final HelixDataAccessor accessor,
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CLUSTER_CONFIG).getAndSet(false)) {
_clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
refreshedType.add(HelixConstants.ChangeType.CLUSTER_CONFIG);
// TODO: This is a temp function to clean up incompatible batched disabled instances format.
// Remove in later version.
if (_clusterConfig!=null && needCleanUpBatchedDisabledInstance(_clusterConfig.getRecord())
&& cleanBatchDisableMapField(accessor)) {
LogUtil.logInfo(logger, getClusterEventId(), String
.format("Clean ClusterConfig mapField for cluster %s, pipeline %s", _clusterName,
getPipelineName()));
}
refreshAbnormalStateResolverMap(_clusterConfig);
} else {
LogUtil.logInfo(logger, getClusterEventId(), String.format(
"No ClusterConfig change for cluster %s, pipeline %s", _clusterName, getPipelineName()));
LogUtil.logDebug(logger, getClusterEventId(), String
.format("No ClusterConfig change for cluster %s, pipeline %s", _clusterName,
getPipelineName()));
}
}

// TODO: This function is used to clean up batched disabled instances for
// "DISABLED_INSTANCES" introduced in 1.0.3.0. This temp change should be reverted after 1.0.5.0 \
// or later version.
private boolean cleanBatchDisableMapField(final HelixDataAccessor accessor) {
boolean successful =
accessor.updateProperty(accessor.keyBuilder().clusterConfig(), new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
if (currentData == null) {
throw new HelixException(
"Cluster: " + _clusterConfig.getClusterName() + ": cluster config is null");
}
ZNRecord newRecord = new ZNRecord(currentData);
String batchDisabledInstanceMapFieldKey =
ClusterConfig.ClusterConfigProperty.DISABLED_INSTANCES.name();
if (needCleanUpBatchedDisabledInstance(currentData)) {
newRecord.getMapFields().remove(batchDisabledInstanceMapFieldKey);
}
return newRecord;
}
}, null);
if (!successful) {
LogUtil.logError(logger, getClusterEventId(), String
.format("Failed to clean ClusterConfig change for cluster %s, pipeline %s", _clusterName,
getPipelineName()));
}
return successful;
}

private boolean needCleanUpBatchedDisabledInstance(ZNRecord record) {
return record!=null && record.getMapFields()!=null && record.getMapFields()
.containsKey(ClusterConfig.ClusterConfigProperty.DISABLED_INSTANCES.name());
}

private void refreshIdealState(final HelixDataAccessor accessor,
Set<HelixConstants.ChangeType> refreshedType) {
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.IDEAL_STATE).getAndSet(false)) {
_idealStateCache.refresh(accessor);
refreshedType.add(HelixConstants.ChangeType.IDEAL_STATE);
} else {
LogUtil.logInfo(logger, getClusterEventId(), String
.format("No ideal state change for %s cluster, %s pipeline", _clusterName,
LogUtil.logInfo(logger, getClusterEventId(),
String.format("No ideal state change for %s cluster, %s pipeline", _clusterName,
getPipelineName()));
}
}
@@ -557,7 +600,8 @@ public Set<String> getInstancesWithTag(String instanceTag) {
public Set<String> getDisabledInstancesForPartition(String resource, String partition) {
Set<String> disabledInstancesForPartition = new HashSet<>(_disabledInstanceSet);
if (_disabledInstanceForPartitionMap.containsKey(resource)
&& _disabledInstanceForPartitionMap.get(resource).containsKey(partition)) {
&& _disabledInstanceForPartitionMap
.get(resource).containsKey(partition)) {
disabledInstancesForPartition
.addAll(_disabledInstanceForPartitionMap.get(resource).get(partition));
}
@@ -767,8 +811,7 @@ private void updateOfflineInstanceHistory(HelixDataAccessor accessor) {
if (!_updateInstanceOfflineTime) {
return;
}
List<String> offlineNodes =
new ArrayList<>(_instanceConfigCache.getPropertyMap().keySet());
List<String> offlineNodes = new ArrayList<>(_instanceConfigCache.getPropertyMap().keySet());
offlineNodes.removeAll(_liveInstanceCache.getPropertyMap().keySet());
_instanceOfflineTimeMap = new HashMap<>();

@@ -809,8 +852,7 @@ private void updateDisabledInstances(Collection<InstanceConfig> instanceConfigs,
_disabledInstanceForPartitionMap.putIfAbsent(resource, new HashMap<>());
for (String partition : disabledPartitionMap.get(resource)) {
_disabledInstanceForPartitionMap.get(resource)
.computeIfAbsent(partition, key -> new HashSet<>())
.add(config.getInstanceName());
.computeIfAbsent(partition, key -> new HashSet<>()).add(config.getInstanceName());
}
}
}
@@ -23,7 +23,6 @@
import java.util.Collections;
import java.util.Map;

import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.integration.task.TaskTestBase;
import org.apache.helix.integration.task.WorkflowGenerator;
@@ -21,11 +21,14 @@

import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import javax.management.MBeanServer;
import javax.management.ObjectName;

import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
@@ -35,12 +38,17 @@
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.CallbackHandler;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -74,6 +82,51 @@ public void afterClass() {
}

@Test
public void testControllerCleanUpClusterConfig() {
ZkBaseDataAccessor baseDataAccessor = new ZkBaseDataAccessor(_gZkClient);
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, "DISABLED_Instance");
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, "DISABLED_Instance", false);

baseDataAccessor.update(PropertyPathBuilder.clusterConfig(CLUSTER_NAME),new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
if (currentData == null) {
throw new HelixException("Cluster: " + CLUSTER_NAME + ": cluster config is null");
}

ClusterConfig clusterConfig = new ClusterConfig(currentData);
Map<String, String> disabledInstances = new TreeMap<>(clusterConfig.getDisabledInstances());
disabledInstances.put("DISABLED_Instance", "HELIX_ENABLED_DISABLE_TIMESTAMP=1652338376608");
clusterConfig.setDisabledInstances(disabledInstances);

return clusterConfig.getRecord();
}
}, AccessOption.PERSISTENT);

ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, "TestController");
controller.syncStart();
verifyControllerIsLeader(controller);

// Create cluster verifier
ZkHelixClusterVerifier clusterVerifier =
new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();

// Wait for rebalanced
Assert.assertTrue(clusterVerifier.verifyByPolling());
ZKHelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, baseDataAccessor);
ClusterConfig cls = helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().clusterConfig());
Assert.assertFalse(cls.getRecord().getMapFields()
.containsKey(ClusterConfig.ClusterConfigProperty.DISABLED_INSTANCES.name()));

controller.syncStop();
verifyControllerIsNotLeader(controller);
verifyZKDisconnected(controller);
}

@Test
public void testControllerConnectThenDisconnect() {
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, "TestController");
@@ -182,6 +235,7 @@ private void verifyZKDisconnected(ClusterControllerManager controller) {
Assert.assertTrue(controller.getZkClient().isClosed());
}


@Test
public void testMissingTopStateDurationMonitoring() throws Exception {
String clusterName = "testCluster-TestControllerLeadershipChange";
@@ -403,6 +403,11 @@ public void updateInstance() throws IOException {
_configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME).getInstanceDisabledReason(),
"");

// We should see no instance disable related field in to clusterConfig
ClusterConfig cls = _configAccessor.getClusterConfig(CLUSTER_NAME);
Assert.assertFalse(cls.getRecord().getMapFields()
.containsKey(ClusterConfig.ClusterConfigProperty.DISABLED_INSTANCES.name()));

// disable instance with no reason input
new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=disable")
.format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
@@ -415,6 +420,11 @@ public void updateInstance() throws IOException {
Assert.assertTrue(
_configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME).getInstanceEnabled());

// Disable instance should see no field write to clusterConfig
cls = _configAccessor.getClusterConfig(CLUSTER_NAME);
Assert.assertFalse(cls.getRecord().getMapFields()
.containsKey(ClusterConfig.ClusterConfigProperty.DISABLED_INSTANCES.name()));

// AddTags
List<String> tagList = ImmutableList.of("tag3", "tag1", "tag2");
entity = Entity.entity(

0 comments on commit efd4ce0

Please sign in to comment.