Skip to content
Browse files

Added capture of transactional state of receive so we don't duplicate…

… error queue messages
  • Loading branch information...
1 parent 695a800 commit 1855ea3b8a4941d0153dbfd3baaa665bd0dd68dd @phatboyg phatboyg committed
View
5 src/MassTransit/Context/IReceiveContext.cs
@@ -101,5 +101,10 @@ void NotifyConsume<T>(IConsumeContext<T> consumeContext, string consumerType, st
IEnumerable<IReceived> Received { get; }
Guid Id { get; }
+
+ /// <summary>
+ /// True if the transport is transactional and will leave the message on the queue if an exception is thrown
+ /// </summary>
+ bool IsTransactional { get; }
}
}
View
26 src/MassTransit/Context/ReceiveContext.cs
@@ -1,12 +1,12 @@
-// Copyright 2007-2011 Chris Patterson, Dru Sellers, Travis Smith, et. al.
+// Copyright 2007-2012 Chris Patterson, Dru Sellers, Travis Smith, et. al.
//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
-// Unless required by applicable law or agreed to in writing, software distributed
+// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
@@ -29,6 +29,7 @@ public class ReceiveContext :
readonly IList<IPublished> _published;
readonly IList<IReceived> _received;
readonly IList<ISent> _sent;
+ readonly bool _transactional;
Stream _bodyStream;
Stopwatch _timer;
IMessageTypeConverter _typeConverter;
@@ -42,10 +43,11 @@ public class ReceiveContext :
_received = new List<IReceived>();
}
- ReceiveContext(Stream bodyStream)
+ ReceiveContext(Stream bodyStream, bool transactional)
: this()
{
_bodyStream = bodyStream;
+ _transactional = transactional;
}
/// <summary>
@@ -139,6 +141,11 @@ public IEnumerable<IReceived> Received
public Guid Id { get; private set; }
+ public bool IsTransactional
+ {
+ get { return _transactional; }
+ }
+
public bool IsContextAvailable(Type messageType)
{
return _typeConverter.Contains(messageType);
@@ -203,11 +210,18 @@ public bool TryGetContext<T>(out IConsumeContext<T> context)
/// which in turn contains both payload and meta-data/out-of-band data.
/// </summary>
/// <param name="bodyStream">Body stream to create receive context from</param>
+ /// <param name="transactional">True if the transport is transactional and will roll back failed messages </param>
/// <returns>The receive context</returns>
[NotNull]
+ public static ReceiveContext FromBodyStream(Stream bodyStream, bool transactional)
+ {
+ return new ReceiveContext(bodyStream, transactional);
+ }
+
+ [NotNull]
public static ReceiveContext FromBodyStream(Stream bodyStream)
{
- return new ReceiveContext(bodyStream);
+ return new ReceiveContext(bodyStream, false);
}
/// <summary>
@@ -217,7 +231,7 @@ public static ReceiveContext FromBodyStream(Stream bodyStream)
[NotNull]
public static ReceiveContext Empty()
{
- return new ReceiveContext(null);
+ return new ReceiveContext(null, false);
}
}
}
View
16 src/MassTransit/EndpointLoggerExtensions.cs
@@ -42,6 +42,22 @@ public static void LogSkipped(this IEndpointAddress sourceAddress, string messag
}
}
+ /// <summary>
+ /// Log that a message was requeued to the transport after an exception occurred
+ /// </summary>
+ /// <param name="sourceAddress"></param>
+ /// <param name="destinationAddress"></param>
+ /// <param name="messageId"></param>
+ /// <param name="description"> </param>
+ public static void LogReQueued(this IEndpointAddress sourceAddress, IEndpointAddress destinationAddress,
+ string messageId, string description)
+ {
+ if (_messages.IsInfoEnabled)
+ {
+ _messages.InfoFormat("RQUE:{0}:{1}:{2}:{3}", sourceAddress, destinationAddress, messageId, description);
+ }
+ }
+
public static void LogReceived(this IEndpointAddress sourceAddress, string messageId, string description)
{
if (_messages.IsDebugEnabled)
View
5 src/MassTransit/Testing/TestDecorators/ReceiveContextTestDecorator.cs
@@ -280,5 +280,10 @@ public Guid Id
{
get { return _context.Id; }
}
+
+ public bool IsTransactional
+ {
+ get { return _context.IsTransactional; }
+ }
}
}
View
14 src/MassTransit/Transports/Endpoint.cs
@@ -288,7 +288,10 @@ public void Receive(Func<IReceiveContext, Action<IReceiveContext>> receiver, Tim
_log.Error("An exception was thrown by a message consumer", ex);
_tracker.IncrementRetryCount(receiveContext.MessageId, ex);
- MoveMessageToErrorTransport(receiveContext);
+ if(!receiveContext.IsTransactional)
+ {
+ SaveMessageToInboundTransport(receiveContext);
+ }
throw;
}
@@ -351,6 +354,15 @@ void MoveMessageToErrorTransport(IReceiveContext context)
Address.LogMoved(_errorTransport.Address, context.MessageId, "");
}
+ void SaveMessageToInboundTransport(IReceiveContext context)
+ {
+ var moveContext = new MoveMessageSendContext(context);
+
+ _transport.Send(moveContext);
+
+ Address.LogReQueued(_transport.Address, context.MessageId, "");
+ }
+
~Endpoint()
{
Dispose(false);
View
2 src/Transports/MassTransit.Transports.Msmq/InboundMsmqTransport.cs
@@ -79,7 +79,7 @@ protected void EnumerateQueue(Func<IReceiveContext, Action<IReceiveContext>> rec
Message peekMessage = enumerator.Current;
using (peekMessage)
{
- IReceiveContext context = ReceiveContext.FromBodyStream(peekMessage.BodyStream);
+ IReceiveContext context = ReceiveContext.FromBodyStream(peekMessage.BodyStream, _address.IsTransactional);
context.SetMessageId(peekMessage.Id);
context.SetInputAddress(_address);
View
2 src/Transports/MassTransit.Transports.RabbitMq/InboundRabbitMqTransport.cs
@@ -72,7 +72,7 @@ public void Receive(Func<IReceiveContext, Action<IReceiveContext>> lookupSinkCha
using (var body = new MemoryStream(result.Body, false))
{
- ReceiveContext context = ReceiveContext.FromBodyStream(body);
+ ReceiveContext context = ReceiveContext.FromBodyStream(body, true);
context.SetMessageId(result.BasicProperties.MessageId ?? result.DeliveryTag.ToString());
result.BasicProperties.MessageId = context.MessageId;
context.SetInputAddress(_address);

0 comments on commit 1855ea3

Please sign in to comment.
Something went wrong with that request. Please try again.