Skip to content

Commit

Permalink
harden timing on racy Akka.Streams specs (#6397)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb committed Feb 10, 2023
1 parent 74ff4f2 commit c5a52cb
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
3 changes: 2 additions & 1 deletion src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs
Expand Up @@ -106,14 +106,15 @@ await foreach (var _ in task)
materializer.Shutdown();
}
});

//since we are collapsing the stream inside the read
//we want to send messages so we aren't just waiting forever.
await probe.SendNextAsync(1);
await probe.SendNextAsync(2);
var thrown = false;
try
{
await a.ShouldCompleteWithin(3.Seconds());
await a.ShouldCompleteWithin(10.Seconds());
}
catch (StreamDetachedException)
{
Expand Down
14 changes: 8 additions & 6 deletions src/core/Akka.Streams.Tests/Dsl/HubSpec.cs
Expand Up @@ -271,14 +271,16 @@ public async Task MergeHub_must_keep_working_even_if_one_of_the_producers_fail()
{
var (sink, task) = MergeHub.Source<int>(16).Take(10).ToMaterialized(Sink.Seq<int>(), Keep.Both).Run(Materializer);
await EventFilter.Error(contains: "Upstream producer failed with exception").ExpectOneAsync(async () =>
await WithinAsync(10.Seconds(), async () =>
{
Source.Failed<int>(new TestException("failing")).RunWith(sink, Materializer);
Source.From(Enumerable.Range(1, 10)).RunWith(sink, Materializer);
var result = await task.ShouldCompleteWithin(3.Seconds());
result.Should().BeEquivalentTo(Enumerable.Range(1, 10));
await EventFilter.Error(contains: "Upstream producer failed with exception").ExpectOneAsync(async () =>
{
Source.Failed<int>(new TestException("failing")).RunWith(sink, Materializer);
Source.From(Enumerable.Range(1, 10)).RunWith(sink, Materializer);
var result = await task.ShouldCompleteWithin(3.Seconds());
result.Should().BeEquivalentTo(Enumerable.Range(1, 10));
});
});
}, Materializer);
}

Expand Down

0 comments on commit c5a52cb

Please sign in to comment.