Skip to content

Commit

Permalink
Add possibility to customize Exchange creation in RabbitMQ (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kralizek committed Feb 22, 2019
1 parent 1e7b07a commit 9f0324c
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Nybus.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/FilterSettingsManager/AttributeFilterXml/@EntryValue">&lt;data /&gt;</s:String>
<s:String x:Key="/Default/FilterSettingsManager/CoverageFilterXml/@EntryValue">&lt;data&gt;&lt;IncludeFilters&gt;&lt;Filter ModuleMask="Nybus*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="False" /&gt;&lt;/IncludeFilters&gt;&lt;ExcludeFilters&gt;&lt;Filter ModuleMask="*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="False" /&gt;&lt;Filter ModuleMask="Test*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="False" /&gt;&lt;/ExcludeFilters&gt;&lt;/data&gt;</s:String>
<s:String x:Key="/Default/FilterSettingsManager/CoverageFilterXml/@EntryValue">&lt;data&gt;&lt;IncludeFilters&gt;&lt;Filter ModuleMask="Nybus*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="True" /&gt;&lt;/IncludeFilters&gt;&lt;ExcludeFilters&gt;&lt;Filter ModuleMask="*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="False" /&gt;&lt;/ExcludeFilters&gt;&lt;/data&gt;</s:String>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Nybus/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ public class RabbitMqOptions
public IConfigurationSection CommandQueue { get; set; }

public IConfigurationSection EventQueue { get; set; }

public ExchangeOptions CommandExchange { get; set; }

public ExchangeOptions EventExchange { get; set; }
}

public class ConfigurationFactory : IConfigurationFactory
Expand Down Expand Up @@ -60,15 +64,24 @@ public IRabbitMqConfiguration Create(RabbitMqOptions options)
var commandQueueFactory = GetQueueFactory(options.CommandQueue);
var eventQueueFactory = GetQueueFactory(options.EventQueue);
var connectionFactory = GetConnectionFactory();
var commandExchangeManager = GetExchangeManager(options.CommandExchange);
var eventExchangeManager = GetExchangeManager(options.EventExchange);

return new RabbitMqConfiguration
{
OutboundEncoding = outboundEncoding,
CommandQueueFactory = commandQueueFactory,
EventQueueFactory = eventQueueFactory,
ConnectionFactory = connectionFactory
ConnectionFactory = connectionFactory,
CommandExchangeManager = commandExchangeManager,
EventExchangeManager = eventExchangeManager
};

IExchangeManager GetExchangeManager(ExchangeOptions exchangeOptions)
{
return new ExchangeManager(exchangeOptions ?? new ExchangeOptions());
}

IQueueFactory GetQueueFactory(IConfigurationSection section)
{
if (section != null && section.TryGetValue("ProviderName", out var providerName) && _queueFactoryProviders.TryGetValue(providerName, out var provider))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System;
using System.Collections.Generic;
using RabbitMQ.Client;

namespace Nybus.Configuration
{
public interface IExchangeManager
{
void EnsureExchangeExists(IModel model, string name, string exchangeType);
}

public class ExchangeManager : IExchangeManager
{
private readonly ExchangeOptions _options;

public ExchangeManager(ExchangeOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}

public void EnsureExchangeExists(IModel model, string name, string exchangeType)
{
model.ExchangeDeclare(name, exchangeType, _options.IsDurable, _options.IsAutoDelete, _options.Properties);
}
}

public class ExchangeOptions
{
public bool IsDurable { get; set; }

public bool IsAutoDelete { get; set; }

public IDictionary<string, object> Properties { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using RabbitMQ.Client;
using QueueDeclareOk = RabbitMQ.Client.QueueDeclareOk;

namespace Nybus.Configuration
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public interface IRabbitMqConfiguration
Encoding OutboundEncoding { get; set; }

ISerializer Serializer { get; set; }
IExchangeManager CommandExchangeManager { get; set; }
IExchangeManager EventExchangeManager { get; set; }
}

public class RabbitMqConfiguration : IRabbitMqConfiguration
Expand All @@ -27,5 +29,10 @@ public class RabbitMqConfiguration : IRabbitMqConfiguration
public Encoding OutboundEncoding { get; set; }

public ISerializer Serializer { get; set; } = new JsonSerializer();

public IExchangeManager CommandExchangeManager { get; set; }

public IExchangeManager EventExchangeManager { get; set; }

}
}
21 changes: 18 additions & 3 deletions src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public Task<IObservable<Message>> StartAsync()
{
var exchangeName = MessageDescriptor.CreateFromType(type);

_channel.ExchangeDeclare(exchange: exchangeName, type: FanOutExchangeType);
_configuration.EventExchangeManager.EnsureExchangeExists(_channel, exchangeName, FanOutExchangeType);

_channel.QueueBind(queue: eventQueue.QueueName, exchange: exchangeName, routingKey: string.Empty);
}
Expand All @@ -73,7 +73,7 @@ public Task<IObservable<Message>> StartAsync()
{
var exchangeName = MessageDescriptor.CreateFromType(type);

_channel.ExchangeDeclare(exchange: exchangeName, type: FanOutExchangeType);
_configuration.CommandExchangeManager.EnsureExchangeExists(_channel, exchangeName, FanOutExchangeType);

_channel.QueueBind(queue: commandQueue.QueueName, exchange: exchangeName, routingKey: string.Empty);
}
Expand Down Expand Up @@ -217,12 +217,27 @@ public Task SendMessageAsync(Message message)

var exchangeName = MessageDescriptor.CreateFromType(type);

_channel.ExchangeDeclare(exchange: exchangeName, type: FanOutExchangeType);
var exchangeManager = GetExchangeManager();
exchangeManager.EnsureExchangeExists(_channel, exchangeName, FanOutExchangeType);

_channel.BasicPublish(exchange: exchangeName, routingKey: string.Empty, body: body, basicProperties: properties);

return Task.CompletedTask;

IExchangeManager GetExchangeManager()
{
if (message.MessageType == MessageType.Command)
{
return _configuration.CommandExchangeManager;
}

if (message.MessageType == MessageType.Event)
{
return _configuration.EventExchangeManager;
}

throw new NotSupportedException();
}
}

public Task NotifySuccessAsync(Message message)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using AutoFixture.Idioms;
using AutoFixture.NUnit3;
using Moq;
using NUnit.Framework;
using Nybus.Configuration;
using RabbitMQ.Client;

namespace Tests.Configuration
{
[TestFixture]
public class ExchangeManagerTests
{
[Test, AutoMoqData]
public void Constructor_is_guarded(GuardClauseAssertion assertion)
{
assertion.Verify(typeof(ExchangeManager).GetConstructors());
}

[Test, AutoMoqData]
public void EnsureExchangeExists_forwards_settings([Frozen] ExchangeOptions options, ExchangeManager sut, IModel model, string exchangeName, string exchangeType)
{
sut.EnsureExchangeExists(model, exchangeName, exchangeType);

Mock.Get(model).Verify(p => p.ExchangeDeclare(exchangeName, exchangeType, options.IsDurable, options.IsAutoDelete, options.Properties));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
using System.Collections.Generic;
using Microsoft.Extensions.Configuration;
using NUnit.Framework;
using Nybus.Configuration;

namespace Tests.Configuration
{
[TestFixture]
public class RabbitMqOptionsBindingTests
{
private static IConfiguration CreateConfiguration(IDictionary<string, string> settings)
{
var builder = new ConfigurationBuilder();
builder.AddInMemoryCollection(settings);

return builder.Build();
}

[Test]
public void CommandExchange_IsAutoDelete_is_correctly_bound([Values] bool value)
{
var settings = new Dictionary<string, string>
{
[$"{nameof(RabbitMqOptions.CommandExchange)}:{nameof(ExchangeOptions.IsAutoDelete)}"] = value.ToString()
};

var configuration = CreateConfiguration(settings);

var sut = new RabbitMqOptions();

configuration.Bind(sut);

Assert.That(sut.CommandExchange.IsAutoDelete, Is.EqualTo(value));
}

[Test]
public void CommandExchange_IsDurable_is_correctly_bound([Values] bool value)
{
var settings = new Dictionary<string, string>
{
[$"{nameof(RabbitMqOptions.CommandExchange)}:{nameof(ExchangeOptions.IsDurable)}"] = value.ToString()
};

var configuration = CreateConfiguration(settings);

var sut = new RabbitMqOptions();

configuration.Bind(sut);

Assert.That(sut.CommandExchange.IsDurable, Is.EqualTo(value));
}

[Test, AutoMoqData]
public void CommandExchange_Properties_is_correctly_bound(string key, string value)
{
var settings = new Dictionary<string, string>
{
[$"{nameof(RabbitMqOptions.CommandExchange)}:{nameof(ExchangeOptions.Properties)}:{key}"] = value
};

var configuration = CreateConfiguration(settings);

var sut = new RabbitMqOptions();

configuration.Bind(sut);

Assert.That(sut.CommandExchange.Properties[key], Is.EqualTo(value));
}

[Test]
public void EventExchange_IsAutoDelete_is_correctly_bound([Values] bool value)
{
var settings = new Dictionary<string, string>
{
[$"{nameof(RabbitMqOptions.EventExchange)}:{nameof(ExchangeOptions.IsAutoDelete)}"] = value.ToString()
};

var configuration = CreateConfiguration(settings);

var sut = new RabbitMqOptions();

configuration.Bind(sut);

Assert.That(sut.EventExchange.IsAutoDelete, Is.EqualTo(value));
}

[Test]
public void EventExchange_IsDurable_is_correctly_bound([Values] bool value)
{
var settings = new Dictionary<string, string>
{
[$"{nameof(RabbitMqOptions.EventExchange)}:{nameof(ExchangeOptions.IsDurable)}"] = value.ToString()
};

var configuration = CreateConfiguration(settings);

var sut = new RabbitMqOptions();

configuration.Bind(sut);

Assert.That(sut.EventExchange.IsDurable, Is.EqualTo(value));
}

[Test, AutoMoqData]
public void EventExchange_Properties_is_correctly_bound(string key, string value)
{
var settings = new Dictionary<string, string>
{
[$"{nameof(RabbitMqOptions.EventExchange)}:{nameof(ExchangeOptions.Properties)}:{key}"] = value
};

var configuration = CreateConfiguration(settings);

var sut = new RabbitMqOptions();

configuration.Bind(sut);

Assert.That(sut.EventExchange.Properties[key], Is.EqualTo(value));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public async Task Exchange_is_declared_when_a_event_is_registered([Frozen] IRabb

var sequence = await sut.StartAsync();

Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.ExchangeDeclare(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<bool>(), It.IsAny<IDictionary<string, object>>()));
Mock.Get(configuration.EventExchangeManager).Verify(p => p.EnsureExchangeExists(It.IsAny<IModel>(), It.IsAny<string>(), It.IsAny<string>()));
}

[Test, CustomAutoMoqData]
Expand All @@ -155,7 +155,7 @@ public async Task Exchange_is_declared_when_a_command_is_registered([Frozen] IRa

var sequence = await sut.StartAsync();

Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.ExchangeDeclare(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<bool>(), It.IsAny<IDictionary<string, object>>()));
Mock.Get(configuration.CommandExchangeManager).Verify(p => p.EnsureExchangeExists(It.IsAny<IModel>(), It.IsAny<string>(), It.IsAny<string>()));
}

[Test, CustomAutoMoqData]
Expand Down

0 comments on commit 9f0324c

Please sign in to comment.