Permalink
Browse files

Waiting pending ACKs to return before stopping the bridge, also avoid…

…ing IO exceptions and interrupt exceptions
  • Loading branch information...
clebertsuconic committed Nov 12, 2012
1 parent 17a7f95 commit 51bfc29cee60f3b5be50f2801ef87aa8eee663e4
@@ -1247,4 +1247,10 @@
value = "Can't find queue {0} while reloading PAGE_CURSOR_COMPLETE, deleting record now",
format = Message.Format.MESSAGE_FORMAT)
void cantFindQueueOnPageComplete(long queueID);
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 224084,
+ value = "Bridge {0} timed out waiting for the completion of {1} messages, we will just shutdown the bridge after 10 seconds wait",
+ format = Message.Format.MESSAGE_FORMAT)
+ void timedOutWaitingCompletions(String bridgeName, long numberOfMessages);
}
@@ -49,6 +49,7 @@
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.utils.FutureLatch;
+import org.hornetq.utils.ReusableLatch;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUID;
@@ -78,6 +79,8 @@
protected final ServerLocatorInternal serverLocator;
+ private final ReusableLatch pendingAcks = new ReusableLatch(0);
+
private final UUID nodeUUID;
private final SimpleString name;
@@ -329,7 +332,6 @@ public void stop() throws Exception
{
HornetQServerLogger.LOGGER.debug("Bridge " + this.name + " being stopped");
}
- cleanUpSessionFactory(csf);
if (futureScheduledReconnection != null)
{
@@ -451,22 +453,26 @@ public void setupRetry(final int currentCount, final int maxRetry)
public void sendAcknowledged(final Message message)
{
- try
+ if (active)
{
- final MessageReference ref = refs.poll();
-
- if (ref != null)
+ try
{
- if (isTrace)
+ final MessageReference ref = refs.poll();
+
+ if (ref != null)
{
- HornetQServerLogger.LOGGER.trace(this + " Acking " + ref + " on queue " + ref.getQueue());
+ if (isTrace)
+ {
+ HornetQServerLogger.LOGGER.trace(this + " Acking " + ref + " on queue " + ref.getQueue());
+ }
+ ref.getQueue().acknowledge(ref);
+ pendingAcks.countDown();
}
- ref.getQueue().acknowledge(ref);
}
- }
- catch (Exception e)
- {
- HornetQServerLogger.LOGGER.bridgeFailedToAck(e);
+ catch (Exception e)
+ {
+ HornetQServerLogger.LOGGER.bridgeFailedToAck(e);
+ }
}
}
@@ -563,15 +569,26 @@ public HandleStatus handle(final MessageReference ref) throws Exception
dest = message.getAddress();
}
- if (message.isLargeMessage())
+ pendingAcks.countUp();
+
+ try
{
- deliveringLargeMessage = true;
- deliverLargeMessage(dest, ref, (LargeServerMessage)message);
- return HandleStatus.HANDLED;
+ if (message.isLargeMessage())
+ {
+ deliveringLargeMessage = true;
+ deliverLargeMessage(dest, ref, (LargeServerMessage)message);
+ return HandleStatus.HANDLED;
+ }
+ else
+ {
+ return deliverStandardMessage(dest, ref, message);
+ }
}
- else
+ catch (Exception e)
{
- return deliverStandardMessage(dest, ref, message);
+ // If an exception happened, we must count down immediately
+ pendingAcks.countDown();
+ throw e;
}
}
}
@@ -831,6 +848,7 @@ protected void connect()
producer = session.createProducer();
session.addFailureListener(BridgeImpl.this);
+
session.setSendAcknowledgementHandler(BridgeImpl.this);
afterConnect();
@@ -952,6 +970,26 @@ public void run()
try
{
HornetQServerLogger.LOGGER.debug("stopping bridge " + BridgeImpl.this);
+ queue.removeConsumer(BridgeImpl.this);
+
+ if (!pendingAcks.await(10, TimeUnit.SECONDS))
+ {
+ HornetQServerLogger.LOGGER.timedOutWaitingCompletions(BridgeImpl.this.toString(),
+ pendingAcks.getCount());
+ }
+
+ synchronized (BridgeImpl.this)
+ {
+ HornetQServerLogger.LOGGER.debug("Closing Session for bridge " + BridgeImpl.this.name);
+
+ started = false;
+
+ active = false;
+
+ }
+
+
+ internalCancelReferences();
if (session != null)
{
@@ -972,20 +1010,6 @@ public void run()
csf.cleanup();
}
- queue.removeConsumer(BridgeImpl.this);
-
- internalCancelReferences();
-
- synchronized (BridgeImpl.this)
- {
- HornetQServerLogger.LOGGER.debug("Closing Session for bridge " + BridgeImpl.this.name);
-
- started = false;
-
- active = false;
-
- }
-
if (isTrace)
{
HornetQServerLogger.LOGGER.trace("Removing consumer on stopRunnable " + this + " from queue " + queue);
@@ -1005,14 +1029,20 @@ public void run()
{
try
{
+ queue.removeConsumer(BridgeImpl.this);
+
+ if (!pendingAcks.await(60, TimeUnit.SECONDS))
+ {
+ HornetQServerLogger.LOGGER.timedOutWaitingCompletions(BridgeImpl.this.toString(),
+ pendingAcks.getCount());
+ }
+
synchronized (BridgeImpl.this)
{
started = false;
active = false;
}
- queue.removeConsumer(BridgeImpl.this);
-
internalCancelReferences();
HornetQServerLogger.LOGGER.bridgePaused(name);
Oops, something went wrong.

0 comments on commit 51bfc29

Please sign in to comment.