Skip to content

Commit

Permalink
[Async TestKit] Convert Akka.Streams.Tests Dsl.TickSourceSpec (#6024)
Browse files Browse the repository at this point in the history
* Convert Akka.Streams.Tests to async - Dsl.TickSourceSpec

* Cleanup

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb committed Jun 27, 2022
1 parent 7da2aa7 commit 6acea81
Showing 1 changed file with 77 additions and 57 deletions.
134 changes: 77 additions & 57 deletions src/core/Akka.Streams.Tests/Dsl/TickSourceSpec.cs
Expand Up @@ -7,13 +7,13 @@

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
using Akka.TestKit.Xunit2.Attributes;
using FluentAssertions;
using Xunit;
// ReSharper disable InvokeAsExtensionMethod

namespace Akka.Streams.Tests.Dsl
{
Expand All @@ -28,71 +28,84 @@ public TickSourceSpec()
}

[Fact]
public void A_Flow_based_on_a_tick_publisher_must_produce_ticks()
public async Task A_Flow_based_on_a_tick_publisher_must_produce_ticks()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
var c = this.CreateManualSubscriberProbe<string>();
Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "tick")
.To(Sink.FromSubscriber(c))
.Run(Materializer);
var sub = c.ExpectSubscription();
var sub = await c.ExpectSubscriptionAsync();
sub.Request(2);
c.ExpectNext("tick");
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
c.ExpectNext("tick");
await c.ExpectNextAsync("tick");
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
await c.ExpectNextAsync("tick");
sub.Cancel();
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
}, Materializer);
}

[Fact]
public void A_Flow_based_on_a_tick_publisher_must_drop_ticks_when_not_requested()
public async Task A_Flow_based_on_a_tick_publisher_must_drop_ticks_when_not_requested()
{
var c = this.CreateManualSubscriberProbe<string>();
Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "tick")
.To(Sink.FromSubscriber(c))
.Run(Materializer);
var sub = c.ExpectSubscription();
sub.Request(2);
c.ExpectNext("tick");
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
c.ExpectNext("tick");
c.ExpectNoMsg(TimeSpan.FromMilliseconds(1400));
sub.Request(2);
c.ExpectNext("tick");
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
c.ExpectNext("tick");
sub.Cancel();
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
await this.AssertAllStagesStoppedAsync(async () =>
{
var c = this.CreateManualSubscriberProbe<string>();
Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "tick")
.To(Sink.FromSubscriber(c))
.Run(Materializer);
var sub = await c.ExpectSubscriptionAsync();
sub.Request(2);
await c.ExpectNextAsync("tick");
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
await c.ExpectNextAsync("tick");
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(1400));
sub.Request(2);
await c.ExpectNextAsync("tick");
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
await c.ExpectNextAsync("tick");
sub.Cancel();
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
}, Materializer);
}

[Fact]
public void A_Flow_based_on_a_tick_publisher_must_reject_multiple_subscribers_but_keep_the_firs()
public async Task A_Flow_based_on_a_tick_publisher_must_reject_multiple_subscribers_but_keep_the_first()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
var p = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "tick")
.RunWith(Sink.AsPublisher<string>(false), Materializer);
var c1 = this.CreateManualSubscriberProbe<string>();
var c2 = this.CreateManualSubscriberProbe<string>();
p.Subscribe(c1);
p.Subscribe(c2);
var sub1 = c1.ExpectSubscription();
c2.ExpectSubscriptionAndError();
var sub1 = await c1.ExpectSubscriptionAsync();
await c2.ExpectSubscriptionAndErrorAsync();
sub1.Request(1);
c1.ExpectNext("tick");
c1.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
await c1.ExpectNextAsync("tick");
await c1.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
sub1.Request(2);
c1.ExpectNext("tick");
await c1.ExpectNextAsync("tick");
sub1.Cancel();
}, Materializer);
}

[LocalFact(SkipLocal = "Racy. See https://github.com/akkadotnet/akka.net/pull/4424#issuecomment-632284459")]
public void A_Flow_based_on_a_tick_publisher_must_be_usable_with_zip_for_a_simple_form_of_rate_limiting()
public async Task A_Flow_based_on_a_tick_publisher_must_be_usable_with_zip_for_a_simple_form_of_rate_limiting()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
var c = this.CreateManualSubscriberProbe<int>();
RunnableGraph.FromGraph(GraphDsl.Create(b =>
Expand All @@ -106,67 +119,74 @@ public void A_Flow_based_on_a_tick_publisher_must_be_usable_with_zip_for_a_simpl
.To(Sink.FromSubscriber(c));
return ClosedShape.Instance;
})).Run(Materializer);
var sub = c.ExpectSubscription();
var sub = await c.ExpectSubscriptionAsync();
sub.Request(1000);
c.ExpectNext(1);
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
c.ExpectNext(2);
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
await c.ExpectNextAsync(1);
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
await c.ExpectNextAsync(2);
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
sub.Cancel();
}, Materializer);
}

[Fact]
public void A_Flow_based_on_a_tick_publisher_must_be_possible_to_cancel()
public async Task A_Flow_based_on_a_tick_publisher_must_be_possible_to_cancel()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
var c = this.CreateManualSubscriberProbe<string>();
var tickSource = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "tick");
var cancelable = tickSource.To(Sink.FromSubscriber(c)).Run(Materializer);
var sub = c.ExpectSubscription();
var sub = await c.ExpectSubscriptionAsync();
sub.Request(2);
c.ExpectNoMsg(TimeSpan.FromMilliseconds(600));
c.ExpectNext("tick");
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
c.ExpectNext("tick");
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(600));
await c.ExpectNextAsync("tick");
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
await c.ExpectNextAsync("tick");
cancelable.Cancel();
AwaitCondition(() => cancelable.IsCancellationRequested);
await AwaitConditionAsync(async () => cancelable.IsCancellationRequested);
sub.Request(3);
c.ExpectComplete();
await c.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Flow_based_on_a_tick_publisher_must_have_IsCancelled_mirror_the_cancellation_state()
public async Task A_Flow_based_on_a_tick_publisher_must_have_IsCancelled_mirror_the_cancellation_state()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
var c = this.CreateManualSubscriberProbe<string>();
var tickSource = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(500), "tick");
var cancelable = tickSource.To(Sink.FromSubscriber(c)).Run(Materializer);
var sub = c.ExpectSubscription();
var sub = await c.ExpectSubscriptionAsync();
sub.Request(2);
c.ExpectNext("tick");
await c.ExpectNextAsync("tick");
cancelable.IsCancellationRequested.Should().BeFalse();
cancelable.Cancel();
cancelable.IsCancellationRequested.Should().BeTrue();
c.ExpectComplete();
await c.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Flow_based_on_a_tick_publisher_must_support_being_cancelled_immediately_after_its_materialization()
public async Task A_Flow_based_on_a_tick_publisher_must_support_being_cancelled_immediately_after_its_materialization()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
var c = this.CreateManualSubscriberProbe<string>();
var tickSource = Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(500), "tick");
var cancelable = tickSource.To(Sink.FromSubscriber(c)).Run(Materializer);
cancelable.Cancel();
var sub = c.ExpectSubscription();
var sub = await c.ExpectSubscriptionAsync();
sub.Request(2);
c.ExpectComplete();
await c.ExpectCompleteAsync();
}, Materializer);
}
}
Expand Down

0 comments on commit 6acea81

Please sign in to comment.