Skip to content

Commit

Permalink
Merge branch 'rq-one-way'
Browse files Browse the repository at this point in the history
  • Loading branch information
mnichols committed Jul 1, 2010
2 parents 841e975 + 5aa7066 commit 0629bba
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 15 deletions.
98 changes: 98 additions & 0 deletions Rhino.ServiceBus.Tests/CanSendMsgsFromOneWayBusUsingRhinoQueues.cs
@@ -0,0 +1,98 @@
using System;
using System.IO;
using System.Threading;
using System.Transactions;
using Castle.Windsor;
using Castle.Windsor.Configuration.Interpreters;
using Rhino.ServiceBus.Impl;
using Rhino.ServiceBus.Internal;
using Rhino.ServiceBus.RhinoQueues;
using Rhino.ServiceBus.Tests.RhinoQueues;
using Xunit;

namespace Rhino.ServiceBus.Tests
{
public class CanSendMsgsFromOneWayBusUsingRhinoQueues : WithDebugging,IDisposable
{
private WindsorContainer container;

public CanSendMsgsFromOneWayBusUsingRhinoQueues()
{
if (Directory.Exists("one_way.esent"))
Directory.Delete("one_way.esent", true);
if (Directory.Exists("test_queue.esent"))
Directory.Delete("test_queue.esent", true);
if (Directory.Exists("test_queue_subscriptions.esent"))
Directory.Delete("test_queue_subscriptions.esent", true);
container = new WindsorContainer(new XmlInterpreter("OneWayBusRhinoQueues.config"));
container.Kernel.AddFacility("rhino.esb", new RhinoServiceBusFacility());
container.AddComponent<StringConsumer>();
StringConsumer.Value = null;
StringConsumer.Event = new ManualResetEvent(false);
}



[Fact]
public void SendMessageToRemoteBus()
{
using (var bus = container.Resolve<IStartableServiceBus>())
{
bus.Start();
var transport = new RhinoQueuesTransport(new Uri("null://nowhere:24689/middle"),
new EndpointRouter(), container.Resolve<IMessageSerializer>(),
1, "one_way.esent", IsolationLevel.ReadCommitted, 5);
var oneWay = new RhinoQueuesOneWayBus(new[]
{
new MessageOwner
{
Endpoint = bus.Endpoint.Uri,
Name = "System",
},
}, transport);

oneWay.Send("hello there, one way");

StringConsumer.Event.WaitOne();

Assert.Equal("hello there, one way", StringConsumer.Value);
}
}

[Fact]
public void SendMessageToRemoteBusFromConfigDrivenOneWayBus()
{
using (var bus = container.Resolve<IStartableServiceBus>())
{
bus.Start();

using (var c = new WindsorContainer(new XmlInterpreter("OneWayBusRhinoQueues.config")))
{
c.Kernel.AddFacility("one.way.rhino.esb", new OnewayRhinoServiceBusFacility());
c.Resolve<IOnewayBus>().Send("hello there, one way");
StringConsumer.Event.WaitOne();
Assert.Equal("hello there, one way", StringConsumer.Value);
}


}
}

public class StringConsumer : ConsumerOf<string>
{
public static ManualResetEvent Event;
public static string Value;

public void Consume(string pong)
{
Value = pong;
Event.Set();
}
}

public void Dispose()
{
container.Dispose();
}
}
}
21 changes: 21 additions & 0 deletions Rhino.ServiceBus.Tests/OneWayBusRhinoQueues.config
@@ -0,0 +1,21 @@
<castle>
<facilities>
<facility id="rhino.esb" >
<bus name="test_queue"
threadCount="1"
numberOfRetries="5"
endpoint="rhino.queues://localhost/test_queue"
queueIsolationLevel="ReadCommitted" />
<messages>
<add name="Rhino.ServiceBus.Tests"
endpoint="rhino.queues://localhost/test_queue"/>
</messages>
</facility>
<facility id="one.way.rhino.esb" >
<messages>
<add name="System.String"
endpoint="rhino.queues://localhost/test_queue"/>
</messages>
</facility>
</facilities>
</castle>
4 changes: 4 additions & 0 deletions Rhino.ServiceBus.Tests/Rhino.ServiceBus.Tests.csproj
Expand Up @@ -112,6 +112,7 @@
<Compile Include="BusSubscriptionTests.cs" />
<Compile Include="CanRouteMessageToConsumerThroughContainer.cs" />
<Compile Include="CanSendMsgsFromOneWayBus.cs" />
<Compile Include="CanSendMsgsFromOneWayBusUsingRhinoQueues.cs" />
<Compile Include="DelayedMessages.cs" />
<Compile Include="DataStructures\LRUSetTest.cs" />
<Compile Include="DataStructures\HashtableTest.cs" />
Expand Down Expand Up @@ -178,6 +179,9 @@
<None Include="BusOnTransactionalQueue.config">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="OneWayBusRhinoQueues.config">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="LoadBalancer\BusWithAcceptingWorkLoadBalancer.config">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
Expand Down
2 changes: 2 additions & 0 deletions Rhino.ServiceBus/IOnewayBus.cs
@@ -1,3 +1,5 @@
using System;

namespace Rhino.ServiceBus
{
public interface IOnewayBus
Expand Down
6 changes: 5 additions & 1 deletion Rhino.ServiceBus/Impl/MessageOwnersConfigReader.cs
Expand Up @@ -15,7 +15,7 @@ public MessageOwnersConfigReader(IConfiguration configuration, ICollection<Messa
this.configuration = configuration;
this.messageOwners = messageOwners;
}

public string EndpointScheme { get; private set; }
public void ReadMessageOwners()
{
IConfiguration messageConfig = configuration.Children["messages"];
Expand All @@ -36,6 +36,10 @@ public void ReadMessageOwners()
try
{
ownerEndpoint = new Uri(uriString);
if(EndpointScheme==null)
{
EndpointScheme = ownerEndpoint.Scheme;
}
}
catch (Exception e)
{
Expand Down
62 changes: 48 additions & 14 deletions Rhino.ServiceBus/Impl/OnewayRhinoServiceBusFacility.cs
@@ -1,10 +1,13 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Transactions;
using Castle.Core;
using Castle.MicroKernel.Facilities;
using Castle.MicroKernel.Registration;
using Rhino.ServiceBus.Internal;
using Rhino.ServiceBus.Msmq;
using Rhino.ServiceBus.RhinoQueues;
using Rhino.ServiceBus.Serializers;
using System.Linq;

Expand All @@ -22,27 +25,58 @@ public void UseMessageSerializer<TMessageSerializer>()

protected override void Init()
{
new MessageOwnersConfigReader(FacilityConfig, messageOwners).ReadMessageOwners();

var messageOwnersReader = new MessageOwnersConfigReader(FacilityConfig, messageOwners);
messageOwnersReader.ReadMessageOwners();
if (IsRhinoQueues(messageOwnersReader.EndpointScheme))
{
var path = Path.GetFullPath(AppDomain.CurrentDomain.BaseDirectory);
Kernel.Register(
Component.For<ITransport>()
.LifeStyle.Is(LifestyleType.Singleton)
.ImplementedBy(typeof (RhinoQueuesTransport))
.DependsOn(new
{
threadCount = 1,
endpoint = new Uri("null://nowhere:24689/middle"),
queueIsolationLevel = IsolationLevel.ReadCommitted,
numberOfRetries = 5,
path = Path.Combine(path,"one_way.esent")
}),
Component.For<IOnewayBus>()
.LifeStyle.Is(LifestyleType.Singleton)
.ImplementedBy<RhinoQueuesOneWayBus>()
.DependsOn(new {messageOwners = messageOwners.ToArray()})
);

}
else
{
Kernel.Register(
Component.For<IMessageBuilder>()
.LifeStyle.Is(LifestyleType.Singleton)
.ImplementedBy<MessageBuilder>(),
Component.For<IOnewayBus>()
.LifeStyle.Is(LifestyleType.Singleton)
.ImplementedBy<OnewayBus>()
.DependsOn(new {messageOwners = messageOwners.ToArray()}));

}
Kernel.Register(
Component.For<IMessageBuilder>()
.LifeStyle.Is(LifestyleType.Singleton)
.ImplementedBy<MessageBuilder>(),
Component.For<IOnewayBus>()
.LifeStyle.Is(LifestyleType.Singleton)
.ImplementedBy<OnewayBus>()
.DependsOn(new{messageOwners = messageOwners.ToArray()}),
Component.For<IReflection>()
.LifeStyle.Is(LifestyleType.Singleton)
.ImplementedBy<DefaultReflection>(),

.LifeStyle.Is(LifestyleType.Singleton)
.ImplementedBy<DefaultReflection>(),
Component.For<IMessageSerializer>()
.LifeStyle.Is(LifestyleType.Singleton)
.ImplementedBy(serializerImpl),
Component.For<IEndpointRouter>()
.ImplementedBy<EndpointRouter>()
);
.ImplementedBy<EndpointRouter>()
);

}

private static bool IsRhinoQueues(string endpointScheme)
{
return endpointScheme.Equals("rhino.queues", StringComparison.InvariantCultureIgnoreCase);
}
}
}
1 change: 1 addition & 0 deletions Rhino.ServiceBus/Rhino.ServiceBus.csproj
Expand Up @@ -122,6 +122,7 @@
<Compile Include="Messages\ReadyToWork.cs" />
<Compile Include="Msmq\AbstractMsmqListener.cs" />
<Compile Include="Msmq\QueueCreationModule.cs" />
<Compile Include="RhinoQueues\RhinoQueuesOneWayBus.cs" />
<Compile Include="Transport\SubQueue.cs" />
<Compile Include="Msmq\TransportActions\ShutDownAction.cs" />
<Compile Include="Msmq\TransportState.cs" />
Expand Down
23 changes: 23 additions & 0 deletions Rhino.ServiceBus/RhinoQueues/RhinoQueuesOneWayBus.cs
@@ -0,0 +1,23 @@
using Rhino.ServiceBus.Impl;
using Rhino.ServiceBus.Internal;

namespace Rhino.ServiceBus.RhinoQueues
{
public class RhinoQueuesOneWayBus : IOnewayBus
{
private MessageOwnersSelector messageOwners;
private ITransport transport;

public RhinoQueuesOneWayBus(MessageOwner[] messageOwners, ITransport transport)
{
this.messageOwners = new MessageOwnersSelector(messageOwners, new EndpointRouter());
this.transport = transport;
this.transport.Start();
}

public void Send(params object[] msgs)
{
transport.Send(messageOwners.GetEndpointForMessageBatch(msgs), msgs);
}
}
}
3 changes: 3 additions & 0 deletions Rhino.ServiceBus/RhinoQueues/RhinoQueuesTransport.cs
Expand Up @@ -128,6 +128,9 @@ public IQueue Queue

public void Start()
{
if (haveStarted)
return;

shouldContinue = true;

var port = endpoint.Port;
Expand Down

0 comments on commit 0629bba

Please sign in to comment.