Navigation Menu

Skip to content

Commit

Permalink
Added test and functionality to remove old subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
phatboyg committed Dec 22, 2010
1 parent 3c6710b commit 40422a8
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 17 deletions.
19 changes: 19 additions & 0 deletions src/MassTransit.TestFramework/ExtensionMethodsForSubscriptions.cs
Expand Up @@ -45,5 +45,24 @@ public static void ShouldHaveSubscriptionFor<TMessage>(this IServiceBus bus)

Assert.Fail("A subscription for " + typeof (TMessage).ToFriendlyName() + " was not found on " + bus.Endpoint.Uri);
}

public static void ShouldNotHaveSubscriptionFor<TMessage>(this IServiceBus bus)
{
DateTime giveUpAt = DateTime.Now + Timeout;

while (DateTime.Now < giveUpAt)
{
var inspector = new EndpointSinkLocator(typeof (TMessage));

bus.OutboundPipeline.Inspect(inspector);

if (inspector.DestinationAddress == null)
return;

Thread.Sleep(10);
}

Assert.Fail("A subscription for " + typeof (TMessage).ToFriendlyName() + " was found on " + bus.Endpoint.Uri);
}
}
}
Expand Up @@ -23,6 +23,31 @@ public void Should_not_remove_any_existing_subscriptions()
}


class A
{ }
}

[TestFixture]
public class Removing_a_subscription_client_and_readding_it :
SubscriptionServiceTestFixture<LoopbackEndpoint>
{
[Test]
public void Should_remove_any_previous_subscriptions()
{
RemoteBus.Subscribe<A>(x => { });

LocalBus.ShouldHaveSubscriptionFor<A>();

RemoteBus.Dispose();

ThreadUtil.Sleep(1.Seconds());

SetupRemoteBus();

LocalBus.ShouldNotHaveSubscriptionFor<A>();
}


class A
{ }
}
Expand Down
Expand Up @@ -57,16 +57,18 @@ protected override void EstablishContext()

SetupSubscriptionService(ObjectBuilder);

LocalControlBus = ControlBusConfigurator.New(x =>
{
x.ReceiveFrom(ClientControlUri);
SetupLocalBus();

x.PurgeBeforeStarting();
});
SetupRemoteBus();

RemoteControlBus = ControlBusConfigurator.New(x =>
Instances = new Dictionary<string, ServiceInstance>();
}

protected void SetupLocalBus()
{
LocalControlBus = ControlBusConfigurator.New(x =>
{
x.ReceiveFrom(ServerControlUri);
x.ReceiveFrom(ClientControlUri);
x.PurgeBeforeStarting();
});
Expand All @@ -84,6 +86,16 @@ protected override void EstablishContext()
ConfigureLocalBus(x);
});
}

protected void SetupRemoteBus()
{
RemoteControlBus = ControlBusConfigurator.New(x =>
{
x.ReceiveFrom(ServerControlUri);
x.PurgeBeforeStarting();
});

RemoteBus = ServiceBusConfigurator.New(x =>
{
Expand All @@ -95,8 +107,6 @@ protected override void EstablishContext()
x.ReceiveFrom(ServerUri);
x.UseControlBus(RemoteControlBus);
});

Instances = new Dictionary<string, ServiceInstance>();
}

protected Dictionary<string, ServiceInstance> Instances { get; private set; }
Expand Down
35 changes: 27 additions & 8 deletions src/MassTransit/Services/Subscriptions/Server/SubscriptionSaga.cs
Expand Up @@ -19,7 +19,7 @@ namespace MassTransit.Services.Subscriptions.Server
using Subscriptions.Messages;

/// <summary>
/// Manages the lifecycle of a subscription through the system
/// Manages the lifecycle of a subscription through the system
/// </summary>
public class SubscriptionSaga :
SagaStateMachine<SubscriptionSaga>,
Expand All @@ -29,26 +29,43 @@ static SubscriptionSaga()
{
Define(() =>
{
Correlate(ClientRemoved)
Correlate(DuplicateClientRemoved)
.By((saga, message) => saga.SubscriptionInfo.ClientId == message.CorrelationId && saga.CurrentState == Active);
Correlate(SubscriptionRemoved)
Correlate(ClientAdded)
.By((saga, message) => saga.SubscriptionInfo.EndpointUri == message.DataUri
&& saga.SubscriptionInfo.ClientId != message.ClientId
&& saga.CurrentState == Active);
Correlate(SubscriptionRemoved)
.By((saga, message) => saga.SubscriptionInfo.SubscriptionId == message.CorrelationId && saga.CurrentState == Active);
Correlate(DuplicateSubscriptionAdded)
.By((saga, message) => saga.SubscriptionInfo.MessageName == message.Subscription.MessageName
&& saga.SubscriptionInfo.EndpointUri == message.Subscription.EndpointUri
&& saga.SubscriptionInfo.ClientId != message.Subscription.ClientId
&& saga.CurrentState == Active);
Initially(
When(SubscriptionAdded)
.Then((saga, message) =>
{
saga.SubscriptionInfo = message.Subscription;
saga.NotifySubscriptionAdded();
}).TransitionTo(Active));
})
.TransitionTo(Active));
During(Active,
When(SubscriptionRemoved)
.Then((saga, message) => { saga.NotifySubscriptionRemoved(message.Subscription); })
.Then((saga, message) => saga.NotifySubscriptionRemoved(message.Subscription))
.Complete(),
When(DuplicateSubscriptionAdded)
.Then((saga, message) => saga.NotifySubscriptionRemoved(message.Subscription))
.Complete(),
When(DuplicateClientRemoved)
.Then((saga, message) => saga.NotifySubscriptionRemoved())
.Complete(),
When(ClientRemoved)
When(ClientAdded)
.Then((saga, message) => saga.NotifySubscriptionRemoved())
.Complete()
);
Expand All @@ -70,9 +87,11 @@ protected SubscriptionSaga()
public static State Completed { get; set; }

public static Event<AddSubscription> SubscriptionAdded { get; set; }
public static Event<AddSubscription> DuplicateSubscriptionAdded { get; set; }
public static Event<RemoveSubscription> SubscriptionRemoved { get; set; }

public static Event<DuplicateSubscriptionClientRemoved> ClientRemoved { get; set; }
public static Event<DuplicateSubscriptionClientRemoved> DuplicateClientRemoved { get; set; }
public static Event<SubscriptionClientAdded> ClientAdded { get; set; }

public virtual SubscriptionInformation SubscriptionInfo { get; set; }

Expand Down

0 comments on commit 40422a8

Please sign in to comment.