Skip to content
Permalink
Browse files

Adding support for multiple exchange subscription

  • Loading branch information...
farajfarook committed Aug 19, 2019
1 parent c50818f commit f78c789334c9b29c70ca75910a878918e326194a
@@ -1,6 +1,6 @@
env:
global:
- PACKAGE_VERSION="1.1.30"
- PACKAGE_VERSION="1.2.0"
- PACKAGE_ICON="https://www.enbiso.com/logo.svg"
- PACKAGE_PROJECT="https://nlib.enbiso.com"
- PACKAGE_REPO="https://github.com/enbiso/Enbiso.NLib"
@@ -14,7 +14,8 @@ public interface IEventBus
/// Publish @event
/// </summary>
/// <param name="event"></param>
void Publish(IEvent @event);
/// <param name="exchange"></param>
void Publish(IEvent @event, string exchange = null);

/// <summary>
/// Subscribe to events
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using Microsoft.Extensions.Logging;
@@ -32,21 +33,25 @@ public void Initialize()
{
if (!_persistentConnection.TryConnect()) return;

var conn = _persistentConnection.GetConnection();
conn.SubscribeAsync($"{_options.Exchange}.>", _options.Client, async (sender, args) =>
var conn = _persistentConnection.GetConnection();

foreach (var exchange in _options.Exchanges)
{
var subject = args.Message.Subject;
var eventName = subject.StartsWith(_options.Exchange)
? subject.Substring(_options.Exchange.Length + 1) : subject;
await _eventProcessor.ProcessEvent(eventName, args.Message.Data);
});
conn.SubscribeAsync($"{exchange}.>", _options.Client, async (sender, args) =>
{
var subject = args.Message.Subject;
var eventName = subject.StartsWith(exchange)
? subject.Substring(exchange.Length + 1) : subject;
await _eventProcessor.ProcessEvent(eventName, args.Message.Data);
});
}
}

public void Publish(IEvent @event)
public void Publish(IEvent @event, string exchange = null)
{
if (!_persistentConnection.IsConnected && !_persistentConnection.TryConnect()) return;
var conn = _persistentConnection.GetConnection();
var eventName = $"{_options.Exchange}.{@event.GetType().Name}";
var eventName = $"{exchange ?? _options.Exchanges.FirstOrDefault()}.{@event.GetType().Name}";
var message = JsonConvert.SerializeObject(@event);
var body = Encoding.UTF8.GetBytes(message);

@@ -24,9 +24,9 @@ public class NatsOptions
/// </summary>
public string Client { get; set; }
/// <summary>
/// Client broker name
/// </summary>
public string Exchange { get; set; }
/// Client brokers name
/// </summary>
public string[] Exchanges { get; set; }
/// <summary>
/// Connection token
/// </summary>
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
@@ -21,7 +22,7 @@ namespace Enbiso.NLib.EventBus.RabbitMq
/// </summary>
public class RabbitMqEventBus : IEventBus
{
private readonly string _brokerName;
private readonly string[] _exchanges;

private readonly IRabbitMqPersistentConnection _persistentConnection;
private readonly ILogger<RabbitMqEventBus> _logger;
@@ -39,7 +40,7 @@ public class RabbitMqEventBus : IEventBus
{
var option = optionWrap.Value;
_queueName = option.Client ?? throw new ArgumentNullException(nameof(option.Client));
_brokerName = option.Exchange ?? throw new ArgumentNullException(nameof(option.Exchange));
_exchanges = option.Exchanges ?? throw new ArgumentNullException(nameof(option.Exchanges));
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_subscriptionsManager = subscriptionManager;
@@ -64,7 +65,7 @@ public void Initialize()
}

/// <inheritdoc />
public void Publish(IEvent @event)
public void Publish(IEvent @event, string exchange = null)
{
if (!_persistentConnection.IsConnected)
{
@@ -80,15 +81,16 @@ public void Publish(IEvent @event)

using (var channel = _persistentConnection.CreateModel())
{
var eventName = @event.GetType().Name;
channel.ExchangeDeclare(exchange: _brokerName, type: "direct");
channel.ExchangeDeclare(exchange: exchange, type: "direct");

var message = JsonConvert.SerializeObject(@event);
var body = Encoding.UTF8.GetBytes(message);

var eventName = @event.GetType().Name;
exchange = exchange ?? _exchanges.FirstOrDefault();
policy.Execute(() =>
{
channel.BasicPublish(exchange: _brokerName, routingKey: eventName, basicProperties: null, body: body);
channel.BasicPublish(exchange: exchange, routingKey: eventName, basicProperties: null, body: body);
});
}
}
@@ -152,7 +154,10 @@ private void DoInternalSubscription(string eventName)

using (var channel = _persistentConnection.CreateModel())
{
channel.QueueBind(queue: _queueName, exchange: _brokerName, routingKey: eventName);
foreach (var exchange in _exchanges)
{
channel.QueueBind(queue: _queueName, exchange: exchange, routingKey: eventName);
}
}
}

@@ -162,7 +167,12 @@ private IModel CreateConsumerChannel()
_persistentConnection.TryConnect();

var channel = _persistentConnection.CreateModel();
channel.ExchangeDeclare(exchange: _brokerName, type: "direct");

foreach (var exchange in _exchanges)
{
channel.ExchangeDeclare(exchange: exchange, type: "direct");
}

channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

channel.CallbackException += (sender, ea) =>
@@ -181,7 +191,10 @@ private void SubscriptionManager_OnEventRemoved(object sender, string eventName)

using (var channel = _persistentConnection.CreateModel())
{
channel.QueueUnbind(_queueName, _brokerName, eventName);
foreach (var exchange in _exchanges)
{
channel.QueueUnbind(_queueName, exchange, eventName);
}
}
}

@@ -35,8 +35,8 @@ public class RabbitMqOption
/// </summary>
public string Client { get; set; }
/// <summary>
/// Client broker name
/// </summary>
public string Exchange { get; set; }
/// Client brokers name
/// </summary>
public string[] Exchanges { get; set; }
}
}
@@ -53,7 +53,7 @@ public void Initialize()
}

/// <inheritdoc />
public void Publish(IEvent @event)
public void Publish(IEvent @event, string exchange = null)
{
var eventName = @event.GetType().Name.Replace(IntegrationEventSuffix, "");
var jsonMessage = JsonConvert.SerializeObject(@event);
@@ -18,7 +18,7 @@ public void SamePublishSubscribeTests()
var opts = Options.Create(new NatsOptions
{
Servers = new [] { "nats://localhost:4222" },
Exchange = "testEx",
Exchanges = new [] {"testEx"},
Client = "tClient"
});
var pConnLogger = new Logger<NatsPersistentConnection>(new NullLoggerFactory());
@@ -38,14 +38,14 @@ public void SeperatePublishSubscribeTests()
var opts1 = Options.Create(new NatsOptions
{
Servers = new[] { "nats://localhost:4222" },
Exchange = "testEx",
Exchanges = new [] {"testEx"},
Client = "tClient1"
});

var opts2 = Options.Create(new NatsOptions
{
Servers = new[] { "nats://localhost:4222" },
Exchange = "testEx",
Exchanges = new [] {"testEx"},
Client = "tClient2"
});

0 comments on commit f78c789

Please sign in to comment.
You can’t perform that action at this time.