From 9c3aa678fe1e918ade5fbdb6d4dc4d5adeb99656 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 8 Jun 2022 04:10:47 +0700 Subject: [PATCH] [Async TestKit] Convert Akka.Cluster.Tests to async - ClusterSpec (#5984) * Convert Akka.Cluster.Tests to async - ClusterSpec * Change `JoinAsync()` timeout value to `akka.cluster.retry-unsuccessful-join-after`, defaulting to 10s if its null. * Fix timeout and serialization issues --- src/core/Akka.Cluster.Tests/ClusterSpec.cs | 292 +++++++++++---------- src/core/Akka.Cluster/Cluster.cs | 46 +++- 2 files changed, 192 insertions(+), 146 deletions(-) diff --git a/src/core/Akka.Cluster.Tests/ClusterSpec.cs b/src/core/Akka.Cluster.Tests/ClusterSpec.cs index 58dfce90905..d1af5561f74 100644 --- a/src/core/Akka.Cluster.Tests/ClusterSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterSpec.cs @@ -13,12 +13,14 @@ using Akka.Actor; using Akka.Configuration; using Akka.TestKit; +using Akka.TestKit.Extensions; using Akka.Util.Internal; using Xunit; using FluentAssertions; using Xunit.Abstractions; using Akka.Util; using FluentAssertions.Extensions; +using static FluentAssertions.FluentActions; namespace Akka.Cluster.Tests { @@ -30,7 +32,8 @@ public class ClusterSpec : AkkaSpec * because the JVM test suite relies on side-effects from one test to another, whereas all of our tests are fully isolated. */ - const string Config = @" + private static readonly Config Config = ConfigurationFactory.ParseString(@" + akka.loglevel = DEBUG akka.cluster { auto-down-unreachable-after = 0s periodic-tasks-initial-delay = 120 s @@ -43,7 +46,7 @@ public class ClusterSpec : AkkaSpec akka.coordinated-shutdown.terminate-actor-system = off akka.coordinated-shutdown.run-by-actor-system-terminate = off akka.remote.log-remote-lifecycle-events = off - akka.remote.dot-netty.tcp.port = 0"; + akka.remote.dot-netty.tcp.port = 0"); public IActorRef Self { get { return TestActor; } } @@ -71,28 +74,28 @@ public void A_cluster_must_use_the_address_of_the_remote_transport() } [Fact] - public void A_cluster_must_initially_become_singleton_cluster_when_joining_itself_and_reach_convergence() + public async Task A_cluster_must_initially_become_singleton_cluster_when_joining_itself_and_reach_convergence() { ClusterView.Members.Count.Should().Be(0); _cluster.Join(_selfAddress); LeaderActions(); // Joining -> Up - AwaitCondition(() => ClusterView.IsSingletonCluster); + await AwaitConditionAsync(() => ClusterView.IsSingletonCluster); ClusterView.Self.Address.Should().Be(_selfAddress); ClusterView.Members.Select(m => m.Address).ToImmutableHashSet() .Should().BeEquivalentTo(ImmutableHashSet.Create(_selfAddress)); - AwaitAssert(() => ClusterView.Status.Should().Be(MemberStatus.Up)); + await AwaitAssertAsync(() => ClusterView.Status.Should().Be(MemberStatus.Up)); ClusterView.Self.AppVersion.Should().Be(AppVersion.Create("1.2.3")); - ClusterView.Members.FirstOrDefault(i => i.Address == _selfAddress).AppVersion.Should().Be(AppVersion.Create("1.2.3")); + ClusterView.Members.First(i => i.Address == _selfAddress).AppVersion.Should().Be(AppVersion.Create("1.2.3")); ClusterView.State.HasMoreThanOneAppVersion.Should().BeFalse(); } [Fact] - public void A_cluster_must_publish_initial_state_as_snapshot_to_subscribers() + public async Task A_cluster_must_publish_initial_state_as_snapshot_to_subscribers() { try { - _cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsSnapshot, new[] { typeof(ClusterEvent.IMemberEvent) }); - ExpectMsg(); + _cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsSnapshot, typeof(ClusterEvent.IMemberEvent)); + await ExpectMsgAsync(); } finally { @@ -101,15 +104,15 @@ public void A_cluster_must_publish_initial_state_as_snapshot_to_subscribers() } [Fact] - public void A_cluster_must_publish_initial_state_as_events_to_subscribers() + public async Task A_cluster_must_publish_initial_state_as_events_to_subscribers() { try { _cluster.Join(_selfAddress); LeaderActions(); // Joining -> Up - _cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent) }); - ExpectMsg(); + _cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, typeof(ClusterEvent.IMemberEvent)); + await ExpectMsgAsync(); } finally { @@ -118,14 +121,14 @@ public void A_cluster_must_publish_initial_state_as_events_to_subscribers() } [Fact] - public void A_cluster_must_send_current_cluster_state_to_one_receiver_when_requested() + public async Task A_cluster_must_send_current_cluster_state_to_one_receiver_when_requested() { _cluster.SendCurrentClusterState(TestActor); - ExpectMsg(); + await ExpectMsgAsync(); } [Fact] - public void A_cluster_must_publish_member_removed_when_shutdown() + public async Task A_cluster_must_publish_member_removed_when_shutdown() { var callbackProbe = CreateTestProbe(); _cluster.RegisterOnMemberRemoved(() => @@ -140,42 +143,42 @@ public void A_cluster_must_publish_member_removed_when_shutdown() _cluster.Join(_selfAddress); LeaderActions(); // Joining -> Up - callbackProbe.ExpectMsg("OnMemberUp"); // verify that callback hooks are registered + await callbackProbe.ExpectMsgAsync("OnMemberUp"); // verify that callback hooks are registered _cluster.Subscribe(TestActor, new[] { typeof(ClusterEvent.MemberRemoved) }); // first, is in response to the subscription - ExpectMsg(); + await ExpectMsgAsync(); _cluster.Shutdown(); - ExpectMsg().Member.Address.Should().Be(_selfAddress); + (await ExpectMsgAsync()).Member.Address.Should().Be(_selfAddress); - callbackProbe.ExpectMsg("OnMemberRemoved"); + await callbackProbe.ExpectMsgAsync("OnMemberRemoved"); } /// /// https://github.com/akkadotnet/akka.net/issues/2442 /// [Fact] - public void BugFix_2442_RegisterOnMemberUp_should_fire_if_node_already_up() + public async Task BugFix_2442_RegisterOnMemberUp_should_fire_if_node_already_up() { _cluster.Join(_selfAddress); LeaderActions(); // Joining -> Up // Member should already be up _cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent) }); - ExpectMsg(); + await ExpectMsgAsync(); var callbackProbe = CreateTestProbe(); _cluster.RegisterOnMemberUp(() => { callbackProbe.Tell("RegisterOnMemberUp"); }); - callbackProbe.ExpectMsg("RegisterOnMemberUp"); + await callbackProbe.ExpectMsgAsync("RegisterOnMemberUp"); } [Fact] - public void A_cluster_must_complete_LeaveAsync_task_upon_being_removed() + public async Task A_cluster_must_complete_LeaveAsync_task_upon_being_removed() { var sys2 = ActorSystem.Create("ClusterSpec2", ConfigurationFactory.ParseString(@" akka.actor.provider = ""cluster"" @@ -184,31 +187,32 @@ public void A_cluster_must_complete_LeaveAsync_task_upon_being_removed() akka.coordinated-shutdown.terminate-actor-system = off akka.coordinated-shutdown.run-by-actor-system-terminate = off akka.cluster.run-coordinated-shutdown-when-down = off - ").WithFallback(Akka.TestKit.Configs.TestConfigs.DefaultConfig)); + ").WithFallback(TestKit.Configs.TestConfigs.DefaultConfig)); + InitializeLogger(sys2); var probe = CreateTestProbe(sys2); Cluster.Get(sys2).Subscribe(probe.Ref, typeof(ClusterEvent.IMemberEvent)); - probe.ExpectMsg(); + await probe.ExpectMsgAsync(); Cluster.Get(sys2).Join(Cluster.Get(sys2).SelfAddress); - probe.ExpectMsg(); + await probe.ExpectMsgAsync(); var leaveTask = Cluster.Get(sys2).LeaveAsync(); leaveTask.IsCompleted.Should().BeFalse(); - probe.ExpectMsg(); + await probe.ExpectMsgAsync(); // MemberExited might not be published before MemberRemoved - var removed = (ClusterEvent.MemberRemoved)probe.FishForMessage(m => m is ClusterEvent.MemberRemoved); + var removed = (ClusterEvent.MemberRemoved)await probe.FishForMessageAsync(m => m is ClusterEvent.MemberRemoved); removed.PreviousStatus.Should().BeEquivalentTo(MemberStatus.Exiting); - AwaitCondition(() => leaveTask.IsCompleted); + await leaveTask.ShouldCompleteWithin(RemainingOrDefault); // A second call for LeaveAsync should complete immediately (should be the same task as before) Cluster.Get(sys2).LeaveAsync().IsCompleted.Should().BeTrue(); } [Fact] - public void A_cluster_must_return_completed_LeaveAsync_task_if_member_already_removed() + public async Task A_cluster_must_return_completed_LeaveAsync_task_if_member_already_removed() { // Join cluster _cluster.Join(_selfAddress); @@ -216,26 +220,26 @@ public void A_cluster_must_return_completed_LeaveAsync_task_if_member_already_re // Subscribe to MemberRemoved and wait for confirmation _cluster.Subscribe(TestActor, typeof(ClusterEvent.MemberRemoved)); - ExpectMsg(); + await ExpectMsgAsync(); // Leave the cluster prior to calling LeaveAsync() _cluster.Leave(_selfAddress); - Within(TimeSpan.FromSeconds(10), () => + await WithinAsync(TimeSpan.FromSeconds(10), async () => { LeaderActions(); // Leaving --> Exiting LeaderActions(); // Exiting --> Removed // Member should leave - ExpectMsg().Member.Address.Should().Be(_selfAddress); + (await ExpectMsgAsync()).Member.Address.Should().Be(_selfAddress); }); // LeaveAsync() task expected to complete immediately - AwaitCondition(() => _cluster.LeaveAsync().IsCompleted); + await _cluster.LeaveAsync().ShouldCompleteWithin(RemainingOrDefault); } [Fact] - public void A_cluster_must_cancel_LeaveAsync_task_if_CancellationToken_fired_before_node_left() + public async Task A_cluster_must_cancel_LeaveAsync_task_if_CancellationToken_fired_before_node_left() { // Join cluster _cluster.Join(_selfAddress); @@ -243,20 +247,20 @@ public void A_cluster_must_cancel_LeaveAsync_task_if_CancellationToken_fired_bef // Subscribe to MemberRemoved and wait for confirmation _cluster.Subscribe(TestActor, typeof(ClusterEvent.MemberRemoved)); - ExpectMsg(); + await ExpectMsgAsync(); // Requesting leave with cancellation token var cts = new CancellationTokenSource(); var task1 = _cluster.LeaveAsync(cts.Token); // Requesting another leave without cancellation - var task2 = _cluster.LeaveAsync(new CancellationTokenSource().Token); + var task2 = _cluster.LeaveAsync(default); // Cancelling the first task cts.Cancel(); - AwaitCondition(() => task1.IsCanceled, null, "Task should be cancelled"); + await AwaitConditionAsync(() => task1.IsCanceled, null, "Task should be cancelled"); - Within(TimeSpan.FromSeconds(10), () => + await WithinAsync(TimeSpan.FromSeconds(10), async () => { // Second task should continue awaiting for cluster leave task2.IsCompleted.Should().BeFalse(); @@ -269,27 +273,28 @@ public void A_cluster_must_cancel_LeaveAsync_task_if_CancellationToken_fired_bef ExpectMsg().Member.Address.Should().Be(_selfAddress); // Second task should complete (not cancelled) - AwaitCondition(() => task2.IsCompleted && !task2.IsCanceled, null, "Task should be completed, but not cancelled."); - }); + await AwaitConditionAsync(() => task2.IsCompleted && !task2.IsCanceled, null, "Task should be completed, but not cancelled."); + }, cancellationToken: cts.Token); // Subsequent LeaveAsync() tasks expected to complete immediately (not cancelled) var task3 = _cluster.LeaveAsync(); - AwaitCondition(() => task3.IsCompleted && !task3.IsCanceled, null, "Task should be completed, but not cancelled."); + await AwaitConditionAsync(() => task3.IsCompleted && !task3.IsCanceled, null, "Task should be completed, but not cancelled."); } [Fact] - public void A_cluster_must_be_allowed_to_join_and_leave_with_local_address() + public async Task A_cluster_must_be_allowed_to_join_and_leave_with_local_address() { var sys2 = ActorSystem.Create("ClusterSpec2", ConfigurationFactory.ParseString(@"akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"" akka.remote.dot-netty.tcp.port = 0")); + InitializeLogger(sys2); try { var @ref = sys2.ActorOf(Props.Empty); Cluster.Get(sys2).Join(@ref.Path.Address); // address doesn't contain full address information - Within(5.Seconds(), () => + await WithinAsync(5.Seconds(), async () => { - AwaitAssert(() => + await AwaitAssertAsync(() => { Cluster.Get(sys2).State.Members.Count.Should().Be(1); Cluster.Get(sys2).State.Members.First().Status.Should().Be(MemberStatus.Up); @@ -298,9 +303,9 @@ public void A_cluster_must_be_allowed_to_join_and_leave_with_local_address() Cluster.Get(sys2).Leave(@ref.Path.Address); - Within(5.Seconds(), () => + await WithinAsync(5.Seconds(), async () => { - AwaitAssert(() => + await AwaitAssertAsync(() => { Cluster.Get(sys2).IsTerminated.Should().BeTrue(); }); @@ -308,25 +313,25 @@ public void A_cluster_must_be_allowed_to_join_and_leave_with_local_address() } finally { - Shutdown(sys2); + await ShutdownAsync(sys2); } } [Fact] - public void A_cluster_must_be_able_to_JoinAsync() + public async Task A_cluster_must_be_able_to_JoinAsync() { - var timeout = TimeSpan.FromSeconds(10); + var timeout = TimeSpan.FromSeconds(15); try { - _cluster.JoinAsync(_selfAddress).Wait(timeout).Should().BeTrue(); + await _cluster.JoinAsync(_selfAddress).ShouldCompleteWithin(timeout); LeaderActions(); // Member should already be up - _cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent) }); - ExpectMsg(); + _cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, typeof(ClusterEvent.IMemberEvent)); + await ExpectMsgAsync(); // join second time - response should be immediate success - _cluster.JoinAsync(_selfAddress).Wait(TimeSpan.FromMilliseconds(100)).Should().BeTrue(); + await _cluster.JoinAsync(_selfAddress).ShouldCompleteWithin(100.Milliseconds()); } finally { @@ -334,54 +339,59 @@ public void A_cluster_must_be_able_to_JoinAsync() } // JoinAsync should fail after cluster has been shutdown - a manual actor system restart is required - Assert.ThrowsAsync(async () => - { - await _cluster.JoinAsync(_selfAddress); - LeaderActions(); - ExpectMsg(); - }).Wait(timeout); + await Awaiting(async () => + { + var task = _cluster.JoinAsync(_selfAddress); + LeaderActions(); + await task; + }) + .Should().ThrowAsync() + .ShouldCompleteWithin(timeout); } [Fact] - public void A_cluster_JoinAsync_must_fail_if_could_not_connect_to_cluster() + public async Task A_cluster_JoinAsync_must_fail_if_could_not_connect_to_cluster() { - var timeout = TimeSpan.FromSeconds(10); - + var config = ConfigurationFactory.ParseString("akka.actor.serialize-messages = off") + .WithFallback(Config); + var sys = ActorSystem.Create(nameof(ClusterSpec), config); + InitializeLogger(sys); + var cluster = Cluster.Get(sys); + var selfAddress = sys.AsInstanceOf().Provider.DefaultAddress; + try { - _cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent) }); - - var nonexisting = Address.Parse($"akka.tcp://{_selfAddress.System}@127.0.0.1:9999/"); - Assert.ThrowsAsync(async () => - { - await _cluster.JoinAsync(nonexisting); - LeaderActions(); - - ExpectMsg(); - }).Wait(timeout); - + await Awaiting(async () => + { + var nonExisting = Address.Parse($"akka.tcp://{selfAddress.System}@127.0.0.1:9999/"); + var task = cluster.JoinAsync(nonExisting); + LeaderActions(); + await task; + }) + .Should().ThrowAsync() + .ShouldCompleteWithin(15.Seconds()); } finally { - _cluster.Shutdown(); + await ShutdownAsync(sys); } } [Fact] - public void A_cluster_must_be_able_to_join_async_to_seed_nodes() + public async Task A_cluster_must_be_able_to_join_async_to_seed_nodes() { var timeout = TimeSpan.FromSeconds(10); try { - _cluster.JoinSeedNodesAsync(new[] { _selfAddress }).Wait(timeout).Should().BeTrue(); + await _cluster.JoinSeedNodesAsync(new[] { _selfAddress }).ShouldCompleteWithin(timeout); LeaderActions(); // Member should already be up - _cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent) }); - ExpectMsg(); + _cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, typeof(ClusterEvent.IMemberEvent)); + await ExpectMsgAsync(); // join second time - response should be immediate success - _cluster.JoinSeedNodesAsync(new[] { _selfAddress }).Wait(TimeSpan.FromMilliseconds(100)).Should().BeTrue(); + await _cluster.JoinSeedNodesAsync(new[] { _selfAddress }).ShouldCompleteWithin(100.Milliseconds()); } finally { @@ -389,36 +399,41 @@ public void A_cluster_must_be_able_to_join_async_to_seed_nodes() } // JoinSeedNodesAsync should fail after cluster has been shutdown - a manual actor system restart is required - Assert.ThrowsAsync(async () => - { - await _cluster.JoinSeedNodesAsync(new[] { _selfAddress }); - LeaderActions(); - ExpectMsg(); - }).Wait(timeout); + await Awaiting(async () => + { + await _cluster.JoinSeedNodesAsync(new[] { _selfAddress }); + LeaderActions(); + await ExpectMsgAsync(); + }) + .Should().ThrowAsync() + .ShouldCompleteWithin(timeout); } [Fact] - public void A_cluster_JoinSeedNodesAsync_must_fail_if_could_not_connect_to_cluster() + public async Task A_cluster_JoinSeedNodesAsync_must_fail_if_could_not_connect_to_cluster() { - var timeout = TimeSpan.FromSeconds(10); - + var config = ConfigurationFactory.ParseString("akka.actor.serialize-messages = off") + .WithFallback(Config); + var sys = ActorSystem.Create(nameof(ClusterSpec), config); + InitializeLogger(sys); + var cluster = Cluster.Get(sys); + var selfAddress = sys.AsInstanceOf().Provider.DefaultAddress; + try { - _cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent) }); - - var nonexisting = Address.Parse($"akka.tcp://{_selfAddress.System}@127.0.0.1:9999/"); - Assert.ThrowsAsync(async () => - { - await _cluster.JoinSeedNodesAsync(new[] { nonexisting }); - LeaderActions(); - - ExpectMsg(); - }).Wait(timeout); - + await Awaiting(async () => + { + var nonExisting = Address.Parse($"akka.tcp://{selfAddress.System}@127.0.0.1:9999/"); + var task = cluster.JoinSeedNodesAsync(new[] { nonExisting }); + LeaderActions(); + await task; + }) + .Should().ThrowAsync() + .ShouldCompleteWithin(15.Seconds()); } finally { - _cluster.Shutdown(); + await ShutdownAsync(sys); } } @@ -433,7 +448,7 @@ public void A_cluster_must_allow_to_resolve_RemotePathOf_any_actor() } [Fact] - public void A_cluster_must_leave_via_CoordinatedShutdownRun() + public async Task A_cluster_must_leave_via_CoordinatedShutdownRun() { var sys2 = ActorSystem.Create("ClusterSpec2", ConfigurationFactory.ParseString(@" akka.actor.provider = ""cluster"" @@ -442,33 +457,37 @@ public void A_cluster_must_leave_via_CoordinatedShutdownRun() akka.coordinated-shutdown.terminate-actor-system = off akka.coordinated-shutdown.run-by-actor-system-terminate = off akka.cluster.run-coordinated-shutdown-when-down = off - ").WithFallback(Akka.TestKit.Configs.TestConfigs.DefaultConfig)); + ").WithFallback(TestKit.Configs.TestConfigs.DefaultConfig)); + InitializeLogger(sys2); try { var probe = CreateTestProbe(sys2); Cluster.Get(sys2).Subscribe(probe.Ref, typeof(ClusterEvent.IMemberEvent)); - probe.ExpectMsg(); + await probe.ExpectMsgAsync(); Cluster.Get(sys2).Join(Cluster.Get(sys2).SelfAddress); - probe.ExpectMsg(); + await probe.ExpectMsgAsync(); - CoordinatedShutdown.Get(sys2).Run(CoordinatedShutdown.UnknownReason.Instance); + var task = CoordinatedShutdown.Get(sys2).Run(CoordinatedShutdown.UnknownReason.Instance); - probe.ExpectMsg(); + await probe.ExpectMsgAsync(); // MemberExited might not be published before MemberRemoved - var removed = (ClusterEvent.MemberRemoved)probe.FishForMessage(m => m is ClusterEvent.MemberRemoved); + var removed = (ClusterEvent.MemberRemoved)await probe.FishForMessageAsync(m => m is ClusterEvent.MemberRemoved); removed.PreviousStatus.Should().BeEquivalentTo(MemberStatus.Exiting); + + await task.ShouldCompleteWithin(3.Seconds()); } finally { - Shutdown(sys2); + await ShutdownAsync(sys2); } } [Fact] - public void A_cluster_must_leave_via_CoordinatedShutdownRun_when_member_status_is_Joining() + public async Task A_cluster_must_leave_via_CoordinatedShutdownRun_when_member_status_is_Joining() { var sys2 = ActorSystem.Create("ClusterSpec2", ConfigurationFactory.ParseString(@" + akka.loglevel = DEBUG akka.actor.provider = ""cluster"" akka.remote.dot-netty.tcp.port = 0 akka.coordinated-shutdown.run-by-clr-shutdown-hook = off @@ -476,64 +495,67 @@ public void A_cluster_must_leave_via_CoordinatedShutdownRun_when_member_status_i akka.coordinated-shutdown.run-by-actor-system-terminate = off akka.cluster.run-coordinated-shutdown-when-down = off akka.cluster.min-nr-of-members = 2 - ").WithFallback(Akka.TestKit.Configs.TestConfigs.DefaultConfig)); - + ").WithFallback(TestKit.Configs.TestConfigs.DefaultConfig)); + InitializeLogger(sys2); + try { var probe = CreateTestProbe(sys2); Cluster.Get(sys2).Subscribe(probe.Ref, typeof(ClusterEvent.IMemberEvent)); - probe.ExpectMsg(); + await probe.ExpectMsgAsync(); Cluster.Get(sys2).Join(Cluster.Get(sys2).SelfAddress); - probe.ExpectMsg(); + await probe.ExpectMsgAsync(); - CoordinatedShutdown.Get(sys2).Run(CoordinatedShutdown.UnknownReason.Instance); + // Intentional detached task + var task = CoordinatedShutdown.Get(sys2).Run(CoordinatedShutdown.UnknownReason.Instance); - probe.ExpectMsg(); + await probe.ExpectMsgAsync(); // MemberExited might not be published before MemberRemoved - var removed = (ClusterEvent.MemberRemoved)probe.FishForMessage(m => m is ClusterEvent.MemberRemoved); + var removed = (ClusterEvent.MemberRemoved)await probe.FishForMessageAsync(m => m is ClusterEvent.MemberRemoved); removed.PreviousStatus.Should().BeEquivalentTo(MemberStatus.Exiting); } finally { - Shutdown(sys2); + await ShutdownAsync(sys2); } } [Fact] - public void A_cluster_must_terminate_ActorSystem_via_leave_CoordinatedShutdown() + public async Task A_cluster_must_terminate_ActorSystem_via_leave_CoordinatedShutdown() { var sys2 = ActorSystem.Create("ClusterSpec2", ConfigurationFactory.ParseString(@" akka.actor.provider = ""cluster"" akka.remote.dot-netty.tcp.port = 0 akka.coordinated-shutdown.terminate-actor-system = on - ").WithFallback(Akka.TestKit.Configs.TestConfigs.DefaultConfig)); + ").WithFallback(TestKit.Configs.TestConfigs.DefaultConfig)); + InitializeLogger(sys2); try { var probe = CreateTestProbe(sys2); Cluster.Get(sys2).Subscribe(probe.Ref, typeof(ClusterEvent.IMemberEvent)); - probe.ExpectMsg(); - Cluster.Get(sys2).Join(Cluster.Get(sys2).SelfAddress); - probe.ExpectMsg(); + await probe.ExpectMsgAsync(); + await Cluster.Get(sys2).JoinAsync(Cluster.Get(sys2).SelfAddress); + await probe.ExpectMsgAsync(); Cluster.Get(sys2).Leave(Cluster.Get(sys2).SelfAddress); - probe.ExpectMsg(); + await probe.ExpectMsgAsync(); // MemberExited might not be published before MemberRemoved - var removed = (ClusterEvent.MemberRemoved)probe.FishForMessage(m => m is ClusterEvent.MemberRemoved); + var removed = (ClusterEvent.MemberRemoved)await probe.FishForMessageAsync(m => m is ClusterEvent.MemberRemoved); removed.PreviousStatus.Should().BeEquivalentTo(MemberStatus.Exiting); - AwaitCondition(() => sys2.WhenTerminated.IsCompleted, TimeSpan.FromSeconds(10)); + await sys2.WhenTerminated.ShouldCompleteWithin(10.Seconds()); Cluster.Get(sys2).IsTerminated.Should().BeTrue(); CoordinatedShutdown.Get(sys2).ShutdownReason.Should().BeOfType(); } finally { - Shutdown(sys2); + await ShutdownAsync(sys2); } } [Fact] - public void A_cluster_must_terminate_ActorSystem_via_Down_CoordinatedShutdown() + public async Task A_cluster_must_terminate_ActorSystem_via_Down_CoordinatedShutdown() { var sys3 = ActorSystem.Create("ClusterSpec3", ConfigurationFactory.ParseString(@" akka.actor.provider = ""cluster"" @@ -547,21 +569,21 @@ public void A_cluster_must_terminate_ActorSystem_via_Down_CoordinatedShutdown() { var probe = CreateTestProbe(sys3); Cluster.Get(sys3).Subscribe(probe.Ref, typeof(ClusterEvent.IMemberEvent)); - probe.ExpectMsg(); - Cluster.Get(sys3).Join(Cluster.Get(sys3).SelfAddress); - probe.ExpectMsg(); + await probe.ExpectMsgAsync(); + await Cluster.Get(sys3).JoinAsync(Cluster.Get(sys3).SelfAddress); + await probe.ExpectMsgAsync(); Cluster.Get(sys3).Down(Cluster.Get(sys3).SelfAddress); - probe.ExpectMsg(); - probe.ExpectMsg(); - AwaitCondition(() => sys3.WhenTerminated.IsCompleted, TimeSpan.FromSeconds(10)); + await probe.ExpectMsgAsync(); + await probe.ExpectMsgAsync(); + await sys3.WhenTerminated.ShouldCompleteWithin(10.Seconds()); Cluster.Get(sys3).IsTerminated.Should().BeTrue(); CoordinatedShutdown.Get(sys3).ShutdownReason.Should().BeOfType(); } finally { - Shutdown(sys3); + await ShutdownAsync(sys3); } } } diff --git a/src/core/Akka.Cluster/Cluster.cs b/src/core/Akka.Cluster/Cluster.cs index 966a6bac633..0da2a90b3c4 100644 --- a/src/core/Akka.Cluster/Cluster.cs +++ b/src/core/Akka.Cluster/Cluster.cs @@ -9,7 +9,6 @@ using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; -using System.Reflection; using System.Runtime.Serialization; using System.Threading; using System.Threading.Tasks; @@ -263,12 +262,25 @@ public void Join(Address address) /// The address of the node we want to join. /// An optional cancellation token used to cancel returned task before it completes. /// Task which completes, once current cluster node reaches state. - public Task JoinAsync(Address address, CancellationToken token = default(CancellationToken)) - { - var completion = new TaskCompletionSource(); - this.RegisterOnMemberUp(() => completion.TrySetResult(NotUsed.Instance)); - this.RegisterOnMemberRemoved(() => completion.TrySetException( - new ClusterJoinFailedException($"Node has not managed to join the cluster using provided address: {address}"))); + public Task JoinAsync(Address address, CancellationToken token = default) + { + var completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var timeout = Settings.RetryUnsuccessfulJoinAfter ?? TimeSpan.FromSeconds(10); + var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(token); + timeoutCts.CancelAfter(timeout); + timeoutCts.Token.Register(() => + { + timeoutCts.Dispose(); + completion.TrySetException(new ClusterJoinFailedException( + $"Node has not managed to join the cluster using provided address: {address}")); + }); + + RegisterOnMemberUp(() => + { + timeoutCts.Dispose(); + completion.TrySetResult(NotUsed.Instance); + }); Join(address); @@ -323,10 +335,22 @@ public void JoinSeedNodes(IEnumerable
seedNodes) /// TBD public Task JoinSeedNodesAsync(IEnumerable
seedNodes, CancellationToken token = default(CancellationToken)) { - var completion = new TaskCompletionSource(); - this.RegisterOnMemberUp(() => completion.TrySetResult(NotUsed.Instance)); - this.RegisterOnMemberRemoved(() => completion.TrySetException( - new ClusterJoinFailedException($"Node has not managed to join the cluster using provided seed node addresses: {string.Join(", ", seedNodes)}."))); + var completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(token); + timeoutCts.CancelAfter(Settings.SeedNodeTimeout); + timeoutCts.Token.Register(() => + { + timeoutCts.Dispose(); + completion.TrySetException(new ClusterJoinFailedException( + $"Node has not managed to join the cluster using provided seed node addresses: {string.Join(", ", seedNodes)}.")); + }); + + RegisterOnMemberUp(() => + { + timeoutCts.Dispose(); + completion.TrySetResult(NotUsed.Instance); + }); JoinSeedNodes(seedNodes);