Skip to content

Commit

Permalink
Getting closer, handler is registering on the distributor side
Browse files Browse the repository at this point in the history
  • Loading branch information
phatboyg committed Apr 24, 2012
1 parent 1962641 commit daf493e
Show file tree
Hide file tree
Showing 53 changed files with 1,741 additions and 909 deletions.
110 changes: 55 additions & 55 deletions src/MassTransit.Tests/Distributor/Default_Specs.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright 2007-2011 Chris Patterson, Dru Sellers, Travis Smith, et. al.
// Copyright 2007-2012 Chris Patterson, Dru Sellers, Travis Smith, et. al.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
Expand All @@ -24,25 +24,12 @@ namespace MassTransit.Tests.Distributor
using MassTransit.Distributor.Messages;
using MassTransit.Pipeline.Inspectors;
using NUnit.Framework;
using Rhino.Mocks;
using TestFramework;
using Util;

[TestFixture]
public class Default_distributor_specifications :
LoopbackDistributorTestFixture
{
protected override void EstablishContext()
{
base.EstablishContext();

AddFirstCommandInstance("A", "loopback://localhost/a");
AddFirstCommandInstance("B", "loopback://localhost/b");
AddFirstCommandInstance("C", "loopback://localhost/c");

RemoteBus.ShouldHaveRemoteSubscriptionFor<Distributed<FirstCommand>>();
}

[Test]
public void Can_collect_iworkeravaiable_messages()
{
Expand All @@ -55,13 +42,8 @@ public void Can_collect_iworkeravaiable_messages()
messageRecieved.Set();
});

Instances.ToList().ForEach(x =>
{
x.Value.DataBus.ControlBus.Endpoint.Send(new PingWorker(), c =>
{
c.SendResponseTo(LocalBus);
});
});
Instances.ToList().ForEach(
x => { x.Value.DataBus.ControlBus.Endpoint.Send(new PingWorker(), c => { c.SendResponseTo(LocalBus); }); });

messageRecieved.WaitOne(8.Seconds());

Expand Down Expand Up @@ -101,8 +83,11 @@ public void Ensure_workers_will_respond_to_ping_request()
messageRecieved.Set();
});

Instances.ToList().ForEach(x => { x.Value.DataBus.ControlBus.Endpoint.Send(new PingWorker(),
y => y.SendResponseTo(LocalBus)); });
Instances.ToList().ForEach(x =>
{
x.Value.DataBus.ControlBus.Endpoint.Send(new PingWorker(),
y => y.SendResponseTo(LocalBus));
});

messageRecieved.WaitOne(8.Seconds());

Expand All @@ -125,22 +110,49 @@ public void Using_the_load_generator_should_share_the_load()
var generator = new LoadGenerator<FirstCommand, FirstResponse>();
const int count = 100;

generator.Run(RemoteBus, LocalBus.Endpoint, Instances.Values.Select(x => x.DataBus), count, x => new FirstCommand(x));
generator.Run(RemoteBus, LocalBus.Endpoint, Instances.Values.Select(x => x.DataBus), count,
x => new FirstCommand(x));

Dictionary<Uri, int> results = generator.GetWorkerLoad();

Assert.That(results.Sum(x => x.Value), Is.EqualTo(count));
results.ToList().ForEach(x =>
Assert.That(x.Value, Is.GreaterThan(0).And.LessThanOrEqualTo(count),
string.Format("{0} did not consume between 0 and {1}",
x.Key.ToString(), count)));
string.Format("{0} did not consume between 0 and {1}",
x.Key.ToString(), count)));
}

protected override void EstablishContext()
{
base.EstablishContext();

AddFirstCommandInstance("A", "loopback://localhost/a");
AddFirstCommandInstance("B", "loopback://localhost/b");
AddFirstCommandInstance("C", "loopback://localhost/c");

RemoteBus.ShouldHaveRemoteSubscriptionFor<Distributed<FirstCommand>>();
}
}

[TestFixture]
public class Distributor_with_custom_worker_selection_strategy :
LoopbackDistributorTestFixture
{
[Test, Explicit]
public void Node_a_should_recieve_all_the_work()
{
var generator = new LoadGenerator<FirstCommand, FirstResponse>();
const int count = 100;

generator.Run(RemoteBus, LocalBus.Endpoint, Instances.Values.Select(x => x.DataBus), count,
x => new FirstCommand(x));

Dictionary<Uri, int> results = generator.GetWorkerLoad();

Assert.That(results.Sum(x => x.Value), Is.EqualTo(count));
Assert.That(results[_nodes["A"]], Is.EqualTo(count));
}

Dictionary<String, Uri> _nodes = new Dictionary<string, Uri>
{
{"A", new Uri("loopback://localhost/a")},
Expand All @@ -157,38 +169,26 @@ protected override void EstablishContext()

protected override void ConfigureLocalBus(ServiceBusConfigurator configurator)
{
var mock = MockRepository.GenerateStub<IWorkerSelectionStrategy<FirstCommand>>();
mock.Stub(x => x.SelectWorker(null, null))
.IgnoreArguments()
.Return(new WorkerDetails
{
ControlUri = _nodes["A"].AppendToPath("_control"),
DataUri = _nodes["A"],
InProgress = 0,
InProgressLimit = 100,
LastUpdate = DateTime.UtcNow
}
);

mock.Stub(x => x.HasAvailableWorker(null, null))
.IgnoreArguments()
.Return(true);

configurator.UseDistributorFor(mock);
var strategy = new CustomWorkerSelectionStrategy(_nodes["A"]);

configurator.UseDistributorFor(strategy);
}

[Test, Explicit]
public void Node_a_should_recieve_all_the_work()
class CustomWorkerSelectionStrategy :
IWorkerSelector<FirstCommand>
{
var generator = new LoadGenerator<FirstCommand, FirstResponse>();
const int count = 100;
readonly Uri _node;

generator.Run(RemoteBus, LocalBus.Endpoint, Instances.Values.Select(x => x.DataBus), count, x => new FirstCommand(x));

Dictionary<Uri, int> results = generator.GetWorkerLoad();
public CustomWorkerSelectionStrategy(Uri node)
{
_node = node;
}

Assert.That(results.Sum(x => x.Value), Is.EqualTo(count));
Assert.That(results[_nodes["A"]], Is.EqualTo(count));
public IEnumerable<IWorker<FirstCommand>> SelectWorker(IEnumerable<IWorker<FirstCommand>> availableWorkers,
IConsumeContext<FirstCommand> context)
{
return availableWorkers.Where(x => x.DataUri == _node);
}
}
}
}
52 changes: 52 additions & 0 deletions src/MassTransit.Tests/Distributor/Handler_Specs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2007-2012 Chris Patterson, Dru Sellers, Travis Smith, et. al.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
namespace MassTransit.Tests.Distributor
{
using System.Linq;
using BusConfigurators;
using Magnum.TestFramework;
using MassTransit.Distributor.Messages;
using MassTransit.Testing;
using NUnit.Framework;
using TextFixtures;

[TestFixture]
public class Using_a_distributor_handler :
LoopbackLocalAndRemoteTestFixture
{
[Test]
public void Should_have_the_subscription_for_the_actual_message()
{
LocalBus.HasSubscription<A>().Any()
.ShouldBeTrue("Message subscription was not found");
}

[Test]
public void Should_have_the_subscription_for_the_worker_availability()
{
LocalBus.HasSubscription<WorkerAvailable<A>>().Any()
.ShouldBeTrue("Worker available subscription was not found.");
}

protected override void ConfigureLocalBus(ServiceBusConfigurator configurator)
{
base.ConfigureLocalBus(configurator);

configurator.Distributor(x => x.Handler<A>());
}

class A
{
}
}
}
20 changes: 9 additions & 11 deletions src/MassTransit.Tests/Distributor/NewConfiguration_Specs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,17 @@ public void Configuring_the_distributor_side()
{
x.ReceiveFrom("loopback://localhost/queue");
x.Distributor(d =>
{
d.Handler<A>()
.UseWorkerSelector(() => new LeastBusyWorkerSelectorFactory())
.UseWorkerSelector<LeastBusyWorkerSelectorFactory>();
x.Subscribe(s =>
{
s.Distributor(d =>
{
d.Handler<A>()
.UseWorkerSelector(() => new DefaultWorkerSelectionStrategy<A>());
d.Consumer<MyConsumer>();
d.Consumer<MyConsumer>();
d.Saga<MySaga>();
});
d.Saga<MySaga>();
});
});
});
}

Expand All @@ -62,7 +60,7 @@ public void Configuring_the_worker_side()
d.Consumer(() => new MyConsumer());
d.Consumer(typeof(MyConsumer), Activator.CreateInstance);
d.Saga<MySaga>();
d.Saga(new InMemorySagaRepository<MySaga>());
});
});
});
Expand Down
1 change: 1 addition & 0 deletions src/MassTransit.Tests/MassTransit.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
<ItemGroup>
<Compile Include="Configuration\Diagnostics_Specs.cs" />
<Compile Include="Diagnostics\Trace_Specs.cs" />
<Compile Include="Distributor\Handler_Specs.cs" />
<Compile Include="Distributor\NewConfiguration_Specs.cs" />
<Compile Include="Environment_Specs.cs" />
<Compile Include="InterfaceProxy_Specs.cs" />
Expand Down
22 changes: 11 additions & 11 deletions src/MassTransit.Tests/Saga/StateMachineInspector_Specs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ public void FirstTestName()

configurator.UseSagaDistributorFor<TestSaga>();

IList<object[]> calls =
configurator.GetArgumentsForCallsMadeOn(x => x.AddService(BusServiceLayer.Presentation, () => new Distributor<InitiateSimpleSaga>()));

calls.Count.ShouldEqual(3, "Not enough calls were made to configure the saga");

calls.Any(x => x[0].GetType().Equals(typeof (DefaultBusServiceConfigurator<Distributor<InitiateSimpleSaga>>)))
.ShouldBeTrue("The event was not registered");
calls.Any(x => x[0].GetType().Equals(typeof (DefaultBusServiceConfigurator<Distributor<CompleteSimpleSaga>>)))
.ShouldBeTrue("The event was not registered");
calls.Any(x => x[0].GetType().Equals(typeof (DefaultBusServiceConfigurator<Distributor<ObservableSagaMessage>>)))
.ShouldBeTrue("The event was not registered");
// IList<object[]> calls =
// configurator.GetArgumentsForCallsMadeOn(x => x.AddService(BusServiceLayer.Presentation, () => new Distributor<InitiateSimpleSaga>()));
//
// calls.Count.ShouldEqual(3, "Not enough calls were made to configure the saga");
//
// calls.Any(x => x[0].GetType().Equals(typeof (DefaultBusServiceConfigurator<Distributor<InitiateSimpleSaga>>)))
// .ShouldBeTrue("The event was not registered");
// calls.Any(x => x[0].GetType().Equals(typeof (DefaultBusServiceConfigurator<Distributor<CompleteSimpleSaga>>)))
// .ShouldBeTrue("The event was not registered");
// calls.Any(x => x[0].GetType().Equals(typeof (DefaultBusServiceConfigurator<Distributor<ObservableSagaMessage>>)))
// .ShouldBeTrue("The event was not registered");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ namespace MassTransit.BusConfigurators
using EndpointConfigurators;
using SubscriptionConfigurators;

/// <summary>
/// <para>The configurator to call methods on, as well as extension methods on,
/// in order to configure your service bus. The configuration
/// goes a lot by convention, but this interface allows you to configure
/// almost any aspect of the bus.</para>
///
/// <para>
/// Documentation is at http://readthedocs.org/docs/masstransit/en/latest/configuration/index.html
/// </para>
/// </summary>
/// <summary>
/// <para>The configurator to call methods on, as well as extension methods on,
/// in order to configure your service bus. The configuration
/// goes a lot by convention, but this interface allows you to configure
/// almost any aspect of the bus.</para>
///
/// <para>
/// Documentation is at http://readthedocs.org/docs/masstransit/en/latest/configuration/index.html
/// </para>
/// </summary>
public interface ServiceBusConfigurator :
EndpointFactoryConfigurator
{
Expand Down Expand Up @@ -73,9 +73,9 @@ public interface ServiceBusConfigurator :
void BeforeConsumingMessage(Action beforeConsume);

/// <summary>
/// Specifies an action to call after a message is consumed. Implementors
/// should take care to not remove previously set actions so that multiple
/// calls to this method generates calls to all those action parameters.
/// Specifies an action to call after a message is consumed. Implementors
/// should take care to not remove previously set actions so that multiple
/// calls to this method generates calls to all those action parameters.
/// </summary>
/// <param name="afterConsume">The action to run after consumption</param>
void AfterConsumingMessage(Action afterConsume);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public ConsumerConnector()
|| interfaces.Implements(typeof (Observes<,>)))
throw new ConfigurationException("InitiatedBy, Orchestrates, and Observes can only be used with sagas");

if (interfaces.Implements(typeof (IDistributor<>))
|| interfaces.Implements(typeof (IWorker<>))
if (/*interfaces.Implements(typeof (IDistributor<>))
||*/ interfaces.Implements(typeof (IWorker<>))
|| interfaces.Implements(typeof (ISagaWorker<>)))
throw new ConfigurationException("Distributor classes can only be subscribed as instances");

Expand Down
Loading

0 comments on commit daf493e

Please sign in to comment.