Skip to content
Permalink
Browse files
Support register user defined CloudEventHandler when participant con…
…nect. (#2121)

This change will load user defined CloudEventHandler dynamically when ZkHelixManager.connect(). User need to implement a EventController and pass the class name when initiating ZkHelixManager.
  • Loading branch information
xyuanlu committed Jun 2, 2022
1 parent 6524a34 commit ae0f50d719c645ed3202167caa7c807d355e01c7
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 26 deletions.
@@ -24,7 +24,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.cloud.event.CloudEventHandler;
import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty;
import org.apache.helix.model.CloudConfig;
import org.slf4j.Logger;
@@ -159,6 +161,14 @@ public Properties getCustomizedCloudProperties() {
return _customizedCloudProperties;
}

public String getCloudEventHandlerClassName() {
String defaultHandler = CloudEventHandler.class.getName();
return getCloudEventCallbackProperty() == null ? defaultHandler
: getCloudEventCallbackProperty().getUserArgs().getOrDefault(
CloudEventCallbackProperty.UserArgsInputKey.CLOUD_EVENT_HANDLER_CLASS_NAME,
defaultHandler);
}

public void setCloudEnabled(boolean isCloudEnabled) {
_isCloudEnabled = isCloudEnabled;
}
@@ -0,0 +1,29 @@
package org.apache.helix.cloud.event;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/**
* This class is the the interface for singleton eventHandler.
* User may implement their own eventHandler or use the default CloudEventHandler
*/
public interface AbstractEventHandler {
void registerCloudEventListener(CloudEventListener listener);
void unregisterCloudEventListener(CloudEventListener listener);
}
@@ -33,7 +33,7 @@
* 3. PostEventHandlerCallback -> only one allowed
* to enable an easy management of event listeners and callbacks.
*/
public class CloudEventHandler {
public class CloudEventHandler implements AbstractEventHandler {
private static final Logger LOG = LoggerFactory.getLogger(CloudEventHandler.class.getName());
private List<CloudEventListener> _unorderedEventListenerList = new ArrayList<>();
private Optional<CloudEventListener> _preEventHandlerCallback = Optional.empty();
@@ -44,6 +44,7 @@ public class CloudEventHandler {
* If no listener type is specified, register as an unordered listener.
* @param listener
*/
@Override
public void registerCloudEventListener(CloudEventListener listener) {
if (listener != null) {
switch (listener.getListenerType()) {
@@ -65,6 +66,7 @@ public void registerCloudEventListener(CloudEventListener listener) {
* Unregister an event listener to the event handler.
* @param listener
*/
@Override
public void unregisterCloudEventListener(CloudEventListener listener) {
_unorderedEventListenerList.remove(listener);
}
@@ -19,27 +19,39 @@
* under the License.
*/

import java.util.HashMap;
import java.util.Map;

import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* This class is the factory for singleton class {@link CloudEventHandler}
* This class is the factory for singleton class {@link AbstractEventHandler}
*/
public class CloudEventHandlerFactory {
private static CloudEventHandler INSTANCE = null;
private static final Logger LOG = LoggerFactory.getLogger(CloudEventHandlerFactory.class);
private static Map<String, AbstractEventHandler> INSTANCE_MAP = new HashMap();

private CloudEventHandlerFactory() {
}

/**
* Get a CloudEventHandler instance
* Get an instance of AbstractEventHandler implementation.
* @return
*/
public static CloudEventHandler getInstance() {
if (INSTANCE == null) {
synchronized (CloudEventHandlerFactory.class) {
if (INSTANCE == null) {
INSTANCE = new CloudEventHandler();
}
public static AbstractEventHandler getInstance(String eventHandlerClassName)
throws ClassNotFoundException, IllegalAccessException, InstantiationException {
synchronized (CloudEventHandlerFactory.class) {
AbstractEventHandler instance = INSTANCE_MAP.get(eventHandlerClassName);
if (instance == null) {
LOG.info("Initiating an object of {}", eventHandlerClassName);
instance = (AbstractEventHandler) (HelixUtil
.loadClass(AbstractEventHandler.class, eventHandlerClassName)).newInstance();
INSTANCE_MAP.put(eventHandlerClassName, instance);
}
return instance;
}
return INSTANCE;
}
}
@@ -56,6 +56,7 @@ public CloudEventCallbackProperty(Map<String, String> userArgs) {
*/
public static class UserArgsInputKey {
public static final String CALLBACK_IMPL_CLASS_NAME = "callbackImplClassName";
public static final String CLOUD_EVENT_HANDLER_CLASS_NAME = "cloudEventHandlerClassName";
}

/**
@@ -832,7 +832,10 @@ public void connect() throws Exception {
if (helixCloudProperty != null && helixCloudProperty.isCloudEventCallbackEnabled()) {
_cloudEventListener =
new HelixCloudEventListener(helixCloudProperty.getCloudEventCallbackProperty(), this);
CloudEventHandlerFactory.getInstance().registerCloudEventListener(_cloudEventListener);
CloudEventHandlerFactory.getInstance(
_helixManagerProperty.getHelixCloudProperty().getCloudEventHandlerClassName())
.registerCloudEventListener(_cloudEventListener);
LOG.info("Using handler: " + helixCloudProperty.getCloudEventHandlerClassName());
}
}
}
@@ -881,7 +884,13 @@ public void disconnect() {
_helixPropertyStore = null;

if (_cloudEventListener != null) {
CloudEventHandlerFactory.getInstance().unregisterCloudEventListener(_cloudEventListener);
try {
CloudEventHandlerFactory.getInstance(
_helixManagerProperty.getHelixCloudProperty().getCloudEventHandlerClassName())
.unregisterCloudEventListener(_cloudEventListener);
} catch (Exception e) {
LOG.error("Failed to unregister cloudEventListener.", e);
}
_cloudEventListener = null;
}

@@ -0,0 +1,42 @@
package org.apache.helix.cloud.event;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class HelixTestCloudEventHandler extends CloudEventHandler {
private static final int TIMEOUT = 900; // second to timeout
private static ExecutorService executorService = Executors.newSingleThreadExecutor();
public static boolean anyListenerIsRegisterFlag = false;

@Override
public void registerCloudEventListener(CloudEventListener listener) {
super.registerCloudEventListener(listener);
anyListenerIsRegisterFlag = true;
}

@Override
public void unregisterCloudEventListener(CloudEventListener listener) {
super.unregisterCloudEventListener(listener);
anyListenerIsRegisterFlag = false;
}

}
@@ -75,21 +75,33 @@ public MockCloudEventAwareHelixManager(HelixManagerProperty helixManagerProperty
.build());
}

public void connect() throws IllegalAccessException, InstantiationException {
@Override
public void connect()
throws IllegalAccessException, InstantiationException, ClassNotFoundException {
if (_helixManagerProperty != null) {
HelixCloudProperty helixCloudProperty = _helixManagerProperty.getHelixCloudProperty();
if (helixCloudProperty != null && helixCloudProperty.isCloudEventCallbackEnabled()) {
_cloudEventListener =
new HelixCloudEventListener(helixCloudProperty.getCloudEventCallbackProperty(), this);
CloudEventHandlerFactory.getInstance().registerCloudEventListener(_cloudEventListener);
System.out.println("Using handler: " + helixCloudProperty.getCloudEventHandlerClassName());
CloudEventHandlerFactory.getInstance(
_helixManagerProperty.getHelixCloudProperty().getCloudEventHandlerClassName())
.registerCloudEventListener(_cloudEventListener);
}
}
}

@Override
public void disconnect() {
if (_cloudEventListener != null) {
CloudEventHandlerFactory.getInstance().unregisterCloudEventListener(_cloudEventListener);
_cloudEventListener = null;
try {
CloudEventHandlerFactory.getInstance(
_helixManagerProperty.getHelixCloudProperty().getCloudEventHandlerClassName())
.unregisterCloudEventListener(_cloudEventListener);
} catch (Exception e) {
System.out.println("Failed to unregister cloudEventListener." );
e.printStackTrace();
}
}
}

@@ -20,6 +20,8 @@
*/

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.helix.HelixCloudProperty;
import org.apache.helix.HelixManager;
@@ -76,16 +78,19 @@ public void afterTest() {
@Test
public void testOptionalHelixOperation() throws Exception {
// Cloud event callback property
CloudEventCallbackProperty property = new CloudEventCallbackProperty(Collections
.singletonMap(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
MockCloudEventCallbackImpl.class.getCanonicalName()));
Map<String, String> paramMap = new HashMap<>();
paramMap.put(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
MockCloudEventCallbackImpl.class.getCanonicalName());
paramMap.put(CloudEventCallbackProperty.UserArgsInputKey.CLOUD_EVENT_HANDLER_CLASS_NAME,
HelixTestCloudEventHandler.class.getCanonicalName());
CloudEventCallbackProperty property = new CloudEventCallbackProperty(paramMap);
property.setHelixOperationEnabled(HelixOperation.ENABLE_DISABLE_INSTANCE, true);
_cloudProperty.setCloudEventCallbackProperty(property);

_helixManager.connect();

// Manually trigger event
CloudEventHandlerFactory.getInstance()
((CloudEventHandler)CloudEventHandlerFactory.getInstance(HelixTestCloudEventHandler.class.getCanonicalName()))
.performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
Assert.assertTrue(
callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE));
@@ -101,7 +106,7 @@ public void testOptionalHelixOperation() throws Exception {
MockCloudEventCallbackImpl.triggeredOperation.clear();

// Manually trigger event
CloudEventHandlerFactory.getInstance()
((CloudEventHandler)CloudEventHandlerFactory.getInstance(HelixTestCloudEventHandler.class.getCanonicalName()))
.performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
Assert.assertTrue(
callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE));
@@ -115,7 +120,7 @@ public void testOptionalHelixOperation() throws Exception {
MockCloudEventCallbackImpl.triggeredOperation.clear();

// Manually trigger event
CloudEventHandlerFactory.getInstance()
((CloudEventHandler) CloudEventHandlerFactory.getInstance(HelixTestCloudEventHandler.class.getCanonicalName()))
.performAction(HelixCloudEventListener.EventType.ON_RESUME, null);
Assert.assertFalse(
callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE));
@@ -160,7 +165,7 @@ public void testUserDefinedCallback() throws Exception {
});

// Manually trigger event
CloudEventHandlerFactory.getInstance()
((CloudEventHandler) CloudEventHandlerFactory.getInstance(CloudEventHandler.class.getCanonicalName()))
.performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_PAUSE));
Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_PAUSE));
@@ -169,7 +174,7 @@ public void testUserDefinedCallback() throws Exception {

MockCloudEventCallbackImpl.triggeredOperation.clear();

CloudEventHandlerFactory.getInstance()
((CloudEventHandler) CloudEventHandlerFactory.getInstance(CloudEventHandler.class.getCanonicalName()))
.performAction(HelixCloudEventListener.EventType.ON_RESUME, null);
Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_PAUSE));
Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_PAUSE));
@@ -186,9 +191,44 @@ public void testUsingInvalidImplClassName() throws Exception {
_cloudProperty.setCloudEventCallbackProperty(property);

_helixManager.connect();
}

@Test
public void testRegisterAndUnregister() throws Exception {
// Cloud event callback property
Map<String, String> paramMap = new HashMap<>();
paramMap.put(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
MockCloudEventCallbackImpl.class.getCanonicalName());
paramMap.put(CloudEventCallbackProperty.UserArgsInputKey.CLOUD_EVENT_HANDLER_CLASS_NAME,
HelixTestCloudEventHandler.class.getCanonicalName());
CloudEventCallbackProperty property = new CloudEventCallbackProperty(paramMap);
property.setHelixOperationEnabled(HelixOperation.ENABLE_DISABLE_INSTANCE, true);
_cloudProperty.setCloudEventCallbackProperty(property);

_helixManager.connect();

Assert.assertTrue(HelixTestCloudEventHandler.anyListenerIsRegisterFlag);

_helixManager.disconnect();
Assert.assertFalse(HelixTestCloudEventHandler.anyListenerIsRegisterFlag);
}

@Test
public void testUsingInvalidHandlerClassName() throws Exception {
// Cloud event callback property
CloudEventCallbackProperty property = new CloudEventCallbackProperty(Collections
.singletonMap(CloudEventCallbackProperty.UserArgsInputKey.CLOUD_EVENT_HANDLER_CLASS_NAME,
"org.apache.helix.cloud.InvalidClassName"));
_cloudProperty.setCloudEventCallbackProperty(property);

try {
_helixManager.connect();}
catch (Exception ex){
Assert.assertEquals(ex.getClass(), java.lang.ClassNotFoundException.class);
}

// Manually trigger event
CloudEventHandlerFactory.getInstance()
((CloudEventHandler) CloudEventHandlerFactory.getInstance(CloudEventHandler.class.getCanonicalName()))
.performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
}

0 comments on commit ae0f50d

Please sign in to comment.