Permalink
Browse files

fixing transport rq

  • Loading branch information...
1 parent e5b97bf commit 2f963258c7395b9abdc6889ee86e6d29929e7257 mnichols committed Jul 5, 2010
@@ -41,7 +41,7 @@ public void SendMessageToRemoteBus()
bus.Start();
var transport = new RhinoQueuesTransport(new Uri("null://nowhere:24689/middle"),
new EndpointRouter(), container.Resolve<IMessageSerializer>(),
- 1, "one_way.esent", IsolationLevel.ReadCommitted, 5, TODO);
+ 1, "one_way.esent", IsolationLevel.ReadCommitted, 5, new RhinoQueuesMessageBuilder(container.Resolve<IMessageSerializer>()));
var oneWay = new RhinoQueuesOneWayBus(new[]
{
new MessageOwner
@@ -4,6 +4,7 @@
using Castle.Core;
using Castle.Core.Configuration;
using Castle.MicroKernel.Registration;
+using Rhino.Queues;
using Rhino.ServiceBus.Impl;
using Rhino.ServiceBus.Internal;
using Rhino.ServiceBus.RhinoQueues;
@@ -44,7 +45,11 @@ public void Configure(AbstractRhinoServiceBusFacility facility, IConfiguration c
queueIsolationLevel = facility.IsolationLevel,
numberOfRetries = facility.NumberOfRetries,
path = Path.Combine(path, name + ".esent")
- })
+ }),
+ Component.For<IMessageBuilder<MessagePayload>>()
+ .ImplementedBy<RhinoQueuesMessageBuilder>()
+ .LifeStyle.Is(LifestyleType.Singleton)
+
);
}
}
@@ -6,6 +6,7 @@
using Castle.Core;
using Castle.MicroKernel.Facilities;
using Castle.MicroKernel.Registration;
+using Rhino.Queues;
using Rhino.ServiceBus.Internal;
using Rhino.ServiceBus.Msmq;
using Rhino.ServiceBus.RhinoQueues;
@@ -32,6 +33,9 @@ protected override void Init()
{
var path = Path.GetFullPath(AppDomain.CurrentDomain.BaseDirectory);
Kernel.Register(
+ Component.For<IMessageBuilder<MessagePayload>>()
+ .ImplementedBy<RhinoQueuesMessageBuilder>()
+ .LifeStyle.Is(LifestyleType.Singleton),
Component.For<ITransport>()
.LifeStyle.Is(LifestyleType.Singleton)
.ImplementedBy(typeof (RhinoQueuesTransport))
@@ -43,12 +47,12 @@ protected override void Init()
numberOfRetries = 5,
path = Path.Combine(path,"one_way.esent")
}),
+
Component.For<IOnewayBus>()
.LifeStyle.Is(LifestyleType.Singleton)
.ImplementedBy<RhinoQueuesOneWayBus>()
.DependsOn(new {messageOwners = messageOwners.ToArray()})
);
-
}
else
{
@@ -7,6 +7,7 @@
namespace Rhino.ServiceBus.RhinoQueues
{
+ [CLSCompliant(false)]
public class RhinoQueuesMessageBuilder : IMessageBuilder<MessagePayload>
{
private readonly IMessageSerializer messageSerializer;
@@ -20,6 +20,7 @@
namespace Rhino.ServiceBus.RhinoQueues
{
+ [CLSCompliant(false)]
public class RhinoQueuesTransport : ITransport
{
private readonly Uri endpoint;
@@ -34,22 +35,30 @@ public class RhinoQueuesTransport : ITransport
private bool haveStarted;
private readonly IsolationLevel queueIsolationLevel;
private readonly int numberOfRetries;
+ private readonly IMessageBuilder<MessagePayload> messageBuilder;
- [ThreadStatic]
+ [ThreadStatic]
private static RhinoQueueCurrentMessageInformation currentMessageInformation;
private readonly ILog logger = LogManager.GetLogger(typeof(RhinoQueuesTransport));
private TimeoutAction timeout;
private IQueue queue;
- private RhinoQueuesMessageBuilder messageBuilder;
private static readonly Uri NullEndpoint = new Uri("null://nowhere:24689/middle") ;
- public RhinoQueuesTransport(Uri endpoint, IEndpointRouter endpointRouter, IMessageSerializer messageSerializer, int threadCount, string path, IsolationLevel queueIsolationLevel, int numberOfRetries, IMessageBuilder<MessagePayload> messageBuilder1)
+ public RhinoQueuesTransport(Uri endpoint,
+ IEndpointRouter endpointRouter,
+ IMessageSerializer messageSerializer,
+ int threadCount,
+ string path,
+ IsolationLevel queueIsolationLevel,
+ int numberOfRetries,
+ IMessageBuilder<MessagePayload> messageBuilder)
{
this.endpoint = endpoint??NullEndpoint;
this.queueIsolationLevel = queueIsolationLevel;
this.numberOfRetries = numberOfRetries;
- this.endpointRouter = endpointRouter;
+ this.messageBuilder = messageBuilder;
+ this.endpointRouter = endpointRouter;
this.messageSerializer = messageSerializer;
this.threadCount = threadCount;
this.path = path;
@@ -61,8 +70,7 @@ public RhinoQueuesTransport(Uri endpoint, IEndpointRouter endpointRouter, IMessa
// This has to be the first subscriber to the transport events
// in order to successfuly handle the errors semantics
new ErrorAction(numberOfRetries).Init(this);
-
- messageBuilder = new RhinoQueuesMessageBuilder(this.messageSerializer);
+ messageBuilder.Initialize(this.Endpoint);
}
public void Dispose()
@@ -386,29 +394,15 @@ public void Send(Endpoint destination, object[] msgs)
private void SendInternal(object[] msgs, Endpoint destination, Action<NameValueCollection> customizeHeaders)
{
var messageId = Guid.NewGuid();
- using (var memoryStream = new MemoryStream())
- {
- messageSerializer.Serialize(msgs, memoryStream);
-
- var payload = new MessagePayload
- {
- Data = memoryStream.ToArray(),
- Headers =
- {
- {"id", messageId.ToString()},
- {"type", GetAppSpecificMarker(msgs).ToString()},
- {"source", Endpoint.Uri.ToString()},
- }
- };
- logger.DebugFormat("Sending a message with id '{0}' to '{1}'", messageId, destination.Uri);
- customizeHeaders(payload.Headers);
- var transactionOptions = GetTransactionOptions();
- using (var tx = new TransactionScope(TransactionScopeOption.Required, transactionOptions))
- {
- queueManager.Send(destination.Uri, payload);
- tx.Complete();
- }
- }
+ var payload = messageBuilder.BuildFromMessageBatch(msgs);
+ logger.DebugFormat("Sending a message with id '{0}' to '{1}'", messageId, destination.Uri);
+ customizeHeaders(payload.Headers);
+ var transactionOptions = GetTransactionOptions();
+ using (var tx = new TransactionScope(TransactionScopeOption.Required, transactionOptions))
+ {
+ queueManager.Send(destination.Uri, payload);
+ tx.Complete();
+ }
var copy = MessageSent;
if (copy == null)
@@ -432,16 +426,6 @@ private TransactionOptions GetTransactionOptions()
};
}
- protected static MessageType GetAppSpecificMarker(object[] msgs)
- {
- var msg = msgs[0];
- if (msg is AdministrativeMessage)
- return MessageType.AdministrativeMessageMarker;
- if (msg is LoadBalancerMessage)
- return MessageType.LoadBalancerMessageMarker;
- return 0;
- }
-
public void Send(Endpoint endpoint, DateTime processAgainAt, object[] msgs)
{
SendInternal(msgs, endpoint,

0 comments on commit 2f96325

Please sign in to comment.