Skip to content

Commit

Permalink
Merge pull request #109 from lucasponce/ActionResponseMessage
Browse files Browse the repository at this point in the history
Refactor OperationMessage into ActionResponseMessage
  • Loading branch information
jshaughn committed Oct 6, 2015
2 parents 5d12277 + 5365256 commit 24ad3f6
Show file tree
Hide file tree
Showing 19 changed files with 428 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@
public interface ActionPluginSender {

/**
* Factory to create new OperationMessage messages.
* Factory to create new ActionResponseMessage messages.
* There could be different implementation of messages depending on the context (bus, standalone) so
* new instances of OperationMessage should be created through this factory method.
* new instances of ActionResponseMessage should be created through this factory method.
*
* @param operation the type of operation of the message
* @return a new OperationMessage
* @param operation the type of operation
* @return a new ActionResponseMessage
*/
OperationMessage createMessage(OperationMessage.Operation operation);
ActionResponseMessage createMessage(ActionResponseMessage.Operation operation);

/**
* Send a message to the engine.
* Plugin should not have access to the implementation used.
*
* @param msg the operation message to be sent
* @param msg the response message to be sent
* @throws Exception any problem
*/
void send(OperationMessage msg) throws Exception;
void send(ActionResponseMessage msg) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*
* @author Lucas Ponce
*/
public interface OperationMessage {
public interface ActionResponseMessage {

enum Operation {
RESULT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.hawkular.alerts.actions.bus;

import static org.hawkular.alerts.actions.api.OperationMessage.Operation;
import static org.hawkular.alerts.actions.api.ActionResponseMessage.Operation;

import java.io.IOException;

Expand All @@ -26,8 +26,8 @@
import javax.naming.NamingException;

import org.hawkular.alerts.actions.api.ActionPluginSender;
import org.hawkular.alerts.actions.api.OperationMessage;
import org.hawkular.alerts.bus.api.BusOperationMessage;
import org.hawkular.alerts.actions.api.ActionResponseMessage;
import org.hawkular.alerts.bus.api.BusActionResponseMessage;
import org.hawkular.bus.common.ConnectionContextFactory;
import org.hawkular.bus.common.Endpoint;
import org.hawkular.bus.common.MessageId;
Expand All @@ -46,7 +46,7 @@ public class BusActionPluginSender implements ActionPluginSender {
public static final int TIMEOUT = 2000;

private static final String CONNECTION_FACTORY = "java:/HawkularBusConnectionFactory";
private static final String ACTION_PLUGIN_REGISTER = "HawkularAlertsOperationsQueue";
private static final String ACTION_PLUGIN_REGISTER = "HawkularAlertsActionsResponseQueue";
private final MsgLogger msgLog = MsgLogger.LOGGER;
private final Logger log = Logger.getLogger(BusActionPluginRegister.class);

Expand Down Expand Up @@ -115,22 +115,23 @@ public void close() throws Exception {
}

@Override
public OperationMessage createMessage(Operation operation) {
public ActionResponseMessage createMessage(Operation operation) {
if (operation == null) {
return new BusOperationMessage();
return new BusActionResponseMessage();
}
return new BusOperationMessage(operation);
return new BusActionResponseMessage(operation);
}

@Override
public void send(OperationMessage msg) throws Exception {
if (!(msg instanceof BusOperationMessage)) {
throw new IllegalArgumentException("OperationMessage is not a BusOperationMessage instance");
public void send(ActionResponseMessage msg) throws Exception {
if (!(msg instanceof BusActionResponseMessage)) {
throw new IllegalArgumentException("ActionResponseMessage is not a BusActionResponseMessage " +
"instance");
}
init();
try {
MessageId mid = new MessageProcessor().send(pcc, (BusOperationMessage)msg);
log.debugf("Plugin [%s] has sent an operation message: [%s]", actionPlugin, mid.toString());
MessageId mid = new MessageProcessor().send(pcc, (BusActionResponseMessage)msg);
log.debugf("Plugin [%s] has sent a response message: [%s]", actionPlugin, mid.toString());
} catch (JMSException e) {
log.debug(e.getMessage(), e);
msgLog.errorCannotSendMessage(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@

import org.hawkular.alerts.actions.api.ActionMessage;
import org.hawkular.alerts.actions.api.ActionPluginListener;
import org.hawkular.alerts.actions.api.ActionPluginSender;
import org.hawkular.alerts.actions.api.ActionResponseMessage;
import org.hawkular.alerts.actions.api.MsgLogger;
import org.hawkular.alerts.actions.api.Plugin;
import org.hawkular.alerts.actions.api.Sender;
import org.hawkular.alerts.api.json.JsonUtil;
import org.hawkular.alerts.api.model.action.Action;
import org.hawkular.alerts.api.model.condition.Alert;
import org.jboss.aerogear.unifiedpush.DefaultPushSender;
import org.jboss.aerogear.unifiedpush.PushSender;
Expand All @@ -49,6 +54,12 @@ public class AerogearPlugin implements ActionPluginListener {

PushSender pushSender;

@Sender
ActionPluginSender sender;

private static final String MESSAGE_PROCESSED = "PROCESSED";
private static final String MESSAGE_FAILED = "FAILED";

public AerogearPlugin() {
defaultProperties.put("alias", "Default aerogear alias");
defaultProperties.put("description", "Default aerogear description");
Expand All @@ -72,18 +83,27 @@ public void process(ActionMessage msg) throws Exception {
return;
}

UnifiedMessage.MessageBuilder alert = UnifiedMessage.withMessage().alert(prepareMessage(msg));
if (msg.getAction().getProperties() != null) {
String alias = msg.getAction().getProperties().get("alias");
if (!isBlank(alias)) {
alert.config().criteria().aliases(alias);
try {
UnifiedMessage.MessageBuilder alert = UnifiedMessage.withMessage().alert(prepareMessage(msg));
if (msg.getAction().getProperties() != null) {
String alias = msg.getAction().getProperties().get("alias");
if (!isBlank(alias)) {
alert.config().criteria().aliases(alias);
}
}
}

UnifiedMessage unifiedMessage = alert.build();
pushSender.send(unifiedMessage);

msgLog.infoActionReceived("aerogear", msg.toString());
UnifiedMessage unifiedMessage = alert.build();
pushSender.send(unifiedMessage);
msgLog.infoActionReceived("aerogear", msg.toString());
Action successAction = msg.getAction();
successAction.setResult(MESSAGE_PROCESSED);
sendResult(successAction);
} catch (Exception e) {
msgLog.errorCannotProcessMessage("aerogear", e.getMessage());
Action failedAction = msg.getAction();
failedAction.setResult(MESSAGE_FAILED);
sendResult(failedAction);
}
}

void setup() {
Expand Down Expand Up @@ -120,5 +140,20 @@ private String prepareMessage(ActionMessage msg) {
return preparedMsg;
}

private void sendResult(Action action) {
if (sender == null) {
throw new IllegalStateException("ActionPluginSender is not present in the plugin");
}
if (action == null) {
throw new IllegalStateException("Action to update result must be not null");
}
ActionResponseMessage newMessage = sender.createMessage(ActionResponseMessage.Operation.RESULT);
newMessage.getPayload().put("action", JsonUtil.toJson(action));
try {
sender.send(newMessage);
} catch (Exception e) {
msgLog.error("Error sending ActionResponseMessage", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Set;

import org.hawkular.alerts.actions.api.ActionMessage;
import org.hawkular.alerts.actions.api.ActionPluginSender;
import org.hawkular.alerts.actions.api.ActionResponseMessage;
import org.hawkular.alerts.api.model.Severity;
import org.hawkular.alerts.api.model.action.Action;
import org.hawkular.alerts.api.model.condition.Alert;
Expand Down Expand Up @@ -126,6 +128,7 @@ public static void configureListener() {
public void setup() {
pushSender = mock(PushSender.class);
aerogearPlugin = new AerogearPlugin();
aerogearPlugin.sender = new TestActionSender();
aerogearPlugin.pushSender = pushSender;
}

Expand All @@ -144,4 +147,44 @@ public void testBroadcast() throws Exception {

verify(pushSender, times(1)).send(argThat(UnifiedMessageMatcher.matchesUnifiedMessage(null, preparedMessage)));
}

public class TestActionResponseMessage implements ActionResponseMessage {

ActionResponseMessage.Operation operation;

Map<String, String> payload;

public TestActionResponseMessage() {
this.operation = ActionResponseMessage.Operation.RESULT;
this.payload = new HashMap<>();
}

public TestActionResponseMessage(ActionResponseMessage.Operation operation) {
this.operation = operation;
this.payload = new HashMap<>();
}

@Override
public Operation getOperation() {
return operation;
}

@Override
public Map<String, String> getPayload() {
return payload;
}
}

public class TestActionSender implements ActionPluginSender {

@Override
public ActionResponseMessage createMessage(ActionResponseMessage.Operation operation) {
return new TestActionResponseMessage(operation);
}

@Override
public void send(ActionResponseMessage msg) throws Exception {
// Nothing to do
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import org.hawkular.alerts.actions.api.ActionMessage;
import org.hawkular.alerts.actions.api.ActionPluginListener;
import org.hawkular.alerts.actions.api.ActionPluginSender;
import org.hawkular.alerts.actions.api.ActionResponseMessage;
import org.hawkular.alerts.actions.api.ActionResponseMessage.Operation;
import org.hawkular.alerts.actions.api.MsgLogger;
import org.hawkular.alerts.actions.api.OperationMessage;
import org.hawkular.alerts.actions.api.OperationMessage.Operation;
import org.hawkular.alerts.actions.api.Plugin;
import org.hawkular.alerts.actions.api.Sender;
import org.hawkular.alerts.api.json.JsonUtil;
Expand Down Expand Up @@ -242,12 +242,12 @@ private void sendResult(Action action) {
if (action == null) {
throw new IllegalStateException("Action to update result must be not null");
}
OperationMessage newMessage = sender.createMessage(Operation.RESULT);
ActionResponseMessage newMessage = sender.createMessage(Operation.RESULT);
newMessage.getPayload().put("action", JsonUtil.toJson(action));
try {
sender.send(newMessage);
} catch (Exception e) {
log.error("Error sending OperationMessage", e);
log.error("Error sending ActionResponseMessage", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@

import org.hawkular.alerts.actions.api.ActionMessage;
import org.hawkular.alerts.actions.api.ActionPluginListener;
import org.hawkular.alerts.actions.api.ActionPluginSender;
import org.hawkular.alerts.actions.api.ActionResponseMessage;
import org.hawkular.alerts.actions.api.MsgLogger;
import org.hawkular.alerts.actions.api.Plugin;
import org.hawkular.alerts.actions.api.Sender;
import org.hawkular.alerts.api.json.JsonUtil;
import org.hawkular.alerts.api.model.action.Action;
import org.hawkular.alerts.api.model.condition.Alert;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -43,6 +48,12 @@ public class FilePlugin implements ActionPluginListener {
private Map<String, String> defaultProperties = new HashMap<>();
private ObjectMapper objectMapper;

@Sender
ActionPluginSender sender;

private static final String MESSAGE_PROCESSED = "PROCESSED";
private static final String MESSAGE_FAILED = "FAILED";

public FilePlugin() {
defaultProperties.put("path",
new File(System.getProperty("java.io.tmpdir"), "hawkular/alerts/actions/file").getAbsolutePath());
Expand Down Expand Up @@ -72,24 +83,50 @@ public void process(ActionMessage msg) throws Exception {
Alert alert = msg.getAction().getAlert();
String fileName = alert.getAlertId() + "-timestamp-" + System.currentTimeMillis() + ".txt";

File pathFile = new File(path);
if (!pathFile.exists()) {
pathFile.mkdirs();
}
File alertFile = new File(pathFile, fileName);
if (!alertFile.exists()) {
alertFile.createNewFile();
}
BufferedWriter writer = null;
try {
File pathFile = new File(path);
if (!pathFile.exists()) {
pathFile.mkdirs();
}
File alertFile = new File(pathFile, fileName);
if (!alertFile.exists()) {
alertFile.createNewFile();
}

writer = new BufferedWriter(new FileWriter(alertFile));
String jsonAlert = objectMapper.writeValueAsString(alert);
writer.write(jsonAlert);
msgLog.infoActionReceived("file", msg.toString());
Action successAction = msg.getAction();
successAction.setResult(MESSAGE_PROCESSED);
sendResult(successAction);
} catch (Exception e) {
msgLog.errorCannotProcessMessage("file", e.getMessage());
Action failedAction = msg.getAction();
failedAction.setResult(MESSAGE_FAILED);
sendResult(failedAction);
} finally {
if (writer != null) {
writer.close();
}
}
msgLog.infoActionReceived("file", msg.toString());
}

private void sendResult(Action action) {
if (sender == null) {
throw new IllegalStateException("ActionPluginSender is not present in the plugin");
}
if (action == null) {
throw new IllegalStateException("Action to update result must be not null");
}
ActionResponseMessage newMessage = sender.createMessage(ActionResponseMessage.Operation.RESULT);
newMessage.getPayload().put("action", JsonUtil.toJson(action));
try {
sender.send(newMessage);
} catch (Exception e) {
msgLog.error("Error sending ActionResponseMessage", e);
}
}

}

0 comments on commit 24ad3f6

Please sign in to comment.