Skip to content

Commit

Permalink
HWKALERTS-87 Actions are processed in separate threads
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasponce committed Oct 1, 2015
1 parent b2a30c3 commit b3c89da
Showing 1 changed file with 57 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.ejb.Local;
import javax.ejb.Singleton;
import javax.ejb.TransactionAttribute;
Expand All @@ -45,27 +50,40 @@ public class MemActionsServiceImpl implements ActionsService {
private final MsgLogger msgLog = MsgLogger.LOGGER;
private final Logger log = Logger.getLogger(MemActionsServiceImpl.class);

private static final String ACTIONS_THREAD_POOL = "hawkular-alerts.actions-thread-pool";

ExecutorService actionsExecutor;
List<ActionListener> listeners = new CopyOnWriteArrayList<ActionListener>();

public MemActionsServiceImpl() {
log.debugf("Creating instance.");
}

@PostConstruct
public void initPool() {
int nThreads = Integer.parseInt(AlertProperties.getProperty(ACTIONS_THREAD_POOL, "10"));
ActionThreadFactory actionsThreadFactory = new ActionThreadFactory();
actionsExecutor = Executors.newFixedThreadPool(nThreads, actionsThreadFactory);
if (actionsExecutor == null) {
throw new IllegalStateException("Actions ThreadPool has not been initialized");
}
}

@PreDestroy
public void release() {
if (actionsExecutor != null) {
actionsExecutor.shutdown();
}
}

@Override
public void send(Action action) {
if (action == null || action.getActionId() == null || action.getActionId().isEmpty()) {
throw new IllegalArgumentException("Action must be not null");
}
/*
In this implementation we invoke listeners as soon as we receive an event.
This can be modified per implementation basis adding asynchronously behaviour at this level.
i.e. send(Action) can be responsible to enqueue message into a buffer.
This buffer can be processed by listeners on an async behaviour with additional logic (priorities by
severity or other criteria).
*/
for (ActionListener listener : listeners) {
listener.process(action);
ActionWorker worker = new ActionWorker(listener, action);
actionsExecutor.execute(worker);
}
}

Expand All @@ -77,4 +95,34 @@ public void addListener(ActionListener listener) {
listeners.add(listener);
msgLog.infoActionListenerRegistered(listener.toString());
}

public class ActionThreadFactory implements ThreadFactory {

private int numThread = 0;
private String prefix = "Alert-Action-Worker-";

@Override
public Thread newThread(Runnable r) {
Thread newThread = new Thread(r, prefix + numThread);
numThread++;
return newThread;
}
}

public class ActionWorker implements Runnable {

private ActionListener listener;
private Action action;

public ActionWorker(ActionListener listener, Action action) {
this.listener = listener;
this.action = action;
}

@Override
public void run() {
listener.process(action);
}
}

}

0 comments on commit b3c89da

Please sign in to comment.