/
EventBusQueue.cs
113 lines (95 loc) · 3.82 KB
/
EventBusQueue.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
using Amazon.SQS;
using Amazon.SQS.Model;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Olive.Aws
{
public class EventBusQueue : IEventBusQueue
{
internal string QueueUrl;
internal AmazonSQSClient Client;
internal bool IsFifo => QueueUrl.EndsWith(".fifo");
/// <summary>
/// Gets and sets the property MaxNumberOfMessages.
/// The maximum number of messages to return. Amazon SQS never returns more messages
/// than this value (however, fewer messages might be returned). Valid values: 1
/// to 10. Default: 1.
/// </summary>
public int MaxNumberOfMessages { get; set; } = Config.Get("Aws:EventBusQueue:MaxNumberOfMessages", 10);
/// <summary>
/// Gets and sets the property VisibilityTimeout.
/// The duration (in seconds) that the received messages are hidden from subsequent
/// retrieve requests after being retrieved by a ReceiveMessage request.
/// </summary>
public int VisibilityTimeout { get; set; } = Config.Get("Aws:EventBusQueue:VisibilityTimeout", 300);
public EventBusQueue(string queueUrl)
{
QueueUrl = queueUrl;
Client = new AmazonSQSClient();
}
public async Task<string> Publish(string message)
{
var request = new SendMessageRequest
{
QueueUrl = QueueUrl,
MessageBody = message,
};
if (IsFifo)
{
request.MessageDeduplicationId = JsonConvert.DeserializeObject<JObject>(message)["DeduplicationId"]?.ToString();
request.MessageGroupId = "Default";
}
var response = await Client.SendMessageAsync(request);
return response.MessageId;
}
public async Task<IEnumerable<string>> PublishBatch(IEnumerable<string> messages)
{
var request = new SendMessageBatchRequest
{
QueueUrl = QueueUrl,
};
messages.Do(message =>
request.Entries.Add(new SendMessageBatchRequestEntry
{
MessageBody = message,
}));
if (IsFifo)
{
request.Entries.ForEach(message =>
{
message.MessageDeduplicationId =
JsonConvert.DeserializeObject<JObject>(message.MessageBody)["DeduplicationId"]?.ToString();
message.MessageGroupId = "Default";
});
}
var response = await Client.SendMessageBatchAsync(request);
return response.Successful.Select(m => m.MessageId);
}
public void Subscribe(Func<string, Task> handler) => new Subscriber(this, handler).Start();
public async Task<QueueMessageHandle> Pull(int timeoutSeconds = 10)
{
var request = new ReceiveMessageRequest
{
QueueUrl = QueueUrl,
WaitTimeSeconds = timeoutSeconds,
MaxNumberOfMessages = MaxNumberOfMessages,
VisibilityTimeout = VisibilityTimeout,
};
var response = await Client.ReceiveMessageAsync(request);
foreach (var item in response.Messages)
{
var receipt = new DeleteMessageRequest { QueueUrl = QueueUrl, ReceiptHandle = item.ReceiptHandle };
return new QueueMessageHandle(item.Body, () => Client.DeleteMessageAsync(receipt));
}
return null;
}
public Task Purge()
{
return Client.PurgeQueueAsync(new PurgeQueueRequest { QueueUrl = QueueUrl });
}
}
}