Skip to content

Commit

Permalink
Merge remote branch 'philiphoy/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
CoreyKaylor committed Jun 9, 2010
2 parents e71c1c6 + 1430fd5 commit 841e975
Show file tree
Hide file tree
Showing 18 changed files with 544 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<castle>
<facilities>
<facility id="rhino.esb" >
<bus threadCount="1"
numberOfRetries="5"
loadBalancerEndpoint="msmq://localhost/test_queue.acceptingWork"
endpoint="msmq://localhost/test_queue"/>
<messages>
<add name="Rhino.ServiceBus.Tests"
endpoint="msmq://localhost/test_queue"/>
</messages>
</facility>
</facilities>
</castle>
226 changes: 226 additions & 0 deletions Rhino.ServiceBus.Tests/LoadBalancer/With_accepting_work_queue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
using System;
using System.Linq;
using System.Messaging;
using System.Threading;
using Castle.MicroKernel.Registration;
using Castle.Windsor;
using Castle.Windsor.Configuration.Interpreters;
using Rhino.ServiceBus.Impl;
using Rhino.ServiceBus.Internal;
using Rhino.ServiceBus.LoadBalancer;
using Rhino.ServiceBus.Messages;
using Rhino.ServiceBus.Msmq;
using Xunit;

namespace Rhino.ServiceBus.Tests.LoadBalancer
{
public class With_accepting_work_queue : LoadBalancingTestBase
{
private readonly IWindsorContainer container;

private const string acceptingWorkQueue = "msmq://localhost/test_queue.acceptingWork";
private readonly string acceptingWorkQueuePath = MsmqUtil.GetQueuePath(new Uri(acceptingWorkQueue).ToEndpoint()).QueuePath;

public With_accepting_work_queue()
{
if (MessageQueue.Exists(acceptingWorkQueuePath) == false)
MessageQueue.Create(acceptingWorkQueuePath);
var acceptingWork = new MessageQueue(acceptingWorkQueuePath);
acceptingWork.Purge();

var interpreter = new XmlInterpreter(@"LoadBalancer\BusWithAcceptingWorkLoadBalancer.config");
container = new WindsorContainer(interpreter);
container.Kernel.AddFacility("rhino.esb", new RhinoServiceBusFacility());

container.AddComponent<MyHandler>();

container.Register(
Component.For<MsmqLoadBalancer>()
.DependsOn(new
{
threadCount = 1,
endpoint = new Uri(loadBalancerQueue),
transactional = TransactionalOptions.FigureItOut,
secondaryLoadBalancer = TestQueueUri2.Uri
})
);

container.Register(
Component.For<MsmqReadyForWorkListener>()
.DependsOn(new
{
threadCount = 1,
endpoint = new Uri(acceptingWorkQueue),
transactional = TransactionalOptions.FigureItOut
})
);

container.Register(
Component.For<MsmqSecondaryLoadBalancer>()
.DependsOn(new
{
threadCount = 1,
endpoint = TestQueueUri2.Uri,
primaryLoadBalancer = new Uri(loadBalancerQueue),
transactional = TransactionalOptions.FigureItOut
})
);
}

[Fact]
public void Can_send_message_through_load_balancer()
{
MyHandler.ResetEvent = new ManualResetEvent(false);

using (var loadBalancer = container.Resolve<MsmqLoadBalancer>())
using (var bus = container.Resolve<IStartableServiceBus>())
{
loadBalancer.Start();
bus.Start();

bus.Send(loadBalancer.Endpoint, "abcdefg");

MyHandler.ResetEvent.WaitOne(TimeSpan.FromSeconds(30), false);
Assert.True(
MyHandler.Message.ResponseQueue.Path.Contains(@"private$\test_queue")
);

Assert.Equal("abcdefg", MyHandler.Value);

}
}

[Fact]
public void When_worker_tell_load_balancer_that_it_is_ready_the_worker_will_be_added_to_known_queues()
{
using (var loadBalancer = new MessageQueue(loadBalancerQueuePath + ";Workers", QueueAccessMode.SendAndReceive))
{
loadBalancer.Purge();
}

using (var loadBalancer = container.Resolve<MsmqLoadBalancer>())
using (var bus = container.Resolve<IStartableServiceBus>())
{
loadBalancer.Start();
bus.Start();

using (var workers = new MessageQueue(loadBalancerQueuePath + ";Workers", QueueAccessMode.SendAndReceive))
{
workers.Formatter = new XmlMessageFormatter(new[] { typeof(string) });
var knownWorker = workers.Peek(TimeSpan.FromSeconds(30));
Assert.Equal(bus.Endpoint.Uri.ToString(), knownWorker.Body.ToString());
}

Assert.True(loadBalancer.KnownWorkers.GetValues().Contains(TestQueueUri.Uri));
}
}

[Fact]
public void when_start_load_balancer_that_has_secondary_will_send_reroute_to_ready_for_work_queue_to_workers_to_relieve_secondary()
{
using (var loadBalancer = container.Resolve<MsmqLoadBalancer>())
{
loadBalancer.KnownWorkers.Add(TestQueueUri.Uri);
loadBalancer.Start();

var message = queue.Receive(TimeSpan.FromSeconds(5));
var serializer = container.Resolve<IMessageSerializer>();
var reroute = serializer.Deserialize(message.BodyStream)
.OfType<Reroute>().First();

Assert.Equal(loadBalancer.ReadyForWorkListener.Endpoint.Uri, reroute.NewEndPoint);
Assert.Equal(loadBalancer.ReadyForWorkListener.Endpoint.Uri, reroute.OriginalEndPoint);
}
}

[Fact]
public void When_secondary_starts_it_will_ask_primary_to_get_ready_to_work_uri()
{
using (var secondary = container.Resolve<MsmqSecondaryLoadBalancer>())
{
secondary.Start();

using (var loadBalancerMsmqQueue = new MessageQueue(loadBalancerQueuePath))
{
int tries = 5;
QueryReadyForWorkQueueUri query = null;
var serializer = container.Resolve<IMessageSerializer>();
while (query == null)
{
var message = loadBalancerMsmqQueue.Receive(TimeSpan.FromSeconds(30));
query = serializer.Deserialize(message.BodyStream)
.OfType<QueryReadyForWorkQueueUri>().FirstOrDefault();
Assert.True(tries > 0);
tries -= 1;
}

Assert.NotNull(query);
}
}
}

[Fact]
public void When_secondary_takes_over_it_will_let_workers_know_that_it_took_over_and_reroute_to_ready_to_work_queue()
{
using (var primary = container.Resolve<MsmqLoadBalancer>())
using (var secondary = container.Resolve<MsmqSecondaryLoadBalancer>())
{
primary.Start();
secondary.TimeoutForHeartBeatFromPrimary = TimeSpan.FromMilliseconds(900);
secondary.KnownWorkers.Add(TestQueueUri.Uri);
secondary.KnownEndpoints.Add(TestQueueUri.Uri);//any worker is also endpoint

var wait = new ManualResetEvent(false);
secondary.TookOverAsActiveLoadBalancer += () => wait.Set();
secondary.Start();

Assert.True(wait.WaitOne());

int tries = 5;
var serializer = container.Resolve<IMessageSerializer>();
Reroute reroute = null;
while (reroute == null)
{
var message = queue.Receive(TimeSpan.FromSeconds(30));
reroute = serializer.Deserialize(message.BodyStream)
.OfType<Reroute>().FirstOrDefault();
Assert.True(tries > 0);
tries -= 1;
}

Assert.Equal(secondary.PrimaryLoadBalancer, reroute.OriginalEndPoint);
Assert.Equal(secondary.Endpoint.Uri, reroute.NewEndPoint);

reroute = null;
while (reroute == null)
{
var message = queue.Receive(TimeSpan.FromSeconds(30));
reroute = serializer.Deserialize(message.BodyStream)
.OfType<Reroute>().FirstOrDefault();
Assert.True(tries > 0);
tries -= 1;
}

Assert.Equal(primary.ReadyForWorkListener.Endpoint.Uri, reroute.OriginalEndPoint);
Assert.Equal(secondary.ReadyForWorkListener.Endpoint.Uri, reroute.NewEndPoint);
}
}


public class MyHandler : ConsumerOf<string>
{
public static ManualResetEvent ResetEvent;
public static string Value;
public static Message Message;

public void Consume(string message)
{
Message = MsmqTransport.MsmqCurrentMessageInformation.MsmqMessage;
Value = message;
ResetEvent.Set();
}

}

}
}
6 changes: 4 additions & 2 deletions Rhino.ServiceBus.Tests/MsmqFlatQueueTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public ITransport Transport
DefaultTransportActions(testQueueEndPoint.Uri),
new EndpointRouter(),
IsolationLevel.Serializable,
TransactionalOptions.FigureItOut);
TransactionalOptions.FigureItOut,
true);
transport.Start();
}
return transport;
Expand Down Expand Up @@ -159,7 +160,8 @@ public ITransport TransactionalTransport
new FlatQueueStrategy(new EndpointRouter(),transactionalTestQueueEndpoint.Uri),
transactionalTestQueueEndpoint.Uri, 1, DefaultTransportActions(transactionalTestQueueEndpoint.Uri),
new EndpointRouter(),
IsolationLevel.Serializable, TransactionalOptions.FigureItOut);
IsolationLevel.Serializable, TransactionalOptions.FigureItOut,
true);
transactionalTransport.Start();
}
return transactionalTransport;
Expand Down
6 changes: 4 additions & 2 deletions Rhino.ServiceBus.Tests/MsmqTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ public ITransport Transport
TestQueueUri.Uri, 1,
defaultTransportActions,
new EndpointRouter(),
IsolationLevel.Serializable, TransactionalOptions.FigureItOut);
IsolationLevel.Serializable, TransactionalOptions.FigureItOut,
true);
transport.Start();
}
return transport;
Expand All @@ -152,7 +153,8 @@ public ITransport TransactionalTransport
1,
defaultTransportActions,
new EndpointRouter(),
IsolationLevel.Serializable,TransactionalOptions.FigureItOut);
IsolationLevel.Serializable,TransactionalOptions.FigureItOut,
true);
transactionalTransport.Start();
}
return transactionalTransport;
Expand Down
4 changes: 4 additions & 0 deletions Rhino.ServiceBus.Tests/Rhino.ServiceBus.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
<Compile Include="LoadBalancer\Full_test_of_load_balancer_and_failover.cs" />
<Compile Include="LoadBalancer\Full_test_of_load_balancer_and_failover_and_recovery.cs" />
<Compile Include="LoadBalancer\LoadBalancingTestBase.cs" />
<Compile Include="LoadBalancer\With_accepting_work_queue.cs" />
<Compile Include="LoadBalancer\With_fail_over.cs" />
<Compile Include="LoadBalancer\With_load_balancing.cs" />
<Compile Include="LoadBalancer\with_secondary_failover.cs" />
Expand Down Expand Up @@ -177,6 +178,9 @@
<None Include="BusOnTransactionalQueue.config">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="LoadBalancer\BusWithAcceptingWorkLoadBalancer.config">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="LoadBalancer\ReceivingBusWithLoadBalancer.config">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
Expand Down
38 changes: 38 additions & 0 deletions Rhino.ServiceBus/Actions/CreateReadyForWorkQueuesAction.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System.Messaging;
using Rhino.ServiceBus.LoadBalancer;
using Rhino.ServiceBus.Msmq;

namespace Rhino.ServiceBus.Actions
{
public class CreateReadyForWorkQueuesAction: IDeploymentAction
{
private IQueueStrategy queueStrategy;
private MsmqReadyForWorkListener readyForWorkListener;

public CreateReadyForWorkQueuesAction(IQueueStrategy queueStrategy, MsmqReadyForWorkListener readyForWorkListener)
{
this.queueStrategy = queueStrategy;
this.readyForWorkListener = readyForWorkListener;
}

public void Execute(string user)
{
// will create the queues if they are not already there
var queues = queueStrategy.InitializeQueue(readyForWorkListener.Endpoint, QueueType.Raw);
foreach (var queue in queues)
{
queue.SetPermissions(user,
MessageQueueAccessRights.DeleteMessage |
MessageQueueAccessRights.DeleteJournalMessage |
MessageQueueAccessRights.GenericRead |
MessageQueueAccessRights.GenericWrite |
MessageQueueAccessRights.GetQueuePermissions |
MessageQueueAccessRights.PeekMessage |
MessageQueueAccessRights.ReceiveJournalMessage |
MessageQueueAccessRights.ReceiveMessage |
MessageQueueAccessRights.WriteMessage,
AccessControlEntryType.Allow);
}
}
}
}
3 changes: 2 additions & 1 deletion Rhino.ServiceBus/Config/MsmqTransportConfigurationAware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public void Configure(AbstractRhinoServiceBusFacility facility, IConfiguration c
endpoint = facility.Endpoint,
queueIsolationLevel = facility.IsolationLevel,
numberOfRetries = facility.NumberOfRetries,
transactional = facility.Transactional
transactional = facility.Transactional,
consumeInTransaction = facility.ConsumeInTransaction,
}),
AllTypes.Of<IMsmqTransportAction>()
.FromAssembly(typeof(IMsmqTransportAction).Assembly)
Expand Down
8 changes: 7 additions & 1 deletion Rhino.ServiceBus/Impl/AbstractRhinoServiceBusFacility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ public abstract class AbstractRhinoServiceBusFacility : AbstractFacility
protected readonly List<Type> messageModules = new List<Type>();
private Type serializerImpl = typeof(XmlMessageSerializer);
protected IsolationLevel queueIsolationLevel = IsolationLevel.Serializable;
public bool consumeInTxn = true;

protected AbstractRhinoServiceBusFacility()
{
ThreadCount = 1;
NumberOfRetries = 5;
Transactional = TransactionalOptions.FigureItOut;
Transactional = TransactionalOptions.FigureItOut;
}

public Uri Endpoint { get; set; }
Expand All @@ -48,6 +49,11 @@ public IsolationLevel IsolationLevel
get { return queueIsolationLevel; }
}

public bool ConsumeInTransaction
{
get { return consumeInTxn; }
}

public AbstractRhinoServiceBusFacility AddMessageModule<TModule>()
where TModule : IMessageModule
{
Expand Down
6 changes: 5 additions & 1 deletion Rhino.ServiceBus/Impl/RhinoServiceBusFacility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ protected void ReadBusConfiguration()
string isolationLevel = busConfig.Attributes["queueIsolationLevel"];
if (!string.IsNullOrEmpty(isolationLevel))
queueIsolationLevel = (IsolationLevel)Enum.Parse(typeof(IsolationLevel), isolationLevel);


string inTransaction = busConfig.Attributes["consumeInTransaction"];
bool boolResult;
if (bool.TryParse(inTransaction, out boolResult))
consumeInTxn = boolResult;

string uriString = busConfig.Attributes["endpoint"];
Uri endpoint;
Expand Down
Loading

0 comments on commit 841e975

Please sign in to comment.