Skip to content

Commit

Permalink
Merge pull request #1975 from Aaronontheweb/1945-cluster-terminate-hooks
Browse files Browse the repository at this point in the history
added cluster termination hooks and added RegisterOnMemberRemoved
  • Loading branch information
Horusiath committed May 31, 2016
2 parents b2a5a70 + 536f281 commit 697e4cd
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 49 deletions.
47 changes: 47 additions & 0 deletions src/core/Akka.Cluster.Tests/ClusterSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Immutable;
using System.Linq;
using Akka.Actor;
using Akka.Configuration;
using Akka.TestKit;
using Akka.Util.Internal;
using Xunit;
Expand Down Expand Up @@ -105,13 +107,58 @@ public void A_cluster_must_publish_member_removed_when_shutdown()
{
_cluster.Join(_selfAddress);
LeaderActions(); // Joining -> Up

var callbackProbe = CreateTestProbe();
_cluster.RegisterOnMemberRemoved(() =>
{
callbackProbe.Tell("OnMemberRemoved");
});

_cluster.Subscribe(TestActor, new []{typeof(ClusterEvent.MemberRemoved)});
// first, is in response to the subscription
ExpectMsg<ClusterEvent.CurrentClusterState>();

_cluster.Shutdown();
var memberRemoved = ExpectMsg<ClusterEvent.MemberRemoved>();
Assert.Equal(_selfAddress, memberRemoved.Member.Address);

callbackProbe.ExpectMsg("OnMemberRemoved");
}

// TODO: https://github.com/akkadotnet/akka.net/issues/1983
[Fact(Skip = "fails for now - will need to implement https://github.com/akkadotnet/akka.net/issues/1983")]
public void A_cluster_must_be_allowed_to_join_and_leave_with_local_address()
{
var sys2 = ActorSystem.Create("ClusterSpec2", ConfigurationFactory.ParseString(@"akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
akka.remote.helios.tcp.port = 0"));

try
{
var ref2 = sys2.ActorOf(Props.Empty);
Cluster.Get(sys2).Join(ref2.Path.Address); // address doesn't contain full address information
Within(TimeSpan.FromSeconds(5), () =>
{
AwaitAssert(() =>
{
Cluster.Get(sys2).State.Members.Count.ShouldBe(1);
Cluster.Get(sys2).State.Members.First().Status.ShouldBe(MemberStatus.Up);
});
});

Cluster.Get(sys2).Leave(ref2.Path.Address);

Within(TimeSpan.FromSeconds(5), () =>
{
AwaitAssert(() =>
{
Cluster.Get(sys2).IsTerminated.ShouldBe(true);
});
});
}
finally
{
Shutdown(sys2);
}
}
}
}
Expand Down
45 changes: 34 additions & 11 deletions src/core/Akka.Cluster/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public override Cluster CreateExtension(ExtendedActorSystem system)
/// the cluster address of this actor system is [[#selfAddress]]. A member also has a status;
/// initially <see cref="Akka.Cluster.MemberStatus.Joining"/> followed by <see cref="Akka.Cluster.MemberStatus.Up"/>.
/// </summary>
public class Cluster :IExtension
public class Cluster : IExtension
{
//TODO: Issue with missing overrides for Get and Lookup

Expand All @@ -53,18 +53,18 @@ public static bool IsAssertInvariantsEnabled
readonly ClusterSettings _settings;
public ClusterSettings Settings { get { return _settings; } }
readonly UniqueAddress _selfUniqueAddress;
public UniqueAddress SelfUniqueAddress {get{return _selfUniqueAddress;}}
public UniqueAddress SelfUniqueAddress { get { return _selfUniqueAddress; } }

public Cluster(ActorSystemImpl system)
{
System = system;
_settings = new ClusterSettings(system.Settings.Config, system.Name);
_settings = new ClusterSettings(system.Settings.Config, system.Name);

var provider = system.Provider as ClusterActorRefProvider;
if(provider == null)
if (provider == null)
throw new ConfigurationException(
String.Format("ActorSystem {0} needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses {1}",
system,
String.Format("ActorSystem {0} needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses {1}",
system,
system.Provider.GetType().FullName));
_selfUniqueAddress = new UniqueAddress(provider.Transport.DefaultAddress, AddressUidExtension.Uid(system));

Expand Down Expand Up @@ -137,7 +137,7 @@ public void Subscribe(IActorRef subscriber, ClusterEvent.SubscriptionInitialStat
/// </summary>
public void Unsubscribe(IActorRef subscriber)
{
Unsubscribe(subscriber,null);
Unsubscribe(subscriber, null);
}

/// <summary>
Expand Down Expand Up @@ -220,12 +220,26 @@ public void Down(Address address)
/// Typically used together with configuration option 'akka.cluster.min-nr-of-members' to defer some action,
/// such as starting actors, until the cluster has reached a certain size.
/// </summary>
/// <param name="callback"></param>
/// <param name="callback">The callback that will be run whenever the current member achieves a status of <see cref="MemberStatus.Up"/></param>
public void RegisterOnMemberUp(Action callback)
{
_clusterDaemons.Tell(new InternalClusterAction.AddOnMemberUpListener(callback));
}

/// <summary>
/// The supplied callback will be run once when the current cluster member is <see cref="MemberStatus.Removed"/>.
///
/// Typically used in combination with <see cref="Leave"/> and <see cref="ActorSystem.Terminate"/>.
/// </summary>
/// <param name="callback">The callback that will be run whenever the current member achieves a status of <see cref="MemberStatus.Down"/></param>
public void RegisterOnMemberRemoved(Action callback)
{
if (IsTerminated)
callback();
else
_clusterDaemons.Tell(new InternalClusterAction.AddOnMemberRemovedListener(callback));
}

/// <summary>
/// The address of this cluster member.
/// </summary>
Expand All @@ -252,7 +266,7 @@ public ImmutableHashSet<string> SelfRoles

readonly ILoggingAdapter _log;
readonly ClusterReadView _readView;
public ClusterReadView ReadView {get { return _readView; }}
public ClusterReadView ReadView { get { return _readView; } }

readonly DefaultFailureDetectorRegistry<Address> _failureDetector;
public DefaultFailureDetectorRegistry<Address> FailureDetector { get { return _failureDetector; } }
Expand All @@ -270,7 +284,16 @@ private static IScheduler CreateScheduler(ActorSystem system)
return system.Scheduler;
}

public void Shutdown()
/// <summary>
/// INTERNAL API.
///
/// Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
/// Should not called by the user.
///
/// The user can issue a <see cref="Leave"/> command which will tell the node
/// to go through graceful handoff process `LEAVE -> EXITING -> REMOVED -> SHUTDOWN`.
/// </summary>
internal void Shutdown()
{
if (_isTerminated.CompareAndSet(false, true))
{
Expand Down
94 changes: 56 additions & 38 deletions src/core/Akka.Cluster/ClusterDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -431,21 +431,29 @@ public IActorRef Publisher

/// <summary>
/// Command to <see cref="Akka.Cluster.ClusterDaemon"/> to create a
/// <see cref="Akka.Cluster.OnMemberUpListener"/>
/// <see cref="OnMemberStatusChangedListener"/>
/// </summary>
public sealed class AddOnMemberUpListener : INoSerializationVerificationNeeded
{
readonly Action _callback;

public AddOnMemberUpListener(Action callback)
{
_callback = callback;
Callback = callback;
}

public Action Callback
public Action Callback { get; }
}

/// <summary>
/// Command to the <see cref="ClusterDaemon"/> to create a
/// </summary>
public sealed class AddOnMemberRemovedListener : INoSerializationVerificationNeeded
{
public AddOnMemberRemovedListener(Action callback)
{
get { return _callback; }
Callback = callback;
}

public Action Callback { get; }
}

public interface ISubscriptionMessage { }
Expand Down Expand Up @@ -554,7 +562,7 @@ public ClusterEvent.IClusterDomainEvent Event
/// <summary>
/// Supervisor managing the different Cluster daemons.
/// </summary>
internal sealed class ClusterDaemon : UntypedActor, IRequiresMessageQueue<IUnboundedMessageQueueSemantics>
internal sealed class ClusterDaemon : ReceiveActor
{
IActorRef _coreSupervisor;
readonly ClusterSettings _settings;
Expand All @@ -564,31 +572,31 @@ public ClusterDaemon(ClusterSettings settings)
// Important - don't use Cluster(context.system) in constructor because that would
// cause deadlock. The Cluster extension is currently being created and is waiting
// for response from GetClusterCoreRef in its constructor.
Receive<InternalClusterAction.GetClusterCoreRef>(msg =>
{
if(_coreSupervisor == null)
CreateChildren();
_coreSupervisor.Forward(msg);
});
Receive<InternalClusterAction.AddOnMemberUpListener>(msg =>
Context.ActorOf(Props.Create(() => new OnMemberStatusChangedListener(msg.Callback, MemberStatus.Up)).WithDispatcher(_settings.UseDispatcher).WithDeploy(Deploy.Local)));
Receive <InternalClusterAction.AddOnMemberRemovedListener>(msg =>
{
Context.ActorOf(
Props.Create(() => new OnMemberStatusChangedListener(msg.Callback, MemberStatus.Removed))
.WithDispatcher(_settings.UseDispatcher)
.WithDeploy(Deploy.Local));
});
Receive<InternalClusterAction.PublisherCreated>(msg =>
{
if (_settings.MetricsEnabled)
Context.ActorOf(
Props.Create<ClusterHeartbeatReceiver>().WithDispatcher(_settings.UseDispatcher),
"metrics");
});
_settings = settings;
}

protected override void OnReceive(object message)
{
message.Match()
.With<InternalClusterAction.GetClusterCoreRef>(msg =>
{
if(_coreSupervisor == null)
CreateChildren();
_coreSupervisor.Forward(msg);
})
.With<InternalClusterAction.AddOnMemberUpListener>(
msg =>
Context.ActorOf(Props.Create(() => new OnMemberUpListener(msg.Callback)).WithDispatcher(_settings.UseDispatcher).WithDeploy(Deploy.Local)))
.With<InternalClusterAction.PublisherCreated>(
msg =>
{
if (_settings.MetricsEnabled)
Context.ActorOf(
Props.Create<ClusterHeartbeatReceiver>().WithDispatcher(_settings.UseDispatcher),
"metrics");
});
}

private void CreateChildren()
{
_coreSupervisor = Context.ActorOf(Props.Create<ClusterCoreSupervisor>().WithDispatcher(_settings.UseDispatcher), "core");
Expand Down Expand Up @@ -2015,38 +2023,48 @@ public GossipStats Copy(long? receivedGossipCount = null,
/// <summary>
/// INTERNAL API
///
/// The supplied callback will be run once when the current cluster member is <see cref="MemberStatus.Up"/>
/// The supplied callback will be run once when the current cluster member has the same status.
/// </summary>
class OnMemberUpListener : ReceiveActor
internal class OnMemberStatusChangedListener : ReceiveActor
{
readonly Action _callback;
readonly ILoggingAdapter _log = Context.GetLogger();
readonly Cluster _cluster;
private readonly MemberStatus _targetStatus;

public OnMemberUpListener(Action callback)
public OnMemberStatusChangedListener(Action callback, MemberStatus targetStatus)
{
_targetStatus = targetStatus;
_callback = callback;
_cluster = Cluster.Get(Context.System);
Receive<ClusterEvent.CurrentClusterState>(state =>
{
if (state.Members.Any(IsSelfUp))
if (state.Members.Any(IsTriggered))
Done();
});

Receive<ClusterEvent.MemberUp>(up =>
{
if (IsSelfUp(up.Member))
if (IsTriggered(up.Member))
Done();
});

Receive<ClusterEvent.MemberRemoved>(removed =>
{
if (IsTriggered(removed.Member))
Done();
});
}

protected override void PreStart()
{
_cluster.Subscribe(Self, new[] { typeof(ClusterEvent.MemberUp) });
_cluster.Subscribe(Self, new[] { typeof(ClusterEvent.MemberUp), typeof(ClusterEvent.MemberRemoved) });
}

protected override void PostStop()
{
if (_targetStatus == MemberStatus.Removed)
Done();
_cluster.Unsubscribe(Self);
}

Expand All @@ -2058,17 +2076,17 @@ private void Done()
}
catch (Exception ex)
{
_log.Error(ex, "OnMemberUp callback failed with [{0}]", ex.Message);
_log.Error(ex, "{0} callback failed with [{1}]", _targetStatus, ex.Message);
}
finally
{
Context.Stop(Self);
}
}

private bool IsSelfUp(Member m)
private bool IsTriggered(Member m)
{
return m.UniqueAddress == _cluster.SelfUniqueAddress && m.Status == MemberStatus.Up;
return m.UniqueAddress == _cluster.SelfUniqueAddress && m.Status == _targetStatus;
}
}

Expand Down

0 comments on commit 697e4cd

Please sign in to comment.