Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve Akka.Cluster / Akka.Remote DeadLetter logging #7149

Merged
merged 11 commits into from
Apr 10, 2024
Expand Up @@ -44,7 +44,6 @@ public string ShardId(object message)
=> message switch
{
int i => (i % 10).ToString(),
ShardRegion.StartEntity se => (int.Parse(se.EntityId) % numberOfShards).ToString(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed per analyzer warning

_ => null
};

Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster.Tests/ClusterSpec.cs
Expand Up @@ -476,7 +476,7 @@ public async Task A_cluster_must_leave_via_CoordinatedShutdownRun()
await probe.ExpectMsgAsync<ClusterEvent.MemberLeft>();
// MemberExited might not be published before MemberRemoved
var removed = (ClusterEvent.MemberRemoved)await probe.FishForMessageAsync(m => m is ClusterEvent.MemberRemoved);
removed.PreviousStatus.Should().BeEquivalentTo(MemberStatus.Exiting);
new [] {MemberStatus.Exiting, MemberStatus.Leaving}.Should().Contain(removed.PreviousStatus);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix for racy spec - we'd already implemented this for one of the other "node leaving" specs.

The problem is that the PublishChanges message gets DeadLetter'd between the exiting and the removed statuses - the Removed status is generated, in this case, from the singleton cluster shutting itself down, rather than from gossip messages. This is a timing issue that mostly effects the spec - has no real-world impact. So we accept either of these two "terminating" statuses instead.


await task.ShouldCompleteWithin(3.Seconds());
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Cluster.Tests/GossipSpec.cs
Expand Up @@ -257,11 +257,11 @@ public void A_gossip_must_find_two_oldest_per_role_as_targets_for_Exiting_change
Member a8 = TestMember.Create(new Address("akka.tcp", "sys", "a8", 2552), MemberStatus.Exiting, ImmutableHashSet<string>.Empty.Add("role1"), upNumber: 8);
Member a9 = TestMember.Create(new Address("akka.tcp", "sys", "a9", 2552), MemberStatus.Exiting, ImmutableHashSet<string>.Empty.Add("role2"), upNumber: 9);

IEnumerable<Member> theExiting = new Member[] { a5, a6 };
var theExiting = new Member[] { a5, a6 };
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned this up to just use an IReadOnlyCollection<Member> - to avoid some of the multiple enumeration problems GossipTargetsForExitingMembers was susceptible to.

var gossip = new Gossip(ImmutableSortedSet.Create(a1, a2, a3, a4, a5, a6, a7, a8, a9));

var r = ClusterCoreDaemon.GossipTargetsForExitingMembers(gossip, theExiting);
r.Should().BeEquivalentTo(new[] { a1, a2, a5, a6, a9 });
r.Should().BeEquivalentTo(a1, a2, a5, a6, a9);
}
}
}
Expand Down
17 changes: 10 additions & 7 deletions src/core/Akka.Cluster/ClusterDaemon.cs
Expand Up @@ -1051,7 +1051,7 @@ private void AddCoordinatedLeave()
});
}

private ActorSelection ClusterCore(Address address)
private static ActorSelection ClusterCore(Address address)
{
return Context.ActorSelection(new RootActorPath(address) / "system" / "cluster" / "core" / "daemon");
}
Expand Down Expand Up @@ -2234,8 +2234,7 @@ public void LeaderActionsOnConvergence()
var localOverview = localGossip.Overview;
var localSeen = localOverview.Seen;

bool enoughMembers = IsMinNrOfMembersFulfilled();
bool IsJoiningUp(Member m) => m.Status is MemberStatus.Joining or MemberStatus.WeaklyUp && enoughMembers;
var enoughMembers = IsMinNrOfMembersFulfilled();

var removedUnreachable =
localOverview.Reachability.AllUnreachableOrTerminated.Select(localGossip.GetMember)
Expand Down Expand Up @@ -2339,15 +2338,19 @@ public void LeaderActionsOnConvergence()
}

PublishMembershipState();
GossipExitingMembersToOldest(changedMembers.Where(i => i.Status == MemberStatus.Exiting));
GossipExitingMembersToOldest(changedMembers.Where(i => i.Status == MemberStatus.Exiting).ToArray());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turn capture an array and pass it here, in order to solve multiple enumeration problems. No real perf hit here - only happens when leaders are converging on cluster membership status changes.

}

return;

bool IsJoiningUp(Member m) => m.Status is MemberStatus.Joining or MemberStatus.WeaklyUp && enoughMembers;
}

/// <summary>
/// Gossip the Exiting change to the two oldest nodes for quick dissemination to potential Singleton nodes
/// </summary>
/// <param name="exitingMembers"></param>
private void GossipExitingMembersToOldest(IEnumerable<Member> exitingMembers)
private void GossipExitingMembersToOldest(IReadOnlyCollection<Member> exitingMembers)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix for the multi-enumeration problem.

{
var targets = GossipTargetsForExitingMembers(LatestGossip, exitingMembers);
if (targets != null && targets.Any())
Expand Down Expand Up @@ -2509,9 +2512,9 @@ public void GossipStatusTo(UniqueAddress node, IActorRef destination)
/// <param name="latestGossip"></param>
/// <param name="exitingMembers"></param>
/// <returns></returns>
public static IEnumerable<Member> GossipTargetsForExitingMembers(Gossip latestGossip, IEnumerable<Member> exitingMembers)
public static IReadOnlyCollection<Member> GossipTargetsForExitingMembers(Gossip latestGossip, IReadOnlyCollection<Member> exitingMembers)
{
if (exitingMembers.Any())
if (exitingMembers.Count > 0)
{
var roles = exitingMembers.SelectMany(m => m.Roles);
var membersSortedByAge = latestGossip.Members
Expand Down
30 changes: 12 additions & 18 deletions src/core/Akka.Cluster/ClusterEvent.cs
Expand Up @@ -395,15 +395,10 @@ public MemberDowned(Member member)
/// </summary>
public sealed class MemberRemoved : MemberStatusChange
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some cleanup on this type.

{
readonly MemberStatus _previousStatus;

/// <summary>
/// The status of the node before the state change event.
/// </summary>
public MemberStatus PreviousStatus
{
get { return _previousStatus; }
}
public MemberStatus PreviousStatus { get; }

/// <summary>
/// Initializes a new instance of the <see cref="MemberRemoved"/> class.
Expand All @@ -418,15 +413,14 @@ public MemberRemoved(Member member, MemberStatus previousStatus)
{
if (member.Status != MemberStatus.Removed)
throw new ArgumentException($"Expected Removed status, got {member}");
_previousStatus = previousStatus;
PreviousStatus = previousStatus;
}

/// <inheritdoc/>
public override bool Equals(object obj)
{
var other = obj as MemberRemoved;
if (other == null) return false;
return _member.Equals(other._member) && _previousStatus == other._previousStatus;
if (obj is not MemberRemoved other) return false;
return _member.Equals(other._member) && PreviousStatus == other.PreviousStatus;
}

/// <inheritdoc/>
Expand All @@ -436,7 +430,7 @@ public override int GetHashCode()
{
var hash = 17;
hash = hash * +base.GetHashCode();
hash = hash * 23 + _previousStatus.GetHashCode();
hash = hash * 23 + PreviousStatus.GetHashCode();
return hash;
}
}
Expand Down Expand Up @@ -1056,17 +1050,17 @@ private void SendCurrentClusterState(IActorRef receiver)
receiver.Tell(state);
}

private void Subscribe(IActorRef subscriber, ClusterEvent.SubscriptionInitialStateMode initMode, IEnumerable<Type> to)
private void Subscribe(IActorRef subscriber, ClusterEvent.SubscriptionInitialStateMode initMode, ImmutableHashSet<Type> to)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple enumeration issue again - the only input for this private method is an ImmutableHashSet<Type> so there's no point in taking a more generic argument.

{
if (initMode == ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents)
{
Action<object> pub = @event =>
void Pub(object @event)
{
var eventType = @event.GetType();
if (to.Any(o => o.IsAssignableFrom(eventType)))
subscriber.Tell(@event);
};
PublishDiff(_emptyMembershipState, _membershipState, pub);
if (to.Any(o => o.IsAssignableFrom(eventType))) subscriber.Tell(@event);
}

PublishDiff(_emptyMembershipState, _membershipState, Pub);
}
else if (initMode == ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot)
{
Expand All @@ -1090,7 +1084,7 @@ private void PublishChanges(MembershipState newState)
PublishDiff(oldState, newState, Publish);
}

private void PublishDiff(MembershipState oldState, MembershipState newState, Action<object> pub)
private static void PublishDiff(MembershipState oldState, MembershipState newState, Action<object> pub)
{
foreach (var @event in ClusterEvent.DiffMemberEvents(oldState, newState)) pub(@event);
foreach (var @event in ClusterEvent.DiffUnreachable(oldState, newState)) pub(@event);
Expand Down
39 changes: 18 additions & 21 deletions src/core/Akka.Cluster/Gossip.cs
Expand Up @@ -442,43 +442,40 @@ public GossipOverview Copy(ImmutableHashSet<UniqueAddress> seen = null, Reachabi
/// the node with same host:port. The `uid` in the `UniqueAddress` is
/// different in that case.
/// </summary>
class GossipEnvelope : IClusterMessage
internal class GossipEnvelope : IClusterMessage
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a missing access modifier.

{
readonly UniqueAddress _from;
readonly UniqueAddress _to;

/// <summary>
/// TBD
/// </summary>
/// <param name="from">TBD</param>
/// <param name="to">TBD</param>
/// <param name="gossip">TBD</param>
/// <param name="deadline">TBD</param>
/// <returns>TBD</returns>
public GossipEnvelope(UniqueAddress from, UniqueAddress to, Gossip gossip, Deadline deadline = null)
{
_from = from;
_to = to;
From = from;
To = to;
Gossip = gossip;
Deadline = deadline;
}

/// <summary>
/// TBD
/// The sender of the gossip.
/// </summary>
public UniqueAddress From { get { return _from; } }
public UniqueAddress From { get; }

/// <summary>
/// TBD
/// The receiver of the gossip.
/// </summary>
public UniqueAddress To { get { return _to; } }
public UniqueAddress To { get; }

/// <summary>
/// TBD
/// The gossip content itself
/// </summary>
public Gossip Gossip { get; set; }
public Gossip Gossip { get; }

/// <summary>
/// TBD
/// The deadline for the gossip.
/// </summary>
public Deadline Deadline { get; set; }

public override string ToString()
{
return $"GossipEnvelope(from={From}, to={To}, gossip={Gossip}, deadline={Deadline})";
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the primary DeadLetter logging change I wanted to make here - should make it much easier to understand what was lost in the DeadLetter pile in terms of missing gossip messages when there's things shutting down inside cluster nodes.

}
}

/// <summary>
Expand Down
26 changes: 14 additions & 12 deletions src/core/Akka.Remote.Tests/ActorsLeakSpec.cs
Expand Up @@ -26,17 +26,19 @@ namespace Akka.Remote.Tests
{
public class ActorsLeakSpec : AkkaSpec
{
private static readonly Config Config = ConfigurationFactory.ParseString(@"
akka.actor.provider = remote
akka.loglevel = INFO
akka.remote.dot-netty.tcp.applied-adapters = [trttl]
akka.remote.dot-netty.tcp.hostname = 127.0.0.1
akka.remote.log-lifecycle-events = on
akka.remote.transport-failure-detector.heartbeat-interval = 1 s
akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s
akka.remote.quarantine-after-silence = 3 s
akka.test.filter-leeway = 12 s
");
private static readonly Config Config = ConfigurationFactory.ParseString("""

akka.actor.provider = remote
akka.loglevel = INFO
akka.remote.dot-netty.tcp.applied-adapters = [trttl]
akka.remote.dot-netty.tcp.hostname = 127.0.0.1
akka.remote.log-lifecycle-events = on
akka.remote.transport-failure-detector.heartbeat-interval = 1 s
akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s
akka.remote.quarantine-after-silence = 3 s
akka.test.filter-leeway = 12 s

""");

public ActorsLeakSpec(ITestOutputHelper output) : base(Config, output)
{
Expand Down Expand Up @@ -211,7 +213,7 @@ public async Task Remoting_must_not_leak_actors()
// Watch a remote actor - this results in system message traffic
Sys.ActorSelection(new RootActorPath(idleRemoteAddress) / "user" / "stoppable").Tell(new Identify(1));
var remoteActor = (await ExpectMsgAsync<ActorIdentity>()).Subject;
Watch(remoteActor);
await WatchAsync(remoteActor);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some basic spec cleanup.

remoteActor.Tell("stop");
await ExpectTerminatedAsync(remoteActor);
// All system messages have been acked now on this side
Expand Down