Skip to content

Commit

Permalink
Akka.Remote.Tests: harden ActorsLeakSpec (#7155)
Browse files Browse the repository at this point in the history
* working on fixes for `ActorsLeakSpec`

* hardened `ActorsLeakSpec`

* harden deathwatch and enable nullability for it

* added API approvals

* harden Akka.Cluster.Metrics `MetricsCollectorSpec`

* disable ActorsLeakSpec
  • Loading branch information
Aaronontheweb committed Apr 19, 2024
1 parent 522aad3 commit 329b2b3
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,43 +50,47 @@ public void Metric_should_merge_2_metrics_that_are_tracking_the_same_metric()
}

[Fact]
public void MetricsCollector_should_collector_accurate_metrics_for_node()
public async Task MetricsCollector_should_collector_accurate_metrics_for_node()
{
var sample = Collector.Sample();
var metrics = sample.Metrics.Select(m => (Name: m.Name, Value: m.Value)).ToList();
var used = metrics.First(m => m.Name == StandardMetrics.MemoryUsed);
var available = metrics.First(m => m.Name == StandardMetrics.MemoryAvailable);
metrics.ForEach(m =>
// await assert here in case there's no metrics available on the very first sample
await AwaitAssertAsync(() =>
{
switch (m.Name)
var sample = Collector.Sample();
var metrics = sample.Metrics.Select(m => (Name: m.Name, Value: m.Value)).ToList();
var used = metrics.First(m => m.Name == StandardMetrics.MemoryUsed);
var available = metrics.First(m => m.Name == StandardMetrics.MemoryAvailable);
metrics.ForEach(m =>
{
case StandardMetrics.Processors:
m.Value.DoubleValue.Should().BeGreaterOrEqualTo(0);
break;
case StandardMetrics.MemoryAvailable:
m.Value.LongValue.Should().BeGreaterThan(0);
break;
case StandardMetrics.MemoryUsed:
m.Value.LongValue.Should().BeGreaterOrEqualTo(0);
break;
case StandardMetrics.MaxMemoryRecommended:
m.Value.LongValue.Should().BeGreaterThan(0);
// Since setting is only a recommendation, we can ignore it
// See: https://stackoverflow.com/a/7729022/3094849
// used.Value.LongValue.Should().BeLessThan(m.Value.LongValue);
// available.Value.LongValue.Should().BeLessThan(m.Value.LongValue);
break;
case StandardMetrics.CpuProcessUsage:
m.Value.DoubleValue.Should().BeInRange(0, 1);
break;
case StandardMetrics.CpuTotalUsage:
m.Value.DoubleValue.Should().BeInRange(0, 1);
break;
default:
throw new ArgumentOutOfRangeException($"Unexpected metric type {m.Name}");
}
});
switch (m.Name)
{
case StandardMetrics.Processors:
m.Value.DoubleValue.Should().BeGreaterOrEqualTo(0);
break;
case StandardMetrics.MemoryAvailable:
m.Value.LongValue.Should().BeGreaterThan(0);
break;
case StandardMetrics.MemoryUsed:
m.Value.LongValue.Should().BeGreaterOrEqualTo(0);
break;
case StandardMetrics.MaxMemoryRecommended:
m.Value.LongValue.Should().BeGreaterThan(0);
// Since setting is only a recommendation, we can ignore it
// See: https://stackoverflow.com/a/7729022/3094849
// used.Value.LongValue.Should().BeLessThan(m.Value.LongValue);
// available.Value.LongValue.Should().BeLessThan(m.Value.LongValue);
break;
case StandardMetrics.CpuProcessUsage:
m.Value.DoubleValue.Should().BeInRange(0, 1);
break;
case StandardMetrics.CpuTotalUsage:
m.Value.DoubleValue.Should().BeInRange(0, 1);
break;
default:
throw new ArgumentOutOfRangeException($"Unexpected metric type {m.Name}");
}
});
}, interval:TimeSpan.FromMilliseconds(250));
}

[LocalFact(SkipLocal = "This performance really depends on current load - so while should work well with " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ namespace Akka.Actor
protected void StopFunctionRefs() { }
public void Suspend() { }
protected void TellWatchersWeDied() { }
public void TerminatedQueuedFor(Akka.Actor.IActorRef subject, Akka.Util.Option<object> customMessage) { }
public void TerminatedQueuedFor(Akka.Actor.IActorRef subject, [System.Runtime.CompilerServices.NullableAttribute(new byte[] {
0,
1})] Akka.Util.Option<object> customMessage) { }
public bool TryGetChildStatsByName(string name, out Akka.Actor.Internal.IChildStats child) { }
protected bool TryGetChildStatsByRef(Akka.Actor.IActorRef actor, out Akka.Actor.Internal.ChildRestartStats child) { }
public void UnbecomeStacked() { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ namespace Akka.Actor
protected void StopFunctionRefs() { }
public void Suspend() { }
protected void TellWatchersWeDied() { }
public void TerminatedQueuedFor(Akka.Actor.IActorRef subject, Akka.Util.Option<object> customMessage) { }
public void TerminatedQueuedFor(Akka.Actor.IActorRef subject, [System.Runtime.CompilerServices.NullableAttribute(new byte[] {
0,
1})] Akka.Util.Option<object> customMessage) { }
public bool TryGetChildStatsByName(string name, out Akka.Actor.Internal.IChildStats child) { }
protected bool TryGetChildStatsByRef(Akka.Actor.IActorRef actor, out Akka.Actor.Internal.ChildRestartStats child) { }
public void UnbecomeStacked() { }
Expand Down
14 changes: 11 additions & 3 deletions src/core/Akka.Remote.Tests/ActorsLeakSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
using Akka.Remote.Transport;
using Akka.TestKit;
using Akka.TestKit.Extensions;
using Akka.TestKit.Internal;
using Akka.TestKit.Internal.StringMatcher;
using Akka.TestKit.TestActors;
using Akka.TestKit.TestEvent;
using Xunit;
using FluentAssertions;
using FluentAssertions.Extensions;
Expand All @@ -39,6 +42,7 @@ public class ActorsLeakSpec : AkkaSpec
akka.test.filter-leeway = 12 s

""");
private static readonly string[] SourceArray = { "/system/endpointManager", "/system/transports" };

public ActorsLeakSpec(ITestOutputHelper output) : base(Config, output)
{
Expand Down Expand Up @@ -84,18 +88,18 @@ public StoppableActor()
}
}

private void AssertActors(ImmutableHashSet<IActorRef> expected, ImmutableHashSet<IActorRef> actual)
private static void AssertActors(ImmutableHashSet<IActorRef> expected, ImmutableHashSet<IActorRef> actual)
{
expected.Should().BeEquivalentTo(actual);
}

[Fact]
[Fact(Skip = "EventFilter can receive 1-2 notifications about nodes shutting down depending on timing, which makes this spec racy")]
public async Task Remoting_must_not_leak_actors()
{
var actorRef = Sys.ActorOf(EchoActor.Props(this, true), "echo");
var echoPath = new RootActorPath(RARP.For(Sys).Provider.DefaultAddress)/"user"/"echo";

var targets = await Task.WhenAll(new[] { "/system/endpointManager", "/system/transports" }.Select(
var targets = await Task.WhenAll(SourceArray.Select(
async x =>
{
Sys.ActorSelection(x).Tell(new Identify(0));
Expand Down Expand Up @@ -165,6 +169,10 @@ public async Task Remoting_must_not_leak_actors()
}
Assert.True(await remoteSystem.WhenTerminated.AwaitWithTimeout(TimeSpan.FromSeconds(10)));
}

// Bugfix: need to filter out the AssociationTermination messages for remote@127.0.0.1:2553 from the quarantine
// case, otherwise those logs might get picked up during the next text case
Sys.EventStream.Publish(new Mute(new WarningFilter( new ContainsString("Association with remote system akka.trttl.tcp://remote@127.0.0.1:2553 has failed"))));

// Missing SHUTDOWN case
for (var i = 1; i <= 3; i++)
Expand Down
25 changes: 15 additions & 10 deletions src/core/Akka/Actor/ActorCell.DeathWatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

#nullable enable
using System;
using System.Linq;
using Akka.Dispatch.SysMsg;
using Akka.Event;
using Akka.Util;
using Akka.Util.Internal;

namespace Akka.Actor
{
Expand All @@ -25,6 +24,7 @@ partial class ActorCell
/// <returns>TBD</returns>
public IActorRef Watch(IActorRef subject)
{
if(subject is null) throw new ArgumentNullException(nameof(subject), "subject must not be null");
var a = (IInternalActorRef)subject;

if (!a.Equals(Self) && !WatchingContains(a))
Expand All @@ -46,6 +46,7 @@ public IActorRef Watch(IActorRef subject)
/// <returns>TBD</returns>
public IActorRef WatchWith(IActorRef subject, object message)
{
if(subject is null) throw new ArgumentNullException(nameof(subject), "subject must not be null");
if (message == null)
throw new ArgumentNullException(nameof(message), "message must not be null");

Expand All @@ -69,6 +70,8 @@ public IActorRef WatchWith(IActorRef subject, object message)
/// <returns>TBD</returns>
public IActorRef Unwatch(IActorRef subject)
{
if(subject is null) throw new ArgumentNullException(nameof(subject), "subject must not be null");

var a = (IInternalActorRef)subject;
if (!a.Equals(Self) && WatchingContains(a))
{
Expand All @@ -91,8 +94,7 @@ protected void ReceivedTerminated(Terminated t)
if (!_state.ContainsTerminated(t.ActorRef))
return;

Option<object> customTerminatedMessage;
(_state, customTerminatedMessage) = _state.RemoveTerminated(t.ActorRef); // here we know that it is the SAME ref which was put in
(_state, var customTerminatedMessage) = _state.RemoveTerminated(t.ActorRef); // here we know that it is the SAME ref which was put in
ReceiveMessage(customTerminatedMessage.GetOrElse(t));
}

Expand Down Expand Up @@ -244,7 +246,8 @@ protected void AddWatcher(IActorRef watchee, IActorRef watcher)
}
else
{
Publish(new Warning(Self.Path.ToString(), Actor.GetType(), string.Format("BUG: illegal Watch({0},{1} for {2}", watchee, watcher, Self)));
Publish(new Warning(Self.Path.ToString(), Actor.GetType(),
$"BUG: illegal Watch({watchee},{watcher} for {Self}"));
}
}

Expand All @@ -255,6 +258,7 @@ protected void AddWatcher(IActorRef watchee, IActorRef watcher)
/// <param name="watcher">TBD</param>
protected void RemWatcher(IActorRef watchee, IActorRef watcher)
{
// assert that watchee and watcher are not null
var watcheeSelf = watchee.Equals(Self);
var watcherSelf = watcher.Equals(Self);

Expand All @@ -273,7 +277,8 @@ protected void RemWatcher(IActorRef watchee, IActorRef watcher)
}
else
{
Publish(new Warning(Self.Path.ToString(), Actor.GetType(), string.Format("BUG: illegal Unwatch({0},{1} for {2}", watchee, watcher, Self)));
Publish(new Warning(Self.Path.ToString(), Actor.GetType(),
$"BUG: illegal Unwatch({watchee},{watcher} for {Self}"));
}
}

Expand Down Expand Up @@ -314,7 +319,7 @@ protected void AddressTerminated(Address address)
/// </summary>
/// <param name="block">TBD</param>
/// <param name="change">TBD</param>
private void MaintainAddressTerminatedSubscription(Action block, IActorRef change = null)
private void MaintainAddressTerminatedSubscription(Action block, IActorRef? change = null)
{
if (IsNonLocal(change))
{
Expand All @@ -333,13 +338,13 @@ private void MaintainAddressTerminatedSubscription(Action block, IActorRef chang
}
}

private static bool IsNonLocal(IActorRef @ref)
private static bool IsNonLocal(IActorRef? @ref)
{
if (@ref == null)
return true;

var a = @ref as IInternalActorRef;
return a != null && !a.IsLocal;
return a is { IsLocal: false };
}

private bool HasNonLocalAddress()
Expand All @@ -359,4 +364,4 @@ private void SubscribeAddressTerminated()
AddressTerminatedTopic.Get(System).Subscribe(Self);
}
}
}
}
2 changes: 1 addition & 1 deletion src/core/Akka/Actor/ActorCell.DefaultMessages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ private void SysMsgInvokeAll(EarliestFirstSystemMessageList messages, int curren
{
switch (message)
{
case SystemMessage sm when ShouldStash(sm, currentState):
case not null when ShouldStash(message, currentState):
Stash(message);
break;
case ActorTaskSchedulerMessage atsm:
Expand Down

0 comments on commit 329b2b3

Please sign in to comment.