Skip to content

Commit

Permalink
Adding support for multiple exchange subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
farajfarook committed Aug 19, 2019
1 parent c50818f commit f78c789
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
@@ -1,6 +1,6 @@
env:
global:
- PACKAGE_VERSION="1.1.30"
- PACKAGE_VERSION="1.2.0"
- PACKAGE_ICON="https://www.enbiso.com/logo.svg"
- PACKAGE_PROJECT="https://nlib.enbiso.com"
- PACKAGE_REPO="https://github.com/enbiso/Enbiso.NLib"
Expand Down
3 changes: 2 additions & 1 deletion Enbiso.NLib.EventBus.Abstractions/IEventBus.cs
Expand Up @@ -14,7 +14,8 @@ public interface IEventBus
/// Publish @event
/// </summary>
/// <param name="event"></param>
void Publish(IEvent @event);
/// <param name="exchange"></param>
void Publish(IEvent @event, string exchange = null);

/// <summary>
/// Subscribe to events
Expand Down
23 changes: 14 additions & 9 deletions Enbiso.NLib.EventBus.Nats/NatsEventBus.cs
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -32,21 +33,25 @@ public void Initialize()
{
if (!_persistentConnection.TryConnect()) return;

var conn = _persistentConnection.GetConnection();
conn.SubscribeAsync($"{_options.Exchange}.>", _options.Client, async (sender, args) =>
var conn = _persistentConnection.GetConnection();

foreach (var exchange in _options.Exchanges)
{
var subject = args.Message.Subject;
var eventName = subject.StartsWith(_options.Exchange)
? subject.Substring(_options.Exchange.Length + 1) : subject;
await _eventProcessor.ProcessEvent(eventName, args.Message.Data);
});
conn.SubscribeAsync($"{exchange}.>", _options.Client, async (sender, args) =>
{
var subject = args.Message.Subject;
var eventName = subject.StartsWith(exchange)
? subject.Substring(exchange.Length + 1) : subject;
await _eventProcessor.ProcessEvent(eventName, args.Message.Data);
});
}
}

public void Publish(IEvent @event)
public void Publish(IEvent @event, string exchange = null)
{
if (!_persistentConnection.IsConnected && !_persistentConnection.TryConnect()) return;
var conn = _persistentConnection.GetConnection();
var eventName = $"{_options.Exchange}.{@event.GetType().Name}";
var eventName = $"{exchange ?? _options.Exchanges.FirstOrDefault()}.{@event.GetType().Name}";
var message = JsonConvert.SerializeObject(@event);
var body = Encoding.UTF8.GetBytes(message);

Expand Down
6 changes: 3 additions & 3 deletions Enbiso.NLib.EventBus.Nats/NatsOptions.cs
Expand Up @@ -24,9 +24,9 @@ public class NatsOptions
/// </summary>
public string Client { get; set; }
/// <summary>
/// Client broker name
/// </summary>
public string Exchange { get; set; }
/// Client brokers name
/// </summary>
public string[] Exchanges { get; set; }
/// <summary>
/// Connection token
/// </summary>
Expand Down
31 changes: 22 additions & 9 deletions Enbiso.NLib.EventBus.RabbitMq/RabbitMqEventBus.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
Expand All @@ -21,7 +22,7 @@ namespace Enbiso.NLib.EventBus.RabbitMq
/// </summary>
public class RabbitMqEventBus : IEventBus
{
private readonly string _brokerName;
private readonly string[] _exchanges;

private readonly IRabbitMqPersistentConnection _persistentConnection;
private readonly ILogger<RabbitMqEventBus> _logger;
Expand All @@ -39,7 +40,7 @@ public class RabbitMqEventBus : IEventBus
{
var option = optionWrap.Value;
_queueName = option.Client ?? throw new ArgumentNullException(nameof(option.Client));
_brokerName = option.Exchange ?? throw new ArgumentNullException(nameof(option.Exchange));
_exchanges = option.Exchanges ?? throw new ArgumentNullException(nameof(option.Exchanges));
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_subscriptionsManager = subscriptionManager;
Expand All @@ -64,7 +65,7 @@ public void Initialize()
}

/// <inheritdoc />
public void Publish(IEvent @event)
public void Publish(IEvent @event, string exchange = null)
{
if (!_persistentConnection.IsConnected)
{
Expand All @@ -80,15 +81,16 @@ public void Publish(IEvent @event)

using (var channel = _persistentConnection.CreateModel())
{
var eventName = @event.GetType().Name;
channel.ExchangeDeclare(exchange: _brokerName, type: "direct");
channel.ExchangeDeclare(exchange: exchange, type: "direct");

var message = JsonConvert.SerializeObject(@event);
var body = Encoding.UTF8.GetBytes(message);

var eventName = @event.GetType().Name;
exchange = exchange ?? _exchanges.FirstOrDefault();
policy.Execute(() =>
{
channel.BasicPublish(exchange: _brokerName, routingKey: eventName, basicProperties: null, body: body);
channel.BasicPublish(exchange: exchange, routingKey: eventName, basicProperties: null, body: body);
});
}
}
Expand Down Expand Up @@ -152,7 +154,10 @@ private void DoInternalSubscription(string eventName)

using (var channel = _persistentConnection.CreateModel())
{
channel.QueueBind(queue: _queueName, exchange: _brokerName, routingKey: eventName);
foreach (var exchange in _exchanges)
{
channel.QueueBind(queue: _queueName, exchange: exchange, routingKey: eventName);
}
}
}

Expand All @@ -162,7 +167,12 @@ private IModel CreateConsumerChannel()
_persistentConnection.TryConnect();

var channel = _persistentConnection.CreateModel();
channel.ExchangeDeclare(exchange: _brokerName, type: "direct");

foreach (var exchange in _exchanges)
{
channel.ExchangeDeclare(exchange: exchange, type: "direct");
}

channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

channel.CallbackException += (sender, ea) =>
Expand All @@ -181,7 +191,10 @@ private void SubscriptionManager_OnEventRemoved(object sender, string eventName)

using (var channel = _persistentConnection.CreateModel())
{
channel.QueueUnbind(_queueName, _brokerName, eventName);
foreach (var exchange in _exchanges)
{
channel.QueueUnbind(_queueName, exchange, eventName);
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions Enbiso.NLib.EventBus.RabbitMq/RabbitMqOption.cs
Expand Up @@ -35,8 +35,8 @@ public class RabbitMqOption
/// </summary>
public string Client { get; set; }
/// <summary>
/// Client broker name
/// </summary>
public string Exchange { get; set; }
/// Client brokers name
/// </summary>
public string[] Exchanges { get; set; }
}
}
2 changes: 1 addition & 1 deletion Enbiso.NLib.EventBus.ServiceBus/ServiceBusEventBus.cs
Expand Up @@ -53,7 +53,7 @@ public void Initialize()
}

/// <inheritdoc />
public void Publish(IEvent @event)
public void Publish(IEvent @event, string exchange = null)
{
var eventName = @event.GetType().Name.Replace(IntegrationEventSuffix, "");
var jsonMessage = JsonConvert.SerializeObject(@event);
Expand Down
Expand Up @@ -18,7 +18,7 @@ public void SamePublishSubscribeTests()
var opts = Options.Create(new NatsOptions
{
Servers = new [] { "nats://localhost:4222" },
Exchange = "testEx",
Exchanges = new [] {"testEx"},
Client = "tClient"
});
var pConnLogger = new Logger<NatsPersistentConnection>(new NullLoggerFactory());
Expand All @@ -38,14 +38,14 @@ public void SeperatePublishSubscribeTests()
var opts1 = Options.Create(new NatsOptions
{
Servers = new[] { "nats://localhost:4222" },
Exchange = "testEx",
Exchanges = new [] {"testEx"},
Client = "tClient1"
});

var opts2 = Options.Create(new NatsOptions
{
Servers = new[] { "nats://localhost:4222" },
Exchange = "testEx",
Exchanges = new [] {"testEx"},
Client = "tClient2"
});

Expand Down

0 comments on commit f78c789

Please sign in to comment.