Skip to content

Commit

Permalink
APPNG-2477
Browse files Browse the repository at this point in the history
  • Loading branch information
madness-inc committed Apr 21, 2023
1 parent 04b8825 commit 1d281f8
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 275 deletions.
37 changes: 30 additions & 7 deletions appng-api/src/main/java/org/appng/api/messaging/Event.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
*
* @author Matthias Müller
*
* @see Sender
* @see Receiver
* @see Site#sendEvent(Event)
* @see Sender
* @see Receiver
* @see Site#sendEvent(Event)
*/
public abstract class Event implements Serializable {

private final String siteName;
private String nodeId;
private final boolean async;

protected Event() {
this(null);
Expand All @@ -47,7 +48,29 @@ protected Event() {
* the name of the {@link Site} this event is for
*/
protected Event(String siteName) {
this(siteName, false);
}

/**
* Creates a new event
*
* @param siteName
* the name of the {@link Site} this event is for
* @param async
* should the event be processed asynchronously?
*/
protected Event(String siteName, boolean async) {
this.siteName = siteName;
this.async = async;
}

/**
* Should the event be processed asynchronously?
*
* @return {@code true} for async processing, {@code false} otherwise
*/
public boolean isAsync() {
return async;
}

/**
Expand Down Expand Up @@ -76,10 +99,10 @@ public String getSiteName() {
/**
* Performs the event
*
* @param environment
* then {@link Environment} to use
* @param site
* the {@link Site} where the event occurred
* @param environment
* then {@link Environment} to use
* @param site
* the {@link Site} where the event occurred
*
* @throws InvalidConfigurationException
* if there's a configuration error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public void runWith(ExecutorService executor) {
this.executor = executor;
ITopic<byte[]> topic = getTopic();
this.listenerId = topic.addMessageListener(this);
//topic.addMessageListener(new ReloadMessageListener());
LOGGER.info("Listening to topic {} on {} with id {}", topic.getName(), instance, listenerId);
}

Expand All @@ -88,7 +87,7 @@ public void close() throws IOException {
}

public void onMessage(Message<byte[]> message) {
executor.submit(() -> Messaging.handleEvent(LOGGER, eventRegistry, serializer, message.getMessageObject()));
Messaging.handleEvent(LOGGER, eventRegistry, serializer, message.getMessageObject(), false, executor);
}

protected Logger logger() {
Expand All @@ -103,15 +102,4 @@ public void setDefaultHandler(EventHandler<?> defaultHandler) {
eventRegistry.setDefaultHandler(defaultHandler);
}

class ReloadMessageListener implements MessageListener<byte[]> {

@Override
public void onMessage(Message<byte[]> message) {
EventRegistry reloadRegistry = new EventRegistry();
ReloadSiteEvent.Handler handler = new ReloadSiteEvent.Handler(true);
reloadRegistry.register(handler);
Messaging.handleEvent(LOGGER, reloadRegistry, serializer, message.getMessageObject());
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void run() {
BinaryJedisPubSub pubSub = new BinaryJedisPubSub() {

public void onMessage(byte[] channel, byte[] message) {
Messaging.handleEvent(LOGGER, eventRegistry, eventSerializer, message);
Messaging.handleEvent(LOGGER, eventRegistry, eventSerializer, message, false, null);
}
};
jedis.subscribe(pubSub, this.channel.getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.appng.core.controller.messaging;

import java.util.concurrent.ExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.appng.api.messaging.Event;
import org.appng.api.messaging.EventHandler;
Expand All @@ -34,12 +36,8 @@ class Messaging {
Messaging() {
}

static void handleEvent(final Logger logger, EventRegistry registry, Serializer serializer, byte[] eventData) {
handleEvent(logger, registry, serializer, eventData, false);
}

static void handleEvent(final Logger logger, EventRegistry registry, Serializer serializer, byte[] eventData,
boolean alternativeCondition) {
boolean alternativeCondition, ExecutorService executor) {
Event event = serializer.deserialize(eventData);
if (null != event) {
try {
Expand All @@ -50,8 +48,13 @@ static void handleEvent(final Logger logger, EventRegistry registry, Serializer
boolean sameNode = StringUtils.equals(currentNode, originNode);
if (!sameNode || alternativeCondition) {
logger.info("about to execute {} ", event);
boolean isAsync = event.isAsync() && null != executor;
for (EventHandler<Event> eventHandler : registry.getHandlers(event)) {
eventHandler.onEvent(event, serializer.getEnvironment(), site);
if (isAsync) {
executor.submit(() -> processEvent(logger, serializer, event, site, eventHandler));
} else {
processEvent(logger, serializer, event, site, eventHandler);
}
}
} else {
logger.debug("event {} is from myself ({}) and can be ignored", event, currentNode);
Expand All @@ -65,4 +68,15 @@ static void handleEvent(final Logger logger, EventRegistry registry, Serializer
}
}

private static void processEvent(final Logger logger, Serializer serializer, Event event, Site site,
EventHandler<Event> eventHandler) {
try {
eventHandler.onEvent(event, serializer.getEnvironment(), site);
} catch (Exception e) {
String message = String.format("Error while executing event %s with %s", event,
eventHandler.getClass().getName());
logger.error(message, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void run() {
void onEvent(byte[] data, List<String> nodeIps, String senderHost) throws IOException, InterruptedException {
if (nodeIps.isEmpty() || nodeIps.contains(senderHost)) {
boolean sameAddress = isSameAddress(senderHost);
Messaging.handleEvent(LOGGER, eventRegistry, eventSerializer, data, !sameAddress);
Messaging.handleEvent(LOGGER, eventRegistry, eventSerializer, data, !sameAddress, null);
} else {
LOGGER.debug("ignoring message from {}", senderHost);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public EventConsumer(Channel channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
Messaging.handleEvent(LOGGER, eventRegistry, eventSerializer, body);
Messaging.handleEvent(LOGGER, eventRegistry, eventSerializer, body, false, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;

import org.appng.api.BusinessException;
import org.appng.api.Environment;
import org.appng.api.FieldProcessor;
import org.appng.api.InvalidConfigurationException;
import org.appng.api.Platform;
import org.appng.api.Scope;
import org.appng.api.messaging.EventHandler;
import org.appng.api.messaging.Messaging;
import org.appng.api.model.Properties;
import org.appng.api.model.Site;
Expand All @@ -42,11 +40,11 @@ public class ReloadSiteEvent extends SiteEvent {
private static final long serialVersionUID = 8053808333634879840L;

public ReloadSiteEvent(String siteName) {
super(siteName);
super(siteName, true);
}

public ReloadSiteEvent(String siteName, String targetNode) {
super(siteName, targetNode);
super(siteName, targetNode, true);
}

public void perform(Environment env, Site site) throws InvalidConfigurationException {
Expand Down Expand Up @@ -107,7 +105,7 @@ public void waitForClusterState(Environment env, Site site, Logger logger) {
}
if (activeNodes < minActiveNodes) {
try {
logger.debug("Site {} is active on {} of {} nodes, waiting {}s for site to start on {} nodes.",
logger.info("Site {} is active on {} of {} nodes, waiting {}s for site to start on {} nodes.",
site.getName(), activeNodes, numNodes, waitTime, minActiveNodes - activeNodes);
waited += waitTime;
Thread.sleep(TimeUnit.SECONDS.toMillis(waitTime));
Expand All @@ -129,29 +127,4 @@ protected boolean delayed() {
return true;
}

/**
* Because a {@link ReloadSiteEvent} is a potentially long running, blocking operation that needs to check the state
* of the site on all other nodes, we need a separate {@link EventHandler} here.
*/
public static class Handler implements EventHandler<ReloadSiteEvent> {

private final boolean doPerform;

public Handler(boolean doPerform) {
this.doPerform = doPerform;
}

@Override
public void onEvent(ReloadSiteEvent event, Environment env, Site site)
throws InvalidConfigurationException, BusinessException {
if (doPerform) {
event.perform(env, site);
}
}

@Override
public Class<ReloadSiteEvent> getEventClass() {
return ReloadSiteEvent.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@ abstract class SiteEvent extends Event {
String currentNode;

SiteEvent(String siteName) {
this(siteName, null);
this(siteName, false);
}

SiteEvent(String siteName, String targetNode) {
SiteEvent(String siteName, boolean async) {
this(siteName, null, async);
}

SiteEvent(String siteName, String targetNode, boolean async) {
super(siteName);
this.targetNode = targetNode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ public class StopSiteEvent extends SiteEvent {
private static final long serialVersionUID = 8053808333634879840L;

public StopSiteEvent(String siteName) {
super(siteName);
super(siteName, false);
}

public StopSiteEvent(String siteName, String targetNode) {
super(siteName, targetNode);
super(siteName, targetNode, false);
}

public void perform(Environment environment, Site site) throws InvalidConfigurationException {
Expand Down

0 comments on commit 1d281f8

Please sign in to comment.