Skip to content

Commit

Permalink
Add more Exception details to failed messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelnoonan committed Feb 13, 2014
1 parent 996ffc3 commit 43c30b9
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,23 @@ internal class CommandMessagePumpsFactory : ICreateComponents
private readonly ICommandBroker _commandBroker;
private readonly DefaultBatchSizeSetting _defaultBatchSize;
private readonly INimbusMessagingFactory _messagingFactory;
private readonly IClock _clock;

private readonly GarbageMan _garbageMan = new GarbageMan();

public CommandMessagePumpsFactory(ILogger logger,
CommandHandlerTypesSetting commandHandlerTypes,
ICommandBroker commandBroker,
DefaultBatchSizeSetting defaultBatchSize,
INimbusMessagingFactory messagingFactory)
INimbusMessagingFactory messagingFactory,
IClock clock)
{
_logger = logger;
_commandHandlerTypes = commandHandlerTypes;
_commandBroker = commandBroker;
_defaultBatchSize = defaultBatchSize;
_messagingFactory = messagingFactory;
_clock = clock;
}

public IEnumerable<IMessagePump> CreateAll()
Expand All @@ -51,7 +54,7 @@ public IEnumerable<IMessagePump> CreateAll()
var dispatcher = new CommandMessageDispatcher(_commandBroker, commandType);
_garbageMan.Add(dispatcher);

var pump = new MessagePump(messageReceiver, dispatcher, _logger, _defaultBatchSize);
var pump = new MessagePump(messageReceiver, dispatcher, _logger, _defaultBatchSize, _clock);
_garbageMan.Add(pump);

yield return pump;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ internal class CompetingEventMessagePumpsFactory : ICreateComponents
private readonly ILogger _logger;
private readonly DefaultBatchSizeSetting _defaultBatchSize;
private readonly INimbusMessagingFactory _messagingFactory;
private readonly IClock _clock;

private readonly GarbageMan _garbageMan = new GarbageMan();

Expand All @@ -25,14 +26,16 @@ internal class CompetingEventMessagePumpsFactory : ICreateComponents
ICompetingEventBroker competingEventBroker,
ILogger logger,
DefaultBatchSizeSetting defaultBatchSize,
INimbusMessagingFactory messagingFactory)
INimbusMessagingFactory messagingFactory,
IClock clock)
{
_applicationName = applicationName;
_competingEventHandlerTypes = competingEventHandlerTypes;
_competingEventBroker = competingEventBroker;
_logger = logger;
_defaultBatchSize = defaultBatchSize;
_messagingFactory = messagingFactory;
_clock = clock;
}

public IEnumerable<IMessagePump> CreateAll()
Expand All @@ -56,7 +59,7 @@ public IEnumerable<IMessagePump> CreateAll()
var dispatcher = new CompetingEventMessageDispatcher(_competingEventBroker, eventType);
_garbageMan.Add(dispatcher);

var pump = new MessagePump(receiver, dispatcher, _logger, _defaultBatchSize);
var pump = new MessagePump(receiver, dispatcher, _logger, _defaultBatchSize, _clock);
_garbageMan.Add(pump);

yield return pump;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ internal class MulticastEventMessagePumpsFactory : ICreateComponents
private readonly ILogger _logger;
private readonly IMulticastEventBroker _multicastEventBroker;
private readonly DefaultBatchSizeSetting _defaultBatchSize;
private readonly IClock _clock;

private readonly GarbageMan _garbageMan = new GarbageMan();

Expand All @@ -28,7 +29,8 @@ internal class MulticastEventMessagePumpsFactory : ICreateComponents
MulticastEventHandlerTypesSetting multicastEventHandlerTypes,
ILogger logger,
IMulticastEventBroker multicastEventBroker,
DefaultBatchSizeSetting defaultBatchSize)
DefaultBatchSizeSetting defaultBatchSize,
IClock clock)
{
_queueManager = queueManager;
_applicationName = applicationName;
Expand All @@ -37,6 +39,7 @@ internal class MulticastEventMessagePumpsFactory : ICreateComponents
_logger = logger;
_multicastEventBroker = multicastEventBroker;
_defaultBatchSize = defaultBatchSize;
_clock = clock;
}

public IEnumerable<IMessagePump> CreateAll()
Expand All @@ -62,7 +65,7 @@ public IEnumerable<IMessagePump> CreateAll()
var dispatcher = new MulticastEventMessageDispatcher(_multicastEventBroker, eventType);
_garbageMan.Add(dispatcher);

var pump = new MessagePump(receiver, dispatcher, _logger, _defaultBatchSize);
var pump = new MessagePump(receiver, dispatcher, _logger, _defaultBatchSize, _clock);
_garbageMan.Add(pump);

yield return pump;
Expand Down
23 changes: 18 additions & 5 deletions src/Nimbus/Infrastructure/ExceptionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Threading;

namespace Nimbus.Infrastructure
{
public static class ExceptionExtensions
{
public static Dictionary<string, object> ExceptionDetailsAsProperties(this Exception exception)
public static Dictionary<string, object> ExceptionDetailsAsProperties(this Exception exception, DateTimeOffset timestamp)
{
if (exception is TargetInvocationException || exception is AggregateException) return ExceptionDetailsAsProperties(exception.InnerException);
if (exception is TargetInvocationException || exception is AggregateException) return ExceptionDetailsAsProperties(exception.InnerException, timestamp);

return new Dictionary<string, object>
{
{MessagePropertyKeys.ExceptionTypeKey, exception.GetType().FullName},
{MessagePropertyKeys.ExceptionMessageKey, exception.Message},
{MessagePropertyKeys.ExceptionStackTraceKey, exception.StackTrace},
{MessagePropertyKeys.ExceptionType, exception.GetType().FullName},
{MessagePropertyKeys.ExceptionMessage, exception.Message},
{MessagePropertyKeys.ExceptionStackTrace, exception.StackTrace},
{MessagePropertyKeys.ExceptionTimestamp, timestamp.ToString()},
{MessagePropertyKeys.ExceptionMachineName, Environment.MachineName},
{MessagePropertyKeys.ExceptionIdentityName, GetCurrentIdentityName()},
};
}

private static string GetCurrentIdentityName()
{
return (Thread.CurrentPrincipal != null &&
Thread.CurrentPrincipal.Identity != null &&
Thread.CurrentPrincipal.Identity.IsAuthenticated)
? Thread.CurrentPrincipal.Identity.Name
: "";
}
}
}
13 changes: 8 additions & 5 deletions src/Nimbus/Infrastructure/MessagePropertyKeys.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
{
public static class MessagePropertyKeys
{
public const string RequestTimeoutInMillisecondsKey = "RequestTimeoutInMilliseconds";
public const string RequestSuccessfulKey = "RequestSuccessful";
public const string ExceptionMessageKey = "ExceptionMessage";
public const string ExceptionTypeKey = "ExceptionType";
public const string ExceptionStackTraceKey = "ExceptionStackTrace";
public const string RequestTimeoutInMilliseconds = "RequestTimeoutInMilliseconds";
public const string RequestSuccessful = "RequestSuccessful";
public const string ExceptionMessage = "ExceptionMessage";
public const string ExceptionType = "ExceptionType";
public const string ExceptionStackTrace = "ExceptionStackTrace";
public const string ExceptionTimestamp = "ExceptionTimestamp";
public const string ExceptionMachineName = "ExceptionMachineName";
public const string ExceptionIdentityName = "ExceptionIdentityName";
public const string MessageType = "MessageType";
}
}
6 changes: 4 additions & 2 deletions src/Nimbus/Infrastructure/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ internal class MessagePump : IMessagePump
private readonly IMessageDispatcher _dispatcher;
private readonly ILogger _logger;
private readonly DefaultBatchSizeSetting _defaultBatchSize;
private readonly IClock _clock;

private Task _internalMessagePump;

public MessagePump(INimbusMessageReceiver receiver, IMessageDispatcher dispatcher, ILogger logger, DefaultBatchSizeSetting defaultBatchSize)
public MessagePump(INimbusMessageReceiver receiver, IMessageDispatcher dispatcher, ILogger logger, DefaultBatchSizeSetting defaultBatchSize, IClock clock)
{
_receiver = receiver;
_dispatcher = dispatcher;
_logger = logger;
_defaultBatchSize = defaultBatchSize;
_clock = clock;
}

public async Task Start()
Expand Down Expand Up @@ -121,7 +123,7 @@ private async Task Dispatch(BrokeredMessage message)
_logger.Error(exception, "Message dispatch failed");

_logger.Debug("Abandoning message {0} from {1}", message, message.ReplyTo);
await message.AbandonAsync(exception.ExceptionDetailsAsProperties());
await message.AbandonAsync(exception.ExceptionDetailsAsProperties(_clock.UtcNow));
_logger.Debug("Abandoned message {0} from {1}", message, message.ReplyTo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ internal class BusMulticastRequestSender : IMulticastRequestSender
ReplyTo = _replyQueueName,
};
message.Properties.Add(MessagePropertyKeys.MessageType, typeof (TRequest).FullName);
message.Properties.Add(MessagePropertyKeys.RequestTimeoutInMillisecondsKey, (int) timeout.TotalMilliseconds);
message.Properties.Add(MessagePropertyKeys.RequestTimeoutInMilliseconds, (int) timeout.TotalMilliseconds);
var expiresAfter = _clock.UtcNow.Add(timeout);
var responseCorrelationWrapper = _requestResponseCorrelator.RecordMulticastRequest<TResponse>(correlationId, expiresAfter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ public async Task Dispatch(BrokeredMessage message)

var busRequest = requestMessage.GetBody(_requestType);

var requestTimeoutInMilliseconds = (int) requestMessage.Properties[MessagePropertyKeys.RequestTimeoutInMillisecondsKey];
var requestTimeoutInMilliseconds = (int) requestMessage.Properties[MessagePropertyKeys.RequestTimeoutInMilliseconds];
var timeout = TimeSpan.FromMilliseconds(requestTimeoutInMilliseconds);

var responses = InvokeGenericHandleMethod(_multicastRequestBroker, busRequest, timeout);
foreach (var response in responses)
{
var responseMessage = new BrokeredMessage(response);
responseMessage.Properties.Add(MessagePropertyKeys.RequestSuccessfulKey, true);
responseMessage.Properties.Add(MessagePropertyKeys.RequestSuccessful, true);
responseMessage.CorrelationId = requestMessage.CorrelationId;
await replyQueueClient.Send(responseMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ internal class MulticastRequestMessagePumpsFactory : ICreateComponents
private readonly IMulticastRequestBroker _multicastRequestBroker;
private readonly DefaultBatchSizeSetting _defaultBatchSize;
private readonly INimbusMessagingFactory _messagingFactory;
private readonly IClock _clock;

private readonly GarbageMan _garbageMan = new GarbageMan();

Expand All @@ -27,7 +28,8 @@ internal class MulticastRequestMessagePumpsFactory : ICreateComponents
IQueueManager queueManager,
IMulticastRequestBroker multicastRequestBroker,
DefaultBatchSizeSetting defaultBatchSize,
INimbusMessagingFactory messagingFactory)
INimbusMessagingFactory messagingFactory,
IClock clock)
{
_logger = logger;
_requestHandlerTypes = requestHandlerTypes;
Expand All @@ -36,6 +38,7 @@ internal class MulticastRequestMessagePumpsFactory : ICreateComponents
_multicastRequestBroker = multicastRequestBroker;
_defaultBatchSize = defaultBatchSize;
_messagingFactory = messagingFactory;
_clock = clock;
}

public IEnumerable<IMessagePump> CreateAll()
Expand All @@ -61,7 +64,7 @@ public IEnumerable<IMessagePump> CreateAll()
var dispatcher = new MulticastRequestMessageDispatcher(_messagingFactory, _multicastRequestBroker, requestType);
_garbageMan.Add(dispatcher);

var pump = new MessagePump(messageReceiver, dispatcher, _logger, _defaultBatchSize);
var pump = new MessagePump(messageReceiver, dispatcher, _logger, _defaultBatchSize, _clock);
_garbageMan.Add(pump);

yield return pump;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ internal class RequestMessageDispatcher : IMessageDispatcher
private readonly INimbusMessagingFactory _messagingFactory;
private readonly Type _messageType;
private readonly IRequestBroker _requestBroker;
private readonly IClock _clock;

public RequestMessageDispatcher(INimbusMessagingFactory messagingFactory, Type messageType, IRequestBroker requestBroker)
public RequestMessageDispatcher(INimbusMessagingFactory messagingFactory, Type messageType, IRequestBroker requestBroker, IClock clock)
{
_messagingFactory = messagingFactory;
_messageType = messageType;
_requestBroker = requestBroker;
_clock = clock;
}

public async Task Dispatch(BrokeredMessage message)
Expand All @@ -34,14 +36,14 @@ public async Task Dispatch(BrokeredMessage message)
{
var response = InvokeGenericHandleMethod(_requestBroker, request);
responseMessage = new BrokeredMessage(response);
responseMessage.Properties.Add(MessagePropertyKeys.RequestSuccessfulKey, true);
responseMessage.Properties.Add(MessagePropertyKeys.RequestSuccessful, true);
responseMessage.Properties.Add(MessagePropertyKeys.MessageType, _messageType.FullName);
}
catch (Exception exc)
{
responseMessage = new BrokeredMessage();
responseMessage.Properties.Add(MessagePropertyKeys.RequestSuccessfulKey, false);
foreach (var prop in exc.ExceptionDetailsAsProperties()) responseMessage.Properties.Add(prop.Key, prop.Value);
responseMessage.Properties.Add(MessagePropertyKeys.RequestSuccessful, false);
foreach (var prop in exc.ExceptionDetailsAsProperties(_clock.UtcNow)) responseMessage.Properties.Add(prop.Key, prop.Value);
}

responseMessage.CorrelationId = message.CorrelationId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ internal class RequestMessagePumpsFactory : ICreateComponents
private readonly DefaultBatchSizeSetting _defaultBatchSize;
private readonly IRequestBroker _requestBroker;
private readonly INimbusMessagingFactory _messagingFactory;
private readonly IClock _clock;

private readonly GarbageMan _garbageMan = new GarbageMan();

Expand All @@ -25,14 +26,16 @@ internal class RequestMessagePumpsFactory : ICreateComponents
IQueueManager queueManager,
DefaultBatchSizeSetting defaultBatchSize,
IRequestBroker requestBroker,
INimbusMessagingFactory messagingFactory)
INimbusMessagingFactory messagingFactory,
IClock clock)
{
_logger = logger;
_requestHandlerTypes = requestHandlerTypes;
_queueManager = queueManager;
_defaultBatchSize = defaultBatchSize;
_requestBroker = requestBroker;
_messagingFactory = messagingFactory;
_clock = clock;
}

public IEnumerable<IMessagePump> CreateAll()
Expand All @@ -53,10 +56,10 @@ public IEnumerable<IMessagePump> CreateAll()
var messageReceiver = new NimbusQueueMessageReceiver(_queueManager, queuePath);
_garbageMan.Add(messageReceiver);

var dispatcher = new RequestMessageDispatcher(_messagingFactory, requestType, _requestBroker);
var dispatcher = new RequestMessageDispatcher(_messagingFactory, requestType, _requestBroker, _clock);
_garbageMan.Add(dispatcher);

var pump = new MessagePump(messageReceiver, dispatcher, _logger, _defaultBatchSize);
var pump = new MessagePump(messageReceiver, dispatcher, _logger, _defaultBatchSize, _clock);
_garbageMan.Add(pump);

yield return pump;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public async Task Dispatch(BrokeredMessage message)
var responseCorrelationWrapper = _requestResponseCorrelator.TryGetWrapper(correlationId);
if (responseCorrelationWrapper == null) return;

var success = (bool)message.Properties[MessagePropertyKeys.RequestSuccessfulKey];
var success = (bool)message.Properties[MessagePropertyKeys.RequestSuccessful];
if (success)
{
var responseType = responseCorrelationWrapper.ResponseType;
Expand All @@ -29,8 +29,8 @@ public async Task Dispatch(BrokeredMessage message)
}
else
{
var exceptionMessage = (string)message.Properties[MessagePropertyKeys.ExceptionMessageKey];
var exceptionStackTrace = (string)message.Properties[MessagePropertyKeys.ExceptionStackTraceKey];
var exceptionMessage = (string)message.Properties[MessagePropertyKeys.ExceptionMessage];
var exceptionStackTrace = (string)message.Properties[MessagePropertyKeys.ExceptionStackTrace];
responseCorrelationWrapper.Throw(exceptionMessage, exceptionStackTrace);
}
}
Expand Down
Loading

0 comments on commit 43c30b9

Please sign in to comment.