diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt index f6b124bb75b..b294584b393 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt @@ -208,6 +208,7 @@ namespace Akka.Cluster public int SchedulerTicksPerWheel { get; } public System.Collections.Immutable.ImmutableList SeedNodes { get; } public System.TimeSpan SeedNodeTimeout { get; } + public System.Nullable ShutdownAfterUnsuccessfulJoinSeedNodes { get; } public System.TimeSpan UnreachableNodesReaperInterval { get; } public string UseDispatcher { get; } public bool VerboseGossipReceivedLogging { get; } diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 3b8067a340e..8a2ed400204 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -522,6 +522,10 @@ namespace Akka.Actor { public static Akka.Actor.CoordinatedShutdown.Reason Instance; } + public class ClusterJoinUnsuccessfulReason : Akka.Actor.CoordinatedShutdown.Reason + { + public static Akka.Actor.CoordinatedShutdown.Reason Instance; + } public class ClusterLeavingReason : Akka.Actor.CoordinatedShutdown.Reason { public static Akka.Actor.CoordinatedShutdown.Reason Instance; diff --git a/src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs b/src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs index 1d1582187c0..2b01afab9e6 100644 --- a/src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs @@ -11,9 +11,8 @@ using Akka.Dispatch; using Akka.Remote; using Akka.TestKit; -using Xunit; -using Assert = Xunit.Assert; using FluentAssertions; +using Xunit; namespace Akka.Cluster.Tests { @@ -30,6 +29,7 @@ public void Clustering_must_be_able_to_parse_generic_cluster_config_elements() settings.SeedNodes.Should().BeEquivalentTo(ImmutableList.Create
()); settings.SeedNodeTimeout.Should().Be(5.Seconds()); settings.RetryUnsuccessfulJoinAfter.Should().Be(10.Seconds()); + settings.ShutdownAfterUnsuccessfulJoinSeedNodes .Should().Be(null); settings.PeriodicTasksInitialDelay.Should().Be(1.Seconds()); settings.GossipInterval.Should().Be(1.Seconds()); settings.GossipTimeToLive.Should().Be(2.Seconds()); diff --git a/src/core/Akka.Cluster.Tests/ShutdownAfterJoinSeedNodesSpec.cs b/src/core/Akka.Cluster.Tests/ShutdownAfterJoinSeedNodesSpec.cs new file mode 100644 index 00000000000..e4461abfac1 --- /dev/null +++ b/src/core/Akka.Cluster.Tests/ShutdownAfterJoinSeedNodesSpec.cs @@ -0,0 +1,64 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2018 Lightbend Inc. +// Copyright (C) 2013-2018 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Immutable; +using Akka.Actor; +using Akka.Configuration; +using Akka.TestKit; +using Xunit; + +namespace Akka.Cluster.Tests +{ + public class ShutdownAfterJoinSeedNodesSpec : AkkaSpec + { + public static readonly Config Config = ConfigurationFactory.ParseString(@" + akka.actor.provider = ""cluster"" + akka.coordinated-shutdown.terminate-actor-system = on + akka.remote.dot-netty.tcp.port = 0 + akka.cluster { + seed-node-timeout = 2s + retry-unsuccessful-join-after = 2s + shutdown-after-unsuccessful-join-seed-nodes = 5s + }"); + + private readonly ActorSystem _seed1; + private readonly ActorSystem _seed2; + private readonly ActorSystem _ordinary1; + + public ShutdownAfterJoinSeedNodesSpec() : base(Config) + { + _seed1 = ActorSystem.Create(Sys.Name, Sys.Settings.Config); + _seed2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config); + _ordinary1 = ActorSystem.Create(Sys.Name, Sys.Settings.Config); + } + + protected override void AfterTermination() + { + base.AfterTermination(); + Shutdown(_seed1); + Shutdown(_seed2); + Shutdown(_ordinary1); + } + + [Fact] + public void Joining_seed_nodes_must_be_aborted_after_shutdown_after_unsuccessful_join_seed_nodes() + { + var seedNodes = ImmutableList.Create( + Cluster.Get(_seed1).SelfAddress, + Cluster.Get(_seed2).SelfAddress); + + Shutdown(_seed1); // crash so that others will not be able to join + + Cluster.Get(_seed2).JoinSeedNodes(seedNodes); + Cluster.Get(_ordinary1).JoinSeedNodes(seedNodes); + + AwaitCondition(() => _seed2.WhenTerminated.IsCompleted, Cluster.Get(_seed2).Settings.ShutdownAfterUnsuccessfulJoinSeedNodes + TimeSpan.FromSeconds(10)); + AwaitCondition(() => _ordinary1.WhenTerminated.IsCompleted, Cluster.Get(_ordinary1).Settings.ShutdownAfterUnsuccessfulJoinSeedNodes + TimeSpan.FromSeconds(10)); + } + } +} diff --git a/src/core/Akka.Cluster/ClusterDaemon.cs b/src/core/Akka.Cluster/ClusterDaemon.cs index a41cbac29c6..dabe7ab46ff 100644 --- a/src/core/Akka.Cluster/ClusterDaemon.cs +++ b/src/core/Akka.Cluster/ClusterDaemon.cs @@ -13,7 +13,6 @@ using Akka.Actor; using Akka.Dispatch; using Akka.Event; -using Akka.Pattern; using Akka.Remote; using Akka.Util; using Akka.Util.Internal; @@ -998,6 +997,7 @@ internal static string VclockName(UniqueAddress node) private ImmutableList
_seedNodes; private IActorRef _seedNodeProcess; private int _seedNodeProcessCounter = 0; //for unique names + private Deadline _joinSeedNodesDeadline; readonly IActorRef _publisher; private int _leaderActionCounter = 0; @@ -1209,72 +1209,103 @@ private bool ReceiveExitingCompleted(object message) private void Uninitialized(object message) { - if (message is InternalClusterAction.InitJoin) + switch (message) { - _cluster.LogInfo("Received InitJoin message from [{0}], but this node is not initialized yet", Sender); - Sender.Tell(new InternalClusterAction.InitJoinNack(_cluster.SelfAddress)); - } - else if (message is ClusterUserAction.JoinTo jt) - { - Join(jt.Address); - } - else if (message is InternalClusterAction.JoinSeedNodes js) - { - JoinSeedNodes(js.SeedNodes); - } - else if (message is InternalClusterAction.ISubscriptionMessage isub) - { - _publisher.Forward(isub); - } - else if (ReceiveExitingCompleted(message)) { } - else - { - Unhandled(message); + case InternalClusterAction.InitJoin _: + { + _cluster.LogInfo("Received InitJoin message from [{0}], but this node is not initialized yet", Sender); + Sender.Tell(new InternalClusterAction.InitJoinNack(_cluster.SelfAddress)); + break; + } + case ClusterUserAction.JoinTo jt: + Join(jt.Address); + break; + case InternalClusterAction.JoinSeedNodes js: + { + ResetJoinSeedNodesDeadline(); + JoinSeedNodes(js.SeedNodes); + break; + } + case InternalClusterAction.ISubscriptionMessage isub: + _publisher.Forward(isub); + break; + case InternalClusterAction.ITick _: + if (_joinSeedNodesDeadline != null && _joinSeedNodesDeadline.IsOverdue) JoinSeedNodesWasUnsuccessful(); + break; + default: + if (!ReceiveExitingCompleted(message)) Unhandled(message); + break; } } private void TryingToJoin(object message, Address joinWith, Deadline deadline) { - if (message is InternalClusterAction.Welcome w) - { - Welcome(joinWith, w.From, w.Gossip); - } - else if (message is InternalClusterAction.InitJoin) - { - _cluster.LogInfo("Received InitJoin message from [{0}], but this node is not initialized yet", Sender); - Sender.Tell(new InternalClusterAction.InitJoinNack(_cluster.SelfAddress)); - } - else if (message is ClusterUserAction.JoinTo jt) - { - BecomeUninitialized(); - Join(jt.Address); - } - else if (message is InternalClusterAction.JoinSeedNodes js) - { - BecomeUninitialized(); - JoinSeedNodes(js.SeedNodes); - } - else if (message is InternalClusterAction.ISubscriptionMessage isub) - { - _publisher.Forward(isub); - } - else if (message is InternalClusterAction.ITick) + switch (message) { - if (deadline != null && deadline.IsOverdue) - { - // join attempt failed, retry - BecomeUninitialized(); - if (!_seedNodes.IsEmpty) JoinSeedNodes(_seedNodes); - else Join(joinWith); - } - } - else if (ReceiveExitingCompleted(message)) { } - else - { - Unhandled(message); + case InternalClusterAction.Welcome w: + Welcome(joinWith, w.From, w.Gossip); + break; + case InternalClusterAction.InitJoin _: + { + _cluster.LogInfo("Received InitJoin message from [{0}], but this node is not initialized yet", Sender); + Sender.Tell(new InternalClusterAction.InitJoinNack(_cluster.SelfAddress)); + break; + } + + case ClusterUserAction.JoinTo jt: + { + BecomeUninitialized(); + Join(jt.Address); + break; + } + case InternalClusterAction.JoinSeedNodes js: + { + ResetJoinSeedNodesDeadline(); + BecomeUninitialized(); + JoinSeedNodes(js.SeedNodes); + break; + } + case InternalClusterAction.ISubscriptionMessage isub: + _publisher.Forward(isub); + break; + case InternalClusterAction.ITick _: + { + if (_joinSeedNodesDeadline != null && _joinSeedNodesDeadline.IsOverdue) + { + JoinSeedNodesWasUnsuccessful(); + } + else if (deadline != null && deadline.IsOverdue) + { + // join attempt failed, retry + BecomeUninitialized(); + if (!_seedNodes.IsEmpty) JoinSeedNodes(_seedNodes); + else Join(joinWith); + } + + break; + } + default: + if (!ReceiveExitingCompleted(message)) Unhandled(message); + break; } } + private void ResetJoinSeedNodesDeadline() + { + _joinSeedNodesDeadline = _cluster.Settings.ShutdownAfterUnsuccessfulJoinSeedNodes != null + ? Deadline.Now + _cluster.Settings.ShutdownAfterUnsuccessfulJoinSeedNodes + : null; + } + + private void JoinSeedNodesWasUnsuccessful() + { + _log.Warning("Joining of seed-nodes [{0}] was unsuccessful after configured shutdown-after-unsuccessful-join-seed-nodes [{1}]. Running CoordinatedShutdown.", + string.Join(", ", _seedNodes), _cluster.Settings.ShutdownAfterUnsuccessfulJoinSeedNodes); + + _joinSeedNodesDeadline = null; + _coordShutdown.Run(CoordinatedShutdown.ClusterJoinUnsuccessfulReason.Instance); + } + private void BecomeUninitialized() { // make sure that join process is stopped diff --git a/src/core/Akka.Cluster/ClusterSettings.cs b/src/core/Akka.Cluster/ClusterSettings.cs index e8f7e9c05e6..69dafc65209 100644 --- a/src/core/Akka.Cluster/ClusterSettings.cs +++ b/src/core/Akka.Cluster/ClusterSettings.cs @@ -41,6 +41,7 @@ public ClusterSettings(Config config, string systemName) SeedNodes = cc.GetStringList("seed-nodes").Select(Address.Parse).ToImmutableList(); SeedNodeTimeout = cc.GetTimeSpan("seed-node-timeout"); RetryUnsuccessfulJoinAfter = cc.GetTimeSpanWithOffSwitch("retry-unsuccessful-join-after"); + ShutdownAfterUnsuccessfulJoinSeedNodes = cc.GetTimeSpanWithOffSwitch("shutdown-after-unsuccessful-join-seed-nodes"); PeriodicTasksInitialDelay = cc.GetTimeSpan("periodic-tasks-initial-delay"); GossipInterval = cc.GetTimeSpan("gossip-interval"); GossipTimeToLive = cc.GetTimeSpan("gossip-time-to-live"); @@ -128,6 +129,11 @@ public ClusterSettings(Config config, string systemName) /// public TimeSpan? RetryUnsuccessfulJoinAfter { get; } + /// + /// TBD + /// + public TimeSpan? ShutdownAfterUnsuccessfulJoinSeedNodes { get; } + /// /// TBD /// diff --git a/src/core/Akka.Cluster/Configuration/Cluster.conf b/src/core/Akka.Cluster/Configuration/Cluster.conf index 6673b0965ee..436ee880f8b 100644 --- a/src/core/Akka.Cluster/Configuration/Cluster.conf +++ b/src/core/Akka.Cluster/Configuration/Cluster.conf @@ -15,13 +15,26 @@ akka { # Leave as empty if the node is supposed to be joined manually. seed-nodes = [] - # how long to wait for one of the seed nodes to reply to initial join request + # How long to wait for one of the seed nodes to reply to initial join request. + # When this is the first seed node and there is no positive reply from the other + # seed nodes within this timeout it will join itself to bootstrap the cluster. + # When this is not the first seed node the join attempts will be performed with + # this interval. seed-node-timeout = 5s # If a join request fails it will be retried after this period. # Disable join retry by specifying "off". retry-unsuccessful-join-after = 10s + # The joining of given seed nodes will by default be retried indefinitely until + # a successful join. That process can be aborted if unsuccessful by defining this + # timeout. When aborted it will run CoordinatedShutdown, which by default will + # terminate the ActorSystem. CoordinatedShutdown can also be configured to exit + # the JVM. It is useful to define this timeout if the seed-nodes are assembled + # dynamically and a restart with new seed-nodes should be tried after unsuccessful + # attempts. + shutdown-after-unsuccessful-join-seed-nodes = off + # Should the 'leader' in the cluster be allowed to automatically mark # unreachable nodes as DOWN after a configured time of unreachability? # Using auto-down implies that two separate clusters will automatically be diff --git a/src/core/Akka/Actor/CoordinatedShutdown.cs b/src/core/Akka/Actor/CoordinatedShutdown.cs index 176aac71e15..aaf11d2afcb 100644 --- a/src/core/Akka/Actor/CoordinatedShutdown.cs +++ b/src/core/Akka/Actor/CoordinatedShutdown.cs @@ -222,6 +222,15 @@ private ClusterLeavingReason() } } + /// + /// The shutdown was initiated by a failure to join a seed node. + /// + public class ClusterJoinUnsuccessfulReason : Reason + { + public static Reason Instance = new ClusterJoinUnsuccessfulReason(); + private ClusterJoinUnsuccessfulReason() { } + } + /// /// The ///