diff --git a/Nybus.sln.DotSettings b/Nybus.sln.DotSettings
index 899275e..f623bb0 100644
--- a/Nybus.sln.DotSettings
+++ b/Nybus.sln.DotSettings
@@ -1,4 +1,4 @@
<data />
- <data><IncludeFilters><Filter ModuleMask="Nybus*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="False" /></IncludeFilters><ExcludeFilters><Filter ModuleMask="*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="False" /><Filter ModuleMask="Test*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="False" /></ExcludeFilters></data>
+ <data><IncludeFilters><Filter ModuleMask="Nybus*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="True" /></IncludeFilters><ExcludeFilters><Filter ModuleMask="*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="False" /></ExcludeFilters></data>
True
\ No newline at end of file
diff --git a/src/engines/Nybus.Engine.RabbitMq/Configuration/IConfigurationFactory.cs b/src/engines/Nybus.Engine.RabbitMq/Configuration/IConfigurationFactory.cs
index 6708ae3..b899855 100644
--- a/src/engines/Nybus.Engine.RabbitMq/Configuration/IConfigurationFactory.cs
+++ b/src/engines/Nybus.Engine.RabbitMq/Configuration/IConfigurationFactory.cs
@@ -24,6 +24,10 @@ public class RabbitMqOptions
public IConfigurationSection CommandQueue { get; set; }
public IConfigurationSection EventQueue { get; set; }
+
+ public ExchangeOptions CommandExchange { get; set; }
+
+ public ExchangeOptions EventExchange { get; set; }
}
public class ConfigurationFactory : IConfigurationFactory
@@ -60,15 +64,24 @@ public IRabbitMqConfiguration Create(RabbitMqOptions options)
var commandQueueFactory = GetQueueFactory(options.CommandQueue);
var eventQueueFactory = GetQueueFactory(options.EventQueue);
var connectionFactory = GetConnectionFactory();
+ var commandExchangeManager = GetExchangeManager(options.CommandExchange);
+ var eventExchangeManager = GetExchangeManager(options.EventExchange);
return new RabbitMqConfiguration
{
OutboundEncoding = outboundEncoding,
CommandQueueFactory = commandQueueFactory,
EventQueueFactory = eventQueueFactory,
- ConnectionFactory = connectionFactory
+ ConnectionFactory = connectionFactory,
+ CommandExchangeManager = commandExchangeManager,
+ EventExchangeManager = eventExchangeManager
};
+ IExchangeManager GetExchangeManager(ExchangeOptions exchangeOptions)
+ {
+ return new ExchangeManager(exchangeOptions ?? new ExchangeOptions());
+ }
+
IQueueFactory GetQueueFactory(IConfigurationSection section)
{
if (section != null && section.TryGetValue("ProviderName", out var providerName) && _queueFactoryProviders.TryGetValue(providerName, out var provider))
diff --git a/src/engines/Nybus.Engine.RabbitMq/Configuration/IExchangeManager.cs b/src/engines/Nybus.Engine.RabbitMq/Configuration/IExchangeManager.cs
new file mode 100644
index 0000000..f471f50
--- /dev/null
+++ b/src/engines/Nybus.Engine.RabbitMq/Configuration/IExchangeManager.cs
@@ -0,0 +1,35 @@
+using System;
+using System.Collections.Generic;
+using RabbitMQ.Client;
+
+namespace Nybus.Configuration
+{
+ public interface IExchangeManager
+ {
+ void EnsureExchangeExists(IModel model, string name, string exchangeType);
+ }
+
+ public class ExchangeManager : IExchangeManager
+ {
+ private readonly ExchangeOptions _options;
+
+ public ExchangeManager(ExchangeOptions options)
+ {
+ _options = options ?? throw new ArgumentNullException(nameof(options));
+ }
+
+ public void EnsureExchangeExists(IModel model, string name, string exchangeType)
+ {
+ model.ExchangeDeclare(name, exchangeType, _options.IsDurable, _options.IsAutoDelete, _options.Properties);
+ }
+ }
+
+ public class ExchangeOptions
+ {
+ public bool IsDurable { get; set; }
+
+ public bool IsAutoDelete { get; set; }
+
+ public IDictionary Properties { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/src/engines/Nybus.Engine.RabbitMq/Configuration/IQueueFactory.cs b/src/engines/Nybus.Engine.RabbitMq/Configuration/IQueueFactory.cs
index 39e4f08..f501b47 100644
--- a/src/engines/Nybus.Engine.RabbitMq/Configuration/IQueueFactory.cs
+++ b/src/engines/Nybus.Engine.RabbitMq/Configuration/IQueueFactory.cs
@@ -1,5 +1,6 @@
using System;
using RabbitMQ.Client;
+using QueueDeclareOk = RabbitMQ.Client.QueueDeclareOk;
namespace Nybus.Configuration
{
diff --git a/src/engines/Nybus.Engine.RabbitMq/Configuration/IRabbitMqConfiguration.cs b/src/engines/Nybus.Engine.RabbitMq/Configuration/IRabbitMqConfiguration.cs
index a8c1c69..ebac33e 100644
--- a/src/engines/Nybus.Engine.RabbitMq/Configuration/IRabbitMqConfiguration.cs
+++ b/src/engines/Nybus.Engine.RabbitMq/Configuration/IRabbitMqConfiguration.cs
@@ -14,6 +14,8 @@ public interface IRabbitMqConfiguration
Encoding OutboundEncoding { get; set; }
ISerializer Serializer { get; set; }
+ IExchangeManager CommandExchangeManager { get; set; }
+ IExchangeManager EventExchangeManager { get; set; }
}
public class RabbitMqConfiguration : IRabbitMqConfiguration
@@ -27,5 +29,10 @@ public class RabbitMqConfiguration : IRabbitMqConfiguration
public Encoding OutboundEncoding { get; set; }
public ISerializer Serializer { get; set; } = new JsonSerializer();
+
+ public IExchangeManager CommandExchangeManager { get; set; }
+
+ public IExchangeManager EventExchangeManager { get; set; }
+
}
}
\ No newline at end of file
diff --git a/src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs b/src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs
index c4a78d5..c8f3c60 100644
--- a/src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs
+++ b/src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs
@@ -57,7 +57,7 @@ public Task> StartAsync()
{
var exchangeName = MessageDescriptor.CreateFromType(type);
- _channel.ExchangeDeclare(exchange: exchangeName, type: FanOutExchangeType);
+ _configuration.EventExchangeManager.EnsureExchangeExists(_channel, exchangeName, FanOutExchangeType);
_channel.QueueBind(queue: eventQueue.QueueName, exchange: exchangeName, routingKey: string.Empty);
}
@@ -73,7 +73,7 @@ public Task> StartAsync()
{
var exchangeName = MessageDescriptor.CreateFromType(type);
- _channel.ExchangeDeclare(exchange: exchangeName, type: FanOutExchangeType);
+ _configuration.CommandExchangeManager.EnsureExchangeExists(_channel, exchangeName, FanOutExchangeType);
_channel.QueueBind(queue: commandQueue.QueueName, exchange: exchangeName, routingKey: string.Empty);
}
@@ -217,12 +217,27 @@ public Task SendMessageAsync(Message message)
var exchangeName = MessageDescriptor.CreateFromType(type);
- _channel.ExchangeDeclare(exchange: exchangeName, type: FanOutExchangeType);
+ var exchangeManager = GetExchangeManager();
+ exchangeManager.EnsureExchangeExists(_channel, exchangeName, FanOutExchangeType);
_channel.BasicPublish(exchange: exchangeName, routingKey: string.Empty, body: body, basicProperties: properties);
return Task.CompletedTask;
+ IExchangeManager GetExchangeManager()
+ {
+ if (message.MessageType == MessageType.Command)
+ {
+ return _configuration.CommandExchangeManager;
+ }
+
+ if (message.MessageType == MessageType.Event)
+ {
+ return _configuration.EventExchangeManager;
+ }
+
+ throw new NotSupportedException();
+ }
}
public Task NotifySuccessAsync(Message message)
diff --git a/tests/Tests.Nybus.Engine.RabbitMq/Configuration/ExchangeManagerTests.cs b/tests/Tests.Nybus.Engine.RabbitMq/Configuration/ExchangeManagerTests.cs
new file mode 100644
index 0000000..045891f
--- /dev/null
+++ b/tests/Tests.Nybus.Engine.RabbitMq/Configuration/ExchangeManagerTests.cs
@@ -0,0 +1,27 @@
+using AutoFixture.Idioms;
+using AutoFixture.NUnit3;
+using Moq;
+using NUnit.Framework;
+using Nybus.Configuration;
+using RabbitMQ.Client;
+
+namespace Tests.Configuration
+{
+ [TestFixture]
+ public class ExchangeManagerTests
+ {
+ [Test, AutoMoqData]
+ public void Constructor_is_guarded(GuardClauseAssertion assertion)
+ {
+ assertion.Verify(typeof(ExchangeManager).GetConstructors());
+ }
+
+ [Test, AutoMoqData]
+ public void EnsureExchangeExists_forwards_settings([Frozen] ExchangeOptions options, ExchangeManager sut, IModel model, string exchangeName, string exchangeType)
+ {
+ sut.EnsureExchangeExists(model, exchangeName, exchangeType);
+
+ Mock.Get(model).Verify(p => p.ExchangeDeclare(exchangeName, exchangeType, options.IsDurable, options.IsAutoDelete, options.Properties));
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/Tests.Nybus.Engine.RabbitMq/Configuration/RabbitMqOptionsBindingTests.cs b/tests/Tests.Nybus.Engine.RabbitMq/Configuration/RabbitMqOptionsBindingTests.cs
new file mode 100644
index 0000000..88df85a
--- /dev/null
+++ b/tests/Tests.Nybus.Engine.RabbitMq/Configuration/RabbitMqOptionsBindingTests.cs
@@ -0,0 +1,121 @@
+using System.Collections.Generic;
+using Microsoft.Extensions.Configuration;
+using NUnit.Framework;
+using Nybus.Configuration;
+
+namespace Tests.Configuration
+{
+ [TestFixture]
+ public class RabbitMqOptionsBindingTests
+ {
+ private static IConfiguration CreateConfiguration(IDictionary settings)
+ {
+ var builder = new ConfigurationBuilder();
+ builder.AddInMemoryCollection(settings);
+
+ return builder.Build();
+ }
+
+ [Test]
+ public void CommandExchange_IsAutoDelete_is_correctly_bound([Values] bool value)
+ {
+ var settings = new Dictionary
+ {
+ [$"{nameof(RabbitMqOptions.CommandExchange)}:{nameof(ExchangeOptions.IsAutoDelete)}"] = value.ToString()
+ };
+
+ var configuration = CreateConfiguration(settings);
+
+ var sut = new RabbitMqOptions();
+
+ configuration.Bind(sut);
+
+ Assert.That(sut.CommandExchange.IsAutoDelete, Is.EqualTo(value));
+ }
+
+ [Test]
+ public void CommandExchange_IsDurable_is_correctly_bound([Values] bool value)
+ {
+ var settings = new Dictionary
+ {
+ [$"{nameof(RabbitMqOptions.CommandExchange)}:{nameof(ExchangeOptions.IsDurable)}"] = value.ToString()
+ };
+
+ var configuration = CreateConfiguration(settings);
+
+ var sut = new RabbitMqOptions();
+
+ configuration.Bind(sut);
+
+ Assert.That(sut.CommandExchange.IsDurable, Is.EqualTo(value));
+ }
+
+ [Test, AutoMoqData]
+ public void CommandExchange_Properties_is_correctly_bound(string key, string value)
+ {
+ var settings = new Dictionary
+ {
+ [$"{nameof(RabbitMqOptions.CommandExchange)}:{nameof(ExchangeOptions.Properties)}:{key}"] = value
+ };
+
+ var configuration = CreateConfiguration(settings);
+
+ var sut = new RabbitMqOptions();
+
+ configuration.Bind(sut);
+
+ Assert.That(sut.CommandExchange.Properties[key], Is.EqualTo(value));
+ }
+
+ [Test]
+ public void EventExchange_IsAutoDelete_is_correctly_bound([Values] bool value)
+ {
+ var settings = new Dictionary
+ {
+ [$"{nameof(RabbitMqOptions.EventExchange)}:{nameof(ExchangeOptions.IsAutoDelete)}"] = value.ToString()
+ };
+
+ var configuration = CreateConfiguration(settings);
+
+ var sut = new RabbitMqOptions();
+
+ configuration.Bind(sut);
+
+ Assert.That(sut.EventExchange.IsAutoDelete, Is.EqualTo(value));
+ }
+
+ [Test]
+ public void EventExchange_IsDurable_is_correctly_bound([Values] bool value)
+ {
+ var settings = new Dictionary
+ {
+ [$"{nameof(RabbitMqOptions.EventExchange)}:{nameof(ExchangeOptions.IsDurable)}"] = value.ToString()
+ };
+
+ var configuration = CreateConfiguration(settings);
+
+ var sut = new RabbitMqOptions();
+
+ configuration.Bind(sut);
+
+ Assert.That(sut.EventExchange.IsDurable, Is.EqualTo(value));
+ }
+
+ [Test, AutoMoqData]
+ public void EventExchange_Properties_is_correctly_bound(string key, string value)
+ {
+ var settings = new Dictionary
+ {
+ [$"{nameof(RabbitMqOptions.EventExchange)}:{nameof(ExchangeOptions.Properties)}:{key}"] = value
+ };
+
+ var configuration = CreateConfiguration(settings);
+
+ var sut = new RabbitMqOptions();
+
+ configuration.Bind(sut);
+
+ Assert.That(sut.EventExchange.Properties[key], Is.EqualTo(value));
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/Tests.Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngineTests.cs b/tests/Tests.Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngineTests.cs
index 7fab2b8..77cc7dd 100644
--- a/tests/Tests.Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngineTests.cs
+++ b/tests/Tests.Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngineTests.cs
@@ -145,7 +145,7 @@ public async Task Exchange_is_declared_when_a_event_is_registered([Frozen] IRabb
var sequence = await sut.StartAsync();
- Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.ExchangeDeclare(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny>()));
+ Mock.Get(configuration.EventExchangeManager).Verify(p => p.EnsureExchangeExists(It.IsAny(), It.IsAny(), It.IsAny()));
}
[Test, CustomAutoMoqData]
@@ -155,7 +155,7 @@ public async Task Exchange_is_declared_when_a_command_is_registered([Frozen] IRa
var sequence = await sut.StartAsync();
- Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.ExchangeDeclare(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny>()));
+ Mock.Get(configuration.CommandExchangeManager).Verify(p => p.EnsureExchangeExists(It.IsAny(), It.IsAny(), It.IsAny()));
}
[Test, CustomAutoMoqData]