From bc0f970c1f447b703ce84a41445a929f387dd1d1 Mon Sep 17 00:00:00 2001 From: clebert suconic Date: Thu, 3 May 2012 22:54:05 -0500 Subject: [PATCH] speeding up MDB stop --- .../core/client/impl/ClientConsumerImpl.java | 51 +++++++++++++++++-- .../client/impl/ClientConsumerInternal.java | 7 +++ .../hornetq/ra/inflow/HornetQActivation.java | 4 ++ .../ra/inflow/HornetQMessageHandler.java | 22 ++++++-- .../client/impl/LargeMessageBufferTest.java | 7 +++ 5 files changed, 85 insertions(+), 6 deletions(-) diff --git a/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java b/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java index 16ece5fb848..2888da2c015 100644 --- a/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java +++ b/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java @@ -450,14 +450,40 @@ else if (handler == null && !noPreviousHandler) public void close() throws HornetQException { - doCleanUp(true); + doCleanUp(true, false); + } + + /** + * To be used by MDBs + * @param interruptConsumer it will send an interrupt to the thread + * @throws HornetQException + */ + public void interruptHandlers() throws HornetQException + { + closing = true; + + resetLargeMessageController(); + + Thread onThread = onMessageThread; + if (onThread != null) + { + try + { + // just trying to interrupt any ongoing messages + onThread.interrupt(); + } + catch (Throwable ignored) + { + // security exception probably.. we just ignore it, not big deal! + } + } } public void cleanUp() { try { - doCleanUp(false); + doCleanUp(false, false); } catch (HornetQException e) { @@ -1011,6 +1037,8 @@ public Object run() return null; } }); + + onMessageThread = null; } if (ClientConsumerImpl.trace) @@ -1051,7 +1079,7 @@ private void flowControlBeforeConsumption(final ClientMessageInternal message) t } } - private void doCleanUp(final boolean sendCloseMessage) throws HornetQException + private void doCleanUp(final boolean sendCloseMessage, final boolean interruptConsumer) throws HornetQException { try { @@ -1067,6 +1095,23 @@ private void doCleanUp(final boolean sendCloseMessage) throws HornetQException resetLargeMessageController(); + if (interruptConsumer) + { + Thread onThread = receiverThread; + if (onThread != null) + { + try + { + // just trying to interrupt any ongoing messages + onThread.interrupt(); + } + catch (Throwable ignored) + { + // security exception probably.. we just ignore it, not big deal! + } + } + } + // Now we wait for any current handler runners to run. waitForOnMessageToComplete(true); diff --git a/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java b/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java index 086545be6a1..42bffe0b1fb 100644 --- a/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java +++ b/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java @@ -47,6 +47,13 @@ public interface ClientConsumerInternal extends ClientConsumer void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException; void clear(boolean waitForOnMessage) throws HornetQException; + + /** + * To be called by things like MDBs during shutdown of the server + * @param interrupt + * @throws HornetQException + */ + void interruptHandlers() throws HornetQException; void clearAtFailover(); diff --git a/src/main/org/hornetq/ra/inflow/HornetQActivation.java b/src/main/org/hornetq/ra/inflow/HornetQActivation.java index e3937a1b11b..bc496582ea1 100644 --- a/src/main/org/hornetq/ra/inflow/HornetQActivation.java +++ b/src/main/org/hornetq/ra/inflow/HornetQActivation.java @@ -342,6 +342,10 @@ protected synchronized void teardown() ra.getRecoveryManager().unRegister(resourceRecovery); } + for (HornetQMessageHandler handler : handlers) + { + handler.interruptConsumer(); + } for (HornetQMessageHandler handler : handlers) { handler.teardown(); diff --git a/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java b/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java index 7902c8bf014..ba5b0a482ea 100644 --- a/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java +++ b/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java @@ -28,6 +28,7 @@ import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.ClientSession.QueueQuery; import org.hornetq.api.core.client.MessageHandler; +import org.hornetq.core.client.impl.ClientConsumerInternal; import org.hornetq.core.client.impl.ClientSessionInternal; import org.hornetq.core.logging.Logger; import org.hornetq.jms.client.HornetQDestination; @@ -58,7 +59,7 @@ public class HornetQMessageHandler implements MessageHandler */ private final ClientSessionInternal session; - private ClientConsumer consumer; + private ClientConsumerInternal consumer; /** * The endpoint @@ -151,7 +152,7 @@ public void setup() throws Exception session.createQueue(activation.getAddress(), queueName, selectorString, true); } } - consumer = session.createConsumer(queueName, null, false); + consumer = (ClientConsumerInternal)session.createConsumer(queueName, null, false); } else { @@ -186,7 +187,7 @@ public void setup() throws Exception session.createQueue(activation.getAddress(), queueName, selectorString, true); } } - consumer = session.createConsumer(queueName, selectorString); + consumer = (ClientConsumerInternal)session.createConsumer(queueName, selectorString); } // Create the endpoint, if we are transacted pass the sesion so it is enlisted, unless using Local TX @@ -205,6 +206,21 @@ public void setup() throws Exception } consumer.setMessageHandler(this); } + + public void interruptConsumer() + { + try + { + if (consumer != null) + { + consumer.interruptHandlers(); + } + } + catch (Throwable e) + { + log.warn("Error interrupting handler on endpoint " + endpoint + " handler=" + consumer); + } + } /** * Stop the handler diff --git a/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java b/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java index 741c815a5f2..488eccef58b 100644 --- a/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java +++ b/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java @@ -930,6 +930,13 @@ public ClientSessionInternal getSession() return null; } + /* (non-Javadoc) + * @see org.hornetq.core.client.impl.ClientConsumerInternal#interruptHandlers() + */ + public void interruptHandlers() throws HornetQException + { + } + } }