Skip to content

Commit

Permalink
RhinoQueuesTransport waits for current message to finish processing b…
Browse files Browse the repository at this point in the history
…efore disposing the queue manager so messages can still be enqueued by consumers.
  • Loading branch information
RyanHauert committed Sep 14, 2012
1 parent 4ee8754 commit 74fa750
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
11 changes: 9 additions & 2 deletions Rhino.ServiceBus.RhinoQueues/RhinoQueues/RhinoQueuesTransport.cs
Expand Up @@ -34,6 +34,7 @@ public class RhinoQueuesTransport : ITransport
private QueueManager queueManager;
private readonly Thread[] threads;
private readonly string queueName;
private volatile int currentlyProccessingCount;
private volatile bool shouldContinue;
private bool haveStarted;
private readonly IsolationLevel queueIsolationLevel;
Expand Down Expand Up @@ -84,17 +85,19 @@ public void Dispose()
shouldContinue = false;
logger.DebugFormat("Stopping transport for {0}", endpoint);

while (currentlyProccessingCount > 0)
Thread.Sleep(TimeSpan.FromSeconds(1));

if (timeout != null)
timeout.Dispose();
DisposeQueueManager();

if (!haveStarted)
return;

foreach (var thread in threads)
{
thread.Join();
}
}
}

private void DisposeQueueManager()
Expand Down Expand Up @@ -364,6 +367,8 @@ private void ProcessMessage(Message message, TransactionScope tx, Func<CurrentMe
try
{
//deserialization errors do not count for module events
#pragma warning disable 420
Interlocked.Increment(ref currentlyProccessingCount);
object[] messages = DeserializeMessages(message);
try
{
Expand Down Expand Up @@ -404,6 +409,8 @@ private void ProcessMessage(Message message, TransactionScope tx, Func<CurrentMe
MessageProcessingFailure, currentMessageInformation);
messageHandlingCompletion.HandleMessageCompletion();
currentMessageInformation = null;
Interlocked.Decrement(ref currentlyProccessingCount);
#pragma warning restore 420
}
}

Expand Down
Expand Up @@ -108,6 +108,7 @@ public void TimeToReachQueue_is_set_when_MaxAttempts_is_one()
{
using (var container = new WindsorContainer())
{
MaxAttemptCustomizer.MaxAttempts = 1;
container.Register(Component.For<ICustomizeOutgoingMessages>()
.ImplementedBy<MaxAttemptCustomizer>()
.LifeStyle.Is(LifestyleType.Transient));
Expand Down

0 comments on commit 74fa750

Please sign in to comment.