Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Reverted to a better solution for handling serialization errors when …

…dequeueing in Rhino Queues.
  • Loading branch information...
commit 23227543c8ada98b85ab4b925ecca837d9c29155 1 parent b8bcb01
@CoreyKaylor CoreyKaylor authored
View
2  Rhino.ServiceBus.Tests/Rhino.ServiceBus.Tests.csproj
@@ -144,7 +144,7 @@
<Compile Include="RhinoQueues\PhtSubscriptionStorageFixture.cs" />
<Compile Include="RhinoQueues\UsingRhinoQueuesBus.cs" />
<Compile Include="RhinoQueues\UsingRhinoQueuesTransport.cs" />
- <Compile Include="RhinoQueues\WhenSerializationErrorOccurs.cs" />
+ <Compile Include="RhinoQueues\WhenErrorOccurs.cs" />
<Compile Include="RhinoQueues\WithDebugging.cs" />
<Compile Include="SagaTests.cs" />
<Compile Include="TestExtensions.cs" />
View
5 Rhino.ServiceBus.Tests/Rhino.ServiceBus.Tests.csproj.user
@@ -1,5 +1,6 @@
-<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+<?xml version="1.0" encoding="utf-8"?>
+<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
- <ProjectView>ShowAllFiles</ProjectView>
+ <ProjectView>ProjectFiles</ProjectView>
</PropertyGroup>
</Project>
View
113 Rhino.ServiceBus.Tests/RhinoQueues/WhenErrorOccurs.cs
@@ -0,0 +1,113 @@
+using System;
+using System.IO;
+using System.Threading;
+using System.Transactions;
+using Castle.MicroKernel;
+using Rhino.ServiceBus.Impl;
+using Rhino.ServiceBus.Internal;
+using Rhino.ServiceBus.RhinoQueues;
+using Rhino.ServiceBus.Serializers;
+using Xunit;
+
+namespace Rhino.ServiceBus.Tests.RhinoQueues
+{
+ public class WhenErrorOccurs : IDisposable
+ {
+ private RhinoQueuesTransport transport;
+ private readonly ManualResetEvent wait = new ManualResetEvent(false);
+ private IMessageSerializer messageSerializer;
+
+ public int FailedCount;
+
+ public WhenErrorOccurs()
+ {
+ if (Directory.Exists("test.esent"))
+ Directory.Delete("test.esent", true);
+ }
+
+ [Fact]
+ public void Deserialization_Error_Will_Not_Retry()
+ {
+ messageSerializer = new ThrowingSerializer(new XmlMessageSerializer(new DefaultReflection(), new DefaultKernel()));
+ transport = new RhinoQueuesTransport(
+ new Uri("rhino.queues://localhost:23456/q"),
+ new EndpointRouter(),
+ messageSerializer,
+ 1,
+ "test.esent",
+ IsolationLevel.Serializable,
+ 5,
+ new RhinoQueuesMessageBuilder(messageSerializer)
+ );
+ transport.Start();
+ var count = 0;
+ transport.MessageProcessingFailure += (messageInfo, ex) =>
+ {
+ count++;
+ };
+ transport.Send(transport.Endpoint, new object[] { "test" });
+
+ wait.WaitOne(TimeSpan.FromSeconds(5));
+
+ Assert.Equal(1, count);
+ }
+
+ [Fact]
+ public void Arrived_Error_Will_Retry_Number_Of_Times_Configured()
+ {
+ messageSerializer = new XmlMessageSerializer(new DefaultReflection(), new DefaultKernel());
+ transport = new RhinoQueuesTransport(
+ new Uri("rhino.queues://localhost:23456/q"),
+ new EndpointRouter(),
+ messageSerializer,
+ 1,
+ "test.esent",
+ IsolationLevel.Serializable,
+ 5,
+ new RhinoQueuesMessageBuilder(messageSerializer)
+ );
+ transport.Start();
+ var count = 0;
+ transport.MessageArrived += info =>
+ {
+ throw new InvalidOperationException();
+ };
+ transport.MessageProcessingFailure += (messageInfo, ex) =>
+ {
+ count++;
+ };
+ transport.Send(transport.Endpoint, new object[] { "test" });
+
+ wait.WaitOne(TimeSpan.FromSeconds(5));
+
+ Assert.Equal(5, count);
+ }
+
+ public void Dispose()
+ {
+ transport.Dispose();
+ }
+ }
+
+ public class ThrowingSerializer : IMessageSerializer
+ {
+ private readonly XmlMessageSerializer serializer;
+
+ public ThrowingSerializer(XmlMessageSerializer serializer)
+ {
+ this.serializer = serializer;
+ }
+
+ public void Serialize(object[] messages, Stream message)
+ {
+ serializer.Serialize(messages, message);
+ }
+
+ public object[] Deserialize(Stream message)
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+
+}
View
84 Rhino.ServiceBus.Tests/RhinoQueues/WhenSerializationErrorOccurs.cs
@@ -1,84 +0,0 @@
-using System;
-using System.IO;
-using System.Threading;
-using System.Transactions;
-using Castle.MicroKernel;
-using Rhino.ServiceBus.Impl;
-using Rhino.ServiceBus.Internal;
-using Rhino.ServiceBus.MessageModules;
-using Rhino.ServiceBus.RhinoQueues;
-using Rhino.ServiceBus.Serializers;
-using Xunit;
-
-namespace Rhino.ServiceBus.Tests.RhinoQueues
-{
- public class WhenSerializationErrorOccurs : IDisposable
- {
- private readonly RhinoQueuesTransport transport;
- private readonly ManualResetEvent wait = new ManualResetEvent(false);
- private readonly IMessageSerializer messageSerializer;
-
- public int FailedCount;
-
- public WhenSerializationErrorOccurs()
- {
- if (Directory.Exists("test.esent"))
- Directory.Delete("test.esent", true);
-
- messageSerializer = new ThrowingSerializer(new XmlMessageSerializer(new DefaultReflection(), new DefaultKernel()));
- transport = new RhinoQueuesTransport(
- new Uri("rhino.queues://localhost:23456/q"),
- new EndpointRouter(),
- messageSerializer,
- 1,
- "test.esent",
- IsolationLevel.Serializable,
- 5,
- new RhinoQueuesMessageBuilder(messageSerializer)
- );
- transport.Start();
- }
-
- [Fact]
- public void Will_Retry_Number_Of_Times_Configured()
- {
- var count = 0;
- transport.MessageProcessingFailure += (messageInfo, ex) =>
- {
- count++;
- };
- transport.Send(transport.Endpoint, new object[] { "test" });
-
- wait.WaitOne(TimeSpan.FromSeconds(5));
-
- Assert.Equal(5, count);
- }
-
- public void Dispose()
- {
- transport.Dispose();
- }
- }
-
- public class ThrowingSerializer : IMessageSerializer
- {
- private readonly XmlMessageSerializer serializer;
-
- public ThrowingSerializer(XmlMessageSerializer serializer)
- {
- this.serializer = serializer;
- }
-
- public void Serialize(object[] messages, Stream message)
- {
- serializer.Serialize(messages, message);
- }
-
- public object[] Deserialize(Stream message)
- {
- throw new NotImplementedException();
- }
- }
-
-
-}
View
76 Rhino.ServiceBus/RhinoQueues/ErrorAction.cs
@@ -1,5 +1,6 @@
using System;
using System.Text;
+using System.Transactions;
using Rhino.Queues;
using Rhino.Queues.Model;
using Rhino.ServiceBus.DataStructures;
@@ -29,47 +30,58 @@ public void Init(ITransport transport)
private bool Transport_OnMessageArrived(CurrentMessageInformation information)
{
- var info = (RhinoQueueCurrentMessageInformation) information;
- ErrorCounter val = null;
- failureCounts.Read(reader => reader.TryGetValue(info.TransportMessageId, out val));
- if (val != null)
- val.AtLeastOneMessageWasReceived = true;
- return MoveToErrorSubqueueIfReachedMaximumRetry(info, val);
- }
+ var info = (RhinoQueueCurrentMessageInformation)information;
+ ErrorCounter val = null;
+ failureCounts.Read(reader => reader.TryGetValue(info.TransportMessageId, out val));
+ if (val == null || val.FailureCount < numberOfRetries)
+ return false;
- private bool MoveToErrorSubqueueIfReachedMaximumRetry(RhinoQueueCurrentMessageInformation info, ErrorCounter errorCounter)
- {
- if(errorCounter == null || errorCounter.FailureCount < numberOfRetries)
- return false;
+ var result = false;
+ failureCounts.Write(writer =>
+ {
+ if (writer.TryGetValue(info.TransportMessageId, out val) == false)
+ return;
- failureCounts.Write(writer =>
- {
- if (writer.TryGetValue(info.TransportMessageId, out errorCounter) == false)
- return;
+ info.Queue.MoveTo(SubQueue.Errors.ToString(), info.TransportMessage);
+ info.Queue.EnqueueDirectlyTo(SubQueue.Errors.ToString(), new MessagePayload
+ {
+ Data = val.ExceptionText == null ? null : Encoding.Unicode.GetBytes(val.ExceptionText),
+ Headers =
+ {
+ {"correlation-id", info.TransportMessageId},
+ {"retries", val.FailureCount.ToString()}
+ }
+ });
- info.Queue.MoveTo(SubQueue.Errors.ToString(), info.TransportMessage);
- info.Queue.EnqueueDirectlyTo(SubQueue.Errors.ToString(), new MessagePayload
- {
- Data = errorCounter.ExceptionText == null ? null : Encoding.Unicode.GetBytes(errorCounter.ExceptionText),
- Headers =
- {
- {"correlation-id", info.TransportMessageId},
- {"retries", errorCounter.FailureCount.ToString()}
- }
- });
+ result = true;
+ });
- });
- return true;
- }
+ return result;
+ }
- private void Transport_OnMessageSerializationException(CurrentMessageInformation information, Exception exception)
+ private void Transport_OnMessageSerializationException(CurrentMessageInformation information, Exception exception)
{
- var info = (RhinoQueueCurrentMessageInformation) information;
+ var info = (RhinoQueueCurrentMessageInformation)information;
failureCounts.Write(writer => writer.Add(info.TransportMessageId, new ErrorCounter
{
ExceptionText = exception == null ? null : exception.ToString(),
FailureCount = numberOfRetries + 1
}));
+
+ using (var tx = new TransactionScope(TransactionScopeOption.RequiresNew))
+ {
+ info.Queue.MoveTo(SubQueue.Errors.ToString(), info.TransportMessage);
+ info.Queue.EnqueueDirectlyTo(SubQueue.Errors.ToString(), new MessagePayload
+ {
+ Data = exception == null ? null : Encoding.Unicode.GetBytes(exception.ToString()),
+ Headers =
+ {
+ {"correlation-id", info.TransportMessageId},
+ {"retries", "1"}
+ }
+ });
+ tx.Complete();
+ }
}
private void Transport_OnMessageProcessingCompleted(CurrentMessageInformation information, Exception ex)
@@ -98,9 +110,6 @@ private void Transport_OnMessageProcessingFailure(CurrentMessageInformation info
};
writer.Add(information.TransportMessageId, errorCounter);
}
-
- if (errorCounter.AtLeastOneMessageWasReceived == false)
- MoveToErrorSubqueueIfReachedMaximumRetry((RhinoQueueCurrentMessageInformation) information, errorCounter);
errorCounter.FailureCount += 1;
});
}
@@ -109,7 +118,6 @@ private class ErrorCounter
{
public string ExceptionText;
public int FailureCount;
- public bool AtLeastOneMessageWasReceived;
}
}
View
843 Rhino.ServiceBus/RhinoQueues/RhinoQueuesTransport.cs
@@ -1,6 +1,5 @@
using System;
using System.Collections.Specialized;
-using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Net;
@@ -13,7 +12,6 @@
using Rhino.Queues.Model;
using Rhino.ServiceBus.Impl;
using Rhino.ServiceBus.Internal;
-using Rhino.ServiceBus.Messages;
using Rhino.ServiceBus.Transport;
using Rhino.ServiceBus.Util;
using Transaction = System.Transactions.Transaction;
@@ -21,386 +19,383 @@
namespace Rhino.ServiceBus.RhinoQueues
{
[CLSCompliant(false)]
- public class RhinoQueuesTransport : ITransport
- {
- private readonly Uri endpoint;
- private readonly IEndpointRouter endpointRouter;
- private readonly IMessageSerializer messageSerializer;
- private readonly int threadCount;
- private readonly string path;
- private QueueManager queueManager;
- private readonly Thread[] threads;
- private readonly string queueName;
- private volatile bool shouldContinue;
- private bool haveStarted;
- private readonly IsolationLevel queueIsolationLevel;
- private readonly int numberOfRetries;
- private readonly IMessageBuilder<MessagePayload> messageBuilder;
-
- [ThreadStatic]
- private static RhinoQueueCurrentMessageInformation currentMessageInformation;
-
- private readonly ILog logger = LogManager.GetLogger(typeof(RhinoQueuesTransport));
- private TimeoutAction timeout;
- private IQueue queue;
-
-
- public RhinoQueuesTransport(Uri endpoint,
- IEndpointRouter endpointRouter,
- IMessageSerializer messageSerializer,
- int threadCount,
- string path,
- IsolationLevel queueIsolationLevel,
- int numberOfRetries,
+ public class RhinoQueuesTransport : ITransport
+ {
+ private readonly Uri endpoint;
+ private readonly IEndpointRouter endpointRouter;
+ private readonly IMessageSerializer messageSerializer;
+ private readonly int threadCount;
+ private readonly string path;
+ private QueueManager queueManager;
+ private readonly Thread[] threads;
+ private readonly string queueName;
+ private volatile bool shouldContinue;
+ private bool haveStarted;
+ private readonly IsolationLevel queueIsolationLevel;
+ private readonly int numberOfRetries;
+ private readonly IMessageBuilder<MessagePayload> messageBuilder;
+
+ [ThreadStatic]
+ private static RhinoQueueCurrentMessageInformation currentMessageInformation;
+
+ private readonly ILog logger = LogManager.GetLogger(typeof(RhinoQueuesTransport));
+ private TimeoutAction timeout;
+ private IQueue queue;
+
+
+ public RhinoQueuesTransport(Uri endpoint,
+ IEndpointRouter endpointRouter,
+ IMessageSerializer messageSerializer,
+ int threadCount,
+ string path,
+ IsolationLevel queueIsolationLevel,
+ int numberOfRetries,
IMessageBuilder<MessagePayload> messageBuilder)
- {
- this.endpoint = endpoint;
- this.queueIsolationLevel = queueIsolationLevel;
- this.numberOfRetries = numberOfRetries;
- this.messageBuilder = messageBuilder;
- this.endpointRouter = endpointRouter;
- this.messageSerializer = messageSerializer;
- this.threadCount = threadCount;
- this.path = path;
-
- queueName = endpoint.GetQueueName();
-
- threads = new Thread[threadCount];
-
- // This has to be the first subscriber to the transport events
- // in order to successfuly handle the errors semantics
- new ErrorAction(numberOfRetries).Init(this);
+ {
+ this.endpoint = endpoint;
+ this.queueIsolationLevel = queueIsolationLevel;
+ this.numberOfRetries = numberOfRetries;
+ this.messageBuilder = messageBuilder;
+ this.endpointRouter = endpointRouter;
+ this.messageSerializer = messageSerializer;
+ this.threadCount = threadCount;
+ this.path = path;
+
+ queueName = endpoint.GetQueueName();
+
+ threads = new Thread[threadCount];
+
+ // This has to be the first subscriber to the transport events
+ // in order to successfuly handle the errors semantics
+ new ErrorAction(numberOfRetries).Init(this);
messageBuilder.Initialize(this.Endpoint);
- }
-
- public void Dispose()
- {
- shouldContinue = false;
- logger.DebugFormat("Stopping transport for {0}", endpoint);
-
- if (timeout != null)
- timeout.Dispose();
- DisposeQueueManager();
-
- if (!haveStarted)
- return;
-
- foreach (var thread in threads)
- {
- thread.Join();
- }
- }
-
- private void DisposeQueueManager()
- {
- if (queueManager != null)
- {
- const int retries = 5;
- int tries = 0;
- bool disposeRudely = false;
- while (true)
- {
- try
- {
- queueManager.Dispose();
- break;
- }
- catch (EsentErrorException e)
- {
- tries += 1;
- if (tries > retries)
- {
- disposeRudely = true;
- break;
- }
- if (e.Error != JET_err.TooManyActiveUsers)
- throw;
- // let the other threads a chance to complete their work
- Thread.Sleep(50);
- }
- }
- if (disposeRudely)
- queueManager.DisposeRudely();
- }
- }
-
- [CLSCompliant(false)]
- public IQueue Queue
- {
- get { return queue; }
- }
-
- public void Start()
- {
+ }
+
+ public void Dispose()
+ {
+ shouldContinue = false;
+ logger.DebugFormat("Stopping transport for {0}", endpoint);
+
+ if (timeout != null)
+ timeout.Dispose();
+ DisposeQueueManager();
+
+ if (!haveStarted)
+ return;
+
+ foreach (var thread in threads)
+ {
+ thread.Join();
+ }
+ }
+
+ private void DisposeQueueManager()
+ {
+ if (queueManager != null)
+ {
+ const int retries = 5;
+ int tries = 0;
+ bool disposeRudely = false;
+ while (true)
+ {
+ try
+ {
+ queueManager.Dispose();
+ break;
+ }
+ catch (EsentErrorException e)
+ {
+ tries += 1;
+ if (tries > retries)
+ {
+ disposeRudely = true;
+ break;
+ }
+ if (e.Error != JET_err.TooManyActiveUsers)
+ throw;
+ // let the other threads a chance to complete their work
+ Thread.Sleep(50);
+ }
+ }
+ if (disposeRudely)
+ queueManager.DisposeRudely();
+ }
+ }
+
+ [CLSCompliant(false)]
+ public IQueue Queue
+ {
+ get { return queue; }
+ }
+
+ public void Start()
+ {
if (haveStarted)
return;
- shouldContinue = true;
-
- var port = endpoint.Port;
- if (port == -1)
- port = 2200;
- queueManager = new QueueManager(new IPEndPoint(IPAddress.Any, port), path);
- queueManager.CreateQueues(queueName);
-
- queue = queueManager.GetQueue(queueName);
-
- timeout = new TimeoutAction(queue);
- logger.DebugFormat("Starting {0} threads to handle messages on {1}, number of retries: {2}",
- threadCount, endpoint, numberOfRetries);
- for (var i = 0; i < threadCount; i++)
- {
- threads[i] = new Thread(ReceiveMessage)
- {
- Name = "Rhino Service Bus Worker Thread #" + i,
- IsBackground = true
- };
- threads[i].Start(i);
- }
- haveStarted = true;
- var started = Started;
- if (started != null)
- started();
- }
-
- private void ReceiveMessage(object context)
- {
- while (shouldContinue)
- {
- try
- {
- queueManager.Peek(queueName);
- }
- catch (TimeoutException)
- {
- logger.DebugFormat("Could not find a message on {0} during the timeout period",
- endpoint);
- continue;
- }
- catch (ObjectDisposedException)
- {
- logger.DebugFormat("Shutting down the transport for {0} thread {1}", endpoint, context);
- return;
- }
- catch (EsentErrorException e)
- {
- // we were shut down, so just return
- if (e.Error == JET_err.TermInProgress)
- {
- logger.DebugFormat("Shutting down the transport for {0} thread {1}", endpoint, context);
- }
- else
- {
- logger.Error("An error occured while recieving a message, shutting down message processing thread", e);
- }
- return;
- }
-
- if (shouldContinue == false)
- return;
-
- var transactionOptions = GetTransactionOptions();
- using (var tx = new TransactionScope(TransactionScopeOption.Required, transactionOptions))
- {
- Message message;
- try
- {
- message = queueManager.Receive(queueName, TimeSpan.FromSeconds(1));
- }
- catch (TimeoutException)
- {
- logger.DebugFormat("Could not find a message on {0} during the timeout period",
- endpoint);
- continue;
- }
- catch (EsentErrorException e)
- {
- // we were shut down, so just return
- if (e.Error == JET_err.TermInProgress)
- return;
- logger.Error("An error occured while recieving a message, shutting down message processing thread", e);
- return;
- }
- catch (Exception e)
- {
- logger.Error("An error occured while recieving a message, shutting down message processing thread", e);
- return;
- }
-
- try
- {
- var msgType = (MessageType)Enum.Parse(typeof(MessageType), message.Headers["type"]);
- logger.DebugFormat("Starting to handle message {0} of type {1} on {2}",
- message.Id,
- msgType,
- endpoint);
- switch (msgType)
- {
- case MessageType.AdministrativeMessageMarker:
- ProcessMessage(message, tx,
- AdministrativeMessageArrived,
- AdministrativeMessageProcessingCompleted,
- null);
- break;
- case MessageType.ShutDownMessageMarker:
- //ignoring this one
- tx.Complete();
- break;
- case MessageType.TimeoutMessageMarker:
- var timeToSend = XmlConvert.ToDateTime(message.Headers["time-to-send"], XmlDateTimeSerializationMode.Utc);
- if (timeToSend > DateTime.Now)
- {
- timeout.Register(message);
- queue.MoveTo(SubQueue.Timeout.ToString(), message);
- tx.Complete();
- }
- else
- {
- ProcessMessage(message, tx,
- MessageArrived,
- MessageProcessingCompleted,
- null);
- }
- break;
- default:
- ProcessMessage(message, tx,
- MessageArrived,
- MessageProcessingCompleted,
- BeforeMessageTransactionCommit);
- break;
- }
- }
- catch (Exception exception)
- {
- logger.Debug("Could not process message", exception);
- }
- }
-
- }
- }
-
- private void ProcessMessage(
- Message message,
- TransactionScope tx,
- Func<CurrentMessageInformation, bool> messageRecieved,
- Action<CurrentMessageInformation, Exception> messageCompleted,
- Action<CurrentMessageInformation> beforeTransactionCommit)
- {
- Exception ex = null;
- try
- {
- currentMessageInformation = new RhinoQueueCurrentMessageInformation
- {
- TransportMessageId = message.Id.ToString(),
- TransportMessage = message,
- Queue = queue,
- };
- //deserialization errors do not count for module events
- object[] messages = DeserializeMessages(message);
- try
- {
- var messageId = new Guid(message.Headers["id"]);
- var source = new Uri(message.Headers["source"]);
- foreach (var msg in messages)
- {
- currentMessageInformation = new RhinoQueueCurrentMessageInformation
- {
- AllMessages = messages,
- Message = msg,
- Destination = endpoint,
- MessageId = messageId,
- Source = source,
- TransportMessageId = message.Id.ToString(),
- Queue = queue,
- TransportMessage = message
- };
-
- if (TransportUtil.ProcessSingleMessage(currentMessageInformation, messageRecieved) == false)
- Discard(currentMessageInformation.Message);
- }
- }
- catch (Exception e)
- {
- ex = e;
- logger.Error("Failed to process message", e);
- }
- }
- catch (Exception e)
- {
- ex = e;
- logger.Error("Failed to deserialize message", e);
- }
- finally
- {
- var messageHandlingCompletion = new MessageHandlingCompletion(tx, null, ex, messageCompleted, beforeTransactionCommit, logger,
- MessageProcessingFailure, currentMessageInformation);
- messageHandlingCompletion.HandleMessageCompletion();
- currentMessageInformation = null;
- }
- }
-
- private void Discard(object message)
- {
- logger.DebugFormat("Discarding message {0} ({1}) because there are no consumers for it.",
- message, currentMessageInformation.TransportMessageId);
- Send(new Endpoint { Uri = endpoint.AddSubQueue(SubQueue.Discarded) }, new[] { message });
- }
-
- private object[] DeserializeMessages(Message message)
- {
- try
- {
- return messageSerializer.Deserialize(new MemoryStream(message.Data));
- }
- catch (Exception e)
- {
- try
- {
- logger.Error("Error when serializing message", e);
- var serializationError = MessageSerializationException;
- if (serializationError != null)
- {
- var information = new RhinoQueueCurrentMessageInformation
- {
- Message = message,
- Source = new Uri(message.Headers["from"]),
- MessageId = new Guid(message.Headers["id"])
- };
- serializationError(information, e);
- }
- }
- catch (Exception moduleEx)
- {
- logger.Error("Error when notifying about serialization exception", moduleEx);
- }
- throw;
- }
- }
-
- public Endpoint Endpoint
- {
- get { return endpointRouter.GetRoutedEndpoint(endpoint); }
- }
-
- public int ThreadCount
- {
- get { return threadCount; }
- }
-
- public CurrentMessageInformation CurrentMessageInformation
- {
- get { return currentMessageInformation; }
- }
-
- public void Send(Endpoint destination, object[] msgs)
- {
- SendInternal(msgs, destination, nv => { });
- }
-
- private void SendInternal(object[] msgs, Endpoint destination, Action<NameValueCollection> customizeHeaders)
- {
- var messageId = Guid.NewGuid();
- var payload = messageBuilder.BuildFromMessageBatch(msgs);
+ shouldContinue = true;
+
+ var port = endpoint.Port;
+ if (port == -1)
+ port = 2200;
+ queueManager = new QueueManager(new IPEndPoint(IPAddress.Any, port), path);
+ queueManager.CreateQueues(queueName);
+
+ queue = queueManager.GetQueue(queueName);
+
+ timeout = new TimeoutAction(queue);
+ logger.DebugFormat("Starting {0} threads to handle messages on {1}, number of retries: {2}",
+ threadCount, endpoint, numberOfRetries);
+ for (var i = 0; i < threadCount; i++)
+ {
+ threads[i] = new Thread(ReceiveMessage)
+ {
+ Name = "Rhino Service Bus Worker Thread #" + i,
+ IsBackground = true
+ };
+ threads[i].Start(i);
+ }
+ haveStarted = true;
+ var started = Started;
+ if (started != null)
+ started();
+ }
+
+ private void ReceiveMessage(object context)
+ {
+ while (shouldContinue)
+ {
+ try
+ {
+ queueManager.Peek(queueName);
+ }
+ catch (TimeoutException)
+ {
+ logger.DebugFormat("Could not find a message on {0} during the timeout period",
+ endpoint);
+ continue;
+ }
+ catch (ObjectDisposedException)
+ {
+ logger.DebugFormat("Shutting down the transport for {0} thread {1}", endpoint, context);
+ return;
+ }
+ catch (EsentErrorException e)
+ {
+ // we were shut down, so just return
+ if (e.Error == JET_err.TermInProgress)
+ {
+ logger.DebugFormat("Shutting down the transport for {0} thread {1}", endpoint, context);
+ }
+ else
+ {
+ logger.Error("An error occured while recieving a message, shutting down message processing thread", e);
+ }
+ return;
+ }
+
+ if (shouldContinue == false)
+ return;
+
+ var transactionOptions = GetTransactionOptions();
+ using (var tx = new TransactionScope(TransactionScopeOption.Required, transactionOptions))
+ {
+ Message message;
+ try
+ {
+ message = queueManager.Receive(queueName, TimeSpan.FromSeconds(1));
+ }
+ catch (TimeoutException)
+ {
+ logger.DebugFormat("Could not find a message on {0} during the timeout period",
+ endpoint);
+ continue;
+ }
+ catch (EsentErrorException e)
+ {
+ // we were shut down, so just return
+ if (e.Error == JET_err.TermInProgress)
+ return;
+ logger.Error("An error occured while recieving a message, shutting down message processing thread", e);
+ return;
+ }
+ catch (Exception e)
+ {
+ logger.Error("An error occured while recieving a message, shutting down message processing thread", e);
+ return;
+ }
+
+ try
+ {
+ var msgType = (MessageType)Enum.Parse(typeof(MessageType), message.Headers["type"]);
+ logger.DebugFormat("Starting to handle message {0} of type {1} on {2}",
+ message.Id,
+ msgType,
+ endpoint);
+ switch (msgType)
+ {
+ case MessageType.AdministrativeMessageMarker:
+ ProcessMessage(message, tx,
+ AdministrativeMessageArrived,
+ AdministrativeMessageProcessingCompleted,
+ null);
+ break;
+ case MessageType.ShutDownMessageMarker:
+ //ignoring this one
+ tx.Complete();
+ break;
+ case MessageType.TimeoutMessageMarker:
+ var timeToSend = XmlConvert.ToDateTime(message.Headers["time-to-send"], XmlDateTimeSerializationMode.Utc);
+ if (timeToSend > DateTime.Now)
+ {
+ timeout.Register(message);
+ queue.MoveTo(SubQueue.Timeout.ToString(), message);
+ tx.Complete();
+ }
+ else
+ {
+ ProcessMessage(message, tx,
+ MessageArrived,
+ MessageProcessingCompleted,
+ null);
+ }
+ break;
+ default:
+ ProcessMessage(message, tx,
+ MessageArrived,
+ MessageProcessingCompleted,
+ BeforeMessageTransactionCommit);
+ break;
+ }
+ }
+ catch (Exception exception)
+ {
+ logger.Debug("Could not process message", exception);
+ }
+ }
+
+ }
+ }
+
+ private void ProcessMessage(
+ Message message,
+ TransactionScope tx,
+ Func<CurrentMessageInformation, bool> messageRecieved,
+ Action<CurrentMessageInformation, Exception> messageCompleted,
+ Action<CurrentMessageInformation> beforeTransactionCommit)
+ {
+ Exception ex = null;
+ try
+ {
+ //deserialization errors do not count for module events
+ object[] messages = DeserializeMessages(message);
+ try
+ {
+ var messageId = new Guid(message.Headers["id"]);
+ var source = new Uri(message.Headers["source"]);
+ foreach (var msg in messages)
+ {
+ currentMessageInformation = new RhinoQueueCurrentMessageInformation
+ {
+ AllMessages = messages,
+ Message = msg,
+ Destination = endpoint,
+ MessageId = messageId,
+ Source = source,
+ TransportMessageId = message.Id.ToString(),
+ Queue = queue,
+ TransportMessage = message
+ };
+
+ if (TransportUtil.ProcessSingleMessage(currentMessageInformation, messageRecieved) == false)
+ Discard(currentMessageInformation.Message);
+ }
+ }
+ catch (Exception e)
+ {
+ ex = e;
+ logger.Error("Failed to process message", e);
+ }
+ }
+ catch (Exception e)
+ {
+ ex = e;
+ logger.Error("Failed to deserialize message", e);
+ }
+ finally
+ {
+ var messageHandlingCompletion = new MessageHandlingCompletion(tx, null, ex, messageCompleted, beforeTransactionCommit, logger,
+ MessageProcessingFailure, currentMessageInformation);
+ messageHandlingCompletion.HandleMessageCompletion();
+ currentMessageInformation = null;
+ }
+ }
+
+ private void Discard(object message)
+ {
+ logger.DebugFormat("Discarding message {0} ({1}) because there are no consumers for it.",
+ message, currentMessageInformation.TransportMessageId);
+ Send(new Endpoint { Uri = endpoint.AddSubQueue(SubQueue.Discarded) }, new[] { message });
+ }
+
+ private object[] DeserializeMessages(Message message)
+ {
+ try
+ {
+ return messageSerializer.Deserialize(new MemoryStream(message.Data));
+ }
+ catch (Exception e)
+ {
+ try
+ {
+ logger.Error("Error when serializing message", e);
+ var serializationError = MessageSerializationException;
+ if (serializationError != null)
+ {
+ currentMessageInformation = new RhinoQueueCurrentMessageInformation
+ {
+ Message = message,
+ Source = new Uri(message.Headers["source"]),
+ MessageId = new Guid(message.Headers["id"]),
+ TransportMessageId = message.Id.ToString(),
+ TransportMessage = message,
+ Queue = queue,
+ };
+ serializationError(currentMessageInformation, e);
+ }
+ }
+ catch (Exception moduleEx)
+ {
+ logger.Error("Error when notifying about serialization exception", moduleEx);
+ }
+ throw;
+ }
+ }
+
+ public Endpoint Endpoint
+ {
+ get { return endpointRouter.GetRoutedEndpoint(endpoint); }
+ }
+
+ public int ThreadCount
+ {
+ get { return threadCount; }
+ }
+
+ public CurrentMessageInformation CurrentMessageInformation
+ {
+ get { return currentMessageInformation; }
+ }
+
+ public void Send(Endpoint destination, object[] msgs)
+ {
+ SendInternal(msgs, destination, nv => { });
+ }
+
+ private void SendInternal(object[] msgs, Endpoint destination, Action<NameValueCollection> customizeHeaders)
+ {
+ var messageId = Guid.NewGuid();
+ var payload = messageBuilder.BuildFromMessageBatch(msgs);
logger.DebugFormat("Sending a message with id '{0}' to '{1}'", messageId, destination.Uri);
customizeHeaders(payload.Headers);
var transactionOptions = GetTransactionOptions();
@@ -410,51 +405,51 @@ private void SendInternal(object[] msgs, Endpoint destination, Action<NameValueC
tx.Complete();
}
- var copy = MessageSent;
- if (copy == null)
- return;
-
- copy(new RhinoQueueCurrentMessageInformation
- {
- AllMessages = msgs,
- Source = Endpoint.Uri,
- Destination = destination.Uri,
- MessageId = messageId,
- });
- }
-
- private TransactionOptions GetTransactionOptions()
- {
- return new TransactionOptions
- {
- IsolationLevel = Transaction.Current == null ? queueIsolationLevel : Transaction.Current.IsolationLevel,
- Timeout = TransportUtil.GetTransactionTimeout(),
- };
- }
-
- public void Send(Endpoint endpoint, DateTime processAgainAt, object[] msgs)
- {
- SendInternal(msgs, endpoint,
- nv =>
- {
- nv["time-to-send"] = processAgainAt.ToString("yyyy-MM-ddTHH:mm:ss.fffffff", CultureInfo.InvariantCulture);
- nv["type"] = MessageType.TimeoutMessageMarker.ToString();
- });
- }
-
- public void Reply(params object[] messages)
- {
- Send(new Endpoint { Uri = currentMessageInformation.Source }, messages);
- }
-
- public event Action<CurrentMessageInformation> MessageSent;
- public event Func<CurrentMessageInformation, bool> AdministrativeMessageArrived;
- public event Func<CurrentMessageInformation, bool> MessageArrived;
- public event Action<CurrentMessageInformation, Exception> MessageSerializationException;
- public event Action<CurrentMessageInformation, Exception> MessageProcessingFailure;
- public event Action<CurrentMessageInformation, Exception> MessageProcessingCompleted;
- public event Action<CurrentMessageInformation> BeforeMessageTransactionCommit;
- public event Action<CurrentMessageInformation, Exception> AdministrativeMessageProcessingCompleted;
- public event Action Started;
- }
+ var copy = MessageSent;
+ if (copy == null)
+ return;
+
+ copy(new RhinoQueueCurrentMessageInformation
+ {
+ AllMessages = msgs,
+ Source = Endpoint.Uri,
+ Destination = destination.Uri,
+ MessageId = messageId,
+ });
+ }
+
+ private TransactionOptions GetTransactionOptions()
+ {
+ return new TransactionOptions
+ {
+ IsolationLevel = Transaction.Current == null ? queueIsolationLevel : Transaction.Current.IsolationLevel,
+ Timeout = TransportUtil.GetTransactionTimeout(),
+ };
+ }
+
+ public void Send(Endpoint endpoint, DateTime processAgainAt, object[] msgs)
+ {
+ SendInternal(msgs, endpoint,
+ nv =>
+ {
+ nv["time-to-send"] = processAgainAt.ToString("yyyy-MM-ddTHH:mm:ss.fffffff", CultureInfo.InvariantCulture);
+ nv["type"] = MessageType.TimeoutMessageMarker.ToString();
+ });
+ }
+
+ public void Reply(params object[] messages)
+ {
+ Send(new Endpoint { Uri = currentMessageInformation.Source }, messages);
+ }
+
+ public event Action<CurrentMessageInformation> MessageSent;
+ public event Func<CurrentMessageInformation, bool> AdministrativeMessageArrived;
+ public event Func<CurrentMessageInformation, bool> MessageArrived;
+ public event Action<CurrentMessageInformation, Exception> MessageSerializationException;
+ public event Action<CurrentMessageInformation, Exception> MessageProcessingFailure;
+ public event Action<CurrentMessageInformation, Exception> MessageProcessingCompleted;
+ public event Action<CurrentMessageInformation> BeforeMessageTransactionCommit;
+ public event Action<CurrentMessageInformation, Exception> AdministrativeMessageProcessingCompleted;
+ public event Action Started;
+ }
}

0 comments on commit 2322754

Please sign in to comment.
Something went wrong with that request. Please try again.