From 40422a84640e720d27fcd19daebad5f1c2199283 Mon Sep 17 00:00:00 2001 From: Chris Patterson Date: Wed, 22 Dec 2010 14:32:22 -0600 Subject: [PATCH] Added test and functionality to remove old subscriptions --- .../ExtensionMethodsForSubscriptions.cs | 19 ++++++++++ .../Removing_a_subscription_client.cs | 25 +++++++++++++ .../SubscriptionServiceTestFixture.cs | 28 ++++++++++----- .../Subscriptions/Server/SubscriptionSaga.cs | 35 ++++++++++++++----- 4 files changed, 90 insertions(+), 17 deletions(-) diff --git a/src/MassTransit.TestFramework/ExtensionMethodsForSubscriptions.cs b/src/MassTransit.TestFramework/ExtensionMethodsForSubscriptions.cs index c946c68832..867c22ae2f 100644 --- a/src/MassTransit.TestFramework/ExtensionMethodsForSubscriptions.cs +++ b/src/MassTransit.TestFramework/ExtensionMethodsForSubscriptions.cs @@ -45,5 +45,24 @@ public static void ShouldHaveSubscriptionFor(this IServiceBus bus) Assert.Fail("A subscription for " + typeof (TMessage).ToFriendlyName() + " was not found on " + bus.Endpoint.Uri); } + + public static void ShouldNotHaveSubscriptionFor(this IServiceBus bus) + { + DateTime giveUpAt = DateTime.Now + Timeout; + + while (DateTime.Now < giveUpAt) + { + var inspector = new EndpointSinkLocator(typeof (TMessage)); + + bus.OutboundPipeline.Inspect(inspector); + + if (inspector.DestinationAddress == null) + return; + + Thread.Sleep(10); + } + + Assert.Fail("A subscription for " + typeof (TMessage).ToFriendlyName() + " was found on " + bus.Endpoint.Uri); + } } } \ No newline at end of file diff --git a/src/MassTransit.Tests/Subscriptions/Removing_a_subscription_client.cs b/src/MassTransit.Tests/Subscriptions/Removing_a_subscription_client.cs index 1fb023592f..5ac2d66f38 100644 --- a/src/MassTransit.Tests/Subscriptions/Removing_a_subscription_client.cs +++ b/src/MassTransit.Tests/Subscriptions/Removing_a_subscription_client.cs @@ -23,6 +23,31 @@ public void Should_not_remove_any_existing_subscriptions() } + class A + { } + } + + [TestFixture] + public class Removing_a_subscription_client_and_readding_it : + SubscriptionServiceTestFixture + { + [Test] + public void Should_remove_any_previous_subscriptions() + { + RemoteBus.Subscribe(x => { }); + + LocalBus.ShouldHaveSubscriptionFor(); + + RemoteBus.Dispose(); + + ThreadUtil.Sleep(1.Seconds()); + + SetupRemoteBus(); + + LocalBus.ShouldNotHaveSubscriptionFor(); + } + + class A { } } diff --git a/src/MassTransit.Tests/TextFixtures/SubscriptionServiceTestFixture.cs b/src/MassTransit.Tests/TextFixtures/SubscriptionServiceTestFixture.cs index 44e05d1c0a..068285dcbc 100644 --- a/src/MassTransit.Tests/TextFixtures/SubscriptionServiceTestFixture.cs +++ b/src/MassTransit.Tests/TextFixtures/SubscriptionServiceTestFixture.cs @@ -57,16 +57,18 @@ protected override void EstablishContext() SetupSubscriptionService(ObjectBuilder); - LocalControlBus = ControlBusConfigurator.New(x => - { - x.ReceiveFrom(ClientControlUri); + SetupLocalBus(); - x.PurgeBeforeStarting(); - }); + SetupRemoteBus(); - RemoteControlBus = ControlBusConfigurator.New(x => + Instances = new Dictionary(); + } + + protected void SetupLocalBus() + { + LocalControlBus = ControlBusConfigurator.New(x => { - x.ReceiveFrom(ServerControlUri); + x.ReceiveFrom(ClientControlUri); x.PurgeBeforeStarting(); }); @@ -84,6 +86,16 @@ protected override void EstablishContext() ConfigureLocalBus(x); }); + } + + protected void SetupRemoteBus() + { + RemoteControlBus = ControlBusConfigurator.New(x => + { + x.ReceiveFrom(ServerControlUri); + + x.PurgeBeforeStarting(); + }); RemoteBus = ServiceBusConfigurator.New(x => { @@ -95,8 +107,6 @@ protected override void EstablishContext() x.ReceiveFrom(ServerUri); x.UseControlBus(RemoteControlBus); }); - - Instances = new Dictionary(); } protected Dictionary Instances { get; private set; } diff --git a/src/MassTransit/Services/Subscriptions/Server/SubscriptionSaga.cs b/src/MassTransit/Services/Subscriptions/Server/SubscriptionSaga.cs index c461b876c8..65757713c7 100644 --- a/src/MassTransit/Services/Subscriptions/Server/SubscriptionSaga.cs +++ b/src/MassTransit/Services/Subscriptions/Server/SubscriptionSaga.cs @@ -19,7 +19,7 @@ namespace MassTransit.Services.Subscriptions.Server using Subscriptions.Messages; /// - /// Manages the lifecycle of a subscription through the system + /// Manages the lifecycle of a subscription through the system /// public class SubscriptionSaga : SagaStateMachine, @@ -29,26 +29,43 @@ static SubscriptionSaga() { Define(() => { - Correlate(ClientRemoved) + Correlate(DuplicateClientRemoved) .By((saga, message) => saga.SubscriptionInfo.ClientId == message.CorrelationId && saga.CurrentState == Active); - Correlate(SubscriptionRemoved) + Correlate(ClientAdded) + .By((saga, message) => saga.SubscriptionInfo.EndpointUri == message.DataUri + && saga.SubscriptionInfo.ClientId != message.ClientId + && saga.CurrentState == Active); + + Correlate(SubscriptionRemoved) .By((saga, message) => saga.SubscriptionInfo.SubscriptionId == message.CorrelationId && saga.CurrentState == Active); + Correlate(DuplicateSubscriptionAdded) + .By((saga, message) => saga.SubscriptionInfo.MessageName == message.Subscription.MessageName + && saga.SubscriptionInfo.EndpointUri == message.Subscription.EndpointUri + && saga.SubscriptionInfo.ClientId != message.Subscription.ClientId + && saga.CurrentState == Active); + Initially( When(SubscriptionAdded) .Then((saga, message) => { saga.SubscriptionInfo = message.Subscription; - saga.NotifySubscriptionAdded(); - }).TransitionTo(Active)); + }) + .TransitionTo(Active)); During(Active, When(SubscriptionRemoved) - .Then((saga, message) => { saga.NotifySubscriptionRemoved(message.Subscription); }) + .Then((saga, message) => saga.NotifySubscriptionRemoved(message.Subscription)) + .Complete(), + When(DuplicateSubscriptionAdded) + .Then((saga, message) => saga.NotifySubscriptionRemoved(message.Subscription)) + .Complete(), + When(DuplicateClientRemoved) + .Then((saga, message) => saga.NotifySubscriptionRemoved()) .Complete(), - When(ClientRemoved) + When(ClientAdded) .Then((saga, message) => saga.NotifySubscriptionRemoved()) .Complete() ); @@ -70,9 +87,11 @@ protected SubscriptionSaga() public static State Completed { get; set; } public static Event SubscriptionAdded { get; set; } + public static Event DuplicateSubscriptionAdded { get; set; } public static Event SubscriptionRemoved { get; set; } - public static Event ClientRemoved { get; set; } + public static Event DuplicateClientRemoved { get; set; } + public static Event ClientAdded { get; set; } public virtual SubscriptionInformation SubscriptionInfo { get; set; }