Skip to content

Commit

Permalink
Make JoinAsync and JoinSeedNodesAsync more robust by checking cluster…
Browse files Browse the repository at this point in the history
… UP status (#6033)

* Make JoinAsync and JoinSeedNodesAsync more robust by using an async state

* Update how join state is being handled

* Fix missing exception

* Update unit test

* Update API Verify list

* update IsUp check

* Change IsUp implementation to check SelfMember instead

* Remove state handling code

* Remove spec

* Revert state changes

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb committed Jul 27, 2022
1 parent 59cda86 commit b1eb688
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 11 deletions.
Expand Up @@ -29,6 +29,7 @@ namespace Akka.Cluster
public Akka.Cluster.IDowningProvider DowningProvider { get; }
public Akka.Remote.DefaultFailureDetectorRegistry<Akka.Actor.Address> FailureDetector { get; }
public bool IsTerminated { get; }
public bool IsUp { get; }
public Akka.Actor.Address SelfAddress { get; }
public Akka.Cluster.Member SelfMember { get; }
public System.Collections.Immutable.ImmutableHashSet<string> SelfRoles { get; }
Expand Down
Expand Up @@ -29,6 +29,7 @@ namespace Akka.Cluster
public Akka.Cluster.IDowningProvider DowningProvider { get; }
public Akka.Remote.DefaultFailureDetectorRegistry<Akka.Actor.Address> FailureDetector { get; }
public bool IsTerminated { get; }
public bool IsUp { get; }
public Akka.Actor.Address SelfAddress { get; }
public Akka.Cluster.Member SelfMember { get; }
public System.Collections.Immutable.ImmutableHashSet<string> SelfRoles { get; }
Expand Down
Expand Up @@ -29,6 +29,7 @@ namespace Akka.Cluster
public Akka.Cluster.IDowningProvider DowningProvider { get; }
public Akka.Remote.DefaultFailureDetectorRegistry<Akka.Actor.Address> FailureDetector { get; }
public bool IsTerminated { get; }
public bool IsUp { get; }
public Akka.Actor.Address SelfAddress { get; }
public Akka.Cluster.Member SelfMember { get; }
public System.Collections.Immutable.ImmutableHashSet<string> SelfRoles { get; }
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka.Cluster.Tests/ClusterSpec.cs
Expand Up @@ -12,6 +12,7 @@
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Event;
using Akka.TestKit;
using Akka.TestKit.Extensions;
using Akka.Util.Internal;
Expand Down
45 changes: 34 additions & 11 deletions src/core/Akka.Cluster/Cluster.cs
Expand Up @@ -264,11 +264,16 @@ public void Join(Address address)
/// <returns>Task which completes, once current cluster node reaches <see cref="MemberStatus.Up"/> state.</returns>
public Task JoinAsync(Address address, CancellationToken token = default)
{
if (_isTerminated)
throw new ClusterJoinFailedException("Cluster has already been terminated");

if (IsUp)
return Task.CompletedTask;

var completion = new TaskCompletionSource<NotUsed>(TaskCreationOptions.RunContinuationsAsynchronously);

var timeout = Settings.RetryUnsuccessfulJoinAfter ?? TimeSpan.FromSeconds(10);
var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(token);
timeoutCts.CancelAfter(timeout);
timeoutCts.CancelAfter(Settings.SeedNodeTimeout);
timeoutCts.Token.Register(() =>
{
timeoutCts.Dispose();
Expand All @@ -281,10 +286,10 @@ public Task JoinAsync(Address address, CancellationToken token = default)
timeoutCts.Dispose();
completion.TrySetResult(NotUsed.Instance);
});

Join(address);

return completion.Task.WithCancellation(token);
return completion.Task;
}

private Address FillLocal(Address address)
Expand Down Expand Up @@ -333,28 +338,35 @@ public void JoinSeedNodes(IEnumerable<Address> seedNodes)
/// </summary>
/// <param name="seedNodes">TBD</param>
/// <param name="token">TBD</param>
public Task JoinSeedNodesAsync(IEnumerable<Address> seedNodes, CancellationToken token = default(CancellationToken))
public Task JoinSeedNodesAsync(IEnumerable<Address> seedNodes, CancellationToken token = default)
{
if (_isTerminated)
throw new ClusterJoinFailedException("Cluster has already been terminated");

if (IsUp)
return Task.CompletedTask;

var completion = new TaskCompletionSource<NotUsed>(TaskCreationOptions.RunContinuationsAsynchronously);
var nodes = seedNodes.ToList();

var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(token);
timeoutCts.CancelAfter(Settings.SeedNodeTimeout);
timeoutCts.Token.Register(() =>
{
timeoutCts.Dispose();
completion.TrySetException(new ClusterJoinFailedException(
$"Node has not managed to join the cluster using provided seed node addresses: {string.Join(", ", seedNodes)}."));
$"Node has not managed to join the cluster using provided seed node addresses: {string.Join(", ", nodes)}."));
});

RegisterOnMemberUp(() =>
{
timeoutCts.Dispose();
completion.TrySetResult(NotUsed.Instance);
});

JoinSeedNodes(nodes);

JoinSeedNodes(seedNodes);

return completion.Task.WithCancellation(token);
return completion.Task;
}

/// <summary>
Expand Down Expand Up @@ -421,7 +433,10 @@ private Task LeaveSelf()
return leaveTask;

// Subscribe to MemberRemoved events
_clusterDaemons.Tell(new InternalClusterAction.AddOnMemberRemovedListener(() => tcs.TrySetResult(null)));
_clusterDaemons.Tell(new InternalClusterAction.AddOnMemberRemovedListener(() =>
{
tcs.TrySetResult(null);
}));

// Send leave message
ClusterCore.Tell(new ClusterUserAction.Leave(SelfAddress));
Expand Down Expand Up @@ -451,7 +466,10 @@ public void Down(Address address)
/// <param name="callback">The callback that is run whenever the current member achieves a status of <see cref="MemberStatus.Up"/></param>
public void RegisterOnMemberUp(Action callback)
{
_clusterDaemons.Tell(new InternalClusterAction.AddOnMemberUpListener(callback));
if (IsUp)
callback();
else
_clusterDaemons.Tell(new InternalClusterAction.AddOnMemberUpListener(callback));
}

/// <summary>
Expand Down Expand Up @@ -524,6 +542,11 @@ public ImmutableHashSet<string> SelfRoles
/// </summary>
public bool IsTerminated { get { return _isTerminated.Value; } }

/// <summary>
/// Determine whether the cluster is in the UP state.
/// </summary>
public bool IsUp => SelfMember.Status == MemberStatus.Up || SelfMember.Status == MemberStatus.WeaklyUp;

/// <summary>
/// The underlying <see cref="ActorSystem"/> supported by this plugin.
/// </summary>
Expand Down

0 comments on commit b1eb688

Please sign in to comment.