Skip to content

Commit

Permalink
Cluster updates to Akka JVM 2.4.16 (#2500)
Browse files Browse the repository at this point in the history
* suppress deadletter for the cluster joining messages

* harden NodeChurnSpec

* Add a retry for a check for stopped actors

* Reachability.remove didn't always remove all

* don't use Down member as leader
  • Loading branch information
alexvaluyskiy authored and Aaronontheweb committed Feb 4, 2017
1 parent 0c59136 commit 913b1ba
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 31 deletions.
9 changes: 5 additions & 4 deletions src/core/Akka.Cluster.Tests.MultiNode/NodeChurnSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void Cluster_with_short_lived_members_must_join_and_remove_transient_node
}
}

AwaitRemoved(systems);
AwaitRemoved(systems, n);
EnterBarrier("members-removed-" + n);
foreach (var node in systems)
{
Expand Down Expand Up @@ -167,16 +167,17 @@ private void AwaitAllMembersUp(ImmutableList<ActorSystem> additionalSystems)
});
}

private void AwaitRemoved(ImmutableList<ActorSystem> additionaSystems)
private void AwaitRemoved(ImmutableList<ActorSystem> additionaSystems, int round)
{
AwaitMembersUp(Roles.Count, timeout: 40.Seconds());
Within(20.Seconds(), () =>
EnterBarrier("removed-" + round);
Within(3.Seconds(), () =>
{
AwaitAssert(() =>
{
additionaSystems.ForEach(s =>
{
Cluster.Get(s).IsTerminated.Should().BeTrue();
Cluster.Get(s).IsTerminated.Should().BeTrue($"{Cluster.Get(s).SelfAddress}");
});
});
});
Expand Down
6 changes: 6 additions & 0 deletions src/core/Akka.Cluster.Tests/GossipSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ public void A_gossip_must_have_leader_as_first_member_based_on_ordering_except_e
new Gossip(ImmutableSortedSet.Create(c3)).Leader(c3.UniqueAddress).Should().Be(c3.UniqueAddress);
}

[Fact]
public void A_gossip_must_not_have_Down_member_as_leader()
{
new Gossip(ImmutableSortedSet.Create(e3)).Leader(e3.UniqueAddress).Should().BeNull();
}

[Fact]
public void A_gossip_must_merge_seen_table_correctly()
{
Expand Down
18 changes: 18 additions & 0 deletions src/core/Akka.Cluster.Tests/ReachabilitySpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -293,5 +293,23 @@ public void ReachabilityTable_must_remove_node()
r.Status(nodeB, nodeC).Should().Be(Reachability.ReachabilityStatus.Reachable);
r.Status(nodeB, nodeE).Should().Be(Reachability.ReachabilityStatus.Reachable);
}

[Fact]
public void ReachabilityTable_must_remove_correctly_after_pruning()
{
var r = Reachability.Empty.
Unreachable(nodeB, nodeA).
Unreachable(nodeB, nodeC).
Unreachable(nodeD, nodeC).
Reachable(nodeB, nodeA).
Reachable(nodeB, nodeC);

r.Records.Should().BeEquivalentTo(ImmutableList.Create(
new Reachability.Record(nodeD, nodeC, Reachability.ReachabilityStatus.Unreachable, 1L)));

var r2 = r.Remove(ImmutableList.Create(nodeB));
r2.AllObservers.Should().BeEquivalentTo(ImmutableList.Create(nodeD));
r2.Versions.Keys.Should().BeEquivalentTo(ImmutableList.Create(nodeD));
}
}
}
8 changes: 4 additions & 4 deletions src/core/Akka.Cluster/ClusterDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public override int GetHashCode()
/// Command to initiate the process to join the specified
/// seed nodes.
/// </summary>
internal sealed class JoinSeedNodes
internal sealed class JoinSeedNodes : IDeadLetterSuppression
{
readonly ImmutableList<Address> _seedNodes;

Expand Down Expand Up @@ -303,7 +303,7 @@ internal class JoinSeenNode
/// <summary>
/// See JoinSeedNode
/// </summary>
internal class InitJoin : IClusterMessage
internal class InitJoin : IClusterMessage, IDeadLetterSuppression
{
/// <summary>
/// TBD
Expand All @@ -319,7 +319,7 @@ public override bool Equals(object obj)
/// <summary>
/// See JoinSeeNode
/// </summary>
internal sealed class InitJoinAck : IClusterMessage
internal sealed class InitJoinAck : IClusterMessage, IDeadLetterSuppression
{
readonly Address _address;

Expand Down Expand Up @@ -371,7 +371,7 @@ public override int GetHashCode()
/// <summary>
/// See JoinSeeNode
/// </summary>
internal sealed class InitJoinNack : IClusterMessage
internal sealed class InitJoinNack : IClusterMessage, IDeadLetterSuppression
{
readonly Address _address;

Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Cluster/Gossip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,9 @@ public UniqueAddress RoleLeader(string role, UniqueAddress selfUniqueAddress)
private UniqueAddress LeaderOf(ImmutableSortedSet<Member> mbrs, UniqueAddress selfUniqueAddress)
{
var reachableMembers = _overview.Reachability.IsAllReachable
? mbrs
? mbrs.Where(m => m.Status != MemberStatus.Down)
: mbrs
.Where(m => _overview.Reachability.IsReachable(m.UniqueAddress) || m.UniqueAddress == selfUniqueAddress)
.Where(m => m.Status != MemberStatus.Down && _overview.Reachability.IsReachable(m.UniqueAddress) || m.UniqueAddress == selfUniqueAddress)
.ToImmutableSortedSet();

if (!reachableMembers.Any()) return null;
Expand Down
22 changes: 4 additions & 18 deletions src/core/Akka.Cluster/Reachability.cs
Original file line number Diff line number Diff line change
Expand Up @@ -375,15 +375,8 @@ public Reachability Remove(IEnumerable<UniqueAddress> nodes)
{
var nodesSet = nodes.ToImmutableHashSet();
var newRecords = _records.FindAll(r => !nodesSet.Contains(r.Observer) && !nodesSet.Contains(r.Subject));
if (newRecords.Count == _records.Count)
{
return this;
}
else
{
var newVersions = _versions.RemoveRange(nodes);
return new Reachability(newRecords, newVersions);
}
var newVersions = _versions.RemoveRange(nodes);
return new Reachability(newRecords, newVersions);
}

/// <summary>
Expand All @@ -400,15 +393,8 @@ public Reachability RemoveObservers(ImmutableHashSet<UniqueAddress> nodes)
else
{
var newRecords = _records.FindAll(r => !nodes.Contains(r.Observer));
if (newRecords.Count == _records.Count)
{
return this;
}
else
{
var newVersions = _versions.RemoveRange(nodes);
return new Reachability(newRecords, newVersions);
}
var newVersions = _versions.RemoveRange(nodes);
return new Reachability(newRecords, newVersions);
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/core/Akka.Remote.Tests/ActorsLeakSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using Akka.TestKit.TestActors;
using Akka.Util.Internal;
using Xunit;
using FluentAssertions;

namespace Akka.Remote.Tests
{
Expand Down Expand Up @@ -222,9 +223,10 @@ public void Remoting_must_not_leak_actors()
*/
EventFilter.Exception<TimeoutException>().ExpectOne(() => { });

var finalActors = targets.SelectMany(CollectLiveActors).ToImmutableHashSet();

AssertActors(initialActors, finalActors);
AwaitAssert(() =>
{
AssertActors(initialActors, targets.SelectMany(CollectLiveActors).ToImmutableHashSet());
}, 5.Seconds());
}
}
}
Expand Down

0 comments on commit 913b1ba

Please sign in to comment.