Skip to content

Commit

Permalink
Add a preliminar C* implementation for ActionsService
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasponce committed Oct 1, 2015
1 parent 45a1197 commit b9744a2
Show file tree
Hide file tree
Showing 11 changed files with 272 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ public class PluginMessageDescription {
/** Context property "description". Supported at Condition.getContext() level with CompareCondition classes */
public static final String CONTEXT_PROPERTY_DESCRIPTION2 = "description2";

/** Shortcut for PluginMessage.getAction().message */
private String message;

/** Shortcut for PluginMessage.getAction().alert */
private Alert alert;

Expand Down Expand Up @@ -206,7 +203,6 @@ public PluginMessageDescription(ActionMessage pm) {
if (pm.getProperties() == null) {
throw new IllegalArgumentException("Properties cannot be null on PluginMessage");
}
message = pm.getAction().getMessage();
alert = pm.getAction().getAlert();
props = pm.getProperties();
if (alert != null && alert.getStatus() != null) {
Expand Down Expand Up @@ -608,14 +604,6 @@ public void setEmailSubject(String emailSubject) {
this.emailSubject = emailSubject;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

public Alert getAlert() {
return alert;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,22 @@ public class Action {
private String actionId;

@JsonInclude(Include.NON_NULL)
private String message;
private Alert alert;

@JsonInclude
private long ctime;

@JsonInclude(Include.NON_NULL)
private Alert alert;
private String result;

public Action() { }

public Action(String tenantId, String actionPlugin, String actionId, String message) {
this.tenantId = tenantId;
this.actionPlugin = actionPlugin;
this.actionId = actionId;
this.message = message;
}

public Action(String tenantId, String actionPlugin, String actionId, Alert alert) {
this.tenantId = tenantId;
this.actionPlugin = actionPlugin;
this.actionId = actionId;
this.alert = alert;
this.ctime = System.currentTimeMillis();
}

public String getTenantId() {
Expand All @@ -75,14 +72,6 @@ public void setTenantId(String tenantId) {
this.tenantId = tenantId;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

public String getActionId() {
return actionId;
}
Expand All @@ -107,6 +96,22 @@ public void setAlert(Alert alert) {
this.alert = alert;
}

public long getCtime() {
return ctime;
}

public void setCtime(long ctime) {
this.ctime = ctime;
}

public String getResult() {
return result;
}

public void setResult(String result) {
this.result = result;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -129,13 +134,11 @@ public int hashCode() {
return result;
}

@Override
public String toString() {
@Override public String toString() {
return "Action{" +
"tenantId='" + tenantId + '\'' +
", actionPlugin='" + actionPlugin + '\'' +
", actionId='" + actionId + '\'' +
", message='" + message + '\'' +
", alert=" + alert +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,19 @@
public interface ActionsService {

/**
* Send a action to an internal queue.
* Primary used by the alerts-engine implementation to send a action.
* Send an action to be processed by the plugins architecture.
*
* @param action Action to send
*/
void send(Action action);

/**
* Update the result of an action.
*
* @param action Action
*/
void updateResult(Action action);

/**
* Register a listener that will process asynchronously.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ public void before() {
@Test
public void jsonActionTest() throws Exception {
String str = "{\"tenantId\":\"tenantTest\",\"actionPlugin\":\"plugin\"," +
"\"actionId\":\"test\",\"message\":\"test-msg\"}";
"\"actionId\":\"test\",\"ctime\":123}";
Action action = objectMapper.readValue(str, Action.class);

assertEquals("tenantTest", action.getTenantId());
assertEquals("plugin", action.getActionPlugin());
assertEquals("test", action.getActionId());
assertEquals("test-msg", action.getMessage());
assertEquals(123, action.getCtime());

String output = objectMapper.writeValueAsString(action);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import org.hawkular.alerts.api.services.AlertsService;
import org.hawkular.alerts.api.services.DefinitionsService;
import org.hawkular.alerts.engine.impl.AlertsEngineImpl;
import org.hawkular.alerts.engine.impl.CassActionsServiceImpl;
import org.hawkular.alerts.engine.impl.CassAlertsServiceImpl;
import org.hawkular.alerts.engine.impl.CassDefinitionsServiceImpl;
import org.hawkular.alerts.engine.impl.DroolsRulesEngineImpl;
import org.hawkular.alerts.engine.impl.MemActionsServiceImpl;

/**
* Factory helper for standalone use cases.
Expand All @@ -34,14 +34,14 @@ public class StandaloneAlerts {

private static StandaloneAlerts instance = null;

private MemActionsServiceImpl actions = null;
private CassActionsServiceImpl actions = null;
private CassAlertsServiceImpl alerts = null;
private CassDefinitionsServiceImpl definitions = null;
private AlertsEngineImpl engine = null;
private DroolsRulesEngineImpl rules = null;

private StandaloneAlerts() {
actions = new MemActionsServiceImpl();
actions = new CassActionsServiceImpl();
rules = new DroolsRulesEngineImpl();
engine = new AlertsEngineImpl();
definitions = new CassDefinitionsServiceImpl();
Expand All @@ -53,7 +53,6 @@ private StandaloneAlerts() {
engine.setRules(rules);

definitions.init();
alerts.initServices();
}

public static synchronized DefinitionsService getDefinitionsService() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright 2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.alerts.engine.impl;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.ejb.Asynchronous;
import javax.ejb.Local;
import javax.ejb.Singleton;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

import org.hawkular.alerts.api.json.JsonUtil;
import org.hawkular.alerts.api.model.action.Action;
import org.hawkular.alerts.api.services.ActionListener;
import org.hawkular.alerts.api.services.ActionsService;
import org.hawkular.alerts.engine.log.MsgLogger;
import org.jboss.logging.Logger;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.util.concurrent.Futures;

/**
* Cassandra implementation of {@link org.hawkular.alerts.api.services.ActionsService}.
*
* @author Jay Shaughnessy
* @author Lucas Ponce
*/
@Local(ActionsService.class)
@Singleton
@TransactionAttribute(value= TransactionAttributeType.NOT_SUPPORTED)
public class CassActionsServiceImpl implements ActionsService {
private final MsgLogger msgLog = MsgLogger.LOGGER;
private final Logger log = Logger.getLogger(CassActionsServiceImpl.class);

private static final String WAITING_RESULT = "WAITING";
private static final String UNKNOWN_RESULT = "UNKWON";

List<ActionListener> listeners = new CopyOnWriteArrayList<ActionListener>();

private Session session;

public CassActionsServiceImpl() {
log.debugf("Creating instance.");
}

@Asynchronous
@Override
public void send(Action action) {
if (action == null || action.getActionPlugin() == null || action.getActionId() == null
|| action.getActionPlugin().isEmpty()
|| action.getActionId().isEmpty()) {
throw new IllegalArgumentException("Action must be not null");
}
if (action.getAlert() == null) {
throw new IllegalArgumentException("Action must have an alert");
}
for (ActionListener listener : listeners) {
listener.process(action);
}
insertActionHistory(action);
}

@Asynchronous
@Override
public void updateResult(Action action) {
if (action == null || action.getActionPlugin() == null || action.getActionId() == null
|| action.getActionPlugin().isEmpty()
|| action.getActionId().isEmpty()) {
throw new IllegalArgumentException("Action must be not null");
}
if (action.getAlert() == null) {
throw new IllegalArgumentException("Action must have an alert");
}
updateActionHistory(action);
}

private void insertActionHistory(Action action) {
String result = action.getResult() == null ? WAITING_RESULT : action.getResult();
try {
session = CassCluster.getSession();
PreparedStatement insertActionHistory = CassStatement.get(session,
CassStatement.INSERT_ACTION_HISTORY);
PreparedStatement insertActionHistoryResult = CassStatement.get(session,
CassStatement.INSERT_ACTION_HISTORY_RESULT);

List<ResultSetFuture> futures = new ArrayList<>();

futures.add(session.executeAsync(insertActionHistory.bind(action.getTenantId(), action.getActionPlugin(),
action.getActionId(), action.getAlert().getAlertId(), action.getCtime(), JsonUtil.toJson(action))));
futures.add(session.executeAsync(insertActionHistoryResult.bind(action.getTenantId(),
action.getActionPlugin(), action.getActionId(), action.getAlert().getAlertId(), action.getCtime(),
result)));

Futures.allAsList(futures).get();
} catch (Exception e) {
msgLog.errorDatabaseException(e.getMessage());
}
}

private Action selectActionHistory(String tenantId, String actionPlugin, String actionId, String alertId,
long ctime) {
Action actionHistory = null;
try {
session = CassCluster.getSession();
PreparedStatement selectActionHistory = CassStatement.get(session, CassStatement.SELECT_ACTION_HISTORY);
ResultSet rsActionHistory = session.execute(selectActionHistory.bind(tenantId, actionPlugin, actionId,
alertId, ctime));
Iterator<Row> itActionHistory = rsActionHistory.iterator();
if (itActionHistory.hasNext()) {
Row row = itActionHistory.next();
actionHistory = JsonUtil.fromJson(row.getString("payload"), Action.class);
}
} catch (Exception e) {
msgLog.errorDatabaseException(e.getMessage());
}
return actionHistory;
}

private void updateActionHistory(Action action) {
String result = action.getResult() == null ? UNKNOWN_RESULT : action.getResult();

try {
Action oldActionHistory = selectActionHistory(action.getTenantId(), action.getActionPlugin(),
action.getActionId(), action.getAlert().getAlertId(), action.getCtime());
if (oldActionHistory == null) {
insertActionHistory(action);
return;
}
String oldResult = oldActionHistory.getResult();
session = CassCluster.getSession();
PreparedStatement deleteActionHistoryResult = CassStatement.get(session,
CassStatement.DELETE_ACTION_HISTORY_RESULT);
PreparedStatement insertActionHistoryResult = CassStatement.get(session,
CassStatement.INSERT_ACTION_HISTORY_RESULT);
PreparedStatement updateActionHistory = CassStatement.get(session, CassStatement.UPDATE_ACTION_HISTORY);

List<ResultSetFuture> futures = new ArrayList<>();

futures.add(session.executeAsync(deleteActionHistoryResult.bind(action.getTenantId(), oldResult,
action.getActionPlugin(), action.getActionId(), action.getAlert().getAlertId(),
action.getCtime())));
futures.add(session.executeAsync(insertActionHistoryResult.bind(action.getTenantId(), result,
action.getActionPlugin(), action.getActionId(), action.getAlert().getAlertId(),
action.getCtime())));
futures.add(session.executeAsync(updateActionHistory.bind(JsonUtil.toJson(action), action.getTenantId(),
action.getActionPlugin(), action.getActionId(), action.getAlert().getAlertId(),
action.getCtime())));

Futures.allAsList(futures).get();
} catch (Exception e) {
msgLog.errorDatabaseException(e.getMessage());
}
}

@Override
public void addListener(ActionListener listener) {
if (listener == null) {
throw new IllegalArgumentException("ActionListener must not be null");
}
listeners.add(listener);
msgLog.infoActionListenerRegistered(listener.toString());
}

}

0 comments on commit b9744a2

Please sign in to comment.