Permalink
Browse files

Configurable, use less resources, Results reflect actions.

  • Loading branch information...
1 parent ad3fc82 commit eaaba635b8531981391ad216a9ed98420f957ed9 fraenkel committed Oct 10, 2011
@@ -15,7 +15,7 @@
public interface KeyOperatorResult<K extends Serializable> {
boolean isApplied();
- boolean isUnApplySuccesful();
+ boolean isUnapplied();
String getErrorString();
@@ -11,7 +11,8 @@
package com.devwebsphere.wxs.asyncserviceimpl;
import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -27,14 +28,19 @@
import com.ibm.websphere.objectgrid.ObjectGridException;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
+import com.ibm.websphere.objectgrid.plugins.LogElement;
+import com.ibm.websphere.objectgrid.plugins.LogSequence;
import com.ibm.websphere.objectgrid.plugins.ObjectGridEventGroup;
+import com.ibm.websphere.objectgrid.plugins.ObjectGridEventListener;
-public class AsyncServiceProcessor<T> implements ObjectGridEventGroup.ShardEvents {
- static ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(3);
+public class AsyncServiceProcessor<T> implements ObjectGridEventListener, ObjectGridEventGroup.TransactionEvents, ObjectGridEventGroup.ShardEvents,
+ ObjectGridEventGroup.ShardLifecycle {
+ static Logger logger = Logger.getLogger(AsyncServiceProcessor.class.getName());
+ static ScheduledExecutorService executor;
- ArrayList<Listener> listeners;
+ Object[] listeners;
PerfReporter reporter;
- static Logger logger = Logger.getLogger(AsyncServiceProcessor.class.getName());
+
int partitionId;
volatile int numMessagesProcessed;
@@ -44,6 +50,20 @@
MessageProcessor<T> listenerInstance = null;
ScheduledFuture<?> perfTask;
ThreadLocalGridClient tlsOGClient;
+ private int poolSize = 3;
+
+ public void setThreadPoolSize(int tpSize) {
+ synchronized (AsyncServiceProcessor.class) {
+ if (executor == null) {
+ poolSize = tpSize;
+ executor = new ScheduledThreadPoolExecutor(tpSize);
+ }
+ }
+ }
+
+ public void setWorkers(int workers) {
+ numAlarms = workers;
+ }
/**
* This is called whenever a primary shard is place in a JVM. It starts the dispatcher threads at that point.
@@ -52,15 +72,15 @@ public void shardActivated(ObjectGrid shard) {
try {
tlsOGClient = new ThreadLocalGridClient(shard);
BackingMap queueMap = shard.getMap(MapNames.QUEUE_MAP);
- partitionId = queueMap.getPartitionId();
- logger.info("Activated " + partitionId);
- listeners = new ArrayList<Listener>(numAlarms);
+ setThreadPoolSize(poolSize);
+
+ listeners = new Object[numAlarms];
for (int i = 0; i < numAlarms; ++i) {
- Listener lis = new Listener(shard, queueMap, i);
- executor.schedule(lis, Listener.idleSleepTime, TimeUnit.MILLISECONDS);
- listeners.add(lis);
+ listeners[i] = new Listener(shard, queueMap, i);
}
+ partitionId = queueMap.getPartitionId();
+ logger.info("Activated " + partitionId);
reporter = new PerfReporter();
perfTask = executor.scheduleAtFixedRate(reporter, 10, 10, TimeUnit.SECONDS);
} catch (ObjectGridException e) {
@@ -73,11 +93,11 @@ public void shardActivated(ObjectGrid shard) {
* This is called whenever a primary shard is about to be moved to another JVM. It stops the dispatchers.
*/
public void shardDeactivate(ObjectGrid shard) {
- for (Listener l : listeners) {
- l.stopListening();
+ for (Object l : listeners) {
+ ((Listener) l).stopListening();
}
+ listeners = null;
- listeners.clear();
perfTask.cancel(false);
}
@@ -106,16 +126,16 @@ public void stopWorking() {
*/
class Listener implements Callable<Boolean> {
static final int busySleepTime = 1;
- static final int idleSleepTime = 250;
static final int maxCounter = 200;
ObjectGrid grid;
Session sess;
BackingMap queue;
ObjectMap map;
ObjectMap processedIdMap;
volatile boolean isActive;
- int counter;
- int sleepTime;
+
+ private ScheduledFuture<Boolean> future;
+ boolean scheduled = false;
boolean isActive() {
return this.isActive;
@@ -128,8 +148,6 @@ boolean isActive() {
isActive = true;
map = sess.getMap(queue.getName());
processedIdMap = sess.getMap(MapNames.HISTORY_MAP);
- counter = maxCounter;
- sleepTime = idleSleepTime;
}
void stopListening() {
@@ -138,10 +156,11 @@ void stopListening() {
public Boolean call() {
if (isActive()) {
+ boolean reschedule = true;
try {
sess.begin();
// get next unlocked/unprocessed message from queue
- RoutableKey nextMsgKey = (RoutableKey) map.getNextKey(sleepTime);
+ RoutableKey nextMsgKey = (RoutableKey) map.getNextKey(busySleepTime);
if (nextMsgKey != null) {
++numMessagesProcessed;
// we have a message
@@ -153,16 +172,8 @@ public Boolean call() {
// store result in history map for peekresult plus duplicate detection
processedIdMap.insert(nextMsgKey, result);
sess.commit();
- counter = maxCounter;
- // short timeouts when we're busy
- sleepTime = busySleepTime;
} else {
- // if no work then reset transaction every maxCounter tries
- if (--counter == 0) {
- counter = maxCounter;
- // we're idle to switch to long sleep times
- sleepTime = idleSleepTime;
- }
+ reschedule = false;
}
} catch (Exception e) {
logger.log(Level.SEVERE, "Exception processing message ", e);
@@ -174,10 +185,22 @@ public Boolean call() {
}
}
}
- executor.schedule(this, sleepTime, TimeUnit.MILLISECONDS);
+
+ synchronized (this) {
+ future = null;
+ schedule(reschedule);
+ }
}
return Boolean.TRUE;
}
+
+ synchronized void schedule(boolean reschedule) {
+ scheduled |= reschedule;
+ if (future == null && scheduled) {
+ future = executor.schedule(this, busySleepTime, TimeUnit.MILLISECONDS);
+ scheduled = false;
+ }
+ }
}
public int getNumThreads() {
@@ -218,4 +241,35 @@ public String getListenerClass() {
}
}
+ public void transactionBegin(String txId, boolean isWriteThroughEnabled) {
+ }
+
+ public void transactionEnd(String txId, boolean isWriteThroughEnabled, boolean committed, Collection changes) {
+ if (committed) {
+ for (Object o : changes) {
+ LogSequence lseq = (LogSequence) o;
+ if (lseq.getMapName().equals(MapNames.QUEUE_MAP)) {
+ for (Iterator<LogElement> lEntryItr = lseq.getAllChanges(); lEntryItr.hasNext();) {
+ LogElement lElem = lEntryItr.next();
+ if (LogElement.INSERT.equals(lElem.getType())) {
+ // make sure the workers/alarms are running with inserts
+ for (Object l : listeners) {
+ ((Listener) l).schedule(true);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public void destroy() {
+ }
+
+ public void initialize(Session sess) {
+ // disable on clients
+ if (sess.getObjectGrid().getObjectGridType() != ObjectGrid.SERVER) {
+ sess.getObjectGrid().removeEventListener(this);
+ }
+ }
}
@@ -1,61 +0,0 @@
-//
-//This sample program is provided AS IS and may be used, executed, copied and
-//modified without royalty payment by customer (a) for its own instruction and
-//study, (b) in order to develop applications designed to run with an IBM
-//WebSphere product, either for customer's own internal use or for redistribution
-//by customer, as part of such an application, in customer's own products. "
-//
-//5724-J34 (C) COPYRIGHT International Business Machines Corp. 2005
-//All Rights Reserved * Licensed Materials - Property of IBM
-//
-package com.devwebsphere.wxs.asyncserviceimpl;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import com.ibm.websphere.objectgrid.ObjectGrid;
-import com.ibm.websphere.objectgrid.Session;
-import com.ibm.websphere.objectgrid.plugins.ObjectGridEventGroup;
-import com.ibm.websphere.objectgrid.plugins.ObjectGridEventListener;
-
-public class ChainedShardEvents implements ObjectGridEventListener, ObjectGridEventGroup.ShardEvents {
- List<ObjectGridEventGroup.ShardEvents> listeners = new ArrayList<ObjectGridEventGroup.ShardEvents>();
-
- public ChainedShardEvents() {
- listeners.add(new AsyncServiceProcessor<Boolean>());
- }
-
- public List<ObjectGridEventGroup.ShardEvents> getListeners() {
- return listeners;
- }
-
- public void setListeners(List<ObjectGridEventGroup.ShardEvents> listeners) {
- this.listeners = listeners;
- }
-
- public void shardActivated(ObjectGrid arg0) {
- for (ObjectGridEventGroup.ShardEvents item : listeners) {
- item.shardActivated(arg0);
- }
- }
-
- public void shardDeactivate(ObjectGrid arg0) {
- for (ObjectGridEventGroup.ShardEvents item : listeners) {
- item.shardDeactivate(arg0);
- }
- }
-
- public void destroy() {
- }
-
- public void initialize(Session arg0) {
- }
-
- public void transactionBegin(String arg0, boolean arg1) {
- }
-
- public void transactionEnd(String arg0, boolean arg1, boolean arg2, Collection arg3) {
- }
-
-}
Oops, something went wrong.

0 comments on commit eaaba63

Please sign in to comment.