diff --git a/src/engines/Nybus.Engine.RabbitMq/Configuration/IConfigurationFactory.cs b/src/engines/Nybus.Engine.RabbitMq/Configuration/IConfigurationFactory.cs index b899855..b2f9c5a 100644 --- a/src/engines/Nybus.Engine.RabbitMq/Configuration/IConfigurationFactory.cs +++ b/src/engines/Nybus.Engine.RabbitMq/Configuration/IConfigurationFactory.cs @@ -28,6 +28,8 @@ public class RabbitMqOptions public ExchangeOptions CommandExchange { get; set; } public ExchangeOptions EventExchange { get; set; } + + public ushort? UnackedMessageCountLimit { get; set; } } public class ConfigurationFactory : IConfigurationFactory @@ -69,12 +71,14 @@ public IRabbitMqConfiguration Create(RabbitMqOptions options) return new RabbitMqConfiguration { + Options = options, OutboundEncoding = outboundEncoding, CommandQueueFactory = commandQueueFactory, EventQueueFactory = eventQueueFactory, ConnectionFactory = connectionFactory, CommandExchangeManager = commandExchangeManager, - EventExchangeManager = eventExchangeManager + EventExchangeManager = eventExchangeManager, + UnackedMessageCountLimit = options.UnackedMessageCountLimit }; IExchangeManager GetExchangeManager(ExchangeOptions exchangeOptions) diff --git a/src/engines/Nybus.Engine.RabbitMq/Configuration/IRabbitMqConfiguration.cs b/src/engines/Nybus.Engine.RabbitMq/Configuration/IRabbitMqConfiguration.cs index ebac33e..6b5bce0 100644 --- a/src/engines/Nybus.Engine.RabbitMq/Configuration/IRabbitMqConfiguration.cs +++ b/src/engines/Nybus.Engine.RabbitMq/Configuration/IRabbitMqConfiguration.cs @@ -14,12 +14,18 @@ public interface IRabbitMqConfiguration Encoding OutboundEncoding { get; set; } ISerializer Serializer { get; set; } + IExchangeManager CommandExchangeManager { get; set; } + IExchangeManager EventExchangeManager { get; set; } + + ushort? UnackedMessageCountLimit { get; set; } } public class RabbitMqConfiguration : IRabbitMqConfiguration { + public RabbitMqOptions Options { get; set; } + public IConnectionFactory ConnectionFactory { get; set; } public IQueueFactory CommandQueueFactory { get; set; } @@ -34,5 +40,7 @@ public class RabbitMqConfiguration : IRabbitMqConfiguration public IExchangeManager EventExchangeManager { get; set; } + public ushort? UnackedMessageCountLimit { get; set; } + } } \ No newline at end of file diff --git a/src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs b/src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs index c8f3c60..502841d 100644 --- a/src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs +++ b/src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs @@ -39,6 +39,11 @@ public Task> StartAsync() _connection = _configuration.ConnectionFactory.CreateConnection(); _channel = _connection.CreateModel(); + if (_configuration.UnackedMessageCountLimit.HasValue) + { + _channel.BasicQos(0, _configuration.UnackedMessageCountLimit.Value, true); + } + var hasEvents = _messageDescriptorStore.HasEvents(); var hasCommands = _messageDescriptorStore.HasCommands(); diff --git a/tests/Tests.Nybus.Engine.RabbitMq/Configuration/RabbitMqOptionsBindingTests.cs b/tests/Tests.Nybus.Engine.RabbitMq/Configuration/RabbitMqOptionsBindingTests.cs index 2a00dad..8198023 100644 --- a/tests/Tests.Nybus.Engine.RabbitMq/Configuration/RabbitMqOptionsBindingTests.cs +++ b/tests/Tests.Nybus.Engine.RabbitMq/Configuration/RabbitMqOptionsBindingTests.cs @@ -145,5 +145,39 @@ public void ConnectionNode_is_correctly_bound(string userName, string password, Assert.That(sut.Connection["HostName"], Is.EqualTo(hostName)); Assert.That(sut.Connection["VirtualHost"], Is.EqualTo(vhost)); } + + [Test, AutoMoqData] + public void UnackedMessageCountLimit_is_correctly_bound_when_value_is_provided(ushort limit) + { + var settings = new Dictionary + { + [$"{nameof(RabbitMqOptions.UnackedMessageCountLimit)}"] = limit.ToString() + }; + + var configuration = CreateConfiguration(settings); + + var sut = new RabbitMqOptions(); + + configuration.Bind(sut); + + Assert.That(sut.UnackedMessageCountLimit, Is.EqualTo(limit)); + } + + [Test, AutoMoqData] + public void UnackedMessageCountLimit_is_correctly_bound_when_no_value_is_provided() + { + var settings = new Dictionary + { + [$"{nameof(RabbitMqOptions.UnackedMessageCountLimit)}"] = null + }; + + var configuration = CreateConfiguration(settings); + + var sut = new RabbitMqOptions(); + + configuration.Bind(sut); + + Assert.That(sut.UnackedMessageCountLimit, Is.Null); + } } } \ No newline at end of file diff --git a/tests/Tests.Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngineTests.cs b/tests/Tests.Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngineTests.cs index 77cc7dd..dc0f6ce 100644 --- a/tests/Tests.Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngineTests.cs +++ b/tests/Tests.Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngineTests.cs @@ -5,7 +5,6 @@ using System.Threading.Tasks; using AutoFixture.Idioms; using AutoFixture.NUnit3; -using Microsoft.Extensions.Logging; using Moq; using NUnit.Framework; using Nybus; @@ -737,5 +736,29 @@ public async Task NotifyFail_can_handle_closed_connections([Frozen] IRabbitMqCon Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicNack(deliveryTag, It.IsAny(), It.IsAny())); } + + [Test, CustomAutoMoqData] + public async Task Global_QoS_is_sent_if_value_is_set([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, ushort limit) + { + Mock.Get(configuration).SetupGet(p => p.UnackedMessageCountLimit).Returns(limit); + + await sut.StartAsync(); + + await sut.StopAsync(); + + Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicQos(0, limit, true)); + } + + [Test, CustomAutoMoqData] + public async Task No_QoS_is_sent_if_no_value_is_set([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut) + { + Mock.Get(configuration).SetupGet(p => p.UnackedMessageCountLimit).Returns(null as ushort?); + + await sut.StartAsync(); + + await sut.StopAsync(); + + Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicQos(It.IsAny(), It.IsAny(), It.IsAny()), Times.Never); + } } } \ No newline at end of file