Skip to content

Commit

Permalink
MassTransitBusEngine now leverages MassTransit headers to store Messa…
Browse files Browse the repository at this point in the history
…ge properties
  • Loading branch information
Kralizek committed Sep 15, 2015
1 parent e107fc4 commit b250c07
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 49 deletions.
63 changes: 63 additions & 0 deletions src/MassTransit/Configuration/IContextManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System;
using MassTransit;

namespace Nybus.Configuration
{
public interface IContextManager
{
EventMessage<TEvent> CreateEventMessage<TEvent>(IConsumeContext<TEvent> context)
where TEvent : class, IEvent;

void SetEventMessageHeaders<TEvent>(EventMessage<TEvent> message, IPublishContext<TEvent> context)
where TEvent : class, IEvent;

CommandMessage<TCommand> CreateCommandMessage<TCommand>(IConsumeContext<TCommand> context)
where TCommand : class, ICommand;

void SetCommandMessageHeaders<TCommand>(CommandMessage<TCommand> message, IPublishContext<TCommand> context)
where TCommand : class, ICommand;
}

public class RabbitMqContextManager : IContextManager
{
public const string CorrelationIdKey = "nybus:CorrelationId";

public EventMessage<TEvent> CreateEventMessage<TEvent>(IConsumeContext<TEvent> context) where TEvent : class, IEvent
{
return new EventMessage<TEvent>
{
CorrelationId = ExtractCorrelationId(context.Headers),
Event = context.Message
};
}

public void SetEventMessageHeaders<TEvent>(EventMessage<TEvent> message, IPublishContext<TEvent> context) where TEvent : class, IEvent
{
PersistCorrelationId(context, message.CorrelationId);
}

public CommandMessage<TCommand> CreateCommandMessage<TCommand>(IConsumeContext<TCommand> context) where TCommand : class, ICommand
{
return new CommandMessage<TCommand>
{
Command = context.Message,
CorrelationId = ExtractCorrelationId(context.Headers)
};
}

public void SetCommandMessageHeaders<TCommand>(CommandMessage<TCommand> message, IPublishContext<TCommand> context) where TCommand : class, ICommand
{
PersistCorrelationId(context, message.CorrelationId);
}

private Guid ExtractCorrelationId(IMessageHeaders messageHeaders)
{
return Guid.Parse(messageHeaders[CorrelationIdKey]);
}

private void PersistCorrelationId(ISendContext sendContext, Guid correlationId)
{
sendContext.SetHeader(CorrelationIdKey, correlationId.ToString("D"));
}
}
}
1 change: 1 addition & 0 deletions src/MassTransit/MassTransit.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
</ItemGroup>
<ItemGroup>
<Compile Include="Configuration\IErrorStrategy.cs" />
<Compile Include="Configuration\IContextManager.cs" />
<Compile Include="Configuration\IQueueStrategy.cs" />
<Compile Include="Configuration\IServiceBusFactory.cs" />
<Compile Include="MassTransit\MassTransitBusEngine.cs" />
Expand Down
52 changes: 25 additions & 27 deletions src/MassTransit/MassTransit/MassTransitBusEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,22 @@ private void EnsureBusIsRunning()
}
}


private Task SendMessage<TMessage>(IServiceBus bus, TMessage message) where TMessage : Message
{
EnsureBusIsRunning();
bus.Publish(message);
return Task.CompletedTask;
}

public Task SendCommand<TCommand>(CommandMessage<TCommand> message) where TCommand : class, ICommand
{
//EnsureBusIsRunning();

//IServiceBus selectedServiceBus = _serviceBusses[0];
EnsureBusIsRunning();

//if (_serviceBusses.Count > 1)
//{
// selectedServiceBus = _serviceBusses[1];
//}
CommandServiceBus.Publish(message.Command, pc => _options.ContextManager.SetCommandMessageHeaders(message, pc));

return SendMessage(CommandServiceBus, message);
return Task.CompletedTask;
}

public Task SendEvent<TEvent>(EventMessage<TEvent> message) where TEvent : class, IEvent
{
return SendMessage(EventServiceBus, message);
EnsureBusIsRunning();

EventServiceBus.Publish(message.Event, pc => _options.ContextManager.SetEventMessageHeaders(message, pc));

return Task.CompletedTask;
}

#region SubscribeToCommand
Expand All @@ -68,19 +59,23 @@ private void EnsureBusIsRunning()

public void SubscribeToCommand<TCommand>(CommandReceived<TCommand> commandReceived) where TCommand : class, ICommand
{
_commandSubscriptions.Add(configurator => configurator.Handler<CommandMessage<TCommand>>((ctx, message) => HandleCommand(commandReceived, ctx).WaitAndUnwrapException()));
_commandSubscriptions.Add(configurator => configurator.Handler<TCommand>((ctx, body) => HandleCommand(commandReceived, ctx).WaitAndUnwrapException()));
}

private Task HandleCommand<TCommand>(CommandReceived<TCommand> commandHandler, IConsumeContext<CommandMessage<TCommand>> context) where TCommand : class, ICommand
private Task HandleCommand<TCommand>(CommandReceived<TCommand> commandHandler, IConsumeContext<TCommand> context)
where TCommand : class, ICommand
{
_options.Logger.Log(LogLevel.Trace, "Received command", new { commandType = typeof(TCommand).FullName, correlationId = context.Message.CorrelationId, retryCount = context.RetryCount });
CommandMessage<TCommand> message = _options.ContextManager.CreateCommandMessage(context);

_options.Logger.Log(LogLevel.Trace, "Received command", new { commandType = typeof(TCommand).FullName, correlationId = message.CorrelationId, retryCount = context.RetryCount });

try
{
return commandHandler?.Invoke(context.Message);
return commandHandler?.Invoke(message);
}
catch (Exception ex)
{
_options.Logger.Log(LogLevel.Error, "Error while processing a command", new { commandType = typeof(TCommand).FullName, correlationId = context.Message.CorrelationId, retryCount = context.RetryCount, exception = ex, command = context.Message.Command });
_options.Logger.Log(LogLevel.Error, "Error while processing a command", new { commandType = typeof(TCommand).FullName, correlationId = message.CorrelationId, retryCount = context.RetryCount, exception = ex, command = context.Message });
_options.CommandErrorStrategy.HandleError(context, ex);
}

Expand All @@ -95,19 +90,22 @@ private void EnsureBusIsRunning()

public void SubscribeToEvent<TEvent>(EventReceived<TEvent> eventReceived) where TEvent : class, IEvent
{
_eventSubscriptions.Add(configurator => configurator.Handler<EventMessage<TEvent>>((ctx, message) => HandleEvent(eventReceived, ctx).WaitAndUnwrapException()));
_eventSubscriptions.Add(configurator => configurator.Handler<TEvent>((ctx, body) => HandleEvent(eventReceived, ctx).WaitAndUnwrapException()));
}

private Task HandleEvent<TEvent>(EventReceived<TEvent> eventHandler, IConsumeContext<EventMessage<TEvent>> context) where TEvent : class, IEvent
private Task HandleEvent<TEvent>(EventReceived<TEvent> eventHandler, IConsumeContext<TEvent> context) where TEvent : class, IEvent
{
_options.Logger.Log(LogLevel.Trace, "Received event", new { eventType = typeof(TEvent).FullName, correlationId = context.Message.CorrelationId, retryCount = context.RetryCount});
EventMessage<TEvent> message = _options.ContextManager.CreateEventMessage(context);

_options.Logger.Log(LogLevel.Trace, "Received event", new { eventType = typeof(TEvent).FullName, correlationId = message.CorrelationId, retryCount = context.RetryCount });

try
{
return eventHandler?.Invoke(context.Message);
return eventHandler?.Invoke(message);
}
catch (Exception ex)
{
_options.Logger.Log(LogLevel.Error, "Error while processing an event", new { eventType = typeof(TEvent).FullName, correlationId = context.Message.CorrelationId, retryCount = context.RetryCount , exception = ex, @event = context.Message.Event });
_options.Logger.Log(LogLevel.Error, "Error while processing an event", new { eventType = typeof(TEvent).FullName, correlationId = message.CorrelationId, retryCount = context.RetryCount, exception = ex, @event = context.Message });
_options.EventErrorStrategy.HandleError(context, ex);
}

Expand Down
2 changes: 2 additions & 0 deletions src/MassTransit/MassTransit/MassTransitOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public class MassTransitOptions

public IServiceBusFactory ServiceBusFactory { get; set; } = new RabbitMqServiceBusFactory();

public IContextManager ContextManager { get; set; } = new RabbitMqContextManager();

public ILogger Logger { get; set; } = new NopLogger();
}
}
136 changes: 136 additions & 0 deletions tests/Tests.MassTransit/Configuration/RabbitMqContextManagerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
using System;
using MassTransit;
using Moq;
using NUnit.Framework;
using Nybus;
using Nybus.Configuration;
using Ploeh.AutoFixture;

namespace Tests.Configuration
{
public class RabbitMqContextManagerTests
{
private IFixture fixture;

private Mock<IConsumeContext<TestEvent>> mockEventConsumeContext;

private Mock<IPublishContext<TestEvent>> mockEventPublishContext;

private Mock<IConsumeContext<TestCommand>> mockCommandConsumeContext;

private Mock<IPublishContext<TestCommand>> mockCommandPublishContext;

[SetUp]
public void Initialize()
{
fixture = new Fixture();

mockEventConsumeContext = new Mock<IConsumeContext<TestEvent>>();

mockEventPublishContext = new Mock<IPublishContext<TestEvent>>();

mockCommandConsumeContext = new Mock<IConsumeContext<TestCommand>>();

mockCommandPublishContext = new Mock<IPublishContext<TestCommand>>();
}

private RabbitMqContextManager CreateSystemUnderTest()
{
return new RabbitMqContextManager();
}

[Test]
public void CreateEventMessage_retrieves_CorrelationId_from_context()
{
var sut = CreateSystemUnderTest();

var body = fixture.Create<TestEvent>();

var correlationId = fixture.Create<Guid>();

mockEventConsumeContext.SetupGet(p => p.Message).Returns(body);
mockEventConsumeContext.Setup(p => p.Headers[RabbitMqContextManager.CorrelationIdKey]).Returns(correlationId.ToString());

var message = sut.CreateEventMessage(mockEventConsumeContext.Object);

Assert.That(message.CorrelationId, Is.EqualTo(correlationId));
}

[Test]
public void CreateEventMessage_retrieves_Event_from_context()
{
var sut = CreateSystemUnderTest();

TestEvent body = fixture.Create<TestEvent>();

Guid correlationId = fixture.Create<Guid>();

mockEventConsumeContext.SetupGet(p => p.Message).Returns(body);
mockEventConsumeContext.Setup(p => p.Headers[RabbitMqContextManager.CorrelationIdKey]).Returns(correlationId.ToString());

var message = sut.CreateEventMessage(mockEventConsumeContext.Object);

Assert.That(message.Event, Is.SameAs(body));
}

[Test]
public void SetEventMessageHeaders_sets_correlationId_to_context()
{
var sut = CreateSystemUnderTest();

var message = fixture.Create<EventMessage<TestEvent>>();

sut.SetEventMessageHeaders(message, mockEventPublishContext.Object);

mockEventPublishContext.Verify(p => p.SetHeader(RabbitMqContextManager.CorrelationIdKey, message.CorrelationId.ToString("D")));
}

[Test]
public void CreateCommandMessage_retrieves_CorrelationId_from_context()
{
var sut = CreateSystemUnderTest();

var body = fixture.Create<TestCommand>();

var correlationId = fixture.Create<Guid>();

mockCommandConsumeContext.SetupGet(p => p.Message).Returns(body);
mockCommandConsumeContext.Setup(p => p.Headers[RabbitMqContextManager.CorrelationIdKey]).Returns(correlationId.ToString());

var message = sut.CreateCommandMessage(mockCommandConsumeContext.Object);

Assert.That(message.CorrelationId, Is.EqualTo(correlationId));
}

[Test]
public void CreateCommandMessage_retrieves_Command_from_context()
{
var sut = CreateSystemUnderTest();

var body = fixture.Create<TestCommand>();

var correlationId = fixture.Create<Guid>();

mockCommandConsumeContext.SetupGet(p => p.Message).Returns(body);
mockCommandConsumeContext.Setup(p => p.Headers[RabbitMqContextManager.CorrelationIdKey]).Returns(correlationId.ToString());

var message = sut.CreateCommandMessage(mockCommandConsumeContext.Object);

Assert.That(message.Command, Is.SameAs(body));
}

[Test]
public void SetCommandMessageHeaders_sets_correlationId_to_context()
{
var sut = CreateSystemUnderTest();

var message = fixture.Create<CommandMessage<TestCommand>>();

sut.SetCommandMessageHeaders(message, mockCommandPublishContext.Object);

mockCommandPublishContext.Verify(p => p.SetHeader(RabbitMqContextManager.CorrelationIdKey, message.CorrelationId.ToString("D")));
}


}
}
Loading

0 comments on commit b250c07

Please sign in to comment.