Skip to content

Commit

Permalink
MsmqMessageBuilder supports DeliverBy and MaxAttempts (when it is 1).
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanHauert committed Aug 10, 2012
1 parent 3bc5fbc commit 5b2191d
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 93 deletions.
88 changes: 0 additions & 88 deletions Rhino.ServiceBus.Tests/CanCustomizeHeadersWithMsmq.cs

This file was deleted.

185 changes: 185 additions & 0 deletions Rhino.ServiceBus.Tests/CanCustomizeOutgoingMessagesWithMsmq.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
using System;
using System.Messaging;
using System.Threading;
using Castle.Core;
using Castle.MicroKernel.Registration;
using Castle.Windsor;
using Castle.Windsor.Configuration.Interpreters;
using Rhino.ServiceBus.Exceptions;
using Rhino.ServiceBus.Impl;
using Rhino.ServiceBus.Internal;
using Rhino.ServiceBus.Msmq;
using Rhino.ServiceBus.Util;
using Xunit;

namespace Rhino.ServiceBus.Tests
{
public class CanCustomizeOutgoingMessagesWithMsmq : MsmqTestBase
{
[Fact]
public void it_should_add_custom_header_to_headers_collection_for_normal_messages()
{
using (var container = new WindsorContainer())
{
container.Register(Component.For<ICustomizeOutgoingMessages>().ImplementedBy<AppIdentityCustomizer>().LifeStyle.Is(LifestyleType.Transient));
new RhinoServiceBusConfiguration()
.UseCastleWindsor(container)
.Configure();
var builder = container.Resolve<IMessageBuilder<Message>>();
Message afterBuild = null;
builder.MessageBuilt += msg => afterBuild = msg;

using (var bus = container.Resolve<IStartableServiceBus>())
{
bus.Start();
bus.Send(bus.Endpoint, "testmessage");
}
Assert.NotNull(afterBuild);
var headers = afterBuild.Extension.DeserializeHeaders();
Assert.Equal("corey", headers["user-id"]);
}
}

[Fact]
public void it_should_add_custom_header_to_headers_collection_for_delayed_messages()
{
using (var container = new WindsorContainer(new XmlInterpreter()))
{
container.Register(Component.For<ICustomizeOutgoingMessages>().ImplementedBy<AppIdentityCustomizer>().LifeStyle.Is(LifestyleType.Transient));
new RhinoServiceBusConfiguration()
.UseCastleWindsor(container)
.Configure();
var transport = container.Resolve<ITransport>();
MsmqCurrentMessageInformation currentMessageInformation = null;
var waitHandle = new ManualResetEvent(false);
transport.MessageArrived += messageInfo =>
{
currentMessageInformation = (MsmqCurrentMessageInformation) messageInfo;
waitHandle.Set();
return true;
};
var builder = container.Resolve<IMessageBuilder<Message>>();
Message afterBuild = null;
builder.MessageBuilt += msg => afterBuild = msg;

using (var bus = container.Resolve<IStartableServiceBus>())
{
bus.Start();
DateTime beforeSend = DateTime.Now;
bus.DelaySend(bus.Endpoint, DateTime.Now.AddMilliseconds(250), "testmessage");
waitHandle.WaitOne(TimeSpan.FromSeconds(30));
Assert.True((DateTime.Now - beforeSend).TotalMilliseconds >= 250);
}
Assert.NotNull(afterBuild);
var headers = afterBuild.Extension.DeserializeHeaders();
Assert.Equal("corey", headers["user-id"]);
Assert.Equal("corey", currentMessageInformation.Headers["user-id"]);
}
}

[Fact]
public void TimeToReachQueue_is_set_when_DeliverBy_is_specified()
{
using (var container = new WindsorContainer())
{
container.Register(Component.For<ICustomizeOutgoingMessages>()
.ImplementedBy<DeliverByCustomizer>()
.LifeStyle.Is(LifestyleType.Transient));
new RhinoServiceBusConfiguration()
.UseCastleWindsor(container)
.Configure();
var builder = container.Resolve<IMessageBuilder<Message>>();
Message afterBuild = null;
builder.MessageBuilt += msg => afterBuild = msg;

using (var bus = container.Resolve<IStartableServiceBus>())
{
bus.Start();
bus.Send(bus.Endpoint, "testmessage");
}

Assert.NotNull(afterBuild);
Assert.NotEqual(Message.InfiniteTimeout, afterBuild.TimeToReachQueue);
}
}

[Fact]
public void TimeToReachQueue_is_set_when_MaxAttempts_is_one()
{
using (var container = new WindsorContainer())
{
container.Register(Component.For<ICustomizeOutgoingMessages>()
.ImplementedBy<MaxAttemptCustomizer>()
.LifeStyle.Is(LifestyleType.Transient));
new RhinoServiceBusConfiguration()
.UseCastleWindsor(container)
.Configure();
var builder = container.Resolve<IMessageBuilder<Message>>();
Message afterBuild = null;
builder.MessageBuilt += msg => afterBuild = msg;

using (var bus = container.Resolve<IStartableServiceBus>())
{
bus.Start();
bus.Send(bus.Endpoint, "testmessage");
}

Assert.NotNull(afterBuild);
Assert.Equal(TimeSpan.Zero, afterBuild.TimeToReachQueue);
}
}

[Fact]
public void Throws_when_MaxAttempts_is_greater_than_one()
{
using (var container = new WindsorContainer())
{
MaxAttemptCustomizer.MaxAttempts = 2;
container.Register(Component.For<ICustomizeOutgoingMessages>()
.ImplementedBy<MaxAttemptCustomizer>()
.LifeStyle.Is(LifestyleType.Transient));
new RhinoServiceBusConfiguration()
.UseCastleWindsor(container)
.Configure();
var builder = container.Resolve<IMessageBuilder<Message>>();
Message afterBuild = null;
builder.MessageBuilt += msg => afterBuild = msg;

using (var bus = container.Resolve<IStartableServiceBus>())
{
bus.Start();

Assert.Throws<InvalidUsageException>(() =>
{
bus.Send(bus.Endpoint, "testmessage");
});
}
}
}

public class AppIdentityCustomizer : ICustomizeOutgoingMessages
{
public void Customize(OutgoingMessageInformation messageInformation)
{
messageInformation.Headers.Add("user-id", "corey");
}
}

public class DeliverByCustomizer : ICustomizeOutgoingMessages
{
public void Customize(OutgoingMessageInformation messageInformation)
{
messageInformation.DeliverBy = DateTime.Now.AddMinutes(1);
}
}

public class MaxAttemptCustomizer : ICustomizeOutgoingMessages
{
public static int MaxAttempts = 1;
public void Customize(OutgoingMessageInformation messageInformation)
{
messageInformation.MaxAttempts = MaxAttempts;
}
}
}
}
2 changes: 1 addition & 1 deletion Rhino.ServiceBus.Tests/Rhino.ServiceBus.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@
</Compile>
<Compile Include="BugWithLogTest.cs" />
<Compile Include="BusSubscriptionTests.cs" />
<Compile Include="CanCustomizeHeadersWithMsmq.cs" />
<Compile Include="CanCustomizeOutgoingMessagesWithMsmq.cs" />
<Compile Include="Containers\Autofac\Can_host_in_another_app_domain.cs" />
<Compile Include="Containers\Autofac\ContainerTests.cs" />
<Compile Include="Containers\Autofac\QueueCreationTests.cs" />
Expand Down
23 changes: 19 additions & 4 deletions Rhino.ServiceBus/Msmq/MsmqMessageBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
using System.Linq;
using System.Messaging;
using System.Runtime.Serialization;
using Common.Logging;
using Common.Logging;
using Rhino.ServiceBus.Exceptions;
using Rhino.ServiceBus.Internal;
using Rhino.ServiceBus.Messages;
using Rhino.ServiceBus.Util;
Expand All @@ -29,8 +30,8 @@ public MsmqMessageBuilder(IMessageSerializer messageSerializer, IServiceLocator
public Message BuildFromMessageBatch(OutgoingMessageInformation messageInformation)
{
var msgs = messageInformation.Messages;
var message = new Message();

var message = new Message();

var isAdmin = msgs.Any(x => x is AdministrativeMessage);
try
{
Expand Down Expand Up @@ -65,7 +66,21 @@ public Message BuildFromMessageBatch(OutgoingMessageInformation messageInformati
}
else
{
extension = messageId;
extension = messageId;
}

if (messageInformation.DeliverBy != null)
{
var timeFromNow = messageInformation.DeliverBy.Value - DateTime.Now;
message.TimeToReachQueue = timeFromNow > TimeSpan.Zero ? timeFromNow : TimeSpan.Zero;
}

if (messageInformation.MaxAttempts != null)
{
if (messageInformation.MaxAttempts != 1)
throw new InvalidUsageException("MSMQ does not support a maximum number of attempts other than 1");

message.TimeToReachQueue = TimeSpan.Zero;
}

message.Extension = extension;
Expand Down

0 comments on commit 5b2191d

Please sign in to comment.