From f78c789334c9b29c70ca75910a878918e326194a Mon Sep 17 00:00:00 2001 From: Faraj Farook Date: Mon, 19 Aug 2019 21:15:22 +1000 Subject: [PATCH] Adding support for multiple exchange subscription --- .travis.yml | 2 +- .../IEventBus.cs | 3 +- Enbiso.NLib.EventBus.Nats/NatsEventBus.cs | 23 ++++++++------ Enbiso.NLib.EventBus.Nats/NatsOptions.cs | 6 ++-- .../RabbitMqEventBus.cs | 31 +++++++++++++------ .../RabbitMqOption.cs | 6 ++-- .../ServiceBusEventBus.cs | 2 +- .../NatsEventBusTests.cs | 6 ++-- 8 files changed, 49 insertions(+), 30 deletions(-) diff --git a/.travis.yml b/.travis.yml index 59dec91..297a148 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ env: global: - - PACKAGE_VERSION="1.1.30" + - PACKAGE_VERSION="1.2.0" - PACKAGE_ICON="https://www.enbiso.com/logo.svg" - PACKAGE_PROJECT="https://nlib.enbiso.com" - PACKAGE_REPO="https://github.com/enbiso/Enbiso.NLib" diff --git a/Enbiso.NLib.EventBus.Abstractions/IEventBus.cs b/Enbiso.NLib.EventBus.Abstractions/IEventBus.cs index d9e2138..cb28fe8 100644 --- a/Enbiso.NLib.EventBus.Abstractions/IEventBus.cs +++ b/Enbiso.NLib.EventBus.Abstractions/IEventBus.cs @@ -14,7 +14,8 @@ public interface IEventBus /// Publish @event /// /// - void Publish(IEvent @event); + /// + void Publish(IEvent @event, string exchange = null); /// /// Subscribe to events diff --git a/Enbiso.NLib.EventBus.Nats/NatsEventBus.cs b/Enbiso.NLib.EventBus.Nats/NatsEventBus.cs index ddaba2b..d6045aa 100644 --- a/Enbiso.NLib.EventBus.Nats/NatsEventBus.cs +++ b/Enbiso.NLib.EventBus.Nats/NatsEventBus.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Net.Sockets; using System.Text; using Microsoft.Extensions.Logging; @@ -32,21 +33,25 @@ public void Initialize() { if (!_persistentConnection.TryConnect()) return; - var conn = _persistentConnection.GetConnection(); - conn.SubscribeAsync($"{_options.Exchange}.>", _options.Client, async (sender, args) => + var conn = _persistentConnection.GetConnection(); + + foreach (var exchange in _options.Exchanges) { - var subject = args.Message.Subject; - var eventName = subject.StartsWith(_options.Exchange) - ? subject.Substring(_options.Exchange.Length + 1) : subject; - await _eventProcessor.ProcessEvent(eventName, args.Message.Data); - }); + conn.SubscribeAsync($"{exchange}.>", _options.Client, async (sender, args) => + { + var subject = args.Message.Subject; + var eventName = subject.StartsWith(exchange) + ? subject.Substring(exchange.Length + 1) : subject; + await _eventProcessor.ProcessEvent(eventName, args.Message.Data); + }); + } } - public void Publish(IEvent @event) + public void Publish(IEvent @event, string exchange = null) { if (!_persistentConnection.IsConnected && !_persistentConnection.TryConnect()) return; var conn = _persistentConnection.GetConnection(); - var eventName = $"{_options.Exchange}.{@event.GetType().Name}"; + var eventName = $"{exchange ?? _options.Exchanges.FirstOrDefault()}.{@event.GetType().Name}"; var message = JsonConvert.SerializeObject(@event); var body = Encoding.UTF8.GetBytes(message); diff --git a/Enbiso.NLib.EventBus.Nats/NatsOptions.cs b/Enbiso.NLib.EventBus.Nats/NatsOptions.cs index c9f55fc..d39319e 100644 --- a/Enbiso.NLib.EventBus.Nats/NatsOptions.cs +++ b/Enbiso.NLib.EventBus.Nats/NatsOptions.cs @@ -24,9 +24,9 @@ public class NatsOptions /// public string Client { get; set; } /// - /// Client broker name - /// - public string Exchange { get; set; } + /// Client brokers name + /// + public string[] Exchanges { get; set; } /// /// Connection token /// diff --git a/Enbiso.NLib.EventBus.RabbitMq/RabbitMqEventBus.cs b/Enbiso.NLib.EventBus.RabbitMq/RabbitMqEventBus.cs index 3091bcb..ca086bf 100644 --- a/Enbiso.NLib.EventBus.RabbitMq/RabbitMqEventBus.cs +++ b/Enbiso.NLib.EventBus.RabbitMq/RabbitMqEventBus.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Net.Sockets; using System.Text; using System.Threading.Tasks; @@ -21,7 +22,7 @@ namespace Enbiso.NLib.EventBus.RabbitMq /// public class RabbitMqEventBus : IEventBus { - private readonly string _brokerName; + private readonly string[] _exchanges; private readonly IRabbitMqPersistentConnection _persistentConnection; private readonly ILogger _logger; @@ -39,7 +40,7 @@ public class RabbitMqEventBus : IEventBus { var option = optionWrap.Value; _queueName = option.Client ?? throw new ArgumentNullException(nameof(option.Client)); - _brokerName = option.Exchange ?? throw new ArgumentNullException(nameof(option.Exchange)); + _exchanges = option.Exchanges ?? throw new ArgumentNullException(nameof(option.Exchanges)); _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _subscriptionsManager = subscriptionManager; @@ -64,7 +65,7 @@ public void Initialize() } /// - public void Publish(IEvent @event) + public void Publish(IEvent @event, string exchange = null) { if (!_persistentConnection.IsConnected) { @@ -80,15 +81,16 @@ public void Publish(IEvent @event) using (var channel = _persistentConnection.CreateModel()) { - var eventName = @event.GetType().Name; - channel.ExchangeDeclare(exchange: _brokerName, type: "direct"); + channel.ExchangeDeclare(exchange: exchange, type: "direct"); var message = JsonConvert.SerializeObject(@event); var body = Encoding.UTF8.GetBytes(message); + var eventName = @event.GetType().Name; + exchange = exchange ?? _exchanges.FirstOrDefault(); policy.Execute(() => { - channel.BasicPublish(exchange: _brokerName, routingKey: eventName, basicProperties: null, body: body); + channel.BasicPublish(exchange: exchange, routingKey: eventName, basicProperties: null, body: body); }); } } @@ -152,7 +154,10 @@ private void DoInternalSubscription(string eventName) using (var channel = _persistentConnection.CreateModel()) { - channel.QueueBind(queue: _queueName, exchange: _brokerName, routingKey: eventName); + foreach (var exchange in _exchanges) + { + channel.QueueBind(queue: _queueName, exchange: exchange, routingKey: eventName); + } } } @@ -162,7 +167,12 @@ private IModel CreateConsumerChannel() _persistentConnection.TryConnect(); var channel = _persistentConnection.CreateModel(); - channel.ExchangeDeclare(exchange: _brokerName, type: "direct"); + + foreach (var exchange in _exchanges) + { + channel.ExchangeDeclare(exchange: exchange, type: "direct"); + } + channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.CallbackException += (sender, ea) => @@ -181,7 +191,10 @@ private void SubscriptionManager_OnEventRemoved(object sender, string eventName) using (var channel = _persistentConnection.CreateModel()) { - channel.QueueUnbind(_queueName, _brokerName, eventName); + foreach (var exchange in _exchanges) + { + channel.QueueUnbind(_queueName, exchange, eventName); + } } } diff --git a/Enbiso.NLib.EventBus.RabbitMq/RabbitMqOption.cs b/Enbiso.NLib.EventBus.RabbitMq/RabbitMqOption.cs index b642867..b613cf6 100644 --- a/Enbiso.NLib.EventBus.RabbitMq/RabbitMqOption.cs +++ b/Enbiso.NLib.EventBus.RabbitMq/RabbitMqOption.cs @@ -35,8 +35,8 @@ public class RabbitMqOption /// public string Client { get; set; } /// - /// Client broker name - /// - public string Exchange { get; set; } + /// Client brokers name + /// + public string[] Exchanges { get; set; } } } \ No newline at end of file diff --git a/Enbiso.NLib.EventBus.ServiceBus/ServiceBusEventBus.cs b/Enbiso.NLib.EventBus.ServiceBus/ServiceBusEventBus.cs index 7bba0ac..bc78ad6 100644 --- a/Enbiso.NLib.EventBus.ServiceBus/ServiceBusEventBus.cs +++ b/Enbiso.NLib.EventBus.ServiceBus/ServiceBusEventBus.cs @@ -53,7 +53,7 @@ public void Initialize() } /// - public void Publish(IEvent @event) + public void Publish(IEvent @event, string exchange = null) { var eventName = @event.GetType().Name.Replace(IntegrationEventSuffix, ""); var jsonMessage = JsonConvert.SerializeObject(@event); diff --git a/Enbiso.NLib.Tests/Enbiso.NLib.EventBus.Nats.Tests/NatsEventBusTests.cs b/Enbiso.NLib.Tests/Enbiso.NLib.EventBus.Nats.Tests/NatsEventBusTests.cs index 888133d..a024b2a 100644 --- a/Enbiso.NLib.Tests/Enbiso.NLib.EventBus.Nats.Tests/NatsEventBusTests.cs +++ b/Enbiso.NLib.Tests/Enbiso.NLib.EventBus.Nats.Tests/NatsEventBusTests.cs @@ -18,7 +18,7 @@ public void SamePublishSubscribeTests() var opts = Options.Create(new NatsOptions { Servers = new [] { "nats://localhost:4222" }, - Exchange = "testEx", + Exchanges = new [] {"testEx"}, Client = "tClient" }); var pConnLogger = new Logger(new NullLoggerFactory()); @@ -38,14 +38,14 @@ public void SeperatePublishSubscribeTests() var opts1 = Options.Create(new NatsOptions { Servers = new[] { "nats://localhost:4222" }, - Exchange = "testEx", + Exchanges = new [] {"testEx"}, Client = "tClient1" }); var opts2 = Options.Create(new NatsOptions { Servers = new[] { "nats://localhost:4222" }, - Exchange = "testEx", + Exchanges = new [] {"testEx"}, Client = "tClient2" });