Skip to content

Commit

Permalink
speeding up MDB stop
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed May 4, 2012
1 parent f88e838 commit bc0f970
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 6 deletions.
51 changes: 48 additions & 3 deletions src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
Expand Up @@ -450,14 +450,40 @@ else if (handler == null && !noPreviousHandler)

public void close() throws HornetQException
{
doCleanUp(true);
doCleanUp(true, false);
}

/**
* To be used by MDBs
* @param interruptConsumer it will send an interrupt to the thread
* @throws HornetQException
*/
public void interruptHandlers() throws HornetQException
{
closing = true;

resetLargeMessageController();

Thread onThread = onMessageThread;
if (onThread != null)
{
try
{
// just trying to interrupt any ongoing messages
onThread.interrupt();
}
catch (Throwable ignored)
{
// security exception probably.. we just ignore it, not big deal!
}
}
}

public void cleanUp()
{
try
{
doCleanUp(false);
doCleanUp(false, false);
}
catch (HornetQException e)
{
Expand Down Expand Up @@ -1011,6 +1037,8 @@ public Object run()
return null;
}
});

onMessageThread = null;
}

if (ClientConsumerImpl.trace)
Expand Down Expand Up @@ -1051,7 +1079,7 @@ private void flowControlBeforeConsumption(final ClientMessageInternal message) t
}
}

private void doCleanUp(final boolean sendCloseMessage) throws HornetQException
private void doCleanUp(final boolean sendCloseMessage, final boolean interruptConsumer) throws HornetQException
{
try
{
Expand All @@ -1067,6 +1095,23 @@ private void doCleanUp(final boolean sendCloseMessage) throws HornetQException

resetLargeMessageController();

if (interruptConsumer)
{
Thread onThread = receiverThread;
if (onThread != null)
{
try
{
// just trying to interrupt any ongoing messages
onThread.interrupt();
}
catch (Throwable ignored)
{
// security exception probably.. we just ignore it, not big deal!
}
}
}

// Now we wait for any current handler runners to run.
waitForOnMessageToComplete(true);

Expand Down
Expand Up @@ -47,6 +47,13 @@ public interface ClientConsumerInternal extends ClientConsumer
void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException;

void clear(boolean waitForOnMessage) throws HornetQException;

/**
* To be called by things like MDBs during shutdown of the server
* @param interrupt
* @throws HornetQException
*/
void interruptHandlers() throws HornetQException;

void clearAtFailover();

Expand Down
4 changes: 4 additions & 0 deletions src/main/org/hornetq/ra/inflow/HornetQActivation.java
Expand Up @@ -342,6 +342,10 @@ protected synchronized void teardown()
ra.getRecoveryManager().unRegister(resourceRecovery);
}

for (HornetQMessageHandler handler : handlers)
{
handler.interruptConsumer();
}
for (HornetQMessageHandler handler : handlers)
{
handler.teardown();
Expand Down
22 changes: 19 additions & 3 deletions src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Expand Up @@ -28,6 +28,7 @@
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession.QueueQuery;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.client.impl.ClientConsumerInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQDestination;
Expand Down Expand Up @@ -58,7 +59,7 @@ public class HornetQMessageHandler implements MessageHandler
*/
private final ClientSessionInternal session;

private ClientConsumer consumer;
private ClientConsumerInternal consumer;

/**
* The endpoint
Expand Down Expand Up @@ -151,7 +152,7 @@ public void setup() throws Exception
session.createQueue(activation.getAddress(), queueName, selectorString, true);
}
}
consumer = session.createConsumer(queueName, null, false);
consumer = (ClientConsumerInternal)session.createConsumer(queueName, null, false);
}
else
{
Expand Down Expand Up @@ -186,7 +187,7 @@ public void setup() throws Exception
session.createQueue(activation.getAddress(), queueName, selectorString, true);
}
}
consumer = session.createConsumer(queueName, selectorString);
consumer = (ClientConsumerInternal)session.createConsumer(queueName, selectorString);
}

// Create the endpoint, if we are transacted pass the sesion so it is enlisted, unless using Local TX
Expand All @@ -205,6 +206,21 @@ public void setup() throws Exception
}
consumer.setMessageHandler(this);
}

public void interruptConsumer()
{
try
{
if (consumer != null)
{
consumer.interruptHandlers();
}
}
catch (Throwable e)
{
log.warn("Error interrupting handler on endpoint " + endpoint + " handler=" + consumer);
}
}

/**
* Stop the handler
Expand Down
Expand Up @@ -930,6 +930,13 @@ public ClientSessionInternal getSession()
return null;
}

/* (non-Javadoc)
* @see org.hornetq.core.client.impl.ClientConsumerInternal#interruptHandlers()
*/
public void interruptHandlers() throws HornetQException
{
}

}

}

0 comments on commit bc0f970

Please sign in to comment.