Permalink
Browse files

extending rq transport with message builder dependency

  • Loading branch information...
mnichols
mnichols committed Jul 5, 2010
1 parent 54743e6 commit e5b97bf39757c521f8ae7b2753eea28869a25e7a
@@ -28,7 +28,7 @@ public void SendMessageToRemoteBus()
{
bus.Start();
- var oneWay = new OnewayBus(new[]
+ var oneWay = new MsmqOnewayBus(new[]
{
new MessageOwner
{
@@ -41,7 +41,7 @@ public void SendMessageToRemoteBus()
bus.Start();
var transport = new RhinoQueuesTransport(new Uri("null://nowhere:24689/middle"),
new EndpointRouter(), container.Resolve<IMessageSerializer>(),
- 1, "one_way.esent", IsolationLevel.ReadCommitted, 5);
+ 1, "one_way.esent", IsolationLevel.ReadCommitted, 5, TODO);
var oneWay = new RhinoQueuesOneWayBus(new[]
{
new MessageOwner
@@ -31,7 +31,8 @@ public UsingRhinoQueuesTransport()
1,
"test.esent",
IsolationLevel.Serializable,
- 5
+ 5,
+ new RhinoQueuesMessageBuilder(messageSerializer)
);
transport.Start();
}
@@ -1,9 +1,8 @@
-using System.Messaging;
-
-namespace Rhino.ServiceBus
+namespace Rhino.ServiceBus
{
public interface IMessageBuilder<T>
{
T BuildFromMessageBatch(params object[] msgs);
+ void Initialize(Endpoint source);
}
}
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.IO;
+using System.Messaging;
using System.Transactions;
using Castle.Core;
using Castle.MicroKernel.Facilities;
@@ -37,7 +38,7 @@ protected override void Init()
.DependsOn(new
{
threadCount = 1,
- endpoint = new Uri("null://nowhere:24689/middle"),
+// endpoint = new Uri("null://nowhere:24689/middle"),
queueIsolationLevel = IsolationLevel.ReadCommitted,
numberOfRetries = 5,
path = Path.Combine(path,"one_way.esent")
@@ -52,12 +53,12 @@ protected override void Init()
else
{
Kernel.Register(
- Component.For<IMessageBuilder>()
+ Component.For<IMessageBuilder<Message>>()
.LifeStyle.Is(LifestyleType.Singleton)
.ImplementedBy<MsmqMessageBuilder>(),
Component.For<IOnewayBus>()
.LifeStyle.Is(LifestyleType.Singleton)
- .ImplementedBy<OnewayBus>()
+ .ImplementedBy<MsmqOnewayBus>()
.DependsOn(new {messageOwners = messageOwners.ToArray()}));
}
@@ -12,7 +12,7 @@ public class MsmqMessageBuilder : IMessageBuilder<Message>
{
private readonly ILog logger = LogManager.GetLogger(typeof (MsmqMessageBuilder));
private readonly IMessageSerializer messageSerializer;
- private readonly Endpoint endpoint;
+ private Endpoint endpoint;
public MsmqMessageBuilder(IMessageSerializer messageSerializer)
:this(messageSerializer, null)
@@ -62,6 +62,16 @@ public Message BuildFromMessageBatch(params object[] msgs)
return message;
}
+ public void Initialize(Endpoint source)
+ {
+ this.endpoint = source;
+ }
+
+ public void Contextualize(Message message)
+ {
+
+ }
+
protected static int GetAppSpecificMarker(object[] msgs)
{
var msg = msgs[0];
@@ -1,14 +1,14 @@
using System.Messaging;
-using Rhino.ServiceBus.Msmq;
+using Rhino.ServiceBus.Impl;
-namespace Rhino.ServiceBus.Impl
+namespace Rhino.ServiceBus.Msmq
{
- public class OnewayBus : IOnewayBus
+ public class MsmqOnewayBus : IOnewayBus
{
private readonly MessageOwnersSelector messageOwners;
- private readonly IMessageBuilder messageBuilder;
+ private readonly IMessageBuilder<Message> messageBuilder;
- public OnewayBus(MessageOwner[] messageOwners, IMessageBuilder messageBuilder)
+ public MsmqOnewayBus(MessageOwner[] messageOwners, IMessageBuilder<Message> messageBuilder)
{
this.messageOwners = new MessageOwnersSelector(messageOwners, new EndpointRouter());
this.messageBuilder = messageBuilder;
@@ -19,7 +19,7 @@ public void Send(params object[] msgs)
var endpoint = messageOwners.GetEndpointForMessageBatch(msgs);
using(var queue = endpoint.InitalizeQueue())
{
- var message = messageBuilder.BuildFromMessageBatch<Message>(msgs);
+ var message = messageBuilder.BuildFromMessageBatch(msgs);
queue.SendInSingleTransaction(message);
}
}
@@ -198,7 +198,7 @@
<Compile Include="Impl\AbstractRhinoServiceBusFacility.cs" />
<Compile Include="Impl\MessageOwnersConfigReader.cs" />
<Compile Include="Impl\MessageOwnersSelector.cs" />
- <Compile Include="Msmq\OnewayBus.cs" />
+ <Compile Include="Msmq\MsmqOnewayBus.cs" />
<Compile Include="Impl\OnewayRhinoServiceBusFacility.cs" />
<Compile Include="Impl\RijndaelEncryptionService.cs" />
<Compile Include="Internal\ICustomElementSerializer.cs" />
@@ -1,13 +1,65 @@
using System;
+using System.IO;
using Rhino.Queues;
+using Rhino.ServiceBus.Internal;
+using Rhino.ServiceBus.Messages;
+using Rhino.ServiceBus.Transport;
namespace Rhino.ServiceBus.RhinoQueues
{
public class RhinoQueuesMessageBuilder : IMessageBuilder<MessagePayload>
{
+ private readonly IMessageSerializer messageSerializer;
+ private Endpoint endpoint;
+ public RhinoQueuesMessageBuilder(IMessageSerializer messageSerializer)
+ {
+ this.messageSerializer = messageSerializer;
+ }
+
public MessagePayload BuildFromMessageBatch(params object[] msgs)
+ {
+ if (endpoint == null)
+ throw new InvalidOperationException("A source endpoint is required for Rhino Queues transport, did you Initialize me? try providing a null Uri.");
+
+ var messageId = Guid.NewGuid();
+ byte[] data = new byte[0];
+ using (var memoryStream = new MemoryStream())
+ {
+ messageSerializer.Serialize(msgs, memoryStream);
+ data = memoryStream.ToArray();
+
+ }
+
+ return new MessagePayload
+ {
+ Data = data,
+ Headers =
+ {
+ {"id", messageId.ToString()},
+ {"type", GetAppSpecificMarker(msgs).ToString()},
+ {"source", endpoint.Uri.ToString()},
+ }
+ };
+ }
+
+ public void Initialize(Endpoint source)
+ {
+ endpoint = source;
+ }
+
+ public void Contextualize(MessagePayload message)
{
throw new NotImplementedException();
}
+
+ private static MessageType GetAppSpecificMarker(object[] msgs)
+ {
+ var msg = msgs[0];
+ if (msg is AdministrativeMessage)
+ return MessageType.AdministrativeMessageMarker;
+ if (msg is LoadBalancerMessage)
+ return MessageType.LoadBalancerMessageMarker;
+ return 0;
+ }
}
}
@@ -41,18 +41,12 @@ public class RhinoQueuesTransport : ITransport
private readonly ILog logger = LogManager.GetLogger(typeof(RhinoQueuesTransport));
private TimeoutAction timeout;
private IQueue queue;
+ private RhinoQueuesMessageBuilder messageBuilder;
+ private static readonly Uri NullEndpoint = new Uri("null://nowhere:24689/middle") ;
-
- public RhinoQueuesTransport(
- Uri endpoint,
- IEndpointRouter endpointRouter,
- IMessageSerializer messageSerializer,
- int threadCount,
- string path,
- IsolationLevel queueIsolationLevel,
- int numberOfRetries)
+ public RhinoQueuesTransport(Uri endpoint, IEndpointRouter endpointRouter, IMessageSerializer messageSerializer, int threadCount, string path, IsolationLevel queueIsolationLevel, int numberOfRetries, IMessageBuilder<MessagePayload> messageBuilder1)
{
- this.endpoint = endpoint;
+ this.endpoint = endpoint??NullEndpoint;
this.queueIsolationLevel = queueIsolationLevel;
this.numberOfRetries = numberOfRetries;
this.endpointRouter = endpointRouter;
@@ -67,6 +61,8 @@ public class RhinoQueuesTransport : ITransport
// This has to be the first subscriber to the transport events
// in order to successfuly handle the errors semantics
new ErrorAction(numberOfRetries).Init(this);
+
+ messageBuilder = new RhinoQueuesMessageBuilder(this.messageSerializer);
}
public void Dispose()

0 comments on commit e5b97bf

Please sign in to comment.