-
Notifications
You must be signed in to change notification settings - Fork 8
/
FanoutPublishStrategyWithConfirmations.cs
39 lines (34 loc) · 1.39 KB
/
FanoutPublishStrategyWithConfirmations.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
using System;
using Lykke.RabbitMqBroker.Subscriber;
using RabbitMQ.Client;
namespace Lykke.RabbitMqBroker.Publisher
{
/// <summary>
/// Publish strategy for fanout exchange with publisher confirmation.
/// </summary>
public sealed class FanoutPublishStrategyWithConfirmations : IRabbitMqPublishStrategy
{
private readonly bool _durable;
private readonly TimeSpan _defaultConfirmationTimeout = TimeSpan.FromSeconds(5);
public FanoutPublishStrategyWithConfirmations(RabbitMqSubscriptionSettings settings)
{
if (settings == null)
throw new ArgumentNullException(nameof(settings));
_durable = settings.IsDurable;
}
public void Configure(RabbitMqSubscriptionSettings settings, IModel channel)
{
channel.ExchangeDeclare(exchange: settings.ExchangeName, type: "fanout", durable: _durable);
channel.ConfirmSelect();
}
public void Publish(RabbitMqSubscriptionSettings settings, IModel channel, RawMessage message)
{
channel.BasicPublish(
exchange: settings.ExchangeName,
routingKey: string.Empty,
basicProperties: null,
body: message.Body);
channel.WaitForConfirmsOrDie(settings.PublisherConfirmationTimeout ?? _defaultConfirmationTimeout);
}
}
}