-
Notifications
You must be signed in to change notification settings - Fork 40
/
QueueSourceSpecs.cs
117 lines (94 loc) · 4.02 KB
/
QueueSourceSpecs.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
using System;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Supervision;
using Akka.Streams.TestKit;
using FluentAssertions;
using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;
using Xunit;
using Xunit.Abstractions;
namespace Akka.Streams.Azure.StorageQueue.Tests
{
public class QueueSourceSpecs : QueueSpecBase
{
public QueueSourceSpecs(ITestOutputHelper output) : base(output)
{
}
[Fact]
public async Task A_QueueSource_should_push_available_messages()
{
await Queue.SendMessageAsync("Test1");
await Queue.SendMessageAsync("Test2");
await Queue.SendMessageAsync("Test3");
var probe = QueueSource.Create(Queue)
.Take(3)
.Select(x => x.MessageText)
.RunWith(this.SinkProbe<string>(), Materializer);
probe.Request(3)
.ExpectNext("Test1", "Test2", "Test3")
.ExpectComplete();
}
[Fact]
public async Task A_QueueSource_should_poll_for_messages_if_the_queue_is_empty()
{
await Queue.SendMessageAsync("Test1");
var probe = QueueSource.Create(Queue, pollInterval: TimeSpan.FromSeconds(1))
.Take(3)
.Select(x => x.MessageText)
.RunWith(this.SinkProbe<string>(), Materializer);
probe.Request(2)
.ExpectNext("Test1")
.ExpectNoMsg(TimeSpan.FromSeconds(3));
await Queue.SendMessageAsync("Test2");
await Queue.SendMessageAsync("Test3");
probe.ExpectNext("Test2", TimeSpan.FromSeconds(2));
probe.Request(1).ExpectNext("Test3").ExpectComplete();
}
[Fact]
public async Task A_QueueSource_should_only_poll_if_demand_is_available()
{
await Queue.SendMessageAsync("Test1");
var probe = QueueSource.Create(Queue, pollInterval: TimeSpan.FromSeconds(1))
.Select(x =>
{
Queue.DeleteMessage(x.MessageId, x.PopReceipt);
return x.MessageText;
})
.RunWith(this.SinkProbe<string>(), Materializer);
probe.Request(1).ExpectNext("Test1");
await Queue.SendMessageAsync("Test2");
probe.ExpectNoMsg(TimeSpan.FromSeconds(3));
//Message wouldn't be visible if the source has called GetMessages even if the message wasn't pushed to the stream
(await Queue.PeekMessagesAsync(1)).Value[0].MessageText.Should().Be("Test2");
probe.Request(1).ExpectNext("Test2");
}
[Fact]
public async Task A_QueueSource_should_fail_when_an_error_occurs()
{
await Queue.DeleteIfExistsAsync();
var probe = QueueSource.Create(Queue, pollInterval: TimeSpan.FromSeconds(1))
.Take(3)
.Select(x => x.MessageText)
.RunWith(this.SinkProbe<string>(), Materializer);
Output.WriteLine(probe.Request(1).ExpectError().Message);
}
[Fact]
public async Task A_QueueSource_should_not_fail_if_the_supervision_strategy_is_not_stop_when_an_error_occurs()
{
await Queue.DeleteIfExistsAsync();
var probe = QueueSource.Create(Queue, pollInterval: TimeSpan.FromSeconds(1))
.Take(3)
.Select(x => x.MessageText)
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.RunWith(this.SinkProbe<string>(), Materializer);
probe.Request(3).ExpectNoMsg();
await Queue.CreateIfNotExistsAsync();
await Queue.SendMessageAsync("Test1", TimeSpan.Zero);
await Queue.SendMessageAsync("Test2", TimeSpan.Zero);
await Queue.SendMessageAsync("Test3", TimeSpan.Zero);
probe.ExpectNext("Test1", "Test2", "Test3")
.ExpectComplete();
}
}
}