Skip to content

Commit

Permalink
[Async TestKit] Fix WithinAsync and Remaining logic (#6010)
Browse files Browse the repository at this point in the history
* Fix WithinAsync and Remaining logic

* Guard or warn against possible negative or infinite timeout scenarios

* Fix tests that uses negative durations

* Fix negative duration specs

* fix InactiveEntityPassivationSpec

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb committed Jun 24, 2022
1 parent b56ea0b commit f3469d7
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 34 deletions.
Expand Up @@ -150,7 +150,8 @@ protected async Task<TimeSpan> TimeUntilPassivate(IActorRef region, TestProbe pr
probe.ExpectMsg<Entity.GotIt>().Id.ShouldBe("2");

var timeSinceOneSawAMessage = DateTime.Now.Ticks - timeOneSawMessage;
return settings.PassivateIdleEntityAfter - TimeSpan.FromTicks(timeSinceOneSawAMessage) + smallTolerance;
var time = settings.PassivateIdleEntityAfter - TimeSpan.FromTicks(timeSinceOneSawAMessage) + smallTolerance;
return time < smallTolerance ? smallTolerance : time;
}
}

Expand Down Expand Up @@ -197,7 +198,7 @@ public async Task Passivation_of_inactive_entities_must_not_passivate_when_passi
var probe = CreateTestProbe();
var region = Start(probe);
var time = await TimeUntilPassivate(region, probe);
probe.ExpectNoMsg(time);
await probe.ExpectNoMsgAsync(time);
}
}
}
1 change: 1 addition & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowFlattenMergeSpec.cs
Expand Up @@ -277,6 +277,7 @@ public async Task A_FlattenMerge_must_work_with_many_concurrently_queued_events(
{
elems.Add(await p.RequestNextAsync());
}
await p.ExpectCompleteAsync();
elems.Should().BeEquivalentTo(Enumerable.Range(0, noOfSources * 10));
}, Materializer);
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/Dsl/LastSinkSpec.cs
Expand Up @@ -36,7 +36,7 @@ public async Task A_Flow_with_Sink_Last_must_yield_the_last_value()
{
var result = await Source.From(Enumerable.Range(1,42)).Select(x=>x)
.RunWith(Sink.Last<int>(), Materializer)
.ShouldCompleteWithin(1.Seconds());
.ShouldCompleteWithin(3.Seconds());
result.Should().Be(42);
}, Materializer);
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Streams.Tests/IO/OutputStreamSourceSpec.cs
Expand Up @@ -94,7 +94,7 @@ await outputStream.WriteAsync(_bytesArray, 0, _bytesArray.Length)
var f = outputStream.FlushAsync();
await ExpectTimeout(f, Timeout);
await probe.ExpectNoMsgAsync(TimeSpan.MinValue);
await probe.ExpectNoMsgAsync(TimeSpan.Zero);
s.Request(1);
await f.ShouldCompleteWithin(Timeout);
Expand Down Expand Up @@ -155,7 +155,7 @@ await outputStream.WriteAsync(_bytesArray, 0, _byteString.Count)
var f = outputStream.WriteAsync(_bytesArray, 0, _byteString.Count);
await ExpectTimeout(f, Timeout);
await probe.ExpectNoMsgAsync(TimeSpan.MinValue);
await probe.ExpectNoMsgAsync(TimeSpan.Zero);
s.Request(17);
await f.ShouldCompleteWithin(Timeout);
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.TestKit/Internal/TimeSpanExtensions.cs
Expand Up @@ -130,7 +130,7 @@ public static bool IsInfiniteTimeout(this TimeSpan? timeSpan)
public static void EnsureIsPositiveFinite(this TimeSpan timeSpan, string parameterName)
{
if(!IsPositiveFinite(timeSpan))
throw new ArgumentException($"The timespan must be greater than zero. Actual value: {timeSpan}", nameof(parameterName));
throw new ArgumentException($"The timespan must be greater than zero. Actual value: {timeSpan}", parameterName);
}

/// <summary>
Expand Down
29 changes: 18 additions & 11 deletions src/core/Akka.TestKit/TestKitBase.cs
Expand Up @@ -413,9 +413,15 @@ public TimeSpan Remaining
{
get
{
// ReSharper disable once PossibleInvalidOperationException
if (_testState.End.IsPositiveFinite()) return _testState.End.Value - Now;
throw new InvalidOperationException(@"Remaining may not be called outside of ""within""");
if(_testState.End is null)
throw new InvalidOperationException(@"Remaining may not be called outside of ""within""");

if (_testState.End < TimeSpan.Zero)
throw new InvalidOperationException($"End can not be negative, was: {_testState.End}");

// Make sure that the returned value is a positive TimeSpan
var remaining = _testState.End.Value - Now;
return remaining < TimeSpan.Zero ? TimeSpan.Zero : remaining;
}
}

Expand All @@ -429,10 +435,12 @@ public TimeSpan Remaining
protected TimeSpan RemainingOr(TimeSpan duration)
{
if (!_testState.End.HasValue) return duration;
if (_testState.End.IsInfinite())
throw new ArgumentException("end cannot be infinite");
return _testState.End.Value - Now;
if (_testState.End < TimeSpan.Zero)
throw new InvalidOperationException($"End can not be negative, was: {_testState.End}");

// Make sure that the returned value is a positive TimeSpan
var remaining = _testState.End.Value - Now;
return remaining < TimeSpan.Zero ? TimeSpan.Zero : remaining;
}

/// <summary>
Expand All @@ -448,7 +456,7 @@ protected TimeSpan RemainingOr(TimeSpan duration)
public TimeSpan RemainingOrDilated(TimeSpan? duration)
{
if(!duration.HasValue) return RemainingOrDefault;
if(duration.IsInfinite()) throw new ArgumentException("max duration cannot be infinite");
if(duration < TimeSpan.Zero) throw new ArgumentException("Must be positive TimeSpan", nameof(duration));
return Dilated(duration.Value);
}

Expand All @@ -461,10 +469,9 @@ public TimeSpan RemainingOrDilated(TimeSpan? duration)
/// <returns>TBD</returns>
public TimeSpan Dilated(TimeSpan duration)
{
if(duration.IsPositiveFinite())
return new TimeSpan((long)(duration.Ticks * _testState.TestKitSettings.TestTimeFactor));
//Else: 0 or infinite (negative)
return duration;
if (duration < TimeSpan.Zero)
throw new ArgumentException("Must not be negative", nameof(duration));
return new TimeSpan((long)(duration.Ticks * _testState.TestKitSettings.TestTimeFactor));
}


Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.TestKit/TestKitBase_Receive.cs
Expand Up @@ -267,7 +267,7 @@ public async ValueTask<object> ReceiveOneAsync(TimeSpan? max = null, Cancellatio
}
else if (maxDuration == Timeout.InfiniteTimeSpan)
{
ConditionalLog(shouldLog, "Trying to receive message from TestActor queue. Will wait indefinitely.");
Log.Warning("Trying to receive message from TestActor queue with infinite timeout! Will wait indefinitely!");
take = await _testState.Queue.TryTakeAsync(-1, cancellationToken)
.ConfigureAwait(false);
}
Expand Down Expand Up @@ -398,7 +398,7 @@ public async ValueTask<(bool success, MessageEnvelope envelope)> TryPeekOneAsync
}
else if (maxDuration == Timeout.InfiniteTimeSpan)
{
ConditionalLog(shouldLog, "Trying to peek message from TestActor queue. Will wait indefinitely.");
Log.Warning("Trying to peek message from TestActor queue with infinite timeout! Will wait indefinitely!");
peek = await _testState.Queue.TryPeekAsync(-1, cancellationToken)
.ConfigureAwait(false);
}
Expand Down
46 changes: 37 additions & 9 deletions src/core/Akka.TestKit/TestKitBase_Within.cs
Expand Up @@ -9,6 +9,7 @@
using System.Threading;
using System.Threading.Tasks;
using Akka.TestKit.Internal;
using FluentAssertions.Extensions;
using Nito.AsyncEx.Synchronous;

namespace Akka.TestKit
Expand Down Expand Up @@ -230,7 +231,16 @@ public abstract partial class TestKitBase
/// <para>`within` blocks may be nested. All methods in this class which take maximum wait times
/// are available in a version which implicitly uses the remaining time governed by
/// the innermost enclosing `within` block.</para>
/// <remarks>Note that the max duration is scaled using <see cref="Dilated(TimeSpan)"/> which uses the config value "akka.test.timefactor"</remarks>
/// <remarks>
/// <para>
/// Note that the max duration is scaled using <see cref="Dilated(TimeSpan)"/> which uses the config value "akka.test.timefactor".
/// </para>
/// <para>
/// Note that due to how asynchronous Task is executed in managed code, there is no way to stop a running Task.
/// If this assertion fails in any way, the <paramref name="function"/> Task might still be running in the
/// background and might not be stopped/disposed until the unit test is over.
/// </para>
/// </remarks>
/// </summary>
/// <typeparam name="T">TBD</typeparam>
/// <param name="min">TBD</param>
Expand All @@ -249,7 +259,7 @@ public abstract partial class TestKitBase
CancellationToken cancellationToken = default)
{
min.EnsureIsPositiveFinite("min");
min.EnsureIsPositiveFinite("max");
max.EnsureIsPositiveFinite("max");
max = Dilated(max);
var start = Now;
var rem = _testState.End.HasValue ? _testState.End.Value - start : Timeout.InfiniteTimeSpan;
Expand All @@ -261,14 +271,32 @@ public abstract partial class TestKitBase
var prevEnd = _testState.End;
_testState.End = start + maxDiff;

T ret;
try
{
ret = await function();
}
finally
T ret = default;
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
{
_testState.End = prevEnd;
try
{
var executionTask = function();
// Limit the execution time block to the maximum allowed execution time.
// 200 milliseconds is added because Task.Delay() timer is not precise and can return prematurely.
var resultTask = await Task.WhenAny(executionTask, Task.Delay(max + 200.Milliseconds(), cts.Token));

if (resultTask == executionTask)
{
ret = executionTask.Result;
}
else
{
// Just throw if the calling code cancels the cancellation token
cancellationToken.ThrowIfCancellationRequested();
}
}
finally
{
// Make sure we stop the delay task
cts.Cancel();
_testState.End = prevEnd;
}
}

var elapsed = Now - start;
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs
Expand Up @@ -192,9 +192,9 @@ public async Task The_UDP_connection_oriented_implementation_must_not_leak_memor
for (var j = 0; j < batchSize; ++j)
client.Tell(UdpConnected.Send.Create(data));

var msgs = serverProbe.ReceiveNAsync(batchSize, TimeSpan.FromSeconds(10));
var cast = await msgs.Cast<Udp.Received>().ToListAsync();
cast.Sum(m => m.Data.Count).Should().Be(data.Count * batchSize);
var msgs = await serverProbe.ReceiveNAsync(batchSize, TimeSpan.FromSeconds(10))
.Cast<Udp.Received>().ToListAsync();
msgs.Sum(m => m.Data.Count).Should().Be(data.Count * batchSize);
}

// stop all connections so all receives are stopped and all pending SocketAsyncEventArgs are collected
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka.Tests/IO/UdpIntegrationSpec.cs
Expand Up @@ -262,9 +262,9 @@ public async Task The_UDP_Fire_and_Forget_SimpleSender_implementation_must_not_l
for (int i = 0; i < batchSize; i++)
sender.Tell(Udp.Send.Create(data, serverLocalEndpoint));

var msgs = await serverProbe.ReceiveNAsync(batchSize, default).ToListAsync();
var receives = msgs.Cast<Udp.Received>();
receives.Sum(r => r.Data.Count).Should().Be(data.Count * batchSize);
var msgs = await serverProbe.ReceiveNAsync(batchSize, 10.Seconds())
.Cast<Udp.Received>().ToListAsync();
msgs.Sum(r => r.Data.Count).Should().Be(data.Count * batchSize);
}

// stop all connections so all receives are stopped and all pending SocketAsyncEventArgs are collected
Expand Down

0 comments on commit f3469d7

Please sign in to comment.