From 73024f0d4b0f1c1ec888a0009198f302ddcdaeaf Mon Sep 17 00:00:00 2001 From: Renato Golia Date: Mon, 7 Jan 2019 22:05:44 +0100 Subject: [PATCH] Improvements to async/await (#34) --- Nybus.sln.DotSettings | 2 +- src/Directory.Build.props | 2 +- src/Nybus.Abstractions/IBusEngine.cs | 8 +-- src/Nybus.Abstractions/IBusHost.cs | 4 +- .../Policies/IErrorPolicy.cs | 4 +- src/Nybus/InMemoryBusEngine.cs | 12 ++-- src/Nybus/NybusHost.cs | 28 ++++---- src/Nybus/NybusHostBuilder.cs | 4 +- src/Nybus/Policies/NoopErrorPolicy.cs | 8 +-- src/Nybus/Policies/RetryErrorPolicy.cs | 8 +-- .../RabbitMqBusEngine.cs | 24 ++++--- tests/TestUtils/TestBusEngine.cs | 8 +-- .../RabbitMqBusEngineTests.cs | 66 +++++++++---------- tests/Tests.Nybus/InMemoryBusEngineTests.cs | 18 ++--- tests/Tests.Nybus/NybusHostTests.cs | 38 +++++------ .../Policies/NoopErrorPolicyTests.cs | 8 +-- .../Policies/RetryErrorPolicyTests.cs | 24 +++---- 17 files changed, 136 insertions(+), 130 deletions(-) diff --git a/Nybus.sln.DotSettings b/Nybus.sln.DotSettings index 93407e1..0946073 100644 --- a/Nybus.sln.DotSettings +++ b/Nybus.sln.DotSettings @@ -1,4 +1,4 @@  <data /> - <data><IncludeFilters /><ExcludeFilters /></data> + <data><IncludeFilters><Filter ModuleMask="Nybus*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="True" /></IncludeFilters><ExcludeFilters><Filter ModuleMask="*" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="True" /></ExcludeFilters></data> True \ No newline at end of file diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 0889dda..b19f08a 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -1,7 +1,7 @@ Renato Golia - 1.0.0-alpha004 + 1.0.0-alpha005 Nybus Project Nybus https://github.com/Nybus-project/Nybus/blob/master/LICENSE diff --git a/src/Nybus.Abstractions/IBusEngine.cs b/src/Nybus.Abstractions/IBusEngine.cs index cc3453b..2c465f2 100644 --- a/src/Nybus.Abstractions/IBusEngine.cs +++ b/src/Nybus.Abstractions/IBusEngine.cs @@ -9,17 +9,17 @@ public interface IBusEngine Task SendEventAsync(EventMessage message) where TEvent : class, IEvent; - IObservable Start(); + Task> StartAsync(); - void Stop(); + Task StopAsync(); void SubscribeToCommand() where TCommand : class, ICommand; void SubscribeToEvent() where TEvent : class, IEvent; - Task NotifySuccess(Message message); + Task NotifySuccessAsync(Message message); - Task NotifyFail(Message message); + Task NotifyFailAsync(Message message); } public delegate Task MessageReceived(Message message); diff --git a/src/Nybus.Abstractions/IBusHost.cs b/src/Nybus.Abstractions/IBusHost.cs index 951b575..49b075d 100644 --- a/src/Nybus.Abstractions/IBusHost.cs +++ b/src/Nybus.Abstractions/IBusHost.cs @@ -38,10 +38,10 @@ public interface IDispatcher public interface IBusExecutionEnvironment { - Task ExecuteCommandHandler(IDispatcher dispatcher, ICommandContext context, Type handlerType) + Task ExecuteCommandHandlerAsync(IDispatcher dispatcher, ICommandContext context, Type handlerType) where TCommand : class, ICommand; - Task ExecuteEventHandler(IDispatcher dispatcher, IEventContext context, Type handlerType) + Task ExecuteEventHandlerAsync(IDispatcher dispatcher, IEventContext context, Type handlerType) where TEvent : class, IEvent; } } diff --git a/src/Nybus.Abstractions/Policies/IErrorPolicy.cs b/src/Nybus.Abstractions/Policies/IErrorPolicy.cs index 7a3c0e9..07e04a3 100644 --- a/src/Nybus.Abstractions/Policies/IErrorPolicy.cs +++ b/src/Nybus.Abstractions/Policies/IErrorPolicy.cs @@ -9,9 +9,9 @@ namespace Nybus.Policies public interface IErrorPolicy { - Task HandleError(IBusEngine engine, Exception exception, CommandMessage message) where TCommand : class, ICommand; + Task HandleErrorAsync(IBusEngine engine, Exception exception, CommandMessage message) where TCommand : class, ICommand; - Task HandleError(IBusEngine engine, Exception exception, EventMessage message) where TEvent : class, IEvent; + Task HandleErrorAsync(IBusEngine engine, Exception exception, EventMessage message) where TEvent : class, IEvent; } public interface IErrorPolicyProvider diff --git a/src/Nybus/InMemoryBusEngine.cs b/src/Nybus/InMemoryBusEngine.cs index 04f3b27..ef18b30 100644 --- a/src/Nybus/InMemoryBusEngine.cs +++ b/src/Nybus/InMemoryBusEngine.cs @@ -13,7 +13,7 @@ public class InMemoryBusEngine : IBusEngine private bool _isStarted; private readonly ISet _acceptedTypes = new HashSet(); - public IObservable Start() + public Task> StartAsync() { _sequenceOfMessages = new Subject(); @@ -33,16 +33,18 @@ public IObservable Start() _isStarted = true; - return Observable.Merge(commands, events); + return Task.FromResult(Observable.Merge(commands, events)); } - public void Stop() + public Task StopAsync() { if (_isStarted) { _sequenceOfMessages.OnCompleted(); _sequenceOfMessages = null; } + + return Task.CompletedTask; } public Task SendCommandAsync(CommandMessage message) where TCommand : class, ICommand @@ -75,12 +77,12 @@ public void Stop() _acceptedTypes.Add(typeof(TEvent)); } - public Task NotifySuccess(Message message) + public Task NotifySuccessAsync(Message message) { return Task.CompletedTask; } - public Task NotifyFail(Message message) + public Task NotifyFailAsync(Message message) { return Task.CompletedTask; } diff --git a/src/Nybus/NybusHost.cs b/src/Nybus/NybusHost.cs index ca6ecee..51f85a6 100644 --- a/src/Nybus/NybusHost.cs +++ b/src/Nybus/NybusHost.cs @@ -26,7 +26,7 @@ public NybusHost(IBusEngine busEngine, INybusConfiguration configuration, IServi _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); } - public async Task InvokeCommandAsync(TCommand command, Guid correlationId) where TCommand : class, ICommand + public Task InvokeCommandAsync(TCommand command, Guid correlationId) where TCommand : class, ICommand { var message = new CommandMessage { @@ -40,10 +40,10 @@ public NybusHost(IBusEngine busEngine, INybusConfiguration configuration, IServi }; _logger.LogTrace(new { type = typeof(TCommand).FullName, correlationId = correlationId, command }, arg => $"Invoking command of type {arg.type} with correlationId {arg.correlationId}. Command: {arg.command.ToString()}"); - await _engine.SendCommandAsync(message).ConfigureAwait(false); + return _engine.SendCommandAsync(message); } - public async Task RaiseEventAsync(TEvent @event, Guid correlationId) where TEvent : class, IEvent + public Task RaiseEventAsync(TEvent @event, Guid correlationId) where TEvent : class, IEvent { var message = new EventMessage { @@ -57,7 +57,7 @@ public NybusHost(IBusEngine busEngine, INybusConfiguration configuration, IServi }; _logger.LogTrace(new { type = typeof(TEvent).FullName, correlationId = correlationId, @event }, arg => $"Raising event of type {arg.type} with correlationId {arg.correlationId}. Event: {arg.@event.ToString()}"); - await _engine.SendEventAsync(message).ConfigureAwait(false); + return _engine.SendEventAsync(message); } private bool _isStarted; @@ -67,7 +67,7 @@ public Task StartAsync() { _logger.LogTrace("Bus starting"); - var incomingMessages = _engine.Start(); + var incomingMessages = _engine.StartAsync().Result; var observable = from message in incomingMessages where message != null @@ -90,7 +90,7 @@ public Task StopAsync() { _logger.LogTrace("Bus stopping"); - _engine.Stop(); + _engine.StopAsync().Wait(); _disposable.Dispose(); _logger.LogTrace("Bus stopped"); @@ -133,13 +133,13 @@ async Task ExecuteHandler(ICommandContext context) async Task NotifySuccess(CommandMessage message) { - await _engine.NotifySuccess(message).ConfigureAwait(false); + await _engine.NotifySuccessAsync(message).ConfigureAwait(false); } - async Task HandleError(Exception exception, CommandMessage message, ICommandContext context) + Task HandleError(Exception exception, CommandMessage message, ICommandContext context) { _logger.LogError(new { CorrelationId = context.CorrelationId, MessageId = message.MessageId, CommandType = typeof(TCommand).Name, Exception = exception, Message = message }, s => $"An error occurred while handling {s.CommandType}. {s.Exception.Message}"); - await _configuration.ErrorPolicy.HandleError(_engine, exception, message).ConfigureAwait(false); + return _configuration.ErrorPolicy.HandleErrorAsync(_engine, exception, message); } } @@ -175,13 +175,13 @@ async Task ExecuteHandler(IEventContext context) async Task NotifySuccess(EventMessage message) { - await _engine.NotifySuccess(message).ConfigureAwait(false); + await _engine.NotifySuccessAsync(message).ConfigureAwait(false); } - async Task HandleError(Exception exception, EventMessage message, IEventContext context) + Task HandleError(Exception exception, EventMessage message, IEventContext context) { _logger.LogError(new { CorrelationId = context.CorrelationId, MessageId = message.MessageId, EventType = typeof(TEvent).Name, Exception = exception, Message = message }, s => $"An error occurred while handling {s.EventType}. {s.Exception.Message}"); - await _configuration.ErrorPolicy.HandleError(_engine, exception, message).ConfigureAwait(false); + return _configuration.ErrorPolicy.HandleErrorAsync(_engine, exception, message); } } @@ -189,7 +189,7 @@ async Task HandleError(Exception exception, EventMessage message, IEvent public IBusExecutionEnvironment ExecutionEnvironment => this; - public async Task ExecuteCommandHandler(IDispatcher dispatcher, ICommandContext context, Type handlerType) + public async Task ExecuteCommandHandlerAsync(IDispatcher dispatcher, ICommandContext context, Type handlerType) where TCommand : class, ICommand { using (var scope = _serviceProvider.CreateScope()) @@ -199,7 +199,7 @@ public async Task ExecuteCommandHandler(IDispatcher dispatcher, IComma } } - public async Task ExecuteEventHandler(IDispatcher dispatcher, IEventContext context, Type handlerType) + public async Task ExecuteEventHandlerAsync(IDispatcher dispatcher, IEventContext context, Type handlerType) where TEvent : class, IEvent { using (var scope = _serviceProvider.CreateScope()) diff --git a/src/Nybus/NybusHostBuilder.cs b/src/Nybus/NybusHostBuilder.cs index a81a4ab..541128f 100644 --- a/src/Nybus/NybusHostBuilder.cs +++ b/src/Nybus/NybusHostBuilder.cs @@ -38,7 +38,7 @@ public void SubscribeToCommand(Type commandHandlerType) _subscriptions.Add(host => { - host.SubscribeToCommand((dispatcher, context) => host.ExecutionEnvironment.ExecuteCommandHandler(dispatcher,context,commandHandlerType)); + host.SubscribeToCommand((dispatcher, context) => host.ExecutionEnvironment.ExecuteCommandHandlerAsync(dispatcher,context,commandHandlerType)); }); } @@ -52,7 +52,7 @@ public void SubscribeToEvent(Type eventHandlerType) _subscriptions.Add(host => { - host.SubscribeToEvent((dispatcher, context) => host.ExecutionEnvironment.ExecuteEventHandler(dispatcher, context, eventHandlerType)); + host.SubscribeToEvent((dispatcher, context) => host.ExecutionEnvironment.ExecuteEventHandlerAsync(dispatcher, context, eventHandlerType)); }); } } diff --git a/src/Nybus/Policies/NoopErrorPolicy.cs b/src/Nybus/Policies/NoopErrorPolicy.cs index 2936963..ccea62d 100644 --- a/src/Nybus/Policies/NoopErrorPolicy.cs +++ b/src/Nybus/Policies/NoopErrorPolicy.cs @@ -15,16 +15,16 @@ public IErrorPolicy CreatePolicy(IConfigurationSection configuration) public class NoopErrorPolicy : IErrorPolicy { - public async Task HandleError(IBusEngine engine, Exception exception, CommandMessage message) + public Task HandleErrorAsync(IBusEngine engine, Exception exception, CommandMessage message) where TCommand : class, ICommand { - await engine.NotifyFail(message).ConfigureAwait(false); + return engine.NotifyFailAsync(message); } - public async Task HandleError(IBusEngine engine, Exception exception, EventMessage message) + public Task HandleErrorAsync(IBusEngine engine, Exception exception, EventMessage message) where TEvent : class, IEvent { - await engine.NotifyFail(message).ConfigureAwait(false); + return engine.NotifyFailAsync(message); } } } \ No newline at end of file diff --git a/src/Nybus/Policies/RetryErrorPolicy.cs b/src/Nybus/Policies/RetryErrorPolicy.cs index 92c1c79..24be11a 100644 --- a/src/Nybus/Policies/RetryErrorPolicy.cs +++ b/src/Nybus/Policies/RetryErrorPolicy.cs @@ -42,7 +42,7 @@ public RetryErrorPolicy(RetryErrorPolicyOptions options, ILogger(IBusEngine engine, Exception exception, CommandMessage message) + public async Task HandleErrorAsync(IBusEngine engine, Exception exception, CommandMessage message) where TCommand : class, ICommand { var retryCount = message.Headers.TryGetValue(Headers.RetryCount, out var str) && int.TryParse(str, out var i) ? i : 0; @@ -61,12 +61,12 @@ public async Task HandleError(IBusEngine engine, Exception exception, { _logger.LogTrace($"Error {retryCount}/{_options.MaxRetries}: will not retry"); - await engine.NotifyFail(message).ConfigureAwait(false); + await engine.NotifyFailAsync(message).ConfigureAwait(false); } } - public async Task HandleError(IBusEngine engine, Exception exception, EventMessage message) + public async Task HandleErrorAsync(IBusEngine engine, Exception exception, EventMessage message) where TEvent : class, IEvent { var retryCount = message.Headers.TryGetValue(Headers.RetryCount, out string str) && int.TryParse(str, out int i) ? i : 0; @@ -81,7 +81,7 @@ public async Task HandleError(IBusEngine engine, Exception exception, Ev } else { - await engine.NotifyFail(message).ConfigureAwait(false); + await engine.NotifyFailAsync(message).ConfigureAwait(false); } } diff --git a/src/engines/Nybus.Engine.RabbitMq/RabbitMqBusEngine.cs b/src/engines/Nybus.Engine.RabbitMq/RabbitMqBusEngine.cs index 0a8882d..7eb94a4 100644 --- a/src/engines/Nybus.Engine.RabbitMq/RabbitMqBusEngine.cs +++ b/src/engines/Nybus.Engine.RabbitMq/RabbitMqBusEngine.cs @@ -35,7 +35,7 @@ public RabbitMqBusEngine(IRabbitMqConfiguration configuration, ILogger Consumers => _consumers; - public IObservable Start() + public Task> StartAsync() { _connection = _configuration.ConnectionFactory.CreateConnection(); _channel = _connection.CreateModel(); @@ -45,7 +45,7 @@ public IObservable Start() if (!hasEvents && !hasCommands) { - return Observable.Never(); + return Task.FromResult(Observable.Never()); } var queueToConsume = new List(); @@ -83,11 +83,13 @@ public IObservable Start() } - return Observable.Defer(() => from queue in queueToConsume.ToObservable() - from args in SubscribeMessages(_channel, queue) - let message = GetMessage(args) - where message != null - select message); + var sequence = Observable.Defer(() => from queue in queueToConsume.ToObservable() + from args in SubscribeMessages(_channel, queue) + let message = GetMessage(args) + where message != null + select message); + + return Task.FromResult(sequence); IObservable SubscribeMessages(IModel channel, string queueName) { @@ -185,10 +187,12 @@ string GetHeader(IBasicProperties properties, string headerName, Encoding encodi } } - public void Stop() + public Task StopAsync() { _channel.Dispose(); _connection.Dispose(); + + return Task.CompletedTask; } public Task SendCommandAsync(CommandMessage message) where TCommand : class, ICommand @@ -239,7 +243,7 @@ public void SubscribeToEvent() AcceptedEventTypes.Add(typeof(TEvent)); } - public Task NotifySuccess(Message message) + public Task NotifySuccessAsync(Message message) { if (message.Headers.TryGetValue(RabbitMqHeaders.DeliveryTag, out var headerValue) && ulong.TryParse(headerValue, out var deliveryTag)) { @@ -263,7 +267,7 @@ public Task NotifySuccess(Message message) return Task.CompletedTask; } - public Task NotifyFail(Message message) + public Task NotifyFailAsync(Message message) { if (message.Headers.TryGetValue(RabbitMqHeaders.DeliveryTag, out var headerValue) && ulong.TryParse(headerValue, out var deliveryTag)) { diff --git a/tests/TestUtils/TestBusEngine.cs b/tests/TestUtils/TestBusEngine.cs index 01cd9f1..9e13a6a 100644 --- a/tests/TestUtils/TestBusEngine.cs +++ b/tests/TestUtils/TestBusEngine.cs @@ -20,12 +20,12 @@ public Task SendEventAsync(EventMessage message) throw new NotImplementedException(); } - public IObservable Start() + public Task> StartAsync() { throw new NotImplementedException(); } - public void Stop() + public Task StopAsync() { throw new NotImplementedException(); } @@ -42,12 +42,12 @@ public void SubscribeToEvent() throw new NotImplementedException(); } - public Task NotifySuccess(Message message) + public Task NotifySuccessAsync(Message message) { throw new NotImplementedException(); } - public Task NotifyFail(Message message) + public Task NotifyFailAsync(Message message) { throw new NotImplementedException(); } diff --git a/tests/Tests.Nybus.Engine.RabbitMq/RabbitMqBusEngineTests.cs b/tests/Tests.Nybus.Engine.RabbitMq/RabbitMqBusEngineTests.cs index 46866bb..b719b41 100644 --- a/tests/Tests.Nybus.Engine.RabbitMq/RabbitMqBusEngineTests.cs +++ b/tests/Tests.Nybus.Engine.RabbitMq/RabbitMqBusEngineTests.cs @@ -89,7 +89,7 @@ public void SubscribeToEvent_ignores_multiple_registrations_of_same_event(Rabbit [Test, AutoMoqData] public void Empty_sequence_is_returned_if_no_subscription(RabbitMqBusEngine sut) { - var sequence = sut.Start(); + var sequence = sut.StartAsync().Result; var incomingMessages = sequence.DumpInList(); @@ -101,7 +101,7 @@ public void Commands_can_be_subscribed(RabbitMqBusEngine sut) { sut.SubscribeToCommand(); - var sequence = sut.Start(); + var sequence = sut.StartAsync().Result; var incomingMessages = sequence.DumpInList(); @@ -113,7 +113,7 @@ public void Events_can_be_subscribed(RabbitMqBusEngine sut) { sut.SubscribeToEvent(); - var sequence = sut.Start(); + var sequence = sut.StartAsync().Result; var incomingMessages = sequence.DumpInList(); @@ -127,7 +127,7 @@ public void Commands_and_events_can_be_subscribed(RabbitMqBusEngine sut) sut.SubscribeToCommand(); - var sequence = sut.Start(); + var sequence = sut.StartAsync().Result; var incomingMessages = sequence.DumpInList(); @@ -139,7 +139,7 @@ public void QueueFactory_is_invoked_when_a_event_is_registered([Frozen] IRabbitM { sut.SubscribeToEvent(); - var sequence = sut.Start(); + var sequence = sut.StartAsync().Result; Mock.Get(configuration.EventQueueFactory).Verify(p => p.CreateQueue(It.IsAny())); @@ -150,7 +150,7 @@ public void QueueFactory_is_invoked_when_a_command_is_registered([Frozen] IRabbi { sut.SubscribeToCommand(); - var sequence = sut.Start(); + var sequence = sut.StartAsync().Result; Mock.Get(configuration.CommandQueueFactory).Verify(p => p.CreateQueue(It.IsAny())); @@ -161,7 +161,7 @@ public void Exchange_is_declared_when_a_event_is_registered([Frozen] IRabbitMqCo { sut.SubscribeToEvent(); - var sequence = sut.Start(); + var sequence = sut.StartAsync().Result; Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.ExchangeDeclare(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny>())); } @@ -171,7 +171,7 @@ public void Exchange_is_declared_when_a_command_is_registered([Frozen] IRabbitMq { sut.SubscribeToCommand(); - var sequence = sut.Start(); + var sequence = sut.StartAsync().Result; Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.ExchangeDeclare(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny>())); } @@ -181,7 +181,7 @@ public void Queue_is_bound_to_exchange_when_a_event_is_registered([Frozen] IRabb { sut.SubscribeToEvent(); - var sequence = sut.Start(); + var sequence = sut.StartAsync().Result; Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.QueueBind(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny>())); } @@ -191,7 +191,7 @@ public void Queue_is_bound_to_exchange_when_a_command_is_registered([Frozen] IRa { sut.SubscribeToCommand(); - var sequence = sut.Start(); + var sequence = sut.StartAsync().Result; Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.QueueBind(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny>())); } @@ -202,7 +202,7 @@ public void Event_consumer_is_exposed_when_sequence_is_subscribed(RabbitMqBusEng { sut.SubscribeToEvent(); - var sequence = sut.Start(); + var sequence = sut.StartAsync().Result; sequence.Subscribe(_ => { }); // subscribes to the sequence but takes no action when items are published @@ -214,7 +214,7 @@ public void Command_consumer_is_exposed_when_sequence_is_subscribed(RabbitMqBusE { sut.SubscribeToCommand(); - var sequence = sut.Start(); + var sequence = sut.StartAsync().Result; sequence.Subscribe(_ => { }); // subscribes to the sequence but takes no action when items are published @@ -226,7 +226,7 @@ public void Events_can_be_received([Frozen] IRabbitMqConfiguration configuration { sut.SubscribeToEvent(); - var sequence = sut.Start(); + var sequence = sut.StartAsync().Result; var encoding = Encoding.UTF8; @@ -264,7 +264,7 @@ public void Commands_can_be_received([Frozen] IRabbitMqConfiguration configurati { sut.SubscribeToCommand(); - var sequence = sut.Start(); + var sequence = sut.StartAsync().Result; var encoding = Encoding.UTF8; @@ -303,7 +303,7 @@ public void Invalid_events_are_discarded([Frozen] IRabbitMqConfiguration configu // At least one subscription is needed to inject invalid messages sut.SubscribeToEvent(); - var sequence = sut.Start(); + var sequence = sut.StartAsync().Result; var encoding = Encoding.UTF8; @@ -336,7 +336,7 @@ public void Invalid_commands_are_discarded([Frozen] IRabbitMqConfiguration confi // At least one subscription is needed to inject invalid messages sut.SubscribeToCommand(); - var sequence = sut.Start(); + var sequence = sut.StartAsync().Result; var encoding = Encoding.UTF8; @@ -364,11 +364,11 @@ public void Invalid_commands_are_discarded([Frozen] IRabbitMqConfiguration confi } [Test, AutoMoqData] - public void Engine_can_be_stopped([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut) + public async Task Engine_can_be_stopped([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut) { - sut.Start(); + await sut.StartAsync(); - sut.Stop(); + await sut.StopAsync(); Mock.Get(configuration.ConnectionFactory.CreateConnection()).Verify(p => p.Dispose()); Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.Dispose()); @@ -377,7 +377,7 @@ public void Engine_can_be_stopped([Frozen] IRabbitMqConfiguration configuration, [Test, AutoMoqData] public async Task Commands_can_be_sent([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, CommandMessage message) { - sut.Start(); + await sut.StartAsync().ConfigureAwait(false); await sut.SendCommandAsync(message); @@ -389,7 +389,7 @@ public async Task Arbitrary_headers_are_forwarded_when_sending_commands([Frozen] { message.Headers.Add(headerKey, headerValue); - sut.Start(); + await sut.StartAsync().ConfigureAwait(false); await sut.SendCommandAsync(message); @@ -399,7 +399,7 @@ public async Task Arbitrary_headers_are_forwarded_when_sending_commands([Frozen] [Test, AutoMoqData] public async Task Events_can_be_sent([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, EventMessage message) { - sut.Start(); + await sut.StartAsync().ConfigureAwait(false); await sut.SendEventAsync(message); @@ -412,7 +412,7 @@ public async Task Arbitrary_headers_are_forwarded_when_sending_events([Frozen] I { message.Headers.Add(headerKey, headerValue); - sut.Start(); + await sut.StartAsync().ConfigureAwait(false); await sut.SendEventAsync(message); @@ -424,7 +424,7 @@ public async Task NotifySuccess_acks_command_messages([Frozen] IRabbitMqConfigur { sut.SubscribeToCommand(); - var sequence = sut.Start(); + var sequence = await sut.StartAsync().ConfigureAwait(false); var encoding = Encoding.UTF8; @@ -446,7 +446,7 @@ public async Task NotifySuccess_acks_command_messages([Frozen] IRabbitMqConfigur sut.Consumers.First().Value.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); - await sut.NotifySuccess(incomingMessages.First()); + await sut.NotifySuccessAsync(incomingMessages.First()); Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicAck(deliveryTag, It.IsAny())); } @@ -456,7 +456,7 @@ public async Task NotifySuccess_acks_event_messages([Frozen] IRabbitMqConfigurat { sut.SubscribeToEvent(); - var sequence = sut.Start(); + var sequence = await sut.StartAsync().ConfigureAwait(false); var encoding = Encoding.UTF8; @@ -478,7 +478,7 @@ public async Task NotifySuccess_acks_event_messages([Frozen] IRabbitMqConfigurat sut.Consumers.First().Value.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); - await sut.NotifySuccess(incomingMessages.First()); + await sut.NotifySuccessAsync(incomingMessages.First()); Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicAck(deliveryTag, It.IsAny())); } @@ -488,7 +488,7 @@ public async Task NotifySuccess_can_handle_closed_connections([Frozen] IRabbitMq { sut.SubscribeToCommand(); - var sequence = sut.Start(); + var sequence = await sut.StartAsync().ConfigureAwait(false); var encoding = Encoding.UTF8; @@ -512,7 +512,7 @@ public async Task NotifySuccess_can_handle_closed_connections([Frozen] IRabbitMq Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Setup(p => p.BasicAck(It.IsAny(), It.IsAny())).Throws(new AlreadyClosedException(shutdownEventArgs)); - await sut.NotifySuccess(incomingMessages.First()); + await sut.NotifySuccessAsync(incomingMessages.First()); Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicAck(deliveryTag, It.IsAny())); } @@ -522,7 +522,7 @@ public async Task NotifyFail_nacks_command_messages([Frozen] IRabbitMqConfigurat { sut.SubscribeToCommand(); - var sequence = sut.Start(); + var sequence = await sut.StartAsync().ConfigureAwait(false); var encoding = Encoding.UTF8; @@ -544,7 +544,7 @@ public async Task NotifyFail_nacks_command_messages([Frozen] IRabbitMqConfigurat sut.Consumers.First().Value.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); - await sut.NotifyFail(incomingMessages.First()); + await sut.NotifyFailAsync(incomingMessages.First()); Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicNack(deliveryTag, It.IsAny(), true)); } @@ -554,7 +554,7 @@ public async Task NotifyFail_can_handle_closed_connections([Frozen] IRabbitMqCon { sut.SubscribeToCommand(); - var sequence = sut.Start(); + var sequence = await sut.StartAsync().ConfigureAwait(false); var encoding = Encoding.UTF8; @@ -578,7 +578,7 @@ public async Task NotifyFail_can_handle_closed_connections([Frozen] IRabbitMqCon Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Setup(p => p.BasicNack(It.IsAny(), It.IsAny(), It.IsAny())).Throws(new AlreadyClosedException(shutdownEventArgs)); - await sut.NotifyFail(incomingMessages.First()); + await sut.NotifyFailAsync(incomingMessages.First()); Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicNack(deliveryTag, It.IsAny(), It.IsAny())); } diff --git a/tests/Tests.Nybus/InMemoryBusEngineTests.cs b/tests/Tests.Nybus/InMemoryBusEngineTests.cs index b47040b..a07fb20 100644 --- a/tests/Tests.Nybus/InMemoryBusEngineTests.cs +++ b/tests/Tests.Nybus/InMemoryBusEngineTests.cs @@ -31,7 +31,7 @@ public async Task Sent_commands_are_received(InMemoryBusEngine sut, CommandMessa { sut.SubscribeToCommand(); - var sequence = sut.Start(); + var sequence = await sut.StartAsync().ConfigureAwait(false); var items = sequence.DumpInList(); @@ -45,7 +45,7 @@ public async Task Sent_events_are_received(InMemoryBusEngine sut, EventMessage(); - var sequence = sut.Start(); + var sequence = await sut.StartAsync().ConfigureAwait(false); var items = sequence.DumpInList(); @@ -57,7 +57,7 @@ public async Task Sent_events_are_received(InMemoryBusEngine sut, EventMessage isCompleted = true ); - sut.Stop(); + sut.StopAsync().Wait(); Assert.That(isCompleted, Is.True); } @@ -75,31 +75,31 @@ public void Stop_completes_the_sequence_if_started(InMemoryBusEngine sut) [Test, AutoMoqData] public void Stop_is_ignored_if_not_started(InMemoryBusEngine sut) { - sut.Stop(); + sut.StopAsync().Wait(); } [Test, AutoMoqData] public void NotifySuccess_returns_completed_task(InMemoryBusEngine sut, CommandMessage testMessage) { - Assert.That(sut.NotifySuccess(testMessage), Is.SameAs(Task.CompletedTask)); + Assert.That(sut.NotifySuccessAsync(testMessage), Is.SameAs(Task.CompletedTask)); } [Test, AutoMoqData] public void NotifySuccess_returns_completed_task(InMemoryBusEngine sut, EventMessage testMessage) { - Assert.That(sut.NotifySuccess(testMessage), Is.SameAs(Task.CompletedTask)); + Assert.That(sut.NotifySuccessAsync(testMessage), Is.SameAs(Task.CompletedTask)); } [Test, AutoMoqData] public void NotifyFail_returns_completed_task(InMemoryBusEngine sut, CommandMessage testMessage) { - Assert.That(sut.NotifyFail(testMessage), Is.SameAs(Task.CompletedTask)); + Assert.That(sut.NotifyFailAsync(testMessage), Is.SameAs(Task.CompletedTask)); } [Test, AutoMoqData] public void NotifyFail_returns_completed_task(InMemoryBusEngine sut, EventMessage testMessage) { - Assert.That(sut.NotifyFail(testMessage), Is.SameAs(Task.CompletedTask)); + Assert.That(sut.NotifyFailAsync(testMessage), Is.SameAs(Task.CompletedTask)); } } diff --git a/tests/Tests.Nybus/NybusHostTests.cs b/tests/Tests.Nybus/NybusHostTests.cs index 94b789f..a185569 100644 --- a/tests/Tests.Nybus/NybusHostTests.cs +++ b/tests/Tests.Nybus/NybusHostTests.cs @@ -60,7 +60,7 @@ public async Task StartAsync_starts_the_engine([Frozen] IBusEngine engine, Nybus { await sut.StartAsync(); - Mock.Get(engine).Verify(p => p.Start(), Times.Once); + Mock.Get(engine).Verify(p => p.StartAsync(), Times.Once); } [Test, AutoMoqData] @@ -68,7 +68,7 @@ public async Task StopAsync_is_ignored_if_not_started([Frozen] IBusEngine engine { await sut.StopAsync(); - Mock.Get(engine).Verify(p => p.Stop(), Times.Never); + Mock.Get(engine).Verify(p => p.StopAsync(), Times.Never); } @@ -79,7 +79,7 @@ public async Task StopAsync_stops_the_engine_if_started([Frozen] IBusEngine engi await sut.StopAsync(); - Mock.Get(engine).Verify(p => p.Stop(), Times.Once); + Mock.Get(engine).Verify(p => p.StopAsync(), Times.Once); } [Test, AutoMoqData] @@ -87,7 +87,7 @@ public async Task Handler_is_executed_when_commandMessages_are_processed([Frozen { var subject = new Subject(); - Mock.Get(engine).Setup(p => p.Start()).Returns(subject); + Mock.Get(engine).Setup(p => p.StartAsync()).ReturnsAsync(subject); var receivedMessage = Mock.Of>(); @@ -107,7 +107,7 @@ public async Task Engine_is_notified_when_commandMessages_are_successfully_proce { var subject = new Subject(); - Mock.Get(engine).Setup(p => p.Start()).Returns(subject); + Mock.Get(engine).Setup(p => p.StartAsync()).ReturnsAsync(subject); var receivedMessage = Mock.Of>(); @@ -119,7 +119,7 @@ public async Task Engine_is_notified_when_commandMessages_are_successfully_proce await sut.StopAsync(); - Mock.Get(engine).Verify(p => p.NotifySuccess(testMessage), Times.Once); + Mock.Get(engine).Verify(p => p.NotifySuccessAsync(testMessage), Times.Once); } [Test, AutoMoqData] @@ -127,7 +127,7 @@ public async Task Error_policy_is_executed_when_commandMessages_are_processed_wi { var subject = new Subject(); - Mock.Get(engine).Setup(p => p.Start()).Returns(subject); + Mock.Get(engine).Setup(p => p.StartAsync()).ReturnsAsync(subject); var receivedMessage = Mock.Of>(); Mock.Get(receivedMessage).Setup(p => p(It.IsAny(), It.IsAny>())).Throws(error); @@ -140,7 +140,7 @@ public async Task Error_policy_is_executed_when_commandMessages_are_processed_wi await sut.StopAsync(); - Mock.Get(configuration.ErrorPolicy).Verify(p => p.HandleError(engine, error, testMessage), Times.Once); + Mock.Get(configuration.ErrorPolicy).Verify(p => p.HandleErrorAsync(engine, error, testMessage), Times.Once); } [Test, AutoMoqData] @@ -148,7 +148,7 @@ public async Task Null_messages_delivered_from_engine_are_discarded([Frozen] IBu { var subject = new Subject(); - Mock.Get(engine).Setup(p => p.Start()).Returns(subject); + Mock.Get(engine).Setup(p => p.StartAsync()).ReturnsAsync(subject); var receivedMessage = Mock.Of>(); @@ -168,7 +168,7 @@ public async Task Handler_is_executed_when_eventMessages_are_processed([Frozen] { var subject = new Subject(); - Mock.Get(engine).Setup(p => p.Start()).Returns(subject); + Mock.Get(engine).Setup(p => p.StartAsync()).ReturnsAsync(subject); var receivedMessage = Mock.Of>(); @@ -188,7 +188,7 @@ public async Task Engine_is_notified_when_eventMessages_are_successfully_process { var subject = new Subject(); - Mock.Get(engine).Setup(p => p.Start()).Returns(subject); + Mock.Get(engine).Setup(p => p.StartAsync()).ReturnsAsync(subject); var receivedMessage = Mock.Of>(); @@ -200,7 +200,7 @@ public async Task Engine_is_notified_when_eventMessages_are_successfully_process await sut.StopAsync(); - Mock.Get(engine).Verify(p => p.NotifySuccess(testMessage), Times.Once); + Mock.Get(engine).Verify(p => p.NotifySuccessAsync(testMessage), Times.Once); } [Test, AutoMoqData] @@ -208,7 +208,7 @@ public async Task Error_policy_is_executed_when_eventMessages_are_processed_with { var subject = new Subject(); - Mock.Get(engine).Setup(p => p.Start()).Returns(subject); + Mock.Get(engine).Setup(p => p.StartAsync()).ReturnsAsync(subject); var receivedMessage = Mock.Of>(); Mock.Get(receivedMessage).Setup(p => p(It.IsAny(), It.IsAny>())).Throws(error); @@ -221,7 +221,7 @@ public async Task Error_policy_is_executed_when_eventMessages_are_processed_with await sut.StopAsync(); - Mock.Get(configuration.ErrorPolicy).Verify(p => p.HandleError(engine, error, testMessage), Times.Once); + Mock.Get(configuration.ErrorPolicy).Verify(p => p.HandleErrorAsync(engine, error, testMessage), Times.Once); } [Test, AutoMoqData] @@ -229,7 +229,7 @@ public async Task Null_messages_delivered_from_engine_are_discarded([Frozen] IBu { var subject = new Subject(); - Mock.Get(engine).Setup(p => p.Start()).Returns(subject); + Mock.Get(engine).Setup(p => p.StartAsync()).ReturnsAsync(subject); var receivedMessage = Mock.Of>(); @@ -259,7 +259,7 @@ public async Task ExecuteCommandHandler_creates_new_scope_for_execution([Frozen] Mock.Get(serviceProvider).Setup(p => p.GetService(handlerType)).Returns(handler); - await sut.ExecuteCommandHandler(dispatcher, commandContext, handlerType); + await sut.ExecuteCommandHandlerAsync(dispatcher, commandContext, handlerType); Mock.Get(scopeFactory).Verify(p => p.CreateScope(), Times.Once); } @@ -273,7 +273,7 @@ public async Task ExecuteCommandHandler_executes_handler([Frozen] IServiceProvid Mock.Get(serviceProvider).Setup(p => p.GetService(handlerType)).Returns(handler); - await sut.ExecuteCommandHandler(dispatcher, commandContext, handlerType); + await sut.ExecuteCommandHandlerAsync(dispatcher, commandContext, handlerType); Mock.Get(handler).Verify(p => p.HandleAsync(dispatcher, commandContext), Times.Once); } @@ -287,7 +287,7 @@ public async Task ExecuteEventHandler_creates_new_scope_for_execution([Frozen] I Mock.Get(serviceProvider).Setup(p => p.GetService(handlerType)).Returns(handler); - await sut.ExecuteEventHandler(dispatcher, eventContext, handlerType); + await sut.ExecuteEventHandlerAsync(dispatcher, eventContext, handlerType); Mock.Get(scopeFactory).Verify(p => p.CreateScope(), Times.Once); } @@ -301,7 +301,7 @@ public async Task ExecuteEventHandler_executes_handler([Frozen] IServiceProvider Mock.Get(serviceProvider).Setup(p => p.GetService(handlerType)).Returns(handler); - await sut.ExecuteEventHandler(dispatcher, eventContext, handlerType); + await sut.ExecuteEventHandlerAsync(dispatcher, eventContext, handlerType); Mock.Get(handler).Verify(p => p.HandleAsync(dispatcher, eventContext), Times.Once); } diff --git a/tests/Tests.Nybus/Policies/NoopErrorPolicyTests.cs b/tests/Tests.Nybus/Policies/NoopErrorPolicyTests.cs index d3e8a65..7879160 100644 --- a/tests/Tests.Nybus/Policies/NoopErrorPolicyTests.cs +++ b/tests/Tests.Nybus/Policies/NoopErrorPolicyTests.cs @@ -13,17 +13,17 @@ public class NoopErrorPolicyTests [Test, AutoMoqData] public async Task HandleError_notifies_engine(NoopErrorPolicy sut, IBusEngine engine, Exception exception, CommandMessage message) { - await sut.HandleError(engine, exception, message); + await sut.HandleErrorAsync(engine, exception, message); - Mock.Get(engine).Verify(p => p.NotifyFail(message)); + Mock.Get(engine).Verify(p => p.NotifyFailAsync(message)); } [Test, AutoMoqData] public async Task HandleError_notifies_engine(NoopErrorPolicy sut, IBusEngine engine, Exception exception, EventMessage message) { - await sut.HandleError(engine, exception, message); + await sut.HandleErrorAsync(engine, exception, message); - Mock.Get(engine).Verify(p => p.NotifyFail(message)); + Mock.Get(engine).Verify(p => p.NotifyFailAsync(message)); } } } \ No newline at end of file diff --git a/tests/Tests.Nybus/Policies/RetryErrorPolicyTests.cs b/tests/Tests.Nybus/Policies/RetryErrorPolicyTests.cs index 380939c..6ab4711 100644 --- a/tests/Tests.Nybus/Policies/RetryErrorPolicyTests.cs +++ b/tests/Tests.Nybus/Policies/RetryErrorPolicyTests.cs @@ -38,9 +38,9 @@ public async Task HandleError_notifies_engine_if_retry_count_equal_or_greater_th { message.Headers[Headers.RetryCount] = options.MaxRetries.Stringfy(); - await sut.HandleError(engine, error, message); + await sut.HandleErrorAsync(engine, error, message); - Mock.Get(engine).Verify(p => p.NotifyFail(message)); + Mock.Get(engine).Verify(p => p.NotifyFailAsync(message)); } [Test, AutoMoqData] @@ -48,7 +48,7 @@ public async Task HandleError_retries_if_retry_count_less_than_maxRetries([Froze { message.Headers[Headers.RetryCount] = (options.MaxRetries - 2).Stringfy(); - await sut.HandleError(engine, error, message); + await sut.HandleErrorAsync(engine, error, message); Mock.Get(engine).Verify(p => p.SendCommandAsync(message)); } @@ -58,7 +58,7 @@ public async Task HandleError_increments_retry_count_if_retry_count_present([Fro { message.Headers[Headers.RetryCount] = (options.MaxRetries - 2).Stringfy(); - await sut.HandleError(engine, error, message); + await sut.HandleErrorAsync(engine, error, message); Assert.That(message.Headers.ContainsKey(Headers.RetryCount)); Assert.That(message.Headers[Headers.RetryCount], Is.EqualTo((options.MaxRetries - 1).Stringfy())); @@ -67,7 +67,7 @@ public async Task HandleError_increments_retry_count_if_retry_count_present([Fro [Test, AutoMoqData] public async Task HandleError_retries_if_retry_count_not_present(RetryErrorPolicy sut, IBusEngine engine, Exception error, CommandMessage message) { - await sut.HandleError(engine, error, message); + await sut.HandleErrorAsync(engine, error, message); Mock.Get(engine).Verify(p => p.SendCommandAsync(message)); } @@ -75,7 +75,7 @@ public async Task HandleError_retries_if_retry_count_not_present(RetryErrorPolic [Test, AutoMoqData] public async Task HandleError_adds_retry_count_if_retry_count_not_present(RetryErrorPolicy sut, IBusEngine engine, Exception error, CommandMessage message) { - await sut.HandleError(engine, error, message); + await sut.HandleErrorAsync(engine, error, message); Assert.That(message.Headers.ContainsKey(Headers.RetryCount)); } @@ -85,9 +85,9 @@ public async Task HandleError_notifies_engine_if_retry_count_equal_or_greater_th { message.Headers[Headers.RetryCount] = options.MaxRetries.Stringfy(); - await sut.HandleError(engine, error, message); + await sut.HandleErrorAsync(engine, error, message); - Mock.Get(engine).Verify(p => p.NotifyFail(message)); + Mock.Get(engine).Verify(p => p.NotifyFailAsync(message)); } [Test, AutoMoqData] @@ -95,7 +95,7 @@ public async Task HandleError_retries_if_retry_count_less_than_maxRetries([Froze { message.Headers[Headers.RetryCount] = (options.MaxRetries - 2).Stringfy(); - await sut.HandleError(engine, error, message); + await sut.HandleErrorAsync(engine, error, message); Mock.Get(engine).Verify(p => p.SendEventAsync(message)); } @@ -103,7 +103,7 @@ public async Task HandleError_retries_if_retry_count_less_than_maxRetries([Froze [Test, AutoMoqData] public async Task HandleError_retries_if_retry_count_not_present(RetryErrorPolicy sut, IBusEngine engine, Exception error, EventMessage message) { - await sut.HandleError(engine, error, message); + await sut.HandleErrorAsync(engine, error, message); Mock.Get(engine).Verify(p => p.SendEventAsync(message)); } @@ -111,7 +111,7 @@ public async Task HandleError_retries_if_retry_count_not_present(RetryErrorPolic [Test, AutoMoqData] public async Task HandleError_adds_retry_count_if_retry_count_not_present(RetryErrorPolicy sut, IBusEngine engine, Exception error, EventMessage message) { - await sut.HandleError(engine, error, message); + await sut.HandleErrorAsync(engine, error, message); Assert.That(message.Headers.ContainsKey(Headers.RetryCount)); } @@ -121,7 +121,7 @@ public async Task HandleError_increments_retry_count_if_retry_count_present([Fro { message.Headers[Headers.RetryCount] = (options.MaxRetries - 2).Stringfy(); - await sut.HandleError(engine, error, message); + await sut.HandleErrorAsync(engine, error, message); Assert.That(message.Headers.ContainsKey(Headers.RetryCount)); Assert.That(message.Headers[Headers.RetryCount], Is.EqualTo((options.MaxRetries - 1).Stringfy()));