Skip to content
Permalink
Browse files
implement util for cloud event (#2149)
This change add implementation for HelixCloudEventUtil.
  • Loading branch information
xyuanlu committed Jun 24, 2022
1 parent 174aaae commit dadaba13c3717ffbad619a9f4a064ed987ea2454
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 37 deletions.
@@ -19,51 +19,102 @@
* under the License.
*/

import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.util.InstanceValidationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* A default callback implementation class to be used in {@link HelixCloudEventListener}
*/
public class DefaultCloudEventCallbackImpl {
private static final Logger LOG = LoggerFactory.getLogger(DefaultCloudEventCallbackImpl.class);
private final String _instanceReason = "Cloud event in DefaultCloudEventCallback at %s";
private final String _emmReason = "Cloud event EMM in DefaultCloudEventCallback by %s at %s";

/**
* Disable the instance
* Disable the instance and track the cloud event in map field disabledInstancesWithInfo in
* cluster config. Will not re-disable the instance if the instance is already disabled for
* other reason. (So we will not overwrite the disabled reason and enable this instance when
* on-unpause)
* @param manager The helix manager associated with the listener
* @param eventInfo Detailed information about the event
*/
public void disableInstance(HelixManager manager, Object eventInfo) {
// To be implemented
throw new NotImplementedException();
String message = String.format(_instanceReason, System.currentTimeMillis());
LOG.info("DefaultCloudEventCallbackImpl disable Instance {}", manager.getInstanceName());
if (InstanceValidationUtil
.isEnabled(manager.getHelixDataAccessor(), manager.getInstanceName())) {
manager.getClusterManagmentTool()
.enableInstance(manager.getClusterName(), manager.getInstanceName(), false,
InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message);
}
HelixEventHandlingUtil.updateCloudEventOperationInClusterConfig(manager.getClusterName(),
manager.getInstanceName(), manager.getHelixDataAccessor().getBaseDataAccessor(), false,
message);
}

/**
* Enable the instance
* Remove tracked cloud event in cluster config and enable the instance
* We only enable instance that is disabled because of cloud event.
* @param manager The helix manager associated with the listener
* @param eventInfo Detailed information about the event
*/
public void enableInstance(HelixManager manager, Object eventInfo) {
// To be implemented
throw new NotImplementedException();
LOG.info("DefaultCloudEventCallbackImpl enable Instance {}", manager.getInstanceName());
String instanceName = manager.getInstanceName();
HelixDataAccessor accessor = manager.getHelixDataAccessor();
String message = String.format(_instanceReason, System.currentTimeMillis());
HelixEventHandlingUtil
.updateCloudEventOperationInClusterConfig(manager.getClusterName(), instanceName,
manager.getHelixDataAccessor().getBaseDataAccessor(), true, message);
if (HelixEventHandlingUtil.isInstanceDisabledForCloudEvent(instanceName, accessor)) {
manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), instanceName, true,
InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message);
}
}

/**
*
* Will enter MM when the cluster is not in MM
* TODO: we should add maintenance reason when EMM with cloud event
* @param manager The helix manager associated with the listener
* @param eventInfo Detailed information about the event
*/
public void enterMaintenanceMode(HelixManager manager, Object eventInfo) {
// To be implemented
throw new NotImplementedException();
if (!manager.getClusterManagmentTool().isInMaintenanceMode(manager.getClusterName())) {
LOG.info("DefaultCloudEventCallbackImpl enterMaintenanceMode by {}",
manager.getInstanceName());
manager.getClusterManagmentTool()
.manuallyEnableMaintenanceMode(manager.getClusterName(), true,
String.format(_emmReason, manager.getInstanceName(), System.currentTimeMillis()),
null);
}
}

/**
*
* Will exit MM when when cluster config tracks no ongoing cloud event being handling
* TODO: we should also check the maintenance reason and only exit when EMM is caused by cloud event
* @param manager The helix manager associated with the listener
* @param eventInfo Detailed information about the event
*/
public void exitMaintenanceMode(HelixManager manager, Object eventInfo) {
// To be implemented
throw new NotImplementedException();
ClusterConfig clusterConfig = manager.getHelixDataAccessor()
.getProperty(manager.getHelixDataAccessor().keyBuilder().clusterConfig());
if (HelixEventHandlingUtil.checkNoInstanceUnderCloudEvent(clusterConfig)) {
LOG.info("DefaultCloudEventCallbackImpl exitMaintenanceMode by {}",
manager.getInstanceName());
manager.getClusterManagmentTool()
.manuallyEnableMaintenanceMode(manager.getClusterName(), false,
String.format(_emmReason, manager.getInstanceName(), System.currentTimeMillis()),
null);
} else {
LOG.info(
"DefaultCloudEventCallbackImpl will not exitMaintenanceMode as there are {} instances under cloud event",
clusterConfig.getDisabledInstancesWithInfo().keySet().size());
}
}
}
@@ -107,9 +107,9 @@ private DefaultCloudEventCallbackImpl loadCloudEventCallbackImplClass(String imp
try {
LOG.info("Loading class: " + implClassName);
implClass = (DefaultCloudEventCallbackImpl) HelixUtil.loadClass(getClass(), implClassName)
.newInstance();
.getConstructor().newInstance();
} catch (Exception e) {
implClass = DefaultCloudEventCallbackImpl.class.newInstance();
implClass = new DefaultCloudEventCallbackImpl();
LOG.error(
"No cloud event callback implementation class found for: {}. message: {}. Using default callback impl class instead.",
implClassName, e.getMessage());
@@ -19,40 +19,107 @@
* under the License.
*/

import java.util.Map;
import java.util.TreeMap;

import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.util.ConfigStringUtil;
import org.apache.helix.util.InstanceValidationUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


class HelixEventHandlingUtil {
class HelixEventHandlingUtil {
private static Logger LOG = LoggerFactory.getLogger(HelixEventHandlingUtil.class);

/**
* Enable or disable an instance for cloud event.
* It will enable/disable Helix for that instance. Also add the instance cloud event info to
* clusterConfig Znode when enable.
* @param clusterName
* check if instance is disabled by cloud event.
* @param instanceName
* @param message
* @param isEnable
* @param dataAccessor
* @return return failure when either enable/disable failed or update cluster ZNode failed.
* @return return true only when instance is Helix disabled and the disabled reason in
* instanceConfig is cloudEvent
*/
static boolean enableInstanceForCloudEvent(String clusterName, String instanceName, String message,
boolean isEnable, BaseDataAccessor dataAccessor) {
// TODO add impl here
return true;
static boolean isInstanceDisabledForCloudEvent(String instanceName,
HelixDataAccessor dataAccessor) {
InstanceConfig instanceConfig =
dataAccessor.getProperty(dataAccessor.keyBuilder().instanceConfig(instanceName));
if (instanceConfig == null) {
throw new HelixException("Instance: " + instanceName
+ ", instance config does not exist");
}
return !InstanceValidationUtil.isEnabled(dataAccessor, instanceName) && instanceConfig
.getInstanceDisabledType()
.equals(InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name());
}

/**
* check if instance is disabled by cloud event.
* @param clusterName
* @param instanceName
* @param dataAccessor
* @return return true only when instance is Helix disabled and has the cloud event info in
* clusterConfig ZNode.
* Update map field disabledInstancesWithInfo in clusterConfig with cloudEvent instance info
*/
static boolean IsInstanceDisabledForCloudEvent(String clusterName, String instanceName,
BaseDataAccessor dataAccessor) {
// TODO add impl here
return true;
static void updateCloudEventOperationInClusterConfig(String clusterName, String instanceName,
BaseDataAccessor baseAccessor, boolean enable, String message) {
String path = PropertyPathBuilder.clusterConfig(clusterName);

if (!baseAccessor.exists(path, 0)) {
throw new HelixException("Cluster " + clusterName + ": cluster config does not exist");
}

if (!baseAccessor.update(path, new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
if (currentData == null) {
throw new HelixException("Cluster: " + clusterName + ": cluster config is null");
}

ClusterConfig clusterConfig = new ClusterConfig(currentData);
Map<String, String> disabledInstancesWithInfo =
new TreeMap<>(clusterConfig.getDisabledInstancesWithInfo());
if (enable) {
disabledInstancesWithInfo.keySet().remove(instanceName);
} else {
// disabledInstancesWithInfo is only used for cloud event handling.
String timeStamp = String.valueOf(System.currentTimeMillis());
disabledInstancesWithInfo.put(instanceName, ZKHelixAdmin
.assembleInstanceBatchedDisabledInfo(
InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message, timeStamp));
}
clusterConfig.setDisabledInstancesWithInfo(disabledInstancesWithInfo);

return clusterConfig.getRecord();
}
}, AccessOption.PERSISTENT)) {
LOG.error("Failed to update cluster config {} for {} instance {}. {}", clusterName,
enable ? "enable" : "disable", instanceName, message);
}
}

/**
* Return true if no instance is under cloud event handling
* @param clusterConfig
* @return
*/
static boolean checkNoInstanceUnderCloudEvent(ClusterConfig clusterConfig) {
Map<String, String> clusterConfigTrackedEvent = clusterConfig.getDisabledInstancesWithInfo();
if (clusterConfigTrackedEvent == null || clusterConfigTrackedEvent.isEmpty()) {
return true;
}

for (Map.Entry<String, String> entry : clusterConfigTrackedEvent.entrySet()) {
if (ConfigStringUtil.parseConcatenatedConfig(entry.getValue())
.get(ClusterConfig.ClusterConfigProperty.HELIX_DISABLED_TYPE.toString())
.equals(InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name())) {
return false;
}
}
return true;
}
}
@@ -0,0 +1,111 @@
package org.apache.helix.cloud.event;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.apache.helix.HelixAdmin;
import org.apache.helix.cloud.event.helix.DefaultCloudEventCallbackImpl;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.util.InstanceValidationUtil;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class TestDefaultCloudEventCallbackImpl extends ZkStandAloneCMTestBase {
private final DefaultCloudEventCallbackImpl _impl = new DefaultCloudEventCallbackImpl();
private MockParticipantManager _instanceManager;
private HelixAdmin _admin;

public TestDefaultCloudEventCallbackImpl() throws IllegalAccessException, InstantiationException {
}

@BeforeClass
public void beforeClass() throws Exception {
super.beforeClass();
_instanceManager = _participants[0];
_admin = _instanceManager.getClusterManagmentTool();
}

@Test
public void testDisableInstance() {
Assert.assertTrue(InstanceValidationUtil
.isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
_impl.disableInstance(_instanceManager, null);
Assert.assertFalse(InstanceValidationUtil
.isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
Assert.assertEquals(_manager.getConfigAccessor()
.getInstanceConfig(CLUSTER_NAME, _instanceManager.getInstanceName())
.getInstanceDisabledType(), InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name());

// Should not disable instance if it is already disabled due to other reasons
// And disabled type should remain unchanged
_admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false);
_impl.disableInstance(_instanceManager, null);
Assert.assertFalse(InstanceValidationUtil
.isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
Assert.assertEquals(_manager.getConfigAccessor()
.getInstanceConfig(CLUSTER_NAME, _instanceManager.getInstanceName())
.getInstanceDisabledType(),
InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name());

_admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false,
InstanceConstants.InstanceDisabledType.CLOUD_EVENT, null);
}

@Test (dependsOnMethods = "testDisableInstance")
public void testEnableInstance() {
Assert.assertFalse(InstanceValidationUtil
.isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
// Should enable instance if the instance is disabled due to cloud event
_impl.enableInstance(_instanceManager, null);
Assert.assertTrue(InstanceValidationUtil
.isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));

// Should not enable instance if it is not disabled due to cloud event
_admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false);
_impl.enableInstance(_instanceManager, null);
Assert.assertFalse(InstanceValidationUtil
.isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
_admin.enableInstance(_instanceManager.getClusterName(), _instanceManager.getInstanceName(),
true);
}

@Test
public void testEnterMaintenanceMode() {
Assert.assertFalse(_admin.isInMaintenanceMode(CLUSTER_NAME));
_impl.enterMaintenanceMode(_instanceManager, null);
_impl.disableInstance(_instanceManager, null);
Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
}

@Test (dependsOnMethods = "testEnterMaintenanceMode")
public void testExitMaintenanceMode() {
Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
// Should not exit maintenance mode if there is remaining live instance that is disabled due to cloud event
_impl.exitMaintenanceMode(_instanceManager, null);
Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));

// Should exit maintenance mode if there is no remaining live instance that is disabled due to cloud event
_impl.enableInstance(_instanceManager, null);
_impl.exitMaintenanceMode(_instanceManager, null);
Assert.assertFalse(_admin.isInMaintenanceMode(CLUSTER_NAME));
}
}

0 comments on commit dadaba1

Please sign in to comment.