Skip to content
Permalink
Browse files
Implement DefaultCloudEventCallbackImpl (#1995)
Implement a default callback implementation for Helix cloud event listeners.
  • Loading branch information
mgao0 authored and junkaixue committed Apr 7, 2022
1 parent 7669a05 commit 2d9fd3cc525edb251b6e41904bccfd78654947e7
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 11 deletions.
@@ -19,22 +19,35 @@
* under the License.
*/

import java.util.List;

import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.util.InstanceValidationUtil;

/**
* A default callback implementation class to be used in {@link HelixCloudEventListener}
*/
public class DefaultCloudEventCallbackImpl {
private final String _reason =
"Cloud event callback %s in class %s triggered in listener HelixManager %s, at time %s .";
protected final String _className = this.getClass().getSimpleName();

/**
* Disable the instance
* @param manager The helix manager associated with the listener
* @param eventInfo Detailed information about the event
*/
public void disableInstance(HelixManager manager, Object eventInfo) {
// To be implemented
throw new NotImplementedException();
if (InstanceValidationUtil
.isEnabled(manager.getHelixDataAccessor(), manager.getInstanceName())) {
manager.getClusterManagmentTool()
.enableInstance(manager.getClusterName(), manager.getInstanceName(), false,
InstanceConstants.InstanceDisabledType.CLOUD_EVENT, String
.format(_reason, "disableInstance", _className, manager,
System.currentTimeMillis()));
}
}

/**
@@ -43,27 +56,48 @@ public void disableInstance(HelixManager manager, Object eventInfo) {
* @param eventInfo Detailed information about the event
*/
public void enableInstance(HelixManager manager, Object eventInfo) {
// To be implemented
throw new NotImplementedException();
String instanceName = manager.getInstanceName();
HelixDataAccessor accessor = manager.getHelixDataAccessor();
if (InstanceValidationUtil.getInstanceHelixDisabledType(accessor, instanceName)
.equals(InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name())) {
manager.getClusterManagmentTool()
.enableInstance(manager.getClusterName(), instanceName, true);
}
}

/**
*
* Put cluster into maintenance mode if the cluster is not currently in maintenance mode
* @param manager The helix manager associated with the listener
* @param eventInfo Detailed information about the event
*/
public void enterMaintenanceMode(HelixManager manager, Object eventInfo) {
// To be implemented
throw new NotImplementedException();
if (!manager.getClusterManagmentTool().isInMaintenanceMode(manager.getClusterName())) {
manager.getClusterManagmentTool()
.manuallyEnableMaintenanceMode(manager.getClusterName(), true, String
.format(_reason, "enterMaintenanceMode", _className, manager,
System.currentTimeMillis()), null);
}
}

/**
*
* Exit maintenance mode for the cluster, if there is no more live instances disabled for cloud event
* @param manager The helix manager associated with the listener
* @param eventInfo Detailed information about the event
*/
public void exitMaintenanceMode(HelixManager manager, Object eventInfo) {
// To be implemented
throw new NotImplementedException();
List<String> instances =
manager.getClusterManagmentTool().getInstancesInCluster(manager.getClusterName());
// Check if there is any disabled live instance that was disabled due to cloud event,
// if none left, exit maintenance mode
HelixDataAccessor accessor = manager.getHelixDataAccessor();
if (instances.stream().noneMatch(instance ->
InstanceValidationUtil.getInstanceHelixDisabledType(accessor, instance)
.equals(InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name())
&& InstanceValidationUtil.isAlive(accessor, instance))) {
manager.getClusterManagmentTool()
.manuallyEnableMaintenanceMode(manager.getClusterName(), false, String
.format(_reason, "exitMaintenanceMode", _className, manager,
System.currentTimeMillis()), null);
}
}
}
@@ -0,0 +1,114 @@
package org.apache.helix.cloud.event;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.apache.helix.HelixAdmin;
import org.apache.helix.cloud.event.helix.DefaultCloudEventCallbackImpl;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.util.InstanceValidationUtil;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class TestDefaultCloudEventCallbackImpl extends ZkStandAloneCMTestBase {
private final DefaultCloudEventCallbackImpl _impl =
DefaultCloudEventCallbackImpl.class.newInstance();
private MockParticipantManager _instanceManager;
private HelixAdmin _admin;

public TestDefaultCloudEventCallbackImpl() throws IllegalAccessException, InstantiationException {
}

@BeforeClass
public void beforeClass() throws Exception {
super.beforeClass();
_instanceManager = _participants[0];
_admin = _instanceManager.getClusterManagmentTool();
}

@Test
public void testDisableInstance() {
Assert.assertTrue(InstanceValidationUtil
.isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
_impl.disableInstance(_instanceManager, null);
Assert.assertFalse(InstanceValidationUtil
.isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
Assert.assertEquals(_manager.getConfigAccessor()
.getInstanceConfig(CLUSTER_NAME, _instanceManager.getInstanceName())
.getInstanceDisabledType(), InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name());

// Should not disable instance if it is already disabled due to other reasons
// And disabled type should remain unchanged
_admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false);
_impl.disableInstance(_instanceManager, null);
Assert.assertFalse(InstanceValidationUtil
.isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
Assert.assertEquals(InstanceValidationUtil
.getInstanceHelixDisabledType(_manager.getHelixDataAccessor(),
_instanceManager.getInstanceName()),
InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name());

_admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false,
InstanceConstants.InstanceDisabledType.CLOUD_EVENT, null);
}

@Test(dependsOnMethods = "testDisableInstance")
public void testEnableInstance() {
Assert.assertFalse(InstanceValidationUtil
.isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
// Should enable instance if the instance is disabled due to cloud event
_impl.enableInstance(_instanceManager, null);
Assert.assertTrue(InstanceValidationUtil
.isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));

// Should not enable instance if it is not disabled due to cloud event
_admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false);
_impl.enableInstance(_instanceManager, null);
Assert.assertFalse(InstanceValidationUtil
.isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
_admin.enableInstance(_instanceManager.getClusterName(), _instanceManager.getInstanceName(),
true);
}

@Test
public void testEnterMaintenanceMode() {
Assert.assertFalse(_admin.isInMaintenanceMode(CLUSTER_NAME));
_impl.enterMaintenanceMode(_instanceManager, null);
Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
}

@Test(dependsOnMethods = "testEnterMaintenanceMode")
public void testExitMaintenanceMode() {
Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
// Should not exit maintenance mode if there is remaining live instance that is disabled due to cloud event
_admin.enableInstance(CLUSTER_NAME, _participants[1].getInstanceName(), false,
InstanceConstants.InstanceDisabledType.CLOUD_EVENT, null);
_impl.exitMaintenanceMode(_instanceManager, null);
Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));

// Should exit maintenance mode if there is no remaining live instance that is disabled due to cloud event
_admin.enableInstance(CLUSTER_NAME, _participants[1].getInstanceName(), false,
InstanceConstants.InstanceDisabledType.USER_OPERATION, null);
_impl.exitMaintenanceMode(_instanceManager, null);
Assert.assertFalse(_admin.isInMaintenanceMode(CLUSTER_NAME));
}
}

0 comments on commit 2d9fd3c

Please sign in to comment.