Permalink
Browse files

Fixed issue when unable to deserialize a message causing NullReferenc…

…eException.
  • Loading branch information...
1 parent 8ff525d commit b8bcb0108071c33cd06f8217c85aae272f0a8b47 @CoreyKaylor CoreyKaylor committed Jul 21, 2010
View
1 Rhino.ServiceBus.Tests/Rhino.ServiceBus.Tests.csproj
@@ -144,6 +144,7 @@
<Compile Include="RhinoQueues\PhtSubscriptionStorageFixture.cs" />
<Compile Include="RhinoQueues\UsingRhinoQueuesBus.cs" />
<Compile Include="RhinoQueues\UsingRhinoQueuesTransport.cs" />
+ <Compile Include="RhinoQueues\WhenSerializationErrorOccurs.cs" />
<Compile Include="RhinoQueues\WithDebugging.cs" />
<Compile Include="SagaTests.cs" />
<Compile Include="TestExtensions.cs" />
View
84 Rhino.ServiceBus.Tests/RhinoQueues/WhenSerializationErrorOccurs.cs
@@ -0,0 +1,84 @@
+using System;
+using System.IO;
+using System.Threading;
+using System.Transactions;
+using Castle.MicroKernel;
+using Rhino.ServiceBus.Impl;
+using Rhino.ServiceBus.Internal;
+using Rhino.ServiceBus.MessageModules;
+using Rhino.ServiceBus.RhinoQueues;
+using Rhino.ServiceBus.Serializers;
+using Xunit;
+
+namespace Rhino.ServiceBus.Tests.RhinoQueues
+{
+ public class WhenSerializationErrorOccurs : IDisposable
+ {
+ private readonly RhinoQueuesTransport transport;
+ private readonly ManualResetEvent wait = new ManualResetEvent(false);
+ private readonly IMessageSerializer messageSerializer;
+
+ public int FailedCount;
+
+ public WhenSerializationErrorOccurs()
+ {
+ if (Directory.Exists("test.esent"))
+ Directory.Delete("test.esent", true);
+
+ messageSerializer = new ThrowingSerializer(new XmlMessageSerializer(new DefaultReflection(), new DefaultKernel()));
+ transport = new RhinoQueuesTransport(
+ new Uri("rhino.queues://localhost:23456/q"),
+ new EndpointRouter(),
+ messageSerializer,
+ 1,
+ "test.esent",
+ IsolationLevel.Serializable,
+ 5,
+ new RhinoQueuesMessageBuilder(messageSerializer)
+ );
+ transport.Start();
+ }
+
+ [Fact]
+ public void Will_Retry_Number_Of_Times_Configured()
+ {
+ var count = 0;
+ transport.MessageProcessingFailure += (messageInfo, ex) =>
+ {
+ count++;
+ };
+ transport.Send(transport.Endpoint, new object[] { "test" });
+
+ wait.WaitOne(TimeSpan.FromSeconds(5));
+
+ Assert.Equal(5, count);
+ }
+
+ public void Dispose()
+ {
+ transport.Dispose();
+ }
+ }
+
+ public class ThrowingSerializer : IMessageSerializer
+ {
+ private readonly XmlMessageSerializer serializer;
+
+ public ThrowingSerializer(XmlMessageSerializer serializer)
+ {
+ this.serializer = serializer;
+ }
+
+ public void Serialize(object[] messages, Stream message)
+ {
+ serializer.Serialize(messages, message);
+ }
+
+ public object[] Deserialize(Stream message)
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+
+}
View
56 Rhino.ServiceBus/RhinoQueues/ErrorAction.cs
@@ -30,35 +30,39 @@ public void Init(ITransport transport)
private bool Transport_OnMessageArrived(CurrentMessageInformation information)
{
var info = (RhinoQueueCurrentMessageInformation) information;
- ErrorCounter val = null;
- failureCounts.Read(reader => reader.TryGetValue(info.TransportMessageId, out val));
- if(val == null || val.FailureCount < numberOfRetries)
- return false;
+ ErrorCounter val = null;
+ failureCounts.Read(reader => reader.TryGetValue(info.TransportMessageId, out val));
+ if (val != null)
+ val.AtLeastOneMessageWasReceived = true;
+ return MoveToErrorSubqueueIfReachedMaximumRetry(info, val);
+ }
- var result = false;
- failureCounts.Write(writer =>
- {
- if (writer.TryGetValue(info.TransportMessageId, out val) == false)
- return;
+ private bool MoveToErrorSubqueueIfReachedMaximumRetry(RhinoQueueCurrentMessageInformation info, ErrorCounter errorCounter)
+ {
+ if(errorCounter == null || errorCounter.FailureCount < numberOfRetries)
+ return false;
- info.Queue.MoveTo(SubQueue.Errors.ToString(), info.TransportMessage);
- info.Queue.EnqueueDirectlyTo(SubQueue.Errors.ToString(), new MessagePayload
- {
- Data = val.ExceptionText == null ? null : Encoding.Unicode.GetBytes(val.ExceptionText),
- Headers =
- {
- {"correlation-id", info.TransportMessageId},
- {"retries", val.FailureCount.ToString()}
- }
- });
+ failureCounts.Write(writer =>
+ {
+ if (writer.TryGetValue(info.TransportMessageId, out errorCounter) == false)
+ return;
- result = true;
- });
+ info.Queue.MoveTo(SubQueue.Errors.ToString(), info.TransportMessage);
+ info.Queue.EnqueueDirectlyTo(SubQueue.Errors.ToString(), new MessagePayload
+ {
+ Data = errorCounter.ExceptionText == null ? null : Encoding.Unicode.GetBytes(errorCounter.ExceptionText),
+ Headers =
+ {
+ {"correlation-id", info.TransportMessageId},
+ {"retries", errorCounter.FailureCount.ToString()}
+ }
+ });
- return result;
- }
+ });
+ return true;
+ }
- private void Transport_OnMessageSerializationException(CurrentMessageInformation information, Exception exception)
+ private void Transport_OnMessageSerializationException(CurrentMessageInformation information, Exception exception)
{
var info = (RhinoQueueCurrentMessageInformation) information;
failureCounts.Write(writer => writer.Add(info.TransportMessageId, new ErrorCounter
@@ -94,6 +98,9 @@ private void Transport_OnMessageProcessingFailure(CurrentMessageInformation info
};
writer.Add(information.TransportMessageId, errorCounter);
}
+
+ if (errorCounter.AtLeastOneMessageWasReceived == false)
+ MoveToErrorSubqueueIfReachedMaximumRetry((RhinoQueueCurrentMessageInformation) information, errorCounter);
errorCounter.FailureCount += 1;
});
}
@@ -102,6 +109,7 @@ private class ErrorCounter
{
public string ExceptionText;
public int FailureCount;
+ public bool AtLeastOneMessageWasReceived;
}
}
View
6 Rhino.ServiceBus/RhinoQueues/RhinoQueuesTransport.cs
@@ -289,6 +289,12 @@ private void ReceiveMessage(object context)
Exception ex = null;
try
{
+ currentMessageInformation = new RhinoQueueCurrentMessageInformation
+ {
+ TransportMessageId = message.Id.ToString(),
+ TransportMessage = message,
+ Queue = queue,
+ };
//deserialization errors do not count for module events
object[] messages = DeserializeMessages(message);
try

0 comments on commit b8bcb01

Please sign in to comment.