Permalink
Browse files

It's no longer possible to get a collision if failFast is false (bloc…

…k is true). It used to be that on the rare occation where eviction was in the process of taking place, and a message came in for the Mp, it could end up marked as a collision. Now it will block until a new one can be created.
  • Loading branch information...
1 parent 6dc5bf6 commit 86b90ca81fb84a26b661c870815ba88676ef9d2a Jim Carroll committed Nov 3, 2012
@@ -106,7 +106,7 @@
{
private Object instance;
private Semaphore lock = new Semaphore(1,true); // basically a mutex
- private AtomicBoolean evicted = new AtomicBoolean(false);
+ private boolean evicted = false;
/**
* DO NOT CALL THIS WITH NULL OR THE LOCKING LOGIC WONT WORK
@@ -151,30 +151,31 @@ public boolean tryLock()
{
return lock.tryAcquire();
}
-
- /**
- * This will set the instance reference to null. MAKE SURE
- * YOU OWN THE LOCK BEFORE CALLING.
- */
- public void markPassivated() { instance = null; }
-
- /**
- * This will tell you if the instance reference is null. MAKE SURE
- * YOU OWN THE LOCK BEFORE CALLING.
- */
- public boolean isPassivated() { return instance == null; }
+
+// /**
+// * This will set the instance reference to null. MAKE SURE
+// * YOU OWN THE LOCK BEFORE CALLING.
+// */
+// public void markPassivated() { instance = null; }
+//
+// /**
+// * This will tell you if the instance reference is null. MAKE SURE
+// * YOU OWN THE LOCK BEFORE CALLING.
+// */
+// public boolean isPassivated() { return instance == null; }
/**
* This will prevent further operations on this instance.
* MAKE SURE YOU OWN THE LOCK BEFORE CALLING.
*/
- public void markEvicted() { evicted.set(true); }
+ public void markEvicted() { evicted = true; }
/**
* Flag to indicate this instance has been evicted and no further operations should be enacted.
+ * THIS SHOULDN'T BE CALLED WITHOUT HOLDING THE LOCK.
*/
- public boolean isEvicted() { return evicted.get(); }
+ public boolean isEvicted() { return evicted; }
//----------------------------------------------------------------------------
// Test access
@@ -446,7 +447,7 @@ public Object call()
* <p>
* <em>This method exists for testing; don't do anything stupid</em>
*/
- public Object getMessageProcessor(Object key)
+ protected Object getMessageProcessor(Object key)
{
InstanceWrapper wrapper = instances.get(key);
return (wrapper != null) ? wrapper.getInstance() : null;
@@ -461,41 +462,58 @@ public Object getMessageProcessor(Object key)
protected boolean dispatch(Object message, boolean block) throws ContainerException {
if (message == null)
return false; // No. We didn't process the null message
-
- InstanceWrapper wrapper;
- wrapper = getInstanceForDispatch(message);
-
+
+ boolean evictedAndBlocking;
boolean messageDispatchSuccessful = false;
- // wrapper cannot be null ... look at the getInstanceForDispatch method
- Object instance = wrapper.getExclusive(block);
-
- if (instance != null) // null indicates we didn't get the lock
+ do
{
- try
+ evictedAndBlocking = false;
+ InstanceWrapper wrapper = getInstanceForDispatch(message);
+
+ // wrapper cannot be null ... look at the getInstanceForDispatch method
+ Object instance = wrapper.getExclusive(block);
+
+ if (instance != null) // null indicates we didn't get the lock
{
- if(wrapper.isEvicted())
+ try
{
- if (logger.isTraceEnabled())
- logger.trace("the container for " + clusterId + " failed handle message due to evicted Mp " + SafeString.valueOf(prototype));
- statCollector.messageDiscarded(message);
- messageDispatchSuccessful = false;
- }
- else
- {
- invokeOperation(wrapper.getInstance(), Operation.handle, message);
- messageDispatchSuccessful = true;
+ if(wrapper.isEvicted())
+ {
+ // if we're not blocking then we need to just return a failure. Otherwise we want to try again
+ // because eventually the current Mp will be passivated and removed from the container and
+ // a subsequent call to getInstanceForDispatch will create a new one.
+ if (block)
+ {
+ Thread.yield();
+ evictedAndBlocking = true; // we're going to try again.
+ }
+ else // otherwise it's just like we couldn't get the lock. The Mp is busy being killed off.
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("the container for " + clusterId + " failed handle message due to evicted Mp " + SafeString.valueOf(prototype));
+
+ statCollector.messageDiscarded(message);
+ statCollector.messageCollision(message);
+ messageDispatchSuccessful = false;
+ }
+ }
+ else
+ {
+ invokeOperation(wrapper.getInstance(), Operation.handle, message);
+ messageDispatchSuccessful = true;
+ }
}
+ finally { wrapper.releaseLock(); }
+ }
+ else // ... we didn't get the lock
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("the container for " + clusterId + " failed to obtain lock on " + SafeString.valueOf(prototype));
+ statCollector.messageDiscarded(message);
+ statCollector.messageCollision(message);
}
- finally { wrapper.releaseLock(); }
- }
- else // ... we didn't get the lock
- {
- if (logger.isTraceEnabled())
- logger.trace("the container for " + clusterId + " failed to obtain lock on " + SafeString.valueOf(prototype));
- statCollector.messageDiscarded(message);
- statCollector.messageCollision(message);
- }
+ } while (evictedAndBlocking);
return messageDispatchSuccessful;
}
@@ -561,7 +579,7 @@ public void evict() {
removeInstance = true;
wrapper.markEvicted();
prototype.passivate(wrapper.getInstance());
- wrapper.markPassivated();
+// wrapper.markPassivated();
}
}
catch (InvocationTargetException e)
@@ -50,6 +50,8 @@
private AtomicBoolean shutdown = new AtomicBoolean(false);
private BlockingQueueDestination destination = null;
private OverflowHandler overflowHandler = null;
+ private boolean failFast = false; // this has been the default - since it's equivalent to there being no overflowHandler
+ private boolean explicitFailFast = false;
/**
* <p>This method starts a background thread that reads messages from the queue and sends
@@ -68,6 +70,10 @@
@Override
public synchronized void start() throws MessageTransportException
{
+ // check to see that the overflowHandler and the failFast setting are consistent.
+ if (!failFast && overflowHandler != null)
+ logger.warn("BlockingQueueAdaptor is configured with an OverflowHandler that will never be used because it's also configured to NOT 'fail fast' so it will always block waiting for messages to be processed.");
+
running = name == null ? new Thread(this) : new Thread(this,name);
running.setDaemon(true);
running.start();
@@ -100,7 +106,7 @@ public void run()
byte[] val = destination.queue.take();
Listener curListener = listener.get();
- boolean messageSuccess = curListener == null ? false : curListener.onMessage(val, overflowHandler != null);
+ boolean messageSuccess = curListener == null ? false : curListener.onMessage(val, failFast);
if (overflowHandler != null && !messageSuccess)
overflowHandler.overflow(val);
}
@@ -142,7 +148,13 @@ public void setName(String name)
* When an overflow handler is set the Adaptor indicates that a 'failFast' should happen
* and any failed message deliveries end up passed to the overflow handler.
*/
- public void setOverflowHandler(OverflowHandler handler) { this.overflowHandler = handler; }
+ public void setOverflowHandler(OverflowHandler handler) { this.overflowHandler = handler; if (!explicitFailFast) failFast = (handler != null); }
+
+ public void setFailFast(boolean failFast)
+ {
+ explicitFailFast = true;
+ this.failFast = failFast;
+ }
public BlockingQueue<byte[]> getQueue() { return destination == null ? null : destination.queue; }
Oops, something went wrong.

0 comments on commit 86b90ca

Please sign in to comment.