-
Notifications
You must be signed in to change notification settings - Fork 749
/
QueueRetrieval.cs
executable file
·69 lines (59 loc) · 2.1 KB
/
QueueRetrieval.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
using EasyNetQ.Consumer;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
namespace EasyNetQ.Hosepipe;
public interface IQueueRetrieval
{
IEnumerable<HosepipeMessage> GetMessagesFromQueue(QueueParameters parameters);
}
public class QueueRetrieval : IQueueRetrieval
{
private readonly IErrorMessageSerializer errorMessageSerializer;
public QueueRetrieval(IErrorMessageSerializer errorMessageSerializer)
{
this.errorMessageSerializer = errorMessageSerializer;
}
public IEnumerable<HosepipeMessage> GetMessagesFromQueue(QueueParameters parameters)
{
using var connection = HosepipeConnection.FromParameters(parameters);
using var channel = connection.CreateModel();
try
{
channel.QueueDeclarePassive(parameters.QueueName);
}
catch (OperationInterruptedException exception)
{
Console.WriteLine(exception.Message);
yield break;
}
var count = 0;
while (count++ < parameters.NumberOfMessagesToRetrieve)
{
BasicGetResult basicGetResult;
try
{
basicGetResult = channel.BasicGet(parameters.QueueName, false);
if (basicGetResult == null) break; // no more messages on the queue
if (parameters.Purge)
{
channel.BasicAck(basicGetResult.DeliveryTag, false);
}
}
catch (Exception exception)
{
Console.WriteLine(exception.Message);
throw;
}
var properties = new MessageProperties(basicGetResult.BasicProperties);
var info = new MessageReceivedInfo(
"hosepipe",
basicGetResult.DeliveryTag,
basicGetResult.Redelivered,
basicGetResult.Exchange,
basicGetResult.RoutingKey,
parameters.QueueName
);
yield return new HosepipeMessage(errorMessageSerializer.Serialize(basicGetResult.Body.ToArray()), properties, info);
}
}
}