From 3a45ae9e7e98b1fb73b44ee309aa11935a098d09 Mon Sep 17 00:00:00 2001 From: Ryan Hauert Date: Mon, 30 Jul 2012 09:38:56 -0600 Subject: [PATCH] Refactored to use OutgoingMessageInformation when building messages. Added support for delivery options in Rhino.Queues. --- Rhino.ServiceBus.Spring/SpringBuilder.cs | 8 +- .../Bugs/FIeldProblem_Nick.cs | 25 +- .../CanCustomizeHeadersWithMsmq.cs | 10 +- ...anSendMsgsFromOneWayBusUsingRhinoQueues.cs | 23 +- .../CustomizingMessageConstruction.cs | 277 +++++++++++------- .../RhinoQueues/UsingRhinoQueuesTransport.cs | 6 +- .../RhinoQueues/WhenErrorOccurs.cs | 11 +- .../RhinoQueues/WhenReceivingTimedMessage.cs | 5 +- Rhino.ServiceBus/ICustomizeMessageHeaders.cs | 9 - .../ICustomizeOutgoingMessages.cs | 7 + .../Impl/MessageOwnersSelector.cs | 2 +- Rhino.ServiceBus/Internal/IMessageBuilder.cs | 2 +- .../LoadBalancer/MsmqSecondaryLoadBalancer.cs | 8 +- Rhino.ServiceBus/Msmq/AbstractMsmqListener.cs | 12 +- Rhino.ServiceBus/Msmq/MsmqMessageBuilder.cs | 21 +- Rhino.ServiceBus/Msmq/MsmqOnewayBus.cs | 11 +- Rhino.ServiceBus/Msmq/MsmqTransport.cs | 18 +- .../OutgoingMessageInformation.cs | 25 ++ Rhino.ServiceBus/Rhino.ServiceBus.csproj | 3 +- .../RhinoQueues/RhinoQueuesMessageBuilder.cs | 48 ++- .../RhinoQueues/RhinoQueuesTransport.cs | 8 +- 21 files changed, 339 insertions(+), 200 deletions(-) delete mode 100644 Rhino.ServiceBus/ICustomizeMessageHeaders.cs create mode 100644 Rhino.ServiceBus/ICustomizeOutgoingMessages.cs create mode 100644 Rhino.ServiceBus/OutgoingMessageInformation.cs diff --git a/Rhino.ServiceBus.Spring/SpringBuilder.cs b/Rhino.ServiceBus.Spring/SpringBuilder.cs index 3a873d3..3aeaaaa 100644 --- a/Rhino.ServiceBus.Spring/SpringBuilder.cs +++ b/Rhino.ServiceBus.Spring/SpringBuilder.cs @@ -220,7 +220,9 @@ public void RegisterRhinoQueuesTransport() busConfig.EnablePerformanceCounters, applicationContext.Get>())); - applicationContext.RegisterSingleton>(() => new RhinoQueuesMessageBuilder(applicationContext.Get())); + applicationContext.RegisterSingleton>(() => new RhinoQueuesMessageBuilder( + applicationContext.Get(), + applicationContext.Get())); } public void RegisterRhinoQueuesOneWay() @@ -228,7 +230,9 @@ public void RegisterRhinoQueuesOneWay() var oneWayConfig = (OnewayRhinoServiceBusConfiguration) config; var busConfig = config.ConfigurationSection.Bus; - applicationContext.RegisterSingleton>(() => new RhinoQueuesMessageBuilder(applicationContext.Get())); + applicationContext.RegisterSingleton>(() => new RhinoQueuesMessageBuilder( + applicationContext.Get(), + applicationContext.Get())); applicationContext.RegisterSingleton(() => new RhinoQueuesOneWayBus(oneWayConfig.MessageOwners, applicationContext.Get(), busConfig.QueuePath, busConfig.EnablePerformanceCounters, applicationContext.Get>())); } diff --git a/Rhino.ServiceBus.Tests/Bugs/FIeldProblem_Nick.cs b/Rhino.ServiceBus.Tests/Bugs/FIeldProblem_Nick.cs index 3530e76..b9439ab 100644 --- a/Rhino.ServiceBus.Tests/Bugs/FIeldProblem_Nick.cs +++ b/Rhino.ServiceBus.Tests/Bugs/FIeldProblem_Nick.cs @@ -42,24 +42,29 @@ public static class SubscriptionTest subscriptionStorage.HandleAdministrativeMessage; Message msg = new MsmqMessageBuilder - (serializer, new CastleServiceLocator(new WindsorContainer())).BuildFromMessageBatch(new - AddInstanceSubscription + (serializer, new CastleServiceLocator(new WindsorContainer())).BuildFromMessageBatch( + new OutgoingMessageInformation { - Endpoint = queueEndpoint.Uri.ToString(), - InstanceSubscriptionKey = id, - Type = typeof (TestMessage2).FullName, + Messages = new[] { new AddInstanceSubscription + { + Endpoint = queueEndpoint.Uri.ToString(), + InstanceSubscriptionKey = id, + Type = typeof (TestMessage2).FullName, + }} }); send(msg); wait.WaitOne(); msg = new MsmqMessageBuilder - (serializer, new CastleServiceLocator(new WindsorContainer())).BuildFromMessageBatch(new - RemoveInstanceSubscription + (serializer, new CastleServiceLocator(new WindsorContainer())).BuildFromMessageBatch(new OutgoingMessageInformation { - Endpoint = queueEndpoint.Uri.ToString(), - InstanceSubscriptionKey = id, - Type = typeof (TestMessage2).FullName, + Messages = new[] { new RemoveInstanceSubscription + { + Endpoint = queueEndpoint.Uri.ToString(), + InstanceSubscriptionKey = id, + Type = typeof (TestMessage2).FullName, + }} }); wait.Reset(); diff --git a/Rhino.ServiceBus.Tests/CanCustomizeHeadersWithMsmq.cs b/Rhino.ServiceBus.Tests/CanCustomizeHeadersWithMsmq.cs index d7ab53e..d1a5842 100644 --- a/Rhino.ServiceBus.Tests/CanCustomizeHeadersWithMsmq.cs +++ b/Rhino.ServiceBus.Tests/CanCustomizeHeadersWithMsmq.cs @@ -21,7 +21,7 @@ public void it_should_add_custom_header_to_headers_collection_for_normal_message { using (var container = new WindsorContainer()) { - container.Register(Component.For().ImplementedBy().LifeStyle.Is(LifestyleType.Transient)); + container.Register(Component.For().ImplementedBy().LifeStyle.Is(LifestyleType.Transient)); new RhinoServiceBusConfiguration() .UseCastleWindsor(container) .Configure(); @@ -45,7 +45,7 @@ public void it_should_add_custom_header_to_headers_collection_for_delayed_messag { using (var container = new WindsorContainer(new XmlInterpreter())) { - container.Register(Component.For().ImplementedBy().LifeStyle.Is(LifestyleType.Transient)); + container.Register(Component.For().ImplementedBy().LifeStyle.Is(LifestyleType.Transient)); new RhinoServiceBusConfiguration() .UseCastleWindsor(container) .Configure(); @@ -77,11 +77,11 @@ public void it_should_add_custom_header_to_headers_collection_for_delayed_messag } } - public class AppIdentityCustomizer : ICustomizeMessageHeaders + public class AppIdentityCustomizer : ICustomizeOutgoingMessages { - public void Customize(NameValueCollection headers) + public void Customize(OutgoingMessageInformation messageInformation) { - headers.Add("user-id", "corey"); + messageInformation.Headers.Add("user-id", "corey"); } } } diff --git a/Rhino.ServiceBus.Tests/CanSendMsgsFromOneWayBusUsingRhinoQueues.cs b/Rhino.ServiceBus.Tests/CanSendMsgsFromOneWayBusUsingRhinoQueues.cs index af2e128..22bd183 100644 --- a/Rhino.ServiceBus.Tests/CanSendMsgsFromOneWayBusUsingRhinoQueues.cs +++ b/Rhino.ServiceBus.Tests/CanSendMsgsFromOneWayBusUsingRhinoQueues.cs @@ -51,17 +51,18 @@ public void SendMessageToRemoteBus() bus.Start(); using (var oneWay = new RhinoQueuesOneWayBus( - new[]{ - new MessageOwner - { - Endpoint = bus.Endpoint.Uri, - Name = "System", - }, - }, - container.Resolve(), - Path.Combine(Path.GetFullPath(AppDomain.CurrentDomain.BaseDirectory), "one_way.esent"), - false, - new RhinoQueuesMessageBuilder(container.Resolve()))) + new[]{ + new MessageOwner + { + Endpoint = bus.Endpoint.Uri, + Name = "System", + }, + }, + container.Resolve(), + Path.Combine(Path.GetFullPath(AppDomain.CurrentDomain.BaseDirectory), "one_way.esent"), + false, + new RhinoQueuesMessageBuilder(container.Resolve(), + container.Resolve()))) { oneWay.Send("hello there, one way"); diff --git a/Rhino.ServiceBus.Tests/RhinoQueues/CustomizingMessageConstruction.cs b/Rhino.ServiceBus.Tests/RhinoQueues/CustomizingMessageConstruction.cs index ffcb87b..c1ff682 100644 --- a/Rhino.ServiceBus.Tests/RhinoQueues/CustomizingMessageConstruction.cs +++ b/Rhino.ServiceBus.Tests/RhinoQueues/CustomizingMessageConstruction.cs @@ -1,102 +1,177 @@ -using System; -using System.Collections.Specialized; -using Castle.Core; -using Castle.MicroKernel.Registration; -using Castle.Windsor; -using Rhino.Queues; -using Rhino.ServiceBus.Impl; -using Rhino.ServiceBus.Internal; -using Rhino.ServiceBus.RhinoQueues; -using Xunit; - -namespace Rhino.ServiceBus.Tests.RhinoQueues -{ - public class CustomizingMessageConstruction - { - - [Fact] - public void it_should_add_custom_header_to_headers_collection_using_builder() - { - using( var container = new WindsorContainer()) - { - container.Register(Component.For>().ImplementedBy());//before configuration - new RhinoServiceBusConfiguration() - .UseCastleWindsor(container) - .UseStandaloneConfigurationFile("RhinoQueues/RhinoQueues.config") - .Configure(); - - var builder = container.Resolve>(); - builder.Initialize(new Endpoint { Uri = RhinoQueuesOneWayBus.NullEndpoint }); - var msg = builder.BuildFromMessageBatch("somemsg"); - Assert.NotNull(msg); - Assert.NotEqual(0, msg.Data.Length); - Assert.Equal("mikey", msg.Headers["user-id"]); - } - - } - - [Fact] - public void it_should_add_custom_header_to_headers_collection_using_interface() - { - using (var container = new WindsorContainer()) - { - container.Register(Component.For().ImplementedBy().LifeStyle.Is(LifestyleType.Transient)); - new RhinoServiceBusConfiguration() - .UseCastleWindsor(container) - .UseStandaloneConfigurationFile("RhinoQueues/RhinoQueues.config") - .Configure(); - - var builder = container.Resolve>(); - builder.Initialize(new Endpoint { Uri = RhinoQueuesOneWayBus.NullEndpoint }); - var msg = builder.BuildFromMessageBatch("somemsg"); - Assert.NotNull(msg); - Assert.NotEqual(0, msg.Data.Length); - Assert.Equal("mikey", msg.Headers["user-id"]); - } - - } - - [CLSCompliant(false)] - public class CustomHeaderBuilder : IMessageBuilder - { - private IMessageBuilder inner; - - public CustomHeaderBuilder(IMessageBuilder inner) - { - this.inner = inner; - } - - public event Action MessageBuilt; - - public MessagePayload BuildFromMessageBatch(params object[] msgs) - { - var payload = inner.BuildFromMessageBatch(msgs); - Contextualize(payload); - - if (MessageBuilt != null) - MessageBuilt(payload); - return payload; - } - - public void Initialize(Endpoint source) - { - inner.Initialize(source); - } - - private static void Contextualize(MessagePayload message) - { - message.Headers.Add("user-id","mikey"); - } - } - public class AppIdentityCustomizer : ICustomizeMessageHeaders - { - public void Customize(NameValueCollection headers) - { - headers.Add("user-id","mikey"); - } - } - - } - - +using System; +using Castle.Core; +using Castle.MicroKernel.Registration; +using Castle.Windsor; +using Rhino.Queues; +using Rhino.ServiceBus.Impl; +using Rhino.ServiceBus.Internal; +using Rhino.ServiceBus.RhinoQueues; +using Xunit; + +namespace Rhino.ServiceBus.Tests.RhinoQueues +{ + public class CustomizingMessageConstruction + { + [Fact] + public void CanCustomizeMessageBasedOnDestination() + { + using (var container = new WindsorContainer()) + { + container.Register(Component.For().ImplementedBy()); + new RhinoServiceBusConfiguration() + .UseCastleWindsor(container) + .UseStandaloneConfigurationFile("RhinoQueues/RhinoQueues.config") + .Configure(); + + var builder = container.Resolve>(); + builder.Initialize(new Endpoint { Uri = RhinoQueuesOneWayBus.NullEndpoint }); + var messageInfo = new OutgoingMessageInformation + { + Destination = new Endpoint { Uri = new Uri("null://nowhere/queue?Volatile=true") }, + Messages = new[] { "somemsg" } + }; + var msg = builder.BuildFromMessageBatch(messageInfo); + Assert.NotNull(msg); + Assert.NotEqual(0, msg.Data.Length); + Assert.Equal(2, msg.MaxAttempts); + } + } + + [Fact] + public void CanCustomizeMessageBasedMessageType() + { + using (var container = new WindsorContainer()) + { + container.Register(Component.For().ImplementedBy()); + new RhinoServiceBusConfiguration() + .UseCastleWindsor(container) + .UseStandaloneConfigurationFile("RhinoQueues/RhinoQueues.config") + .Configure(); + + var builder = container.Resolve>(); + builder.Initialize(new Endpoint { Uri = RhinoQueuesOneWayBus.NullEndpoint }); + var messageInfo = new OutgoingMessageInformation { Messages = new[] { new CustomizedMessage() } }; + var msg = builder.BuildFromMessageBatch(messageInfo); + Assert.NotNull(msg); + Assert.NotEqual(0, msg.Data.Length); + Assert.Equal(1, msg.MaxAttempts); + } + } + + [Fact] + public void it_should_add_custom_header_to_headers_collection_using_builder() + { + using (var container = new WindsorContainer()) + { + container.Register(Component.For>().ImplementedBy());//before configuration + new RhinoServiceBusConfiguration() + .UseCastleWindsor(container) + .UseStandaloneConfigurationFile("RhinoQueues/RhinoQueues.config") + .Configure(); + + var builder = container.Resolve>(); + builder.Initialize(new Endpoint { Uri = RhinoQueuesOneWayBus.NullEndpoint }); + var messageInfo = new OutgoingMessageInformation { Messages = new[] { "somemsg" } }; + var msg = builder.BuildFromMessageBatch(messageInfo); + Assert.NotNull(msg); + Assert.NotEqual(0, msg.Data.Length); + Assert.Equal("mikey", msg.Headers["user-id"]); + } + + } + + [Fact] + public void it_should_add_custom_header_to_headers_collection_using_interface() + { + using (var container = new WindsorContainer()) + { + container.Register(Component.For().ImplementedBy().LifeStyle.Is(LifestyleType.Transient)); + new RhinoServiceBusConfiguration() + .UseCastleWindsor(container) + .UseStandaloneConfigurationFile("RhinoQueues/RhinoQueues.config") + .Configure(); + + var builder = container.Resolve>(); + builder.Initialize(new Endpoint { Uri = RhinoQueuesOneWayBus.NullEndpoint }); + var messageInfo = new OutgoingMessageInformation { Messages = new[] { "somemsg" } }; + var msg = builder.BuildFromMessageBatch(messageInfo); + Assert.NotNull(msg); + Assert.NotEqual(0, msg.Data.Length); + Assert.Equal("mikey", msg.Headers["user-id"]); + } + + } + + [CLSCompliant(false)] + public class CustomHeaderBuilder : IMessageBuilder + { + private IMessageBuilder inner; + + public CustomHeaderBuilder(IMessageBuilder inner) + { + this.inner = inner; + } + + public event Action MessageBuilt; + + public MessagePayload BuildFromMessageBatch(OutgoingMessageInformation messageInformation) + { + var payload = inner.BuildFromMessageBatch(messageInformation); + Contextualize(payload); + + if (MessageBuilt != null) + MessageBuilt(payload); + return payload; + } + + public void Initialize(Endpoint source) + { + inner.Initialize(source); + } + + private static void Contextualize(MessagePayload message) + { + message.Headers.Add("user-id", "mikey"); + } + } + + public class AppIdentityCustomizer : ICustomizeOutgoingMessages + { + public void Customize(OutgoingMessageInformation messageInformation) + { + messageInformation.Headers.Add("user-id", "mikey"); + } + } + + public class CustomizeByDestination : ICustomizeOutgoingMessages + { + public void Customize(OutgoingMessageInformation messageInformation) + { + if (messageInformation.Destination != null + && messageInformation.Destination.Uri.Query.Contains("Volatile")) + { + messageInformation.MaxAttempts = 2; + } + } + } + + public class CustomizeByMessageType : ICustomizeOutgoingMessages + { + public void Customize(OutgoingMessageInformation messageInformation) + { + if (messageInformation.Messages[0] is ICustomizeMessageByType) + { + messageInformation.MaxAttempts = 1; + } + } + } + + public interface ICustomizeMessageByType + { + } + + public class CustomizedMessage : ICustomizeMessageByType + { + } + } } \ No newline at end of file diff --git a/Rhino.ServiceBus.Tests/RhinoQueues/UsingRhinoQueuesTransport.cs b/Rhino.ServiceBus.Tests/RhinoQueues/UsingRhinoQueuesTransport.cs index e763409..5934c54 100644 --- a/Rhino.ServiceBus.Tests/RhinoQueues/UsingRhinoQueuesTransport.cs +++ b/Rhino.ServiceBus.Tests/RhinoQueues/UsingRhinoQueuesTransport.cs @@ -25,8 +25,8 @@ public UsingRhinoQueuesTransport() if (Directory.Exists("test.esent")) Directory.Delete("test.esent", true); - messageSerializer = new XmlMessageSerializer(new DefaultReflection(), - new CastleServiceLocator(new WindsorContainer())); + var serviceLocator = new CastleServiceLocator(new WindsorContainer()); + messageSerializer = new XmlMessageSerializer(new DefaultReflection(), serviceLocator); transport = new RhinoQueuesTransport( new Uri("rhino.queues://localhost:23456/q"), new EndpointRouter(), @@ -36,7 +36,7 @@ public UsingRhinoQueuesTransport() IsolationLevel.Serializable, 5, false, - new RhinoQueuesMessageBuilder(messageSerializer) + new RhinoQueuesMessageBuilder(messageSerializer, serviceLocator) ); transport.Start(); } diff --git a/Rhino.ServiceBus.Tests/RhinoQueues/WhenErrorOccurs.cs b/Rhino.ServiceBus.Tests/RhinoQueues/WhenErrorOccurs.cs index 9bd1353..cfa85b8 100644 --- a/Rhino.ServiceBus.Tests/RhinoQueues/WhenErrorOccurs.cs +++ b/Rhino.ServiceBus.Tests/RhinoQueues/WhenErrorOccurs.cs @@ -30,8 +30,9 @@ public WhenErrorOccurs() [Fact] public void Deserialization_Error_Will_Not_Retry() { + var serviceLocator = new CastleServiceLocator(new WindsorContainer()); messageSerializer = new ThrowingSerializer(new XmlMessageSerializer(new DefaultReflection(), - new CastleServiceLocator(new WindsorContainer()))); + serviceLocator)); transport = new RhinoQueuesTransport( new Uri("rhino.queues://localhost:23456/q"), new EndpointRouter(), @@ -41,7 +42,7 @@ public void Deserialization_Error_Will_Not_Retry() IsolationLevel.Serializable, 5, false, - new RhinoQueuesMessageBuilder(messageSerializer) + new RhinoQueuesMessageBuilder(messageSerializer, serviceLocator) ); transport.Start(); var count = 0; @@ -59,8 +60,8 @@ public void Deserialization_Error_Will_Not_Retry() [Fact] public void Arrived_Error_Will_Retry_Number_Of_Times_Configured() { - messageSerializer = new XmlMessageSerializer(new DefaultReflection(), - new CastleServiceLocator(new WindsorContainer())); + var serviceLocator = new CastleServiceLocator(new WindsorContainer()); + messageSerializer = new XmlMessageSerializer(new DefaultReflection(), serviceLocator); transport = new RhinoQueuesTransport( new Uri("rhino.queues://localhost:23456/q"), new EndpointRouter(), @@ -70,7 +71,7 @@ public void Arrived_Error_Will_Retry_Number_Of_Times_Configured() IsolationLevel.Serializable, 5, false, - new RhinoQueuesMessageBuilder(messageSerializer) + new RhinoQueuesMessageBuilder(messageSerializer, serviceLocator) ); transport.Start(); var count = 0; diff --git a/Rhino.ServiceBus.Tests/RhinoQueues/WhenReceivingTimedMessage.cs b/Rhino.ServiceBus.Tests/RhinoQueues/WhenReceivingTimedMessage.cs index ceb1820..2530fb5 100644 --- a/Rhino.ServiceBus.Tests/RhinoQueues/WhenReceivingTimedMessage.cs +++ b/Rhino.ServiceBus.Tests/RhinoQueues/WhenReceivingTimedMessage.cs @@ -22,8 +22,9 @@ public WhenReceivingTimedMessage() if (Directory.Exists("test.esent")) Directory.Delete("test.esent", true); + var serviceLocator = new CastleServiceLocator(new WindsorContainer()); messageSerializer = new XmlMessageSerializer(new DefaultReflection(), - new CastleServiceLocator(new WindsorContainer())); + serviceLocator); transport = new RhinoQueuesTransport( new Uri("rhino.queues://localhost:23456/q"), new EndpointRouter(), @@ -33,7 +34,7 @@ public WhenReceivingTimedMessage() IsolationLevel.Serializable, 5, false, - new RhinoQueuesMessageBuilder(messageSerializer) + new RhinoQueuesMessageBuilder(messageSerializer, serviceLocator) ); transport.Start(); } diff --git a/Rhino.ServiceBus/ICustomizeMessageHeaders.cs b/Rhino.ServiceBus/ICustomizeMessageHeaders.cs deleted file mode 100644 index 66f6295..0000000 --- a/Rhino.ServiceBus/ICustomizeMessageHeaders.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System.Collections.Specialized; - -namespace Rhino.ServiceBus -{ - public interface ICustomizeMessageHeaders - { - void Customize(NameValueCollection headers); - } -} \ No newline at end of file diff --git a/Rhino.ServiceBus/ICustomizeOutgoingMessages.cs b/Rhino.ServiceBus/ICustomizeOutgoingMessages.cs new file mode 100644 index 0000000..8c4063a --- /dev/null +++ b/Rhino.ServiceBus/ICustomizeOutgoingMessages.cs @@ -0,0 +1,7 @@ +namespace Rhino.ServiceBus +{ + public interface ICustomizeOutgoingMessages + { + void Customize(OutgoingMessageInformation messageInformation); + } +} \ No newline at end of file diff --git a/Rhino.ServiceBus/Impl/MessageOwnersSelector.cs b/Rhino.ServiceBus/Impl/MessageOwnersSelector.cs index 380e140..41b710b 100644 --- a/Rhino.ServiceBus/Impl/MessageOwnersSelector.cs +++ b/Rhino.ServiceBus/Impl/MessageOwnersSelector.cs @@ -33,7 +33,7 @@ public Endpoint GetEndpointForMessageBatch(object[] messages) .FirstOrDefault(); if (messageOwner == null) - throw new MessagePublicationException("Could not find no message owner for " + messages[0]); + throw new MessagePublicationException("Could not find a message owner for " + messages[0]); var endpoint = endpointRouter.GetRoutedEndpoint(messageOwner.Endpoint); endpoint.Transactional = messageOwner.Transactional; diff --git a/Rhino.ServiceBus/Internal/IMessageBuilder.cs b/Rhino.ServiceBus/Internal/IMessageBuilder.cs index bf00415..d3d4e2b 100644 --- a/Rhino.ServiceBus/Internal/IMessageBuilder.cs +++ b/Rhino.ServiceBus/Internal/IMessageBuilder.cs @@ -5,7 +5,7 @@ namespace Rhino.ServiceBus.Internal public interface IMessageBuilder { event Action MessageBuilt; - T BuildFromMessageBatch(params object[] msgs); + T BuildFromMessageBatch(OutgoingMessageInformation messageInformation); void Initialize(Endpoint source); } } \ No newline at end of file diff --git a/Rhino.ServiceBus/LoadBalancer/MsmqSecondaryLoadBalancer.cs b/Rhino.ServiceBus/LoadBalancer/MsmqSecondaryLoadBalancer.cs index 3d98b49..d0f2d7c 100644 --- a/Rhino.ServiceBus/LoadBalancer/MsmqSecondaryLoadBalancer.cs +++ b/Rhino.ServiceBus/LoadBalancer/MsmqSecondaryLoadBalancer.cs @@ -89,9 +89,7 @@ private void OnCheckPrimaryHeartbeat(object state) { logger.InfoFormat("Notifying worker {0} that secondary load balancer {1} is accepting work on awating listenerQueue", queueUri, - newEndpoint, - originalEndPoint - ); + newEndpoint); SendToQueue(queueUri, new Reroute @@ -105,9 +103,7 @@ private void OnCheckPrimaryHeartbeat(object state) { logger.InfoFormat("Notifying worker {0} that secondary load balancer {1} is accepting work", queueUri, - Endpoint.Uri, - PrimaryLoadBalancer - ); + Endpoint.Uri); SendToQueue(queueUri, new AcceptingWork diff --git a/Rhino.ServiceBus/Msmq/AbstractMsmqListener.cs b/Rhino.ServiceBus/Msmq/AbstractMsmqListener.cs index ee91c3f..25067bc 100644 --- a/Rhino.ServiceBus/Msmq/AbstractMsmqListener.cs +++ b/Rhino.ServiceBus/Msmq/AbstractMsmqListener.cs @@ -273,7 +273,17 @@ protected static void Raise(Action action) protected Message GenerateMsmqMessageFromMessageBatch(params object[] msgs) { - return messageBuilder.BuildFromMessageBatch(msgs); + var messageInformation = new OutgoingMessageInformation + { + Messages = msgs, + Source = Endpoint + }; + return GenerateMsmqMessageFromMessageBatch(messageInformation); + } + + protected Message GenerateMsmqMessageFromMessageBatch(OutgoingMessageInformation messageInformation) + { + return messageBuilder.BuildFromMessageBatch(messageInformation); } protected object[] DeserializeMessages(OpenedQueue messageQueue, Message transportMessage, Action messageSerializationException) diff --git a/Rhino.ServiceBus/Msmq/MsmqMessageBuilder.cs b/Rhino.ServiceBus/Msmq/MsmqMessageBuilder.cs index f42c9a9..789bc61 100644 --- a/Rhino.ServiceBus/Msmq/MsmqMessageBuilder.cs +++ b/Rhino.ServiceBus/Msmq/MsmqMessageBuilder.cs @@ -14,20 +14,21 @@ public class MsmqMessageBuilder : IMessageBuilder { private readonly ILog logger = LogManager.GetLogger(typeof (MsmqMessageBuilder)); private readonly IMessageSerializer messageSerializer; - private readonly ICustomizeMessageHeaders[] customizeHeaders; + private readonly ICustomizeOutgoingMessages[] customizeHeaders; private Endpoint endpoint; public MsmqMessageBuilder(IMessageSerializer messageSerializer, IServiceLocator serviceLocator) { this.messageSerializer = messageSerializer; - customizeHeaders = serviceLocator.ResolveAll().ToArray(); + customizeHeaders = serviceLocator.ResolveAll().ToArray(); } - public event Action MessageBuilt; - - public Message BuildFromMessageBatch(params object[] msgs) - { + public event Action MessageBuilt; + + public Message BuildFromMessageBatch(OutgoingMessageInformation messageInformation) + { + var msgs = messageInformation.Messages; var message = new Message(); var isAdmin = msgs.Any(x => x is AdministrativeMessage); @@ -51,12 +52,12 @@ public Message BuildFromMessageBatch(params object[] msgs) if (customizeHeaders.Length > 0) { - var headers = new NameValueCollection(); + messageInformation.Headers = new NameValueCollection(); foreach (var customizeHeader in customizeHeaders) { - customizeHeader.Customize(headers); - } - var headerBytes = headers.SerializeHeaders(); + customizeHeader.Customize(messageInformation); + } + var headerBytes = messageInformation.Headers.SerializeHeaders(); //accounts for existing use of Extension for messageId and deferred messages extension = new byte[24 + headerBytes.Length]; Buffer.BlockCopy(messageId, 0, extension, 0, messageId.Length); diff --git a/Rhino.ServiceBus/Msmq/MsmqOnewayBus.cs b/Rhino.ServiceBus/Msmq/MsmqOnewayBus.cs index a64c206..d5c4311 100644 --- a/Rhino.ServiceBus/Msmq/MsmqOnewayBus.cs +++ b/Rhino.ServiceBus/Msmq/MsmqOnewayBus.cs @@ -17,10 +17,15 @@ public MsmqOnewayBus(MessageOwner[] messageOwners, IMessageBuilder mess public void Send(params object[] msgs) { - var endpoint = messageOwners.GetEndpointForMessageBatch(msgs); + var endpoint = messageOwners.GetEndpointForMessageBatch(msgs); + var messageInformation = new OutgoingMessageInformation + { + Destination = endpoint, + Messages = msgs, + }; using(var queue = endpoint.InitalizeQueue()) - { - var message = messageBuilder.BuildFromMessageBatch(msgs); + { + var message = messageBuilder.BuildFromMessageBatch(messageInformation); queue.SendInSingleTransaction(message); } } diff --git a/Rhino.ServiceBus/Msmq/MsmqTransport.cs b/Rhino.ServiceBus/Msmq/MsmqTransport.cs index 1466774..b138670 100644 --- a/Rhino.ServiceBus/Msmq/MsmqTransport.cs +++ b/Rhino.ServiceBus/Msmq/MsmqTransport.cs @@ -111,8 +111,14 @@ public void Send(Endpoint endpoint, DateTime processAgainAt, object[] msgs) { if (HaveStarted == false) throw new InvalidOperationException("Cannot send a message before transport is started"); - - var message = GenerateMsmqMessageFromMessageBatch(msgs); + + var messageInformation = new OutgoingMessageInformation + { + Destination = endpoint, + Messages = msgs, + Source = Endpoint + }; + var message = GenerateMsmqMessageFromMessageBatch(messageInformation); var processAgainBytes = BitConverter.GetBytes(processAgainAt.ToBinary()); if (message.Extension.Length == 16) { @@ -136,7 +142,13 @@ public void Send(Endpoint destination, object[] msgs) if(HaveStarted==false) throw new InvalidOperationException("Cannot send a message before transport is started"); - var message = GenerateMsmqMessageFromMessageBatch(msgs); + var messageInformation = new OutgoingMessageInformation + { + Destination = destination, + Messages = msgs, + Source = Endpoint + }; + var message = GenerateMsmqMessageFromMessageBatch(messageInformation); SendMessageToQueue(message, destination); diff --git a/Rhino.ServiceBus/OutgoingMessageInformation.cs b/Rhino.ServiceBus/OutgoingMessageInformation.cs new file mode 100644 index 0000000..ebcc998 --- /dev/null +++ b/Rhino.ServiceBus/OutgoingMessageInformation.cs @@ -0,0 +1,25 @@ +using System; +using System.Collections.Specialized; + +namespace Rhino.ServiceBus +{ + public class OutgoingMessageInformation + { + public DateTime? DeliverBy { get; set; } + + /// + /// The destination the messages will be sent to. This may be null if the + /// messages are being sent to multiple endpoints. + /// + public Endpoint Destination { get; set; } + + public NameValueCollection Headers { get; set; } + public int? MaxAttempts { get; set; } + public object[] Messages { get; set; } + + /// + /// The current endpoint. This may be null on a one-way bus. + /// + public Endpoint Source { get; set; } + } +} \ No newline at end of file diff --git a/Rhino.ServiceBus/Rhino.ServiceBus.csproj b/Rhino.ServiceBus/Rhino.ServiceBus.csproj index 5226393..4698e55 100644 --- a/Rhino.ServiceBus/Rhino.ServiceBus.csproj +++ b/Rhino.ServiceBus/Rhino.ServiceBus.csproj @@ -175,7 +175,7 @@ - + @@ -202,6 +202,7 @@ + diff --git a/Rhino.ServiceBus/RhinoQueues/RhinoQueuesMessageBuilder.cs b/Rhino.ServiceBus/RhinoQueues/RhinoQueuesMessageBuilder.cs index 6b163fd..ca633fd 100644 --- a/Rhino.ServiceBus/RhinoQueues/RhinoQueuesMessageBuilder.cs +++ b/Rhino.ServiceBus/RhinoQueues/RhinoQueuesMessageBuilder.cs @@ -1,6 +1,6 @@ using System; -using System.Collections.Specialized; -using System.IO; +using System.IO; +using System.Linq; using Rhino.Queues; using Rhino.ServiceBus.Internal; using Rhino.ServiceBus.Messages; @@ -8,20 +8,21 @@ namespace Rhino.ServiceBus.RhinoQueues { - public class RhinoQueuesMessageBuilder : IMessageBuilder { - private readonly IMessageSerializer messageSerializer; - private Endpoint endpoint; - public RhinoQueuesMessageBuilder(IMessageSerializer messageSerializer) + private readonly IMessageSerializer messageSerializer; + private readonly ICustomizeOutgoingMessages[] customizeHeaders; + private Endpoint endpoint; + public RhinoQueuesMessageBuilder(IMessageSerializer messageSerializer, IServiceLocator serviceLocator) { - this.messageSerializer = messageSerializer; + this.messageSerializer = messageSerializer; + customizeHeaders = serviceLocator.ResolveAll().ToArray(); } public event Action MessageBuilt; - [CLSCompliant(false)] - public MessagePayload BuildFromMessageBatch(params object[] msgs) + [CLSCompliant(false)] + public MessagePayload BuildFromMessageBatch(OutgoingMessageInformation messageInformation) { if (endpoint == null) throw new InvalidOperationException("A source endpoint is required for Rhino Queues transport, did you Initialize me? try providing a null Uri."); @@ -30,7 +31,7 @@ public MessagePayload BuildFromMessageBatch(params object[] msgs) byte[] data = new byte[0]; using (var memoryStream = new MemoryStream()) { - messageSerializer.Serialize(msgs, memoryStream); + messageSerializer.Serialize(messageInformation.Messages, memoryStream); data = memoryStream.ToArray(); } @@ -40,12 +41,19 @@ public MessagePayload BuildFromMessageBatch(params object[] msgs) Headers = { {"id", messageId.ToString()}, - {"type", GetAppSpecificMarker(msgs).ToString()}, + {"type", GetAppSpecificMarker(messageInformation.Messages).ToString()}, {"source", endpoint.Uri.ToString()}, } - }; - - TryCustomizeHeaders(payload.Headers); + }; + + messageInformation.Headers = payload.Headers; + foreach (var customizeHeader in customizeHeaders) + { + customizeHeader.Customize(messageInformation); + } + + payload.DeliverBy = messageInformation.DeliverBy; + payload.MaxAttempts = messageInformation.MaxAttempts; var copy = MessageBuilt; if (copy != null) @@ -53,22 +61,12 @@ public MessagePayload BuildFromMessageBatch(params object[] msgs) return payload; } - - private void TryCustomizeHeaders(NameValueCollection headers) - { - if (MessageHeaders == null) - return; - MessageHeaders.Customize(headers); - } - - public ICustomizeMessageHeaders MessageHeaders { get; set; } + public void Initialize(Endpoint source) { endpoint = source; } - - private static MessageType GetAppSpecificMarker(object[] msgs) { var msg = msgs[0]; diff --git a/Rhino.ServiceBus/RhinoQueues/RhinoQueuesTransport.cs b/Rhino.ServiceBus/RhinoQueues/RhinoQueuesTransport.cs index 04cbdd1..8b5ebcf 100644 --- a/Rhino.ServiceBus/RhinoQueues/RhinoQueuesTransport.cs +++ b/Rhino.ServiceBus/RhinoQueues/RhinoQueuesTransport.cs @@ -471,7 +471,13 @@ public void Send(Endpoint destination, object[] msgs) private void SendInternal(object[] msgs, Endpoint destination, Action customizeHeaders) { var messageId = Guid.NewGuid(); - var payload = messageBuilder.BuildFromMessageBatch(msgs); + var messageInformation = new OutgoingMessageInformation + { + Destination = destination, + Messages = msgs, + Source = Endpoint + }; + var payload = messageBuilder.BuildFromMessageBatch(messageInformation); logger.DebugFormat("Sending a message with id '{0}' to '{1}'", messageId, destination.Uri); customizeHeaders(payload.Headers); var transactionOptions = GetTransactionOptions();