Skip to content
Browse files

Merge branch 'customize-headers'

  • Loading branch information...
2 parents 822c76a + d3b3a5a commit 7a1acba19832232ea66dbf9b899b8518ee2895c7 mnichols committed Jul 5, 2010
View
8 Rhino.ServiceBus.Tests/Bugs/FIeldProblem_Nick.cs
@@ -39,8 +39,8 @@ public static class SubscriptionTest
t.AdministrativeMessageArrived +=
subscriptionStorage.HandleAdministrativeMessage;
- Message msg = new MessageBuilder
- (serializer).GenerateMsmqMessageFromMessageBatch(new
+ Message msg = new MsmqMessageBuilder
+ (serializer).BuildFromMessageBatch(new
AddInstanceSubscription
{
Endpoint = queueEndpoint.Uri.ToString(),
@@ -51,8 +51,8 @@ public static class SubscriptionTest
wait.WaitOne();
- msg = new MessageBuilder
- (serializer).GenerateMsmqMessageFromMessageBatch(new
+ msg = new MsmqMessageBuilder
+ (serializer).BuildFromMessageBatch(new
RemoveInstanceSubscription
{
Endpoint = queueEndpoint.Uri.ToString(),
View
4 Rhino.ServiceBus.Tests/CanSendMsgsFromOneWayBus.cs
@@ -28,14 +28,14 @@ public void SendMessageToRemoteBus()
{
bus.Start();
- var oneWay = new OnewayBus(new[]
+ var oneWay = new MsmqOnewayBus(new[]
{
new MessageOwner
{
Endpoint = bus.Endpoint.Uri,
Name = "System",
},
- }, new MessageBuilder(container.Resolve<IMessageSerializer>(), null));
+ }, new MsmqMessageBuilder(container.Resolve<IMessageSerializer>()));
oneWay.Send("hello there, one way");
View
9 Rhino.ServiceBus.Tests/CanSendMsgsFromOneWayBusUsingRhinoQueues.cs
@@ -39,17 +39,15 @@ public void SendMessageToRemoteBus()
using (var bus = container.Resolve<IStartableServiceBus>())
{
bus.Start();
- var transport = new RhinoQueuesTransport(new Uri("null://nowhere:24689/middle"),
- new EndpointRouter(), container.Resolve<IMessageSerializer>(),
- 1, "one_way.esent", IsolationLevel.ReadCommitted, 5);
+
var oneWay = new RhinoQueuesOneWayBus(new[]
{
new MessageOwner
{
Endpoint = bus.Endpoint.Uri,
Name = "System",
},
- }, transport);
+ }, container.Resolve<IMessageSerializer>(),new RhinoQueuesMessageBuilder(container.Resolve<IMessageSerializer>()));
oneWay.Send("hello there, one way");
@@ -69,7 +67,8 @@ public void SendMessageToRemoteBusFromConfigDrivenOneWayBus()
using (var c = new WindsorContainer(new XmlInterpreter("OneWayBusRhinoQueues.config")))
{
c.Kernel.AddFacility("one.way.rhino.esb", new OnewayRhinoServiceBusFacility());
- c.Resolve<IOnewayBus>().Send("hello there, one way");
+ var oneway = c.Resolve<IOnewayBus>();
+ oneway.Send("hello there, one way");
StringConsumer.Event.WaitOne();
Assert.Equal("hello there, one way", StringConsumer.Value);
}
View
1 Rhino.ServiceBus.Tests/Rhino.ServiceBus.Tests.csproj
@@ -106,6 +106,7 @@
</Compile>
<Compile Include="BugWithLogTest.cs" />
<Compile Include="BusSubscriptionTests.cs" />
+ <Compile Include="RhinoQueues\CanAddHeadersToMessage.cs" />
<Compile Include="CanRouteMessageToConsumerThroughContainer.cs" />
<Compile Include="CanSendMsgsFromOneWayBus.cs" />
<Compile Include="CanSendMsgsFromOneWayBusUsingRhinoQueues.cs" />
View
70 Rhino.ServiceBus.Tests/RhinoQueues/CanAddHeadersToMessage.cs
@@ -0,0 +1,70 @@
+using System;
+using Castle.MicroKernel.Registration;
+using Castle.Windsor;
+using Rhino.Queues;
+using Rhino.ServiceBus.Impl;
+using Rhino.ServiceBus.RhinoQueues;
+using Xunit;
+
+namespace Rhino.ServiceBus.Tests.RhinoQueues
+{
+ public class CanAddHeadersToMessage:IDisposable
+ {
+ private WindsorContainer container;
+
+ public CanAddHeadersToMessage()
+ {
+ container = new WindsorContainer("RhinoQueues/RhinoQueues.config");
+
+ container.Register(Component.For<IMessageBuilder<MessagePayload>>().ImplementedBy<CustomHeaderBuilder>());
+ container.AddFacility("rhino.esb", new RhinoServiceBusFacility());
+
+
+ }
+
+
+ [Fact]
+ public void it_should_add_custom_header_to_headers_collection()
+ {
+ var builder = container.Resolve<IMessageBuilder<MessagePayload>>();
+ builder.Initialize(new Endpoint {Uri = RhinoQueuesOneWayBus.NullEndpoint});
+ var msg = builder.BuildFromMessageBatch("somemsg");
+ Assert.NotNull(msg);
+ Assert.NotEqual(0,msg.Data.Length);
+ Assert.Equal("mikey",msg.Headers["user-id"]);
+
+ }
+
+ public class CustomHeaderBuilder : IMessageBuilder<MessagePayload>
+ {
+ private IMessageBuilder<MessagePayload> inner;
+
+ public CustomHeaderBuilder(IMessageBuilder<MessagePayload> inner)
+ {
+ this.inner = inner;
+ }
+
+ public MessagePayload BuildFromMessageBatch(params object[] msgs)
+ {
+ var payload = inner.BuildFromMessageBatch(msgs);
+ Contextualize(payload);
+ return payload;
+ }
+
+ public void Initialize(Endpoint source)
+ {
+ inner.Initialize(source);
+ }
+
+ private static void Contextualize(MessagePayload message)
+ {
+ message.Headers.Add("user-id","mikey");
+ }
+ }
+
+ public void Dispose()
+ {
+ container.Dispose();
+ }
+ }
+}
View
3 Rhino.ServiceBus.Tests/RhinoQueues/UsingRhinoQueuesTransport.cs
@@ -31,7 +31,8 @@ public UsingRhinoQueuesTransport()
1,
"test.esent",
IsolationLevel.Serializable,
- 5
+ 5,
+ new RhinoQueuesMessageBuilder(messageSerializer)
);
transport.Start();
}
View
7 Rhino.ServiceBus/Config/RhinoQueuesConfigurationAware.cs
@@ -4,6 +4,7 @@
using Castle.Core;
using Castle.Core.Configuration;
using Castle.MicroKernel.Registration;
+using Rhino.Queues;
using Rhino.ServiceBus.Impl;
using Rhino.ServiceBus.Internal;
using Rhino.ServiceBus.RhinoQueues;
@@ -44,7 +45,11 @@ public void Configure(AbstractRhinoServiceBusFacility facility, IConfiguration c
queueIsolationLevel = facility.IsolationLevel,
numberOfRetries = facility.NumberOfRetries,
path = Path.Combine(path, name + ".esent")
- })
+ }),
+ Component.For<IMessageBuilder<MessagePayload>>()
+ .ImplementedBy<RhinoQueuesMessageBuilder>()
+ .LifeStyle.Is(LifestyleType.Singleton)
+
);
}
}
View
8 Rhino.ServiceBus/IMessageBuilder.cs
@@ -0,0 +1,8 @@
+namespace Rhino.ServiceBus
+{
+ public interface IMessageBuilder<T>
+ {
+ T BuildFromMessageBatch(params object[] msgs);
+ void Initialize(Endpoint source);
+ }
+}
View
31 Rhino.ServiceBus/Impl/OnewayRhinoServiceBusFacility.cs
@@ -1,10 +1,12 @@
-using System;
+using System;
using System.Collections.Generic;
using System.IO;
+using System.Messaging;
using System.Transactions;
using Castle.Core;
using Castle.MicroKernel.Facilities;
using Castle.MicroKernel.Registration;
+using Rhino.Queues;
using Rhino.ServiceBus.Internal;
using Rhino.ServiceBus.Msmq;
using Rhino.ServiceBus.RhinoQueues;
@@ -29,35 +31,28 @@ protected override void Init()
messageOwnersReader.ReadMessageOwners();
if (IsRhinoQueues(messageOwnersReader.EndpointScheme))
{
- var path = Path.GetFullPath(AppDomain.CurrentDomain.BaseDirectory);
Kernel.Register(
- Component.For<ITransport>()
- .LifeStyle.Is(LifestyleType.Singleton)
- .ImplementedBy(typeof (RhinoQueuesTransport))
- .DependsOn(new
- {
- threadCount = 1,
- endpoint = new Uri("null://nowhere:24689/middle"),
- queueIsolationLevel = IsolationLevel.ReadCommitted,
- numberOfRetries = 5,
- path = Path.Combine(path,"one_way.esent")
- }),
+ Component.For<IMessageBuilder<MessagePayload>>()
+ .ImplementedBy<RhinoQueuesMessageBuilder>()
+ .LifeStyle.Is(LifestyleType.Singleton),
Component.For<IOnewayBus>()
.LifeStyle.Is(LifestyleType.Singleton)
.ImplementedBy<RhinoQueuesOneWayBus>()
- .DependsOn(new {messageOwners = messageOwners.ToArray()})
+ .DependsOn(new
+ {
+ messageOwners = messageOwners.ToArray(),
+ })
);
-
}
else
{
Kernel.Register(
- Component.For<IMessageBuilder>()
+ Component.For<IMessageBuilder<Message>>()
.LifeStyle.Is(LifestyleType.Singleton)
- .ImplementedBy<MessageBuilder>(),
+ .ImplementedBy<MsmqMessageBuilder>(),
Component.For<IOnewayBus>()
.LifeStyle.Is(LifestyleType.Singleton)
- .ImplementedBy<OnewayBus>()
+ .ImplementedBy<MsmqOnewayBus>()
.DependsOn(new {messageOwners = messageOwners.ToArray()}));
}
View
7 Rhino.ServiceBus/Msmq/AbstractMsmqListener.cs
@@ -62,7 +62,8 @@ public abstract class AbstractMsmqListener : IDisposable
default:
throw new ArgumentOutOfRangeException("transactional");
}
- builder = new MessageBuilder(this.messageSerializer, Endpoint);
+ messageBuilder = new MsmqMessageBuilder(this.messageSerializer);
+ this.messageBuilder.Initialize(Endpoint);
}
public event Action Started;
@@ -239,7 +240,7 @@ protected static void Raise(Action action)
protected IEndpointRouter endpointRouter;
private readonly bool? transactional;
- private readonly MessageBuilder builder;
+ private readonly MsmqMessageBuilder messageBuilder;
public TransportState TransportState { get; set; }
@@ -273,7 +274,7 @@ protected static void Raise(Action action)
protected Message GenerateMsmqMessageFromMessageBatch(params object[] msgs)
{
- return builder.GenerateMsmqMessageFromMessageBatch(msgs);
+ return messageBuilder.BuildFromMessageBatch(msgs);
}
protected object[] DeserializeMessages(OpenedQueue messageQueue, Message transportMessage, Action<CurrentMessageInformation, Exception> messageSerializationException)
View
9 Rhino.ServiceBus/Msmq/IMessageBuilder.cs
@@ -1,9 +0,0 @@
-using System.Messaging;
-
-namespace Rhino.ServiceBus.Msmq
-{
- public interface IMessageBuilder
- {
- Message GenerateMsmqMessageFromMessageBatch(params object[] msgs);
- }
-}
View
149 Rhino.ServiceBus/Msmq/MessageBuilder.cs → Rhino.ServiceBus/Msmq/MsmqMessageBuilder.cs
@@ -1,75 +1,76 @@
-using System;
-using System.Linq;
-using System.Messaging;
-using System.Runtime.Serialization;
-using log4net;
-using Rhino.ServiceBus.Internal;
-using Rhino.ServiceBus.Messages;
-
-namespace Rhino.ServiceBus.Msmq
-{
- public class MessageBuilder : IMessageBuilder
- {
- private readonly ILog logger = LogManager.GetLogger(typeof (MessageBuilder));
- private readonly IMessageSerializer messageSerializer;
- private readonly Endpoint endpoint;
-
- public MessageBuilder(IMessageSerializer messageSerializer)
- :this(messageSerializer, null)
- {
- }
-
- public MessageBuilder(IMessageSerializer messageSerializer, Endpoint endpoint)
- {
- this.messageSerializer = messageSerializer;
- this.endpoint = endpoint;
- }
-
- public Message GenerateMsmqMessageFromMessageBatch(params object[] msgs)
- {
- var message = new Message();
-
- var isAdmin = msgs.Any(x => x is AdministrativeMessage);
- try
- {
- messageSerializer.Serialize(msgs, message.BodyStream);
- }
- catch (SerializationException ex)
- {
- logger.Error("Error when trying to serialize message.", ex);
- throw;
- }
- message.Priority = isAdmin ? MessagePriority.High : MessagePriority.Normal;
- if (endpoint != null)
- message.ResponseQueue = endpoint.InitalizeQueue().ToResponseQueue();
- else
- message.ResponseQueue = null;
-
- message.Extension = Guid.NewGuid().ToByteArray();
-
- message.AppSpecific = GetAppSpecificMarker(msgs);
-
- message.Label = msgs
- .Where(msg => msg != null)
- .Select(msg =>
- {
- string s = msg.ToString();
- if (s.Length > 249)
- return s.Substring(0, 246) + "...";
- return s;
- })
- .FirstOrDefault();
- return message;
- }
-
- protected static int GetAppSpecificMarker(object[] msgs)
- {
- var msg = msgs[0];
- if (msg is AdministrativeMessage)
- return (int)Transport.MessageType.AdministrativeMessageMarker;
- if (msg is LoadBalancerMessage)
- return (int)Transport.MessageType.LoadBalancerMessageMarker;
- return 0;
- }
- }
+using System;
+using System.Linq;
+using System.Messaging;
+using System.Runtime.Serialization;
+using log4net;
+using Rhino.ServiceBus.Internal;
+using Rhino.ServiceBus.Messages;
+
+namespace Rhino.ServiceBus.Msmq
+{
+ public class MsmqMessageBuilder : IMessageBuilder<Message>
+ {
+ private readonly ILog logger = LogManager.GetLogger(typeof (MsmqMessageBuilder));
+ private readonly IMessageSerializer messageSerializer;
+ private Endpoint endpoint;
+
+
+ public MsmqMessageBuilder(IMessageSerializer messageSerializer)
+ {
+ this.messageSerializer = messageSerializer;
+ }
+
+ public Message BuildFromMessageBatch(params object[] msgs)
+ {
+ var message = new Message();
+
+ var isAdmin = msgs.Any(x => x is AdministrativeMessage);
+ try
+ {
+ messageSerializer.Serialize(msgs, message.BodyStream);
+ }
+ catch (SerializationException ex)
+ {
+ logger.Error("Error when trying to serialize message.", ex);
+ throw;
+ }
+ message.Priority = isAdmin ? MessagePriority.High : MessagePriority.Normal;
+ if (endpoint != null)
+ message.ResponseQueue = endpoint.InitalizeQueue().ToResponseQueue();
+ else
+ message.ResponseQueue = null;
+
+ message.Extension = Guid.NewGuid().ToByteArray();
+
+ message.AppSpecific = GetAppSpecificMarker(msgs);
+
+ message.Label = msgs
+ .Where(msg => msg != null)
+ .Select(msg =>
+ {
+ string s = msg.ToString();
+ if (s.Length > 249)
+ return s.Substring(0, 246) + "...";
+ return s;
+ })
+ .FirstOrDefault();
+ return message;
+ }
+
+ public void Initialize(Endpoint source)
+ {
+ this.endpoint = source;
+ }
+
+
+ protected static int GetAppSpecificMarker(object[] msgs)
+ {
+ var msg = msgs[0];
+ if (msg is AdministrativeMessage)
+ return (int)Transport.MessageType.AdministrativeMessageMarker;
+ if (msg is LoadBalancerMessage)
+ return (int)Transport.MessageType.LoadBalancerMessageMarker;
+ return 0;
+ }
+ }
}
View
51 Rhino.ServiceBus/Impl/OnewayBus.cs → Rhino.ServiceBus/Msmq/MsmqOnewayBus.cs
@@ -1,26 +1,27 @@
-using Rhino.ServiceBus.Msmq;
-
-namespace Rhino.ServiceBus.Impl
-{
- public class OnewayBus : IOnewayBus
- {
- private readonly MessageOwnersSelector messageOwners;
- private readonly IMessageBuilder messageBuilder;
-
- public OnewayBus(MessageOwner[] messageOwners, IMessageBuilder messageBuilder)
- {
- this.messageOwners = new MessageOwnersSelector(messageOwners, new EndpointRouter());
- this.messageBuilder = messageBuilder;
- }
-
- public void Send(params object[] msgs)
- {
- var endpoint = messageOwners.GetEndpointForMessageBatch(msgs);
- using(var queue = endpoint.InitalizeQueue())
- {
- var message = messageBuilder.GenerateMsmqMessageFromMessageBatch(msgs);
- queue.SendInSingleTransaction(message);
- }
- }
- }
+using System.Messaging;
+using Rhino.ServiceBus.Impl;
+
+namespace Rhino.ServiceBus.Msmq
+{
+ public class MsmqOnewayBus : IOnewayBus
+ {
+ private readonly MessageOwnersSelector messageOwners;
+ private readonly IMessageBuilder<Message> messageBuilder;
+
+ public MsmqOnewayBus(MessageOwner[] messageOwners, IMessageBuilder<Message> messageBuilder)
+ {
+ this.messageOwners = new MessageOwnersSelector(messageOwners, new EndpointRouter());
+ this.messageBuilder = messageBuilder;
+ }
+
+ public void Send(params object[] msgs)
+ {
+ var endpoint = messageOwners.GetEndpointForMessageBatch(msgs);
+ using(var queue = endpoint.InitalizeQueue())
+ {
+ var message = messageBuilder.BuildFromMessageBatch(msgs);
+ queue.SendInSingleTransaction(message);
+ }
+ }
+ }
}
View
7 Rhino.ServiceBus/Rhino.ServiceBus.csproj
@@ -118,6 +118,7 @@
<Compile Include="Messages\ReadyToWork.cs" />
<Compile Include="Msmq\AbstractMsmqListener.cs" />
<Compile Include="Msmq\QueueCreationModule.cs" />
+ <Compile Include="RhinoQueues\RhinoQueuesMessageBuilder.cs" />
<Compile Include="RhinoQueues\RhinoQueuesOneWayBus.cs" />
<Compile Include="Transport\SubQueue.cs" />
<Compile Include="Msmq\TransportActions\ShutDownAction.cs" />
@@ -196,7 +197,7 @@
<Compile Include="Impl\AbstractRhinoServiceBusFacility.cs" />
<Compile Include="Impl\MessageOwnersConfigReader.cs" />
<Compile Include="Impl\MessageOwnersSelector.cs" />
- <Compile Include="Impl\OnewayBus.cs" />
+ <Compile Include="Msmq\MsmqOnewayBus.cs" />
<Compile Include="Impl\OnewayRhinoServiceBusFacility.cs" />
<Compile Include="Impl\RijndaelEncryptionService.cs" />
<Compile Include="Internal\ICustomElementSerializer.cs" />
@@ -216,8 +217,8 @@
<Compile Include="Messages\QueryReadyForWorkQueueUri.cs" />
<Compile Include="Messages\Reroute.cs" />
<Compile Include="Msmq\EndpointExtensions.cs" />
- <Compile Include="Msmq\IMessageBuilder.cs" />
- <Compile Include="Msmq\MessageBuilder.cs" />
+ <Compile Include="IMessageBuilder.cs" />
+ <Compile Include="Msmq\MsmqMessageBuilder.cs" />
<Compile Include="Transport\MessageHandlingCompletion.cs" />
<Compile Include="Msmq\OpenedQueue.cs" />
<Compile Include="Msmq\QueueInfo.cs" />
View
63 Rhino.ServiceBus/RhinoQueues/RhinoQueuesMessageBuilder.cs
@@ -0,0 +1,63 @@
+using System;
+using System.IO;
+using Rhino.Queues;
+using Rhino.ServiceBus.Internal;
+using Rhino.ServiceBus.Messages;
+using Rhino.ServiceBus.Transport;
+
+namespace Rhino.ServiceBus.RhinoQueues
+{
+
+ public class RhinoQueuesMessageBuilder : IMessageBuilder<MessagePayload>
+ {
+ private readonly IMessageSerializer messageSerializer;
+ private Endpoint endpoint;
+ public RhinoQueuesMessageBuilder(IMessageSerializer messageSerializer)
+ {
+ this.messageSerializer = messageSerializer;
+ }
+ [CLSCompliant(false)]
+ public MessagePayload BuildFromMessageBatch(params object[] msgs)
+ {
+ if (endpoint == null)
+ throw new InvalidOperationException("A source endpoint is required for Rhino Queues transport, did you Initialize me? try providing a null Uri.");
+
+ var messageId = Guid.NewGuid();
+ byte[] data = new byte[0];
+ using (var memoryStream = new MemoryStream())
+ {
+ messageSerializer.Serialize(msgs, memoryStream);
+ data = memoryStream.ToArray();
+
+ }
+ var payload=new MessagePayload
+ {
+ Data = data,
+ Headers =
+ {
+ {"id", messageId.ToString()},
+ {"type", GetAppSpecificMarker(msgs).ToString()},
+ {"source", endpoint.Uri.ToString()},
+ }
+ };
+ return payload;
+ }
+
+ public void Initialize(Endpoint source)
+ {
+ endpoint = source;
+ }
+
+
+
+ private static MessageType GetAppSpecificMarker(object[] msgs)
+ {
+ var msg = msgs[0];
+ if (msg is AdministrativeMessage)
+ return MessageType.AdministrativeMessageMarker;
+ if (msg is LoadBalancerMessage)
+ return MessageType.LoadBalancerMessageMarker;
+ return 0;
+ }
+ }
+}
View
19 Rhino.ServiceBus/RhinoQueues/RhinoQueuesOneWayBus.cs
@@ -1,23 +1,30 @@
+using System;
+using System.IO;
+using System.Transactions;
+using Rhino.Queues;
using Rhino.ServiceBus.Impl;
using Rhino.ServiceBus.Internal;
namespace Rhino.ServiceBus.RhinoQueues
{
- public class RhinoQueuesOneWayBus : IOnewayBus
+ [CLSCompliant(false)]
+ public class RhinoQueuesOneWayBus : RhinoQueuesTransport,IOnewayBus
{
private MessageOwnersSelector messageOwners;
- private ITransport transport;
+ public static readonly Uri NullEndpoint = new Uri("null://nowhere:24689/middle");
+ public RhinoQueuesOneWayBus(MessageOwner[] messageOwners, IMessageSerializer messageSerializer,IMessageBuilder<MessagePayload> messageBuilder)
+ : base(NullEndpoint, new EndpointRouter(), messageSerializer, 1, Path.Combine(Path.GetFullPath(AppDomain.CurrentDomain.BaseDirectory), "one_way.esent"), IsolationLevel.ReadCommitted,5,messageBuilder)
- public RhinoQueuesOneWayBus(MessageOwner[] messageOwners, ITransport transport)
{
this.messageOwners = new MessageOwnersSelector(messageOwners, new EndpointRouter());
- this.transport = transport;
- this.transport.Start();
+ Start();
}
public void Send(params object[] msgs)
{
- transport.Send(messageOwners.GetEndpointForMessageBatch(msgs), msgs);
+ base.Send(messageOwners.GetEndpointForMessageBatch(msgs), msgs);
}
+
+
}
}
View
72 Rhino.ServiceBus/RhinoQueues/RhinoQueuesTransport.cs
@@ -20,6 +20,7 @@
namespace Rhino.ServiceBus.RhinoQueues
{
+ [CLSCompliant(false)]
public class RhinoQueuesTransport : ITransport
{
private readonly Uri endpoint;
@@ -34,28 +35,30 @@ public class RhinoQueuesTransport : ITransport
private bool haveStarted;
private readonly IsolationLevel queueIsolationLevel;
private readonly int numberOfRetries;
+ private readonly IMessageBuilder<MessagePayload> messageBuilder;
- [ThreadStatic]
+ [ThreadStatic]
private static RhinoQueueCurrentMessageInformation currentMessageInformation;
private readonly ILog logger = LogManager.GetLogger(typeof(RhinoQueuesTransport));
private TimeoutAction timeout;
private IQueue queue;
-
-
- public RhinoQueuesTransport(
- Uri endpoint,
- IEndpointRouter endpointRouter,
- IMessageSerializer messageSerializer,
- int threadCount,
- string path,
- IsolationLevel queueIsolationLevel,
- int numberOfRetries)
+
+
+ public RhinoQueuesTransport(Uri endpoint,
+ IEndpointRouter endpointRouter,
+ IMessageSerializer messageSerializer,
+ int threadCount,
+ string path,
+ IsolationLevel queueIsolationLevel,
+ int numberOfRetries,
+ IMessageBuilder<MessagePayload> messageBuilder)
{
- this.endpoint = endpoint;
+ this.endpoint = endpoint;
this.queueIsolationLevel = queueIsolationLevel;
this.numberOfRetries = numberOfRetries;
- this.endpointRouter = endpointRouter;
+ this.messageBuilder = messageBuilder;
+ this.endpointRouter = endpointRouter;
this.messageSerializer = messageSerializer;
this.threadCount = threadCount;
this.path = path;
@@ -67,6 +70,7 @@ public class RhinoQueuesTransport : ITransport
// This has to be the first subscriber to the transport events
// in order to successfuly handle the errors semantics
new ErrorAction(numberOfRetries).Init(this);
+ messageBuilder.Initialize(this.Endpoint);
}
public void Dispose()
@@ -390,29 +394,15 @@ public void Send(Endpoint destination, object[] msgs)
private void SendInternal(object[] msgs, Endpoint destination, Action<NameValueCollection> customizeHeaders)
{
var messageId = Guid.NewGuid();
- using (var memoryStream = new MemoryStream())
- {
- messageSerializer.Serialize(msgs, memoryStream);
-
- var payload = new MessagePayload
- {
- Data = memoryStream.ToArray(),
- Headers =
- {
- {"id", messageId.ToString()},
- {"type", GetAppSpecificMarker(msgs).ToString()},
- {"source", Endpoint.Uri.ToString()},
- }
- };
- logger.DebugFormat("Sending a message with id '{0}' to '{1}'", messageId, destination.Uri);
- customizeHeaders(payload.Headers);
- var transactionOptions = GetTransactionOptions();
- using (var tx = new TransactionScope(TransactionScopeOption.Required, transactionOptions))
- {
- queueManager.Send(destination.Uri, payload);
- tx.Complete();
- }
- }
+ var payload = messageBuilder.BuildFromMessageBatch(msgs);
+ logger.DebugFormat("Sending a message with id '{0}' to '{1}'", messageId, destination.Uri);
+ customizeHeaders(payload.Headers);
+ var transactionOptions = GetTransactionOptions();
+ using (var tx = new TransactionScope(TransactionScopeOption.Required, transactionOptions))
+ {
+ queueManager.Send(destination.Uri, payload);
+ tx.Complete();
+ }
var copy = MessageSent;
if (copy == null)
@@ -436,16 +426,6 @@ private TransactionOptions GetTransactionOptions()
};
}
- protected static MessageType GetAppSpecificMarker(object[] msgs)
- {
- var msg = msgs[0];
- if (msg is AdministrativeMessage)
- return MessageType.AdministrativeMessageMarker;
- if (msg is LoadBalancerMessage)
- return MessageType.LoadBalancerMessageMarker;
- return 0;
- }
-
public void Send(Endpoint endpoint, DateTime processAgainAt, object[] msgs)
{
SendInternal(msgs, endpoint,

0 comments on commit 7a1acba

Please sign in to comment.
Something went wrong with that request. Please try again.