Skip to content

Commit

Permalink
Add timeout to abort joining of seed nodes (#3863)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed authored and Aaronontheweb committed Dec 20, 2019
1 parent fc43334 commit 23c2206
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 60 deletions.
Expand Up @@ -208,6 +208,7 @@ namespace Akka.Cluster
public int SchedulerTicksPerWheel { get; }
public System.Collections.Immutable.ImmutableList<Akka.Actor.Address> SeedNodes { get; }
public System.TimeSpan SeedNodeTimeout { get; }
public System.Nullable<System.TimeSpan> ShutdownAfterUnsuccessfulJoinSeedNodes { get; }
public System.TimeSpan UnreachableNodesReaperInterval { get; }
public string UseDispatcher { get; }
public bool VerboseGossipReceivedLogging { get; }
Expand Down
4 changes: 4 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Expand Up @@ -522,6 +522,10 @@ namespace Akka.Actor
{
public static Akka.Actor.CoordinatedShutdown.Reason Instance;
}
public class ClusterJoinUnsuccessfulReason : Akka.Actor.CoordinatedShutdown.Reason
{
public static Akka.Actor.CoordinatedShutdown.Reason Instance;
}
public class ClusterLeavingReason : Akka.Actor.CoordinatedShutdown.Reason
{
public static Akka.Actor.CoordinatedShutdown.Reason Instance;
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs
Expand Up @@ -11,9 +11,8 @@
using Akka.Dispatch;
using Akka.Remote;
using Akka.TestKit;
using Xunit;
using Assert = Xunit.Assert;
using FluentAssertions;
using Xunit;

namespace Akka.Cluster.Tests
{
Expand All @@ -30,6 +29,7 @@ public void Clustering_must_be_able_to_parse_generic_cluster_config_elements()
settings.SeedNodes.Should().BeEquivalentTo(ImmutableList.Create<Address>());
settings.SeedNodeTimeout.Should().Be(5.Seconds());
settings.RetryUnsuccessfulJoinAfter.Should().Be(10.Seconds());
settings.ShutdownAfterUnsuccessfulJoinSeedNodes .Should().Be(null);
settings.PeriodicTasksInitialDelay.Should().Be(1.Seconds());
settings.GossipInterval.Should().Be(1.Seconds());
settings.GossipTimeToLive.Should().Be(2.Seconds());
Expand Down
64 changes: 64 additions & 0 deletions src/core/Akka.Cluster.Tests/ShutdownAfterJoinSeedNodesSpec.cs
@@ -0,0 +1,64 @@
//-----------------------------------------------------------------------
// <copyright file="SplitBrainResolverConfigSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2018 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Configuration;
using Akka.TestKit;
using Xunit;

namespace Akka.Cluster.Tests
{
public class ShutdownAfterJoinSeedNodesSpec : AkkaSpec
{
public static readonly Config Config = ConfigurationFactory.ParseString(@"
akka.actor.provider = ""cluster""
akka.coordinated-shutdown.terminate-actor-system = on
akka.remote.dot-netty.tcp.port = 0
akka.cluster {
seed-node-timeout = 2s
retry-unsuccessful-join-after = 2s
shutdown-after-unsuccessful-join-seed-nodes = 5s
}");

private readonly ActorSystem _seed1;
private readonly ActorSystem _seed2;
private readonly ActorSystem _ordinary1;

public ShutdownAfterJoinSeedNodesSpec() : base(Config)
{
_seed1 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
_seed2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
_ordinary1 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
}

protected override void AfterTermination()
{
base.AfterTermination();
Shutdown(_seed1);
Shutdown(_seed2);
Shutdown(_ordinary1);
}

[Fact]
public void Joining_seed_nodes_must_be_aborted_after_shutdown_after_unsuccessful_join_seed_nodes()
{
var seedNodes = ImmutableList.Create(
Cluster.Get(_seed1).SelfAddress,
Cluster.Get(_seed2).SelfAddress);

Shutdown(_seed1); // crash so that others will not be able to join

Cluster.Get(_seed2).JoinSeedNodes(seedNodes);
Cluster.Get(_ordinary1).JoinSeedNodes(seedNodes);

AwaitCondition(() => _seed2.WhenTerminated.IsCompleted, Cluster.Get(_seed2).Settings.ShutdownAfterUnsuccessfulJoinSeedNodes + TimeSpan.FromSeconds(10));
AwaitCondition(() => _ordinary1.WhenTerminated.IsCompleted, Cluster.Get(_ordinary1).Settings.ShutdownAfterUnsuccessfulJoinSeedNodes + TimeSpan.FromSeconds(10));
}
}
}
145 changes: 88 additions & 57 deletions src/core/Akka.Cluster/ClusterDaemon.cs
Expand Up @@ -13,7 +13,6 @@
using Akka.Actor;
using Akka.Dispatch;
using Akka.Event;
using Akka.Pattern;
using Akka.Remote;
using Akka.Util;
using Akka.Util.Internal;
Expand Down Expand Up @@ -998,6 +997,7 @@ internal static string VclockName(UniqueAddress node)
private ImmutableList<Address> _seedNodes;
private IActorRef _seedNodeProcess;
private int _seedNodeProcessCounter = 0; //for unique names
private Deadline _joinSeedNodesDeadline;

readonly IActorRef _publisher;
private int _leaderActionCounter = 0;
Expand Down Expand Up @@ -1209,72 +1209,103 @@ private bool ReceiveExitingCompleted(object message)

private void Uninitialized(object message)
{
if (message is InternalClusterAction.InitJoin)
switch (message)
{
_cluster.LogInfo("Received InitJoin message from [{0}], but this node is not initialized yet", Sender);
Sender.Tell(new InternalClusterAction.InitJoinNack(_cluster.SelfAddress));
}
else if (message is ClusterUserAction.JoinTo jt)
{
Join(jt.Address);
}
else if (message is InternalClusterAction.JoinSeedNodes js)
{
JoinSeedNodes(js.SeedNodes);
}
else if (message is InternalClusterAction.ISubscriptionMessage isub)
{
_publisher.Forward(isub);
}
else if (ReceiveExitingCompleted(message)) { }
else
{
Unhandled(message);
case InternalClusterAction.InitJoin _:
{
_cluster.LogInfo("Received InitJoin message from [{0}], but this node is not initialized yet", Sender);
Sender.Tell(new InternalClusterAction.InitJoinNack(_cluster.SelfAddress));
break;
}
case ClusterUserAction.JoinTo jt:
Join(jt.Address);
break;
case InternalClusterAction.JoinSeedNodes js:
{
ResetJoinSeedNodesDeadline();
JoinSeedNodes(js.SeedNodes);
break;
}
case InternalClusterAction.ISubscriptionMessage isub:
_publisher.Forward(isub);
break;
case InternalClusterAction.ITick _:
if (_joinSeedNodesDeadline != null && _joinSeedNodesDeadline.IsOverdue) JoinSeedNodesWasUnsuccessful();
break;
default:
if (!ReceiveExitingCompleted(message)) Unhandled(message);
break;
}
}

private void TryingToJoin(object message, Address joinWith, Deadline deadline)
{
if (message is InternalClusterAction.Welcome w)
{
Welcome(joinWith, w.From, w.Gossip);
}
else if (message is InternalClusterAction.InitJoin)
{
_cluster.LogInfo("Received InitJoin message from [{0}], but this node is not initialized yet", Sender);
Sender.Tell(new InternalClusterAction.InitJoinNack(_cluster.SelfAddress));
}
else if (message is ClusterUserAction.JoinTo jt)
{
BecomeUninitialized();
Join(jt.Address);
}
else if (message is InternalClusterAction.JoinSeedNodes js)
{
BecomeUninitialized();
JoinSeedNodes(js.SeedNodes);
}
else if (message is InternalClusterAction.ISubscriptionMessage isub)
{
_publisher.Forward(isub);
}
else if (message is InternalClusterAction.ITick)
switch (message)
{
if (deadline != null && deadline.IsOverdue)
{
// join attempt failed, retry
BecomeUninitialized();
if (!_seedNodes.IsEmpty) JoinSeedNodes(_seedNodes);
else Join(joinWith);
}
}
else if (ReceiveExitingCompleted(message)) { }
else
{
Unhandled(message);
case InternalClusterAction.Welcome w:
Welcome(joinWith, w.From, w.Gossip);
break;
case InternalClusterAction.InitJoin _:
{
_cluster.LogInfo("Received InitJoin message from [{0}], but this node is not initialized yet", Sender);
Sender.Tell(new InternalClusterAction.InitJoinNack(_cluster.SelfAddress));
break;
}

case ClusterUserAction.JoinTo jt:
{
BecomeUninitialized();
Join(jt.Address);
break;
}
case InternalClusterAction.JoinSeedNodes js:
{
ResetJoinSeedNodesDeadline();
BecomeUninitialized();
JoinSeedNodes(js.SeedNodes);
break;
}
case InternalClusterAction.ISubscriptionMessage isub:
_publisher.Forward(isub);
break;
case InternalClusterAction.ITick _:
{
if (_joinSeedNodesDeadline != null && _joinSeedNodesDeadline.IsOverdue)
{
JoinSeedNodesWasUnsuccessful();
}
else if (deadline != null && deadline.IsOverdue)
{
// join attempt failed, retry
BecomeUninitialized();
if (!_seedNodes.IsEmpty) JoinSeedNodes(_seedNodes);
else Join(joinWith);
}

break;
}
default:
if (!ReceiveExitingCompleted(message)) Unhandled(message);
break;
}
}

private void ResetJoinSeedNodesDeadline()
{
_joinSeedNodesDeadline = _cluster.Settings.ShutdownAfterUnsuccessfulJoinSeedNodes != null
? Deadline.Now + _cluster.Settings.ShutdownAfterUnsuccessfulJoinSeedNodes
: null;
}

private void JoinSeedNodesWasUnsuccessful()
{
_log.Warning("Joining of seed-nodes [{0}] was unsuccessful after configured shutdown-after-unsuccessful-join-seed-nodes [{1}]. Running CoordinatedShutdown.",
string.Join(", ", _seedNodes), _cluster.Settings.ShutdownAfterUnsuccessfulJoinSeedNodes);

_joinSeedNodesDeadline = null;
_coordShutdown.Run(CoordinatedShutdown.ClusterJoinUnsuccessfulReason.Instance);
}

private void BecomeUninitialized()
{
// make sure that join process is stopped
Expand Down
6 changes: 6 additions & 0 deletions src/core/Akka.Cluster/ClusterSettings.cs
Expand Up @@ -41,6 +41,7 @@ public ClusterSettings(Config config, string systemName)
SeedNodes = cc.GetStringList("seed-nodes").Select(Address.Parse).ToImmutableList();
SeedNodeTimeout = cc.GetTimeSpan("seed-node-timeout");
RetryUnsuccessfulJoinAfter = cc.GetTimeSpanWithOffSwitch("retry-unsuccessful-join-after");
ShutdownAfterUnsuccessfulJoinSeedNodes = cc.GetTimeSpanWithOffSwitch("shutdown-after-unsuccessful-join-seed-nodes");
PeriodicTasksInitialDelay = cc.GetTimeSpan("periodic-tasks-initial-delay");
GossipInterval = cc.GetTimeSpan("gossip-interval");
GossipTimeToLive = cc.GetTimeSpan("gossip-time-to-live");
Expand Down Expand Up @@ -128,6 +129,11 @@ public ClusterSettings(Config config, string systemName)
/// </summary>
public TimeSpan? RetryUnsuccessfulJoinAfter { get; }

/// <summary>
/// TBD
/// </summary>
public TimeSpan? ShutdownAfterUnsuccessfulJoinSeedNodes { get; }

/// <summary>
/// TBD
/// </summary>
Expand Down
15 changes: 14 additions & 1 deletion src/core/Akka.Cluster/Configuration/Cluster.conf
Expand Up @@ -15,13 +15,26 @@ akka {
# Leave as empty if the node is supposed to be joined manually.
seed-nodes = []

# how long to wait for one of the seed nodes to reply to initial join request
# How long to wait for one of the seed nodes to reply to initial join request.
# When this is the first seed node and there is no positive reply from the other
# seed nodes within this timeout it will join itself to bootstrap the cluster.
# When this is not the first seed node the join attempts will be performed with
# this interval.
seed-node-timeout = 5s

# If a join request fails it will be retried after this period.
# Disable join retry by specifying "off".
retry-unsuccessful-join-after = 10s

# The joining of given seed nodes will by default be retried indefinitely until
# a successful join. That process can be aborted if unsuccessful by defining this
# timeout. When aborted it will run CoordinatedShutdown, which by default will
# terminate the ActorSystem. CoordinatedShutdown can also be configured to exit
# the JVM. It is useful to define this timeout if the seed-nodes are assembled
# dynamically and a restart with new seed-nodes should be tried after unsuccessful
# attempts.
shutdown-after-unsuccessful-join-seed-nodes = off

# Should the 'leader' in the cluster be allowed to automatically mark
# unreachable nodes as DOWN after a configured time of unreachability?
# Using auto-down implies that two separate clusters will automatically be
Expand Down
9 changes: 9 additions & 0 deletions src/core/Akka/Actor/CoordinatedShutdown.cs
Expand Up @@ -222,6 +222,15 @@ private ClusterLeavingReason()
}
}

/// <summary>
/// The shutdown was initiated by a failure to join a seed node.
/// </summary>
public class ClusterJoinUnsuccessfulReason : Reason
{
public static Reason Instance = new ClusterJoinUnsuccessfulReason();
private ClusterJoinUnsuccessfulReason() { }
}

/// <summary>
/// The <see cref="ActorSystem"/>
/// </summary>
Expand Down

0 comments on commit 23c2206

Please sign in to comment.