-
Notifications
You must be signed in to change notification settings - Fork 1k
/
BaseTwoStreamsSetup.cs
144 lines (127 loc) · 5.28 KB
/
BaseTwoStreamsSetup.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
//-----------------------------------------------------------------------
// <copyright file="BaseTwoStreamsSetup.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
using FluentAssertions;
using Reactive.Streams;
using Xunit;
using Xunit.Abstractions;
namespace Akka.Streams.Tests
{
public abstract class BaseTwoStreamsSetup<TOutputs> : AkkaSpec
{
protected readonly ActorMaterializer Materializer;
protected BaseTwoStreamsSetup(ITestOutputHelper output = null) : base(output)
{
var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(initialSize: 2, maxSize: 2);
Materializer = ActorMaterializer.Create(Sys, settings);
}
protected Exception TestException()
{
return new TestException("test");
}
protected virtual TestSubscriber.Probe<TOutputs> Setup(IPublisher<int> p1, IPublisher<int> p2)
{
return this.CreateSubscriberProbe<TOutputs>();
}
protected IPublisher<T> FailedPublisher<T>()
{
return TestPublisher.Error<T>(TestException());
}
protected IPublisher<T> CompletedPublisher<T>()
{
return TestPublisher.Empty<T>();
}
protected IPublisher<T> NonEmptyPublisher<T>(IEnumerable<T> elements)
{
return Source.From(elements).RunWith(Sink.AsPublisher<T>(false), Materializer);
}
protected IPublisher<T> SoonToFailPublisher<T>()
{
return TestPublisher.LazyError<T>(TestException());
}
protected IPublisher<T> SoonToCompletePublisher<T>()
{
return TestPublisher.LazyEmpty<T>();
}
[Fact]
public async Task Should_work_with_two_immediately_completed_publishers()
{
await this.AssertAllStagesStoppedAsync(async () =>
{
var subscriber = Setup(CompletedPublisher<int>(), CompletedPublisher<int>());
await subscriber.AsyncBuilder()
.ExpectSubscriptionAndComplete()
.ExecuteAsync();
}, Materializer);
}
[Fact]
public async Task Should_work_with_two_delayed_completed_publishers()
{
await this.AssertAllStagesStoppedAsync(async () =>
{
var subscriber = Setup(SoonToCompletePublisher<int>(), SoonToCompletePublisher<int>());
await subscriber.AsyncBuilder()
.ExpectSubscriptionAndComplete()
.ExecuteAsync();
}, Materializer);
}
[Fact]
public async Task Should_work_with_one_immediately_completed_and_one_delayed_completed_publisher()
{
await this.AssertAllStagesStoppedAsync(async () =>
{
var subscriber = Setup(CompletedPublisher<int>(), SoonToCompletePublisher<int>());
await subscriber.AsyncBuilder()
.ExpectSubscriptionAndComplete()
.ExecuteAsync();
}, Materializer);
}
[Fact]
public async Task Should_work_with_two_immediately_failed_publishers()
{
await this.AssertAllStagesStoppedAsync(async () =>
{
var subscriber = Setup(FailedPublisher<int>(), FailedPublisher<int>());
(await subscriber.ExpectSubscriptionAndErrorAsync()).Should().Be(TestException());
}, Materializer);
}
[Fact]
public async Task Should_work_with_two_delayed_failed_publishers()
{
await this.AssertAllStagesStoppedAsync(async () =>
{
var subscriber = Setup(SoonToFailPublisher<int>(), SoonToFailPublisher<int>());
(await subscriber.ExpectSubscriptionAndErrorAsync()).Should().Be(TestException());
}, Materializer);
}
// Warning: The two test cases below are somewhat implementation specific and might fail if the implementation
// is changed. They are here to be an early warning though.
[Fact]
public async Task Should_work_with_one_immediately_failed_and_one_delayed_failed_publisher_case_1()
{
await this.AssertAllStagesStoppedAsync(async () =>
{
var subscriber = Setup(SoonToFailPublisher<int>(), FailedPublisher<int>());
(await subscriber.ExpectSubscriptionAndErrorAsync()).Should().Be(TestException());
}, Materializer);
}
[Fact]
public async Task Should_work_with_one_immediately_failed_and_one_delayed_failed_publisher_case_2()
{
await this.AssertAllStagesStoppedAsync(async () =>
{
var subscriber = Setup(FailedPublisher<int>(), SoonToFailPublisher<int>());
(await subscriber.ExpectSubscriptionAndErrorAsync()).Should().Be(TestException());
}, Materializer);
}
}
}