Skip to content

Commit

Permalink
Messages now carry information about their creation time
Browse files Browse the repository at this point in the history
  • Loading branch information
Kralizek committed Oct 12, 2015
1 parent e88dd4b commit 21e59c1
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/Core/Configuration/ICommandContextFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public DefaultCommandContextFactory(IClock clock)

public CommandContext<TCommand> CreateContext<TCommand>(CommandMessage<TCommand> message, INybusOptions options) where TCommand : class, ICommand
{
return new CommandContext<TCommand>(message.Command, _clock.Now, message.CorrelationId);
return new CommandContext<TCommand>(message.Command, _clock.Now, message.CorrelationId) {InvokedOn = message.SentOn};
}
}
}
2 changes: 1 addition & 1 deletion src/Core/Configuration/IEventContextFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public DefaultEventContextFactory(IClock clock)

public EventContext<TEvent> CreateContext<TEvent>(EventMessage<TEvent> message, INybusOptions options) where TEvent : class, IEvent
{
return new EventContext<TEvent>(message.Event, _clock.Now, message.CorrelationId);
return new EventContext<TEvent>(message.Event, _clock.Now, message.CorrelationId) {RaisedOn = message.SentOn};
}
}
}
4 changes: 4 additions & 0 deletions src/Interfaces/IBusEngine.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Threading.Tasks;
using Nybus.Utils;

namespace Nybus
{
Expand Down Expand Up @@ -29,9 +30,12 @@ public abstract class Message
protected Message()
{
CorrelationId = Guid.NewGuid();
SentOn = Clock.Default.Now;
}

public Guid CorrelationId { get; set; }

public DateTimeOffset SentOn { get; set; }
}

public class CommandMessage<TCommand> : Message
Expand Down
4 changes: 4 additions & 0 deletions src/Interfaces/ICommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ public CommandContext(TCommand message, DateTimeOffset receivedOn, Guid correlat
}

public TCommand Message { get; private set; }

public DateTimeOffset ReceivedOn { get; private set; }

public Guid CorrelationId { get; set; }

public DateTimeOffset InvokedOn { get; set; }
}
}
2 changes: 2 additions & 0 deletions src/Interfaces/IEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public EventContext(TEvent eventMessage, DateTimeOffset receivedOn, Guid correla

public DateTimeOffset ReceivedOn { get; private set; }

public DateTimeOffset RaisedOn { get; set; }

public TEvent Message { get; private set; }
}
}
20 changes: 18 additions & 2 deletions src/MassTransit/Configuration/IContextManager.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using MassTransit;
using Nybus.Utils;

namespace Nybus.Configuration
{
Expand All @@ -21,33 +22,38 @@ void SetCommandMessageHeaders<TCommand>(CommandMessage<TCommand> message, IPubli
public class RabbitMqContextManager : IContextManager
{
public const string CorrelationIdKey = "nybus:CorrelationId";
public const string MessageSentKey = "nybus:MessageSent";

public EventMessage<TEvent> CreateEventMessage<TEvent>(IConsumeContext<TEvent> context) where TEvent : class, IEvent
{
return new EventMessage<TEvent>
{
CorrelationId = ExtractCorrelationId(context.Headers),
Event = context.Message
Event = context.Message,
SentOn = ExtractMessageSentTime(context.Headers)
};
}

public void SetEventMessageHeaders<TEvent>(EventMessage<TEvent> message, IPublishContext<TEvent> context) where TEvent : class, IEvent
{
PersistCorrelationId(context, message.CorrelationId);
PersistMessageSentTime(context, message.SentOn);
}

public CommandMessage<TCommand> CreateCommandMessage<TCommand>(IConsumeContext<TCommand> context) where TCommand : class, ICommand
{
return new CommandMessage<TCommand>
{
Command = context.Message,
CorrelationId = ExtractCorrelationId(context.Headers)
CorrelationId = ExtractCorrelationId(context.Headers),
SentOn = ExtractMessageSentTime(context.Headers)
};
}

public void SetCommandMessageHeaders<TCommand>(CommandMessage<TCommand> message, IPublishContext<TCommand> context) where TCommand : class, ICommand
{
PersistCorrelationId(context, message.CorrelationId);
PersistMessageSentTime(context, message.SentOn);
}

private Guid ExtractCorrelationId(IMessageHeaders messageHeaders)
Expand All @@ -59,5 +65,15 @@ private void PersistCorrelationId(ISendContext sendContext, Guid correlationId)
{
sendContext.SetHeader(CorrelationIdKey, correlationId.ToString("D"));
}

private DateTimeOffset ExtractMessageSentTime(IMessageHeaders messageHeaders)
{
return DateTimeOffset.Parse(messageHeaders[MessageSentKey]);
}

private void PersistMessageSentTime(ISendContext sendContext, DateTimeOffset sentTime)
{
sendContext.SetHeader(MessageSentKey, sentTime.ToString("O"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class RabbitMqContextManagerTests

private Mock<IPublishContext<TestCommand>> mockCommandPublishContext;

private DateTimeOffset now;

[SetUp]
public void Initialize()
{
Expand All @@ -32,6 +34,8 @@ public void Initialize()
mockCommandConsumeContext = new Mock<IConsumeContext<TestCommand>>();

mockCommandPublishContext = new Mock<IPublishContext<TestCommand>>();

now = fixture.Create<DateTimeOffset>();
}

private RabbitMqContextManager CreateSystemUnderTest()
Expand All @@ -50,6 +54,7 @@ public void CreateEventMessage_retrieves_CorrelationId_from_context()

mockEventConsumeContext.SetupGet(p => p.Message).Returns(body);
mockEventConsumeContext.Setup(p => p.Headers[RabbitMqContextManager.CorrelationIdKey]).Returns(correlationId.ToString());
mockEventConsumeContext.Setup(p => p.Headers[RabbitMqContextManager.MessageSentKey]).Returns(now.ToString("O"));

var message = sut.CreateEventMessage(mockEventConsumeContext.Object);

Expand All @@ -67,6 +72,7 @@ public void CreateEventMessage_retrieves_Event_from_context()

mockEventConsumeContext.SetupGet(p => p.Message).Returns(body);
mockEventConsumeContext.Setup(p => p.Headers[RabbitMqContextManager.CorrelationIdKey]).Returns(correlationId.ToString());
mockEventConsumeContext.Setup(p => p.Headers[RabbitMqContextManager.MessageSentKey]).Returns(now.ToString("O"));

var message = sut.CreateEventMessage(mockEventConsumeContext.Object);

Expand All @@ -83,6 +89,7 @@ public void SetEventMessageHeaders_sets_correlationId_to_context()
sut.SetEventMessageHeaders(message, mockEventPublishContext.Object);

mockEventPublishContext.Verify(p => p.SetHeader(RabbitMqContextManager.CorrelationIdKey, message.CorrelationId.ToString("D")));
mockEventPublishContext.Verify(p => p.SetHeader(RabbitMqContextManager.MessageSentKey, message.SentOn.ToString("O")));
}

[Test]
Expand All @@ -96,10 +103,12 @@ public void CreateCommandMessage_retrieves_CorrelationId_from_context()

mockCommandConsumeContext.SetupGet(p => p.Message).Returns(body);
mockCommandConsumeContext.Setup(p => p.Headers[RabbitMqContextManager.CorrelationIdKey]).Returns(correlationId.ToString());
mockCommandConsumeContext.Setup(p => p.Headers[RabbitMqContextManager.MessageSentKey]).Returns(now.ToString("O"));

var message = sut.CreateCommandMessage(mockCommandConsumeContext.Object);

Assert.That(message.CorrelationId, Is.EqualTo(correlationId));
Assert.That(message.SentOn, Is.EqualTo(now));
}

[Test]
Expand All @@ -113,10 +122,12 @@ public void CreateCommandMessage_retrieves_Command_from_context()

mockCommandConsumeContext.SetupGet(p => p.Message).Returns(body);
mockCommandConsumeContext.Setup(p => p.Headers[RabbitMqContextManager.CorrelationIdKey]).Returns(correlationId.ToString());
mockCommandConsumeContext.Setup(p => p.Headers[RabbitMqContextManager.MessageSentKey]).Returns(now.ToString("O"));

var message = sut.CreateCommandMessage(mockCommandConsumeContext.Object);

Assert.That(message.Command, Is.SameAs(body));
Assert.That(message.SentOn, Is.EqualTo(now));
}

[Test]
Expand All @@ -129,6 +140,7 @@ public void SetCommandMessageHeaders_sets_correlationId_to_context()
sut.SetCommandMessageHeaders(message, mockCommandPublishContext.Object);

mockCommandPublishContext.Verify(p => p.SetHeader(RabbitMqContextManager.CorrelationIdKey, message.CorrelationId.ToString("D")));
mockCommandPublishContext.Verify(p => p.SetHeader(RabbitMqContextManager.MessageSentKey, message.SentOn.ToString("O")));
}


Expand Down

0 comments on commit 21e59c1

Please sign in to comment.