Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

changed oneway bus in rq to BE a transport

  • Loading branch information...
commit 7b906b1631110d462a8e1054eb05329194de9157 1 parent 2f96325
mnichols authored
9 Rhino.ServiceBus.Tests/CanSendMsgsFromOneWayBusUsingRhinoQueues.cs
View
@@ -39,9 +39,7 @@ public void SendMessageToRemoteBus()
using (var bus = container.Resolve<IStartableServiceBus>())
{
bus.Start();
- var transport = new RhinoQueuesTransport(new Uri("null://nowhere:24689/middle"),
- new EndpointRouter(), container.Resolve<IMessageSerializer>(),
- 1, "one_way.esent", IsolationLevel.ReadCommitted, 5, new RhinoQueuesMessageBuilder(container.Resolve<IMessageSerializer>()));
+
var oneWay = new RhinoQueuesOneWayBus(new[]
{
new MessageOwner
@@ -49,7 +47,7 @@ public void SendMessageToRemoteBus()
Endpoint = bus.Endpoint.Uri,
Name = "System",
},
- }, transport);
+ }, container.Resolve<IMessageSerializer>(),new RhinoQueuesMessageBuilder(container.Resolve<IMessageSerializer>()));
oneWay.Send("hello there, one way");
@@ -69,7 +67,8 @@ public void SendMessageToRemoteBusFromConfigDrivenOneWayBus()
using (var c = new WindsorContainer(new XmlInterpreter("OneWayBusRhinoQueues.config")))
{
c.Kernel.AddFacility("one.way.rhino.esb", new OnewayRhinoServiceBusFacility());
- c.Resolve<IOnewayBus>().Send("hello there, one way");
+ var oneway = c.Resolve<IOnewayBus>();
+ oneway.Send("hello there, one way");
StringConsumer.Event.WaitOne();
Assert.Equal("hello there, one way", StringConsumer.Value);
}
18 Rhino.ServiceBus/Impl/OnewayRhinoServiceBusFacility.cs
View
@@ -31,27 +31,17 @@ protected override void Init()
messageOwnersReader.ReadMessageOwners();
if (IsRhinoQueues(messageOwnersReader.EndpointScheme))
{
- var path = Path.GetFullPath(AppDomain.CurrentDomain.BaseDirectory);
Kernel.Register(
Component.For<IMessageBuilder<MessagePayload>>()
.ImplementedBy<RhinoQueuesMessageBuilder>()
.LifeStyle.Is(LifestyleType.Singleton),
- Component.For<ITransport>()
- .LifeStyle.Is(LifestyleType.Singleton)
- .ImplementedBy(typeof (RhinoQueuesTransport))
- .DependsOn(new
- {
- threadCount = 1,
-// endpoint = new Uri("null://nowhere:24689/middle"),
- queueIsolationLevel = IsolationLevel.ReadCommitted,
- numberOfRetries = 5,
- path = Path.Combine(path,"one_way.esent")
- }),
-
Component.For<IOnewayBus>()
.LifeStyle.Is(LifestyleType.Singleton)
.ImplementedBy<RhinoQueuesOneWayBus>()
- .DependsOn(new {messageOwners = messageOwners.ToArray()})
+ .DependsOn(new
+ {
+ messageOwners = messageOwners.ToArray(),
+ })
);
}
else
9 Rhino.ServiceBus/RhinoQueues/RhinoQueuesMessageBuilder.cs
View
@@ -7,7 +7,7 @@
namespace Rhino.ServiceBus.RhinoQueues
{
- [CLSCompliant(false)]
+
public class RhinoQueuesMessageBuilder : IMessageBuilder<MessagePayload>
{
private readonly IMessageSerializer messageSerializer;
@@ -16,7 +16,7 @@ public RhinoQueuesMessageBuilder(IMessageSerializer messageSerializer)
{
this.messageSerializer = messageSerializer;
}
-
+ [CLSCompliant(false)]
public MessagePayload BuildFromMessageBatch(params object[] msgs)
{
if (endpoint == null)
@@ -48,10 +48,7 @@ public void Initialize(Endpoint source)
endpoint = source;
}
- public void Contextualize(MessagePayload message)
- {
- throw new NotImplementedException();
- }
+
private static MessageType GetAppSpecificMarker(object[] msgs)
{
19 Rhino.ServiceBus/RhinoQueues/RhinoQueuesOneWayBus.cs
View
@@ -1,23 +1,30 @@
+using System;
+using System.IO;
+using System.Transactions;
+using Rhino.Queues;
using Rhino.ServiceBus.Impl;
using Rhino.ServiceBus.Internal;
namespace Rhino.ServiceBus.RhinoQueues
{
- public class RhinoQueuesOneWayBus : IOnewayBus
+ [CLSCompliant(false)]
+ public class RhinoQueuesOneWayBus : RhinoQueuesTransport,IOnewayBus
{
private MessageOwnersSelector messageOwners;
- private ITransport transport;
+ private static readonly Uri NullEndpoint = new Uri("null://nowhere:24689/middle");
+ public RhinoQueuesOneWayBus(MessageOwner[] messageOwners, IMessageSerializer messageSerializer,IMessageBuilder<MessagePayload> messageBuilder)
+ : base(NullEndpoint, new EndpointRouter(), messageSerializer, 1, Path.Combine(Path.GetFullPath(AppDomain.CurrentDomain.BaseDirectory), "one_way.esent"), IsolationLevel.ReadCommitted,5,messageBuilder)
- public RhinoQueuesOneWayBus(MessageOwner[] messageOwners, ITransport transport)
{
this.messageOwners = new MessageOwnersSelector(messageOwners, new EndpointRouter());
- this.transport = transport;
- this.transport.Start();
+ base.Start();
}
public void Send(params object[] msgs)
{
- transport.Send(messageOwners.GetEndpointForMessageBatch(msgs), msgs);
+ base.Send(messageOwners.GetEndpointForMessageBatch(msgs), msgs);
}
+
+
}
}
4 Rhino.ServiceBus/RhinoQueues/RhinoQueuesTransport.cs
View
@@ -43,7 +43,7 @@ public class RhinoQueuesTransport : ITransport
private readonly ILog logger = LogManager.GetLogger(typeof(RhinoQueuesTransport));
private TimeoutAction timeout;
private IQueue queue;
- private static readonly Uri NullEndpoint = new Uri("null://nowhere:24689/middle") ;
+
public RhinoQueuesTransport(Uri endpoint,
IEndpointRouter endpointRouter,
@@ -54,7 +54,7 @@ public class RhinoQueuesTransport : ITransport
int numberOfRetries,
IMessageBuilder<MessagePayload> messageBuilder)
{
- this.endpoint = endpoint??NullEndpoint;
+ this.endpoint = endpoint;
this.queueIsolationLevel = queueIsolationLevel;
this.numberOfRetries = numberOfRetries;
this.messageBuilder = messageBuilder;
Please sign in to comment.
Something went wrong with that request. Please try again.