diff --git a/Rhino.ServiceBus.Tests/CanSendMsgsFromOneWayBusUsingRhinoQueues.cs b/Rhino.ServiceBus.Tests/CanSendMsgsFromOneWayBusUsingRhinoQueues.cs new file mode 100644 index 0000000..8204b8e --- /dev/null +++ b/Rhino.ServiceBus.Tests/CanSendMsgsFromOneWayBusUsingRhinoQueues.cs @@ -0,0 +1,98 @@ +using System; +using System.IO; +using System.Threading; +using System.Transactions; +using Castle.Windsor; +using Castle.Windsor.Configuration.Interpreters; +using Rhino.ServiceBus.Impl; +using Rhino.ServiceBus.Internal; +using Rhino.ServiceBus.RhinoQueues; +using Rhino.ServiceBus.Tests.RhinoQueues; +using Xunit; + +namespace Rhino.ServiceBus.Tests +{ + public class CanSendMsgsFromOneWayBusUsingRhinoQueues : WithDebugging,IDisposable + { + private WindsorContainer container; + + public CanSendMsgsFromOneWayBusUsingRhinoQueues() + { + if (Directory.Exists("one_way.esent")) + Directory.Delete("one_way.esent", true); + if (Directory.Exists("test_queue.esent")) + Directory.Delete("test_queue.esent", true); + if (Directory.Exists("test_queue_subscriptions.esent")) + Directory.Delete("test_queue_subscriptions.esent", true); + container = new WindsorContainer(new XmlInterpreter("OneWayBusRhinoQueues.config")); + container.Kernel.AddFacility("rhino.esb", new RhinoServiceBusFacility()); + container.AddComponent(); + StringConsumer.Value = null; + StringConsumer.Event = new ManualResetEvent(false); + } + + + + [Fact] + public void SendMessageToRemoteBus() + { + using (var bus = container.Resolve()) + { + bus.Start(); + var transport = new RhinoQueuesTransport(new Uri("null://nowhere:24689/middle"), + new EndpointRouter(), container.Resolve(), + 1, "one_way.esent", IsolationLevel.ReadCommitted, 5); + var oneWay = new RhinoQueuesOneWayBus(new[] + { + new MessageOwner + { + Endpoint = bus.Endpoint.Uri, + Name = "System", + }, + }, transport); + + oneWay.Send("hello there, one way"); + + StringConsumer.Event.WaitOne(); + + Assert.Equal("hello there, one way", StringConsumer.Value); + } + } + + [Fact] + public void SendMessageToRemoteBusFromConfigDrivenOneWayBus() + { + using (var bus = container.Resolve()) + { + bus.Start(); + + using (var c = new WindsorContainer(new XmlInterpreter("OneWayBusRhinoQueues.config"))) + { + c.Kernel.AddFacility("one.way.rhino.esb", new OnewayRhinoServiceBusFacility()); + c.Resolve().Send("hello there, one way"); + StringConsumer.Event.WaitOne(); + Assert.Equal("hello there, one way", StringConsumer.Value); + } + + + } + } + + public class StringConsumer : ConsumerOf + { + public static ManualResetEvent Event; + public static string Value; + + public void Consume(string pong) + { + Value = pong; + Event.Set(); + } + } + + public void Dispose() + { + container.Dispose(); + } + } +} \ No newline at end of file diff --git a/Rhino.ServiceBus.Tests/OneWayBusRhinoQueues.config b/Rhino.ServiceBus.Tests/OneWayBusRhinoQueues.config new file mode 100644 index 0000000..cfd6238 --- /dev/null +++ b/Rhino.ServiceBus.Tests/OneWayBusRhinoQueues.config @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/Rhino.ServiceBus.Tests/Rhino.ServiceBus.Tests.csproj b/Rhino.ServiceBus.Tests/Rhino.ServiceBus.Tests.csproj index 085e70c..9ffcac8 100644 --- a/Rhino.ServiceBus.Tests/Rhino.ServiceBus.Tests.csproj +++ b/Rhino.ServiceBus.Tests/Rhino.ServiceBus.Tests.csproj @@ -112,6 +112,7 @@ + @@ -178,6 +179,9 @@ PreserveNewest + + PreserveNewest + PreserveNewest diff --git a/Rhino.ServiceBus/IOnewayBus.cs b/Rhino.ServiceBus/IOnewayBus.cs index bdfa62c..6f7c1e4 100644 --- a/Rhino.ServiceBus/IOnewayBus.cs +++ b/Rhino.ServiceBus/IOnewayBus.cs @@ -1,3 +1,5 @@ +using System; + namespace Rhino.ServiceBus { public interface IOnewayBus diff --git a/Rhino.ServiceBus/Impl/MessageOwnersConfigReader.cs b/Rhino.ServiceBus/Impl/MessageOwnersConfigReader.cs index 576f420..b64154d 100644 --- a/Rhino.ServiceBus/Impl/MessageOwnersConfigReader.cs +++ b/Rhino.ServiceBus/Impl/MessageOwnersConfigReader.cs @@ -15,7 +15,7 @@ public MessageOwnersConfigReader(IConfiguration configuration, ICollection() protected override void Init() { - new MessageOwnersConfigReader(FacilityConfig, messageOwners).ReadMessageOwners(); - + var messageOwnersReader = new MessageOwnersConfigReader(FacilityConfig, messageOwners); + messageOwnersReader.ReadMessageOwners(); + if (IsRhinoQueues(messageOwnersReader.EndpointScheme)) + { + var path = Path.GetFullPath(AppDomain.CurrentDomain.BaseDirectory); + Kernel.Register( + Component.For() + .LifeStyle.Is(LifestyleType.Singleton) + .ImplementedBy(typeof (RhinoQueuesTransport)) + .DependsOn(new + { + threadCount = 1, + endpoint = new Uri("null://nowhere:24689/middle"), + queueIsolationLevel = IsolationLevel.ReadCommitted, + numberOfRetries = 5, + path = Path.Combine(path,"one_way.esent") + }), + Component.For() + .LifeStyle.Is(LifestyleType.Singleton) + .ImplementedBy() + .DependsOn(new {messageOwners = messageOwners.ToArray()}) + ); + } + else + { + Kernel.Register( + Component.For() + .LifeStyle.Is(LifestyleType.Singleton) + .ImplementedBy(), + Component.For() + .LifeStyle.Is(LifestyleType.Singleton) + .ImplementedBy() + .DependsOn(new {messageOwners = messageOwners.ToArray()})); + + } Kernel.Register( - Component.For() - .LifeStyle.Is(LifestyleType.Singleton) - .ImplementedBy(), - Component.For() - .LifeStyle.Is(LifestyleType.Singleton) - .ImplementedBy() - .DependsOn(new{messageOwners = messageOwners.ToArray()}), Component.For() - .LifeStyle.Is(LifestyleType.Singleton) - .ImplementedBy(), - + .LifeStyle.Is(LifestyleType.Singleton) + .ImplementedBy(), Component.For() .LifeStyle.Is(LifestyleType.Singleton) .ImplementedBy(serializerImpl), Component.For() - .ImplementedBy() - ); + .ImplementedBy() + ); + + } + + private static bool IsRhinoQueues(string endpointScheme) + { + return endpointScheme.Equals("rhino.queues", StringComparison.InvariantCultureIgnoreCase); } } } \ No newline at end of file diff --git a/Rhino.ServiceBus/Rhino.ServiceBus.csproj b/Rhino.ServiceBus/Rhino.ServiceBus.csproj index e039c11..90dfcc4 100644 --- a/Rhino.ServiceBus/Rhino.ServiceBus.csproj +++ b/Rhino.ServiceBus/Rhino.ServiceBus.csproj @@ -122,6 +122,7 @@ + diff --git a/Rhino.ServiceBus/RhinoQueues/RhinoQueuesOneWayBus.cs b/Rhino.ServiceBus/RhinoQueues/RhinoQueuesOneWayBus.cs new file mode 100644 index 0000000..e3680fa --- /dev/null +++ b/Rhino.ServiceBus/RhinoQueues/RhinoQueuesOneWayBus.cs @@ -0,0 +1,23 @@ +using Rhino.ServiceBus.Impl; +using Rhino.ServiceBus.Internal; + +namespace Rhino.ServiceBus.RhinoQueues +{ + public class RhinoQueuesOneWayBus : IOnewayBus + { + private MessageOwnersSelector messageOwners; + private ITransport transport; + + public RhinoQueuesOneWayBus(MessageOwner[] messageOwners, ITransport transport) + { + this.messageOwners = new MessageOwnersSelector(messageOwners, new EndpointRouter()); + this.transport = transport; + this.transport.Start(); + } + + public void Send(params object[] msgs) + { + transport.Send(messageOwners.GetEndpointForMessageBatch(msgs), msgs); + } + } +} \ No newline at end of file diff --git a/Rhino.ServiceBus/RhinoQueues/RhinoQueuesTransport.cs b/Rhino.ServiceBus/RhinoQueues/RhinoQueuesTransport.cs index c610d7e..25f1972 100644 --- a/Rhino.ServiceBus/RhinoQueues/RhinoQueuesTransport.cs +++ b/Rhino.ServiceBus/RhinoQueues/RhinoQueuesTransport.cs @@ -128,6 +128,9 @@ public IQueue Queue public void Start() { + if (haveStarted) + return; + shouldContinue = true; var port = endpoint.Port;