diff --git a/src/Nybus.Abstractions/Configuration/INybusConfigurator.cs b/src/Nybus.Abstractions/Configuration/INybusConfigurator.cs index df5b3c4..f4c4693 100644 --- a/src/Nybus.Abstractions/Configuration/INybusConfigurator.cs +++ b/src/Nybus.Abstractions/Configuration/INybusConfigurator.cs @@ -1,7 +1,8 @@ using System; +using System.Collections.Generic; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; -using Nybus.Policies; +using Nybus.Filters; namespace Nybus.Configuration { @@ -20,7 +21,11 @@ public interface INybusConfigurator public interface INybusConfiguration { - IErrorPolicy ErrorPolicy { get; set; } + IErrorFilter FallbackErrorFilter { get; } + + IReadOnlyList CommandErrorFilters { get; set; } + + IReadOnlyList EventErrorFilters { get; set; } } public static class NybusConfiguratorExtensions @@ -28,7 +33,7 @@ public static class NybusConfiguratorExtensions public static void UseBusEngine(this INybusConfigurator configurator, Action configureServices = null) where TEngine : class, IBusEngine { - configurator.AddServiceConfiguration(svcs => svcs.AddSingleton()); + configurator.AddServiceConfiguration(services => services.AddSingleton()); if (configureServices != null) { diff --git a/src/Nybus.Abstractions/Configuration/ConfigurationException.cs b/src/Nybus.Abstractions/Exceptions.cs similarity index 55% rename from src/Nybus.Abstractions/Configuration/ConfigurationException.cs rename to src/Nybus.Abstractions/Exceptions.cs index 3f0e166..3ab6a04 100644 --- a/src/Nybus.Abstractions/Configuration/ConfigurationException.cs +++ b/src/Nybus.Abstractions/Exceptions.cs @@ -1,6 +1,6 @@ using System; -namespace Nybus.Configuration +namespace Nybus { public class ConfigurationException : Exception { @@ -14,4 +14,14 @@ public ConfigurationException(string message) : base (message) } } + + public class MissingHandlerException : Exception + { + public MissingHandlerException(Type handlerType, string message) : base(message) + { + HandlerType = handlerType; + } + + public Type HandlerType { get; } + } } \ No newline at end of file diff --git a/src/Nybus.Abstractions/Filters/IErrorFilter.cs b/src/Nybus.Abstractions/Filters/IErrorFilter.cs new file mode 100644 index 0000000..f6a7b16 --- /dev/null +++ b/src/Nybus.Abstractions/Filters/IErrorFilter.cs @@ -0,0 +1,24 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; + +namespace Nybus.Filters +{ + public interface IErrorFilter + { + Task HandleErrorAsync(ICommandContext context, Exception exception, CommandErrorDelegate next) where TCommand : class, ICommand; + + Task HandleErrorAsync(IEventContext context, Exception exception, EventErrorDelegate next) where TEvent : class, IEvent; + } + + public delegate Task CommandErrorDelegate(ICommandContext context, Exception exception) where TCommand : class, ICommand; + + public delegate Task EventErrorDelegate(IEventContext context, Exception exception) where TEvent : class, IEvent; + + public interface IErrorFilterProvider + { + string ProviderName { get; } + + IErrorFilter CreateErrorFilter(IConfigurationSection settings); + } +} diff --git a/src/Nybus.Abstractions/Policies/IErrorPolicy.cs b/src/Nybus.Abstractions/Policies/IErrorPolicy.cs deleted file mode 100644 index 07e04a3..0000000 --- a/src/Nybus.Abstractions/Policies/IErrorPolicy.cs +++ /dev/null @@ -1,23 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; -using Microsoft.Extensions.Configuration; - -namespace Nybus.Policies -{ - - public interface IErrorPolicy - { - Task HandleErrorAsync(IBusEngine engine, Exception exception, CommandMessage message) where TCommand : class, ICommand; - - Task HandleErrorAsync(IBusEngine engine, Exception exception, EventMessage message) where TEvent : class, IEvent; - } - - public interface IErrorPolicyProvider - { - string ProviderName { get; } - - IErrorPolicy CreatePolicy(IConfigurationSection configuration); - } -} diff --git a/src/Nybus.Abstractions/Policies/IPolicy.cs b/src/Nybus.Abstractions/Policies/IPolicy.cs deleted file mode 100644 index 62f0f7a..0000000 --- a/src/Nybus.Abstractions/Policies/IPolicy.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace Nybus.Policies -{ - public interface IPolicy - { - - } -} diff --git a/src/Nybus.Abstractions/Utils/EnumerableExtensions.cs b/src/Nybus.Abstractions/Utils/EnumerableExtensions.cs new file mode 100644 index 0000000..223b454 --- /dev/null +++ b/src/Nybus.Abstractions/Utils/EnumerableExtensions.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Nybus.Utils +{ + public static class EnumerableExtensions + { + public static IEnumerable NotNull(this IEnumerable items) where T : class => items.Where(i => i != null); + + public static IEnumerable EmptyIfNull(this IEnumerable items) => items ?? Array.Empty(); + + public static IEnumerable IfNull(this IEnumerable items, IEnumerable alternative) => items ?? alternative; + } +} diff --git a/src/Nybus/Configuration/INybusHostConfigurationFactory.cs b/src/Nybus/Configuration/INybusHostConfigurationFactory.cs index 8537e46..d41fe09 100644 --- a/src/Nybus/Configuration/INybusHostConfigurationFactory.cs +++ b/src/Nybus/Configuration/INybusHostConfigurationFactory.cs @@ -1,33 +1,40 @@ using System; using System.Collections.Generic; +using System.Linq; using Microsoft.Extensions.Configuration; -using Nybus.Policies; +using Nybus.Filters; using Nybus.Utils; namespace Nybus.Configuration { public interface INybusHostConfigurationFactory { - INybusConfiguration CreateConfiguration(NybusHostOptions options); + NybusConfiguration CreateConfiguration(NybusHostOptions options); } public class NybusHostOptions { - public IConfigurationSection ErrorPolicy { get; set; } + public IConfigurationSection[] ErrorFilters { get; set; } + + public IConfigurationSection[] CommandErrorFilters { get; set; } + + public IConfigurationSection[] EventErrorFilters { get; set; } } public class NybusHostConfigurationFactory : INybusHostConfigurationFactory { - private readonly IReadOnlyDictionary _errorPolicyProviders; + private readonly IErrorFilter _fallbackErrorFilter; + private readonly IReadOnlyDictionary _errorFilterProvidersByName; - public NybusHostConfigurationFactory(IEnumerable errorPolicyProviders) + public NybusHostConfigurationFactory(IEnumerable errorFilterProviders, FallbackErrorFilter fallbackErrorFilter) { - _errorPolicyProviders = CreateDictionary(errorPolicyProviders ?? throw new ArgumentNullException(nameof(errorPolicyProviders))); + _fallbackErrorFilter = fallbackErrorFilter ?? throw new ArgumentNullException(nameof(fallbackErrorFilter)); + _errorFilterProvidersByName = CreateDictionary(errorFilterProviders ?? throw new ArgumentNullException(nameof(errorFilterProviders))); } - private IReadOnlyDictionary CreateDictionary(IEnumerable providers) + private static IReadOnlyDictionary CreateDictionary(IEnumerable providers) { - var result = new Dictionary(StringComparer.OrdinalIgnoreCase); + var result = new Dictionary(StringComparer.OrdinalIgnoreCase); foreach (var provider in providers) { @@ -40,30 +47,40 @@ public NybusHostConfigurationFactory(IEnumerable errorPoli return result; } - public INybusConfiguration CreateConfiguration(NybusHostOptions options) + public NybusConfiguration CreateConfiguration(NybusHostOptions options) { - var errorPolicy = GetErrorPolicy(options.ErrorPolicy); - return new NybusConfiguration { - ErrorPolicy = errorPolicy + FallbackErrorFilter = _fallbackErrorFilter, + CommandErrorFilters = GetErrorFilters(options.CommandErrorFilters), + EventErrorFilters = GetErrorFilters(options.EventErrorFilters) }; - IErrorPolicy GetErrorPolicy(IConfigurationSection section) + IReadOnlyList GetErrorFilters(IEnumerable sections) => sections + .IfNull(options.ErrorFilters) + .EmptyIfNull() + .Select(GetErrorFilter) + .NotNull() + .ToArray(); + + IErrorFilter GetErrorFilter(IConfigurationSection section) { - if (section != null && section.TryGetValue("ProviderName", out var providerName) && _errorPolicyProviders.TryGetValue(providerName, out var provider)) + if (section != null && section.TryGetValue("type", out var providerName) && _errorFilterProvidersByName.TryGetValue(providerName, out var provider)) { - return provider.CreatePolicy(section); + return provider.CreateErrorFilter(section); } - return new NoopErrorPolicy(); + return null; } } } public class NybusConfiguration : INybusConfiguration { - public IErrorPolicy ErrorPolicy { get; set; } - } + public IReadOnlyList CommandErrorFilters { get; set; } = new IErrorFilter[0]; + + public IReadOnlyList EventErrorFilters { get; set; } = new IErrorFilter[0]; + public IErrorFilter FallbackErrorFilter { get; set; } + } } \ No newline at end of file diff --git a/src/Nybus/Filters/DiscardErrorFilter.cs b/src/Nybus/Filters/DiscardErrorFilter.cs new file mode 100644 index 0000000..31b9951 --- /dev/null +++ b/src/Nybus/Filters/DiscardErrorFilter.cs @@ -0,0 +1,70 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Nybus.Filters +{ + public class DiscardErrorFilterProvider : IErrorFilterProvider + { + private readonly IServiceProvider _serviceProvider; + + public DiscardErrorFilterProvider(IServiceProvider serviceProvider) + { + _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); + } + + public string ProviderName { get; } = "discard"; + + public IErrorFilter CreateErrorFilter(IConfigurationSection settings) + { + var engine = _serviceProvider.GetRequiredService(); + var logger = _serviceProvider.GetRequiredService>(); + + return new DiscardErrorFilter(engine, logger); + } + } + + public class DiscardErrorFilter : IErrorFilter + { + private readonly IBusEngine _engine; + private readonly ILogger _logger; + + public DiscardErrorFilter(IBusEngine engine, ILogger logger) + { + _engine = engine ?? throw new ArgumentNullException(nameof(engine)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task HandleErrorAsync(ICommandContext context, Exception exception, CommandErrorDelegate next) + where TCommand : class, ICommand + { + try + { + await _engine.NotifyFailAsync(context.Message).ConfigureAwait(false); + } + catch (Exception discardException) + { + _logger.LogError(discardException, ex => $"Unable to discard message: {ex.Message}"); + await next(context, exception).ConfigureAwait(false); + } + } + + public async Task HandleErrorAsync(IEventContext context, Exception exception, EventErrorDelegate next) + where TEvent : class, IEvent + { + try + { + await _engine.NotifyFailAsync(context.Message).ConfigureAwait(false); + } + catch (Exception discardException) + { + _logger.LogError(discardException, ex => $"Unable to discard message: {ex.Message}"); + await next(context, exception).ConfigureAwait(false); + } + } + } +} diff --git a/src/Nybus/Filters/FallbackErrorFilter.cs b/src/Nybus/Filters/FallbackErrorFilter.cs new file mode 100644 index 0000000..24bedb4 --- /dev/null +++ b/src/Nybus/Filters/FallbackErrorFilter.cs @@ -0,0 +1,27 @@ +using System; +using System.Threading.Tasks; + +namespace Nybus.Filters +{ + public class FallbackErrorFilter : IErrorFilter + { + private readonly IBusEngine _engine; + + public FallbackErrorFilter(IBusEngine engine) + { + _engine = engine ?? throw new ArgumentNullException(nameof(engine)); + } + + public Task HandleErrorAsync(ICommandContext context, Exception exception, CommandErrorDelegate next) + where TCommand : class, ICommand + { + return _engine.NotifyFailAsync(context.Message); + } + + public Task HandleErrorAsync(IEventContext context, Exception exception, EventErrorDelegate next) + where TEvent : class, IEvent + { + return _engine.NotifyFailAsync(context.Message); + } + } +} \ No newline at end of file diff --git a/src/Nybus/Filters/RetryErrorFilter.cs b/src/Nybus/Filters/RetryErrorFilter.cs new file mode 100644 index 0000000..316f08c --- /dev/null +++ b/src/Nybus/Filters/RetryErrorFilter.cs @@ -0,0 +1,110 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Nybus.Utils; + +namespace Nybus.Filters +{ + public class RetryErrorFilterProvider : IErrorFilterProvider + { + private readonly IServiceProvider _serviceProvider; + + public RetryErrorFilterProvider(IServiceProvider serviceProvider) + { + _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); + } + + public string ProviderName { get; } = "retry"; + + public IErrorFilter CreateErrorFilter(IConfigurationSection settings) + { + var engine = _serviceProvider.GetRequiredService(); + var logger = _serviceProvider.GetRequiredService>(); + + var options = new RetryErrorFilterOptions(); + settings.Bind(options); + + return new RetryErrorFilter(engine, options, logger); + } + } + + public class RetryErrorFilter : IErrorFilter + { + private readonly IBusEngine _engine; + private readonly RetryErrorFilterOptions _options; + private readonly ILogger _logger; + + public RetryErrorFilter(IBusEngine engine, RetryErrorFilterOptions options, ILogger logger) + { + _engine = engine ?? throw new ArgumentNullException(nameof(engine)); + _options = options ?? throw new ArgumentNullException(nameof(options)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + if (_options.MaxRetries < 0) + { + throw new ArgumentOutOfRangeException(nameof(_options.MaxRetries), "maxRetries must be greater or equal than 0"); + } + } + + private static int RetryCount(Message message) => message.Headers.TryGetValue(Headers.RetryCount, out var str) && int.TryParse(str, out var i) ? i : 0; + + public async Task HandleErrorAsync(ICommandContext context, Exception exception, CommandErrorDelegate next) + where TCommand : class, ICommand + { + if (context.Message is CommandMessage message) + { + var retryCount = RetryCount(message) + 1; + + if (retryCount < _options.MaxRetries) + { + _logger.LogTrace($"Error {retryCount}/{_options.MaxRetries}: will retry"); + + message.Headers[Headers.RetryCount] = retryCount.Stringfy(); + + await _engine.SendCommandAsync(message).ConfigureAwait(false); + } + else + { + _logger.LogTrace($"Error {retryCount}/{_options.MaxRetries}: will not retry"); + + await _engine.NotifyFailAsync(message).ConfigureAwait(false); + + await next(context, exception).ConfigureAwait(false); + } + } + } + + public async Task HandleErrorAsync(IEventContext context, Exception exception, EventErrorDelegate next) + where TEvent : class, IEvent + { + if (context.Message is EventMessage message) + { + var retryCount = RetryCount(message) + 1; + + if (retryCount < _options.MaxRetries) + { + _logger.LogTrace($"Error {retryCount}/{_options.MaxRetries}: will retry"); + + message.Headers[Headers.RetryCount] = retryCount.Stringfy(); + + await _engine.SendEventAsync(message).ConfigureAwait(false); + } + else + { + _logger.LogTrace($"Error {retryCount}/{_options.MaxRetries}: will not retry"); + + await _engine.NotifyFailAsync(message).ConfigureAwait(false); + + await next(context, exception).ConfigureAwait(false); + } + } + } + } + + public class RetryErrorFilterOptions + { + public int MaxRetries { get; set; } + } +} diff --git a/src/Nybus/NybusConfiguratorExtensions.cs b/src/Nybus/NybusConfiguratorExtensions.cs index faabd3d..6d9e449 100644 --- a/src/Nybus/NybusConfiguratorExtensions.cs +++ b/src/Nybus/NybusConfiguratorExtensions.cs @@ -1,7 +1,7 @@ using System; using Microsoft.Extensions.DependencyInjection; using Nybus.Configuration; -using Nybus.Policies; +using Nybus.Filters; namespace Nybus { @@ -75,12 +75,11 @@ public static void SubscribeToEvent(this INybusConfigurator configurator configurator.AddServiceConfiguration(services => services.AddSingleton(handler)); } - public static void RegisterErrorPolicyProvider(this INybusConfigurator configurator, Func factory = null) - where TProvider: class, IErrorPolicyProvider + public static void RegisterErrorFilterProvider(this INybusConfigurator configurator, Func factory = null) where TProvider : class, IErrorFilterProvider { if (factory == null) { - configurator.AddServiceConfiguration(sc => sc.AddSingleton()); + configurator.AddServiceConfiguration(sc => sc.AddSingleton()); } else { diff --git a/src/Nybus/NybusHost.cs b/src/Nybus/NybusHost.cs index e6ed86b..724842f 100644 --- a/src/Nybus/NybusHost.cs +++ b/src/Nybus/NybusHost.cs @@ -1,5 +1,8 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.ComponentModel; +using System.Linq; using System.Reactive.Linq; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -8,6 +11,7 @@ using System.Reactive.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Nybus.Configuration; +using Nybus.Filters; namespace Nybus { @@ -28,7 +32,8 @@ public NybusHost(IBusEngine busEngine, INybusConfiguration configuration, IServi public IBus Bus => this; - public Task InvokeCommandAsync(TCommand command, Guid correlationId) where TCommand : class, ICommand + public Task InvokeCommandAsync(TCommand command, Guid correlationId) + where TCommand : class, ICommand { var message = new CommandMessage { @@ -45,7 +50,8 @@ public NybusHost(IBusEngine busEngine, INybusConfiguration configuration, IServi return _engine.SendCommandAsync(message); } - public Task RaiseEventAsync(TEvent @event, Guid correlationId) where TEvent : class, IEvent + public Task RaiseEventAsync(TEvent @event, Guid correlationId) + where TEvent : class, IEvent { var message = new EventMessage { @@ -76,7 +82,7 @@ public async Task StartAsync() from pipeline in _messagePipelines from execution in pipeline(message).ToObservable() select Unit.Default; - + _disposable = observable.Subscribe(); _logger.LogTrace("Bus started"); @@ -97,90 +103,42 @@ public async Task StopAsync() } } - private readonly List _messagePipelines = new List(); + private readonly IList _messagePipelines = new List(); - public void SubscribeToCommand(CommandReceived commandReceived) where TCommand : class, ICommand + public void SubscribeToCommand(CommandReceived commandReceived) + where TCommand : class, ICommand { _engine.SubscribeToCommand(); - _messagePipelines.Add(ProcessMessage); + _errorHandlers.AddOrUpdate(typeof(TCommand), CreateCommandErrorDelegate(), (key, item) => item); - async Task ProcessMessage(Message message) + _messagePipelines.Add(async message => { if (message is CommandMessage commandMessage) { + var dispatcher = new NybusDispatcher(this, commandMessage); var context = new NybusCommandContext(commandMessage); - - try - { - await ExecuteHandler(context).ConfigureAwait(false); - await NotifySuccess(commandMessage).ConfigureAwait(false); - } - catch (Exception ex) - { - await HandleError(ex, commandMessage, context).ConfigureAwait(false); - } + await commandReceived(dispatcher, context).ConfigureAwait(false); } - } - - async Task ExecuteHandler(ICommandContext context) - { - var dispatcher = new NybusDispatcher(this, context.Message); - await commandReceived(dispatcher, context).ConfigureAwait(false); - } - - async Task NotifySuccess(CommandMessage message) - { - await _engine.NotifySuccessAsync(message).ConfigureAwait(false); - } - - Task HandleError(Exception exception, CommandMessage message, ICommandContext context) - { - _logger.LogError(new { CorrelationId = context.CorrelationId, MessageId = message.MessageId, CommandType = typeof(TCommand).Name, Exception = exception, Message = message }, s => $"An error occurred while handling {s.CommandType}. {s.Exception.Message}"); - return _configuration.ErrorPolicy.HandleErrorAsync(_engine, exception, message); - } + }); } - public void SubscribeToEvent(EventReceived eventReceived) where TEvent : class, IEvent + public void SubscribeToEvent(EventReceived eventReceived) + where TEvent : class, IEvent { _engine.SubscribeToEvent(); - _messagePipelines.Add(ProcessMessage); + _errorHandlers.AddOrUpdate(typeof(TEvent), CreateEventErrorDelegate(), (key, item) => item); - async Task ProcessMessage(Message message) + _messagePipelines.Add(async message => { if (message is EventMessage eventMessage) { + var dispatcher = new NybusDispatcher(this, eventMessage); var context = new NybusEventContext(eventMessage); - - try - { - await ExecuteHandler(context).ConfigureAwait(false); - await NotifySuccess(eventMessage).ConfigureAwait(false); - } - catch (Exception ex) - { - await HandleError(ex, eventMessage, context).ConfigureAwait(false); - } + await eventReceived(dispatcher, context).ConfigureAwait(false); } - } - - async Task ExecuteHandler(IEventContext context) - { - var dispatcher = new NybusDispatcher(this, context.Message); - await eventReceived(dispatcher, context).ConfigureAwait(false); - } - - async Task NotifySuccess(EventMessage message) - { - await _engine.NotifySuccessAsync(message).ConfigureAwait(false); - } - - Task HandleError(Exception exception, EventMessage message, IEventContext context) - { - _logger.LogError(new { CorrelationId = context.CorrelationId, MessageId = message.MessageId, EventType = typeof(TEvent).Name, Exception = exception, Message = message }, s => $"An error occurred while handling {s.EventType}. {s.Exception.Message}"); - return _configuration.ErrorPolicy.HandleErrorAsync(_engine, exception, message); - } + }); } private delegate Task MessagePipeline(Message message); @@ -194,13 +152,27 @@ public async Task ExecuteCommandHandlerAsync(IDispatcher dispatcher, I { try { - var handler = (ICommandHandler)scope.ServiceProvider.GetRequiredService(handlerType); - await handler.HandleAsync(dispatcher, context).ConfigureAwait(false); + if (scope.ServiceProvider.GetService(handlerType) is ICommandHandler handler) + { + await handler.HandleAsync(dispatcher, context).ConfigureAwait(false); + await _engine.NotifySuccessAsync(context.Message).ConfigureAwait(false); + } + else + { + throw new MissingHandlerException(handlerType, $"No valid registration for {handlerType.FullName}"); + } } - catch (InvalidOperationException ex) + catch (MissingHandlerException ex) { - _logger.LogError(new { commandType = typeof(TCommand), handlerType}, s => $"No valid registration for {s.handlerType.FullName}"); - throw new ConfigurationException($"No valid registration for {handlerType.FullName}", ex); + _logger.LogError(new { eventType = typeof(TCommand), ex.HandlerType }, ex, (s, e) => $"No valid registration for {s.HandlerType.FullName}"); + throw; + } + catch (Exception ex) + { + var message = context.Message as CommandMessage; + _logger.LogError(new { CorrelationId = context.CorrelationId, MessageId = context.Message.MessageId, EventType = typeof(TCommand).Name, Message = message }, ex, (s, e) => $"An error occurred while handling {s.EventType}. {e.Message}"); + + await HandleCommandErrorAsync(context, ex).ConfigureAwait(false); } } } @@ -212,16 +184,96 @@ public async Task ExecuteEventHandlerAsync(IDispatcher dispatcher, IEven { try { - var handler = (IEventHandler)scope.ServiceProvider.GetRequiredService(handlerType); - await handler.HandleAsync(dispatcher, context).ConfigureAwait(false); + if (scope.ServiceProvider.GetService(handlerType) is IEventHandler handler) + { + await handler.HandleAsync(dispatcher, context).ConfigureAwait(false); + await _engine.NotifySuccessAsync(context.Message).ConfigureAwait(false); + } + else + { + throw new MissingHandlerException(handlerType, $"No valid registration for {handlerType.FullName}"); + } + } + catch (MissingHandlerException ex) + { + _logger.LogError(new { eventType = typeof(TEvent), ex.HandlerType }, ex, (s, e) => $"No valid registration for {s.HandlerType.FullName}"); + throw; + } + catch (Exception ex) + { + var message = context.Message as EventMessage; + _logger.LogError(new { CorrelationId = context.CorrelationId, MessageId = context.Message.MessageId, EventType = typeof(TEvent).Name, Message = message }, ex, (s,e) => $"An error occurred while handling {s.EventType}. {e.Message}"); + await HandleEventErrorAsync(context, ex).ConfigureAwait(false); } - catch (InvalidOperationException ex) + } + } + + private readonly ConcurrentDictionary> _errorHandlers = new ConcurrentDictionary>(); + + private Task HandleCommandErrorAsync(ICommandContext context, Exception error) + where TCommand : class, ICommand + { + var handler = _errorHandlers.GetOrAdd(typeof(TCommand), CreateCommandErrorDelegate()); + return handler(context, error); + } + + private Func CreateCommandErrorDelegate() + where TCommand : class, ICommand + { + var chain = new List> + { + (c, ex) => _configuration.FallbackErrorFilter.HandleErrorAsync(c, ex, null) + }; + + foreach (var filter in _configuration.CommandErrorFilters.Reverse()) + { + var latest = chain.Last(); + chain.Add((c, ex) => filter.HandleErrorAsync(c, ex, latest)); + } + + return (ctx, exception) => + { + if (ctx is ICommandContext context) { - _logger.LogError(new { eventType = typeof(TEvent), handlerType }, s => $"No valid registration for {s.handlerType.FullName}"); - throw new ConfigurationException($"No valid registration for {handlerType.FullName}", ex); + return chain.Last().Invoke(context, exception); } + + return Task.FromException(exception); + }; + } + + private Task HandleEventErrorAsync(IEventContext context, Exception error) + where TEvent : class, IEvent + { + var handler = _errorHandlers.GetOrAdd(typeof(TEvent), CreateEventErrorDelegate()); + return handler(context, error); + } + + private Func CreateEventErrorDelegate() + where TEvent : class, IEvent + { + var chain = new List> + { + (c, ex) => _configuration.FallbackErrorFilter.HandleErrorAsync(c, ex, null) + }; + + foreach (var filter in _configuration.EventErrorFilters.Reverse()) + { + var latest = chain.Last(); + chain.Add((c, ex) => filter.HandleErrorAsync(c, ex, latest)); } + + return (ctx, exception) => + { + if (ctx is IEventContext context) + { + return chain.Last().Invoke(context, exception); + } + + return Task.FromException(exception); + }; } + } -} +} \ No newline at end of file diff --git a/src/Nybus/Policies/NoopErrorPolicy.cs b/src/Nybus/Policies/NoopErrorPolicy.cs deleted file mode 100644 index ccea62d..0000000 --- a/src/Nybus/Policies/NoopErrorPolicy.cs +++ /dev/null @@ -1,30 +0,0 @@ -using System; -using System.Threading.Tasks; -using Microsoft.Extensions.Configuration; - -namespace Nybus.Policies -{ - public class NoopErrorPolicyProvider : IErrorPolicyProvider - { - public string ProviderName => "noop"; - public IErrorPolicy CreatePolicy(IConfigurationSection configuration) - { - return new NoopErrorPolicy(); - } - } - - public class NoopErrorPolicy : IErrorPolicy - { - public Task HandleErrorAsync(IBusEngine engine, Exception exception, CommandMessage message) - where TCommand : class, ICommand - { - return engine.NotifyFailAsync(message); - } - - public Task HandleErrorAsync(IBusEngine engine, Exception exception, EventMessage message) - where TEvent : class, IEvent - { - return engine.NotifyFailAsync(message); - } - } -} \ No newline at end of file diff --git a/src/Nybus/Policies/RetryErrorPolicy.cs b/src/Nybus/Policies/RetryErrorPolicy.cs deleted file mode 100644 index 24be11a..0000000 --- a/src/Nybus/Policies/RetryErrorPolicy.cs +++ /dev/null @@ -1,95 +0,0 @@ -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using System; -using System.Threading.Tasks; -using Microsoft.Extensions.Configuration; - -namespace Nybus.Policies -{ - public class RetryErrorPolicyProvider : IErrorPolicyProvider - { - private readonly ILoggerFactory _loggerFactory; - - public RetryErrorPolicyProvider(ILoggerFactory loggerFactory) - { - _loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); - } - - public string ProviderName => "retry"; - - public IErrorPolicy CreatePolicy(IConfigurationSection configuration) - { - var options = new RetryErrorPolicyOptions(); - configuration.Bind(options); - - var logger = _loggerFactory.CreateLogger(); - return new RetryErrorPolicy(options, logger); - } - } - - - public class RetryErrorPolicy : IErrorPolicy - { - private readonly ILogger _logger; - private readonly RetryErrorPolicyOptions _options; - - public RetryErrorPolicy(RetryErrorPolicyOptions options, ILogger logger) - { - _options = options ?? throw new ArgumentNullException(nameof(options)); - - if (_options.MaxRetries < 0) throw new ArgumentOutOfRangeException(nameof(_options.MaxRetries), "maxRetries must be greater or equal than 0"); - - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - } - - public async Task HandleErrorAsync(IBusEngine engine, Exception exception, CommandMessage message) - where TCommand : class, ICommand - { - var retryCount = message.Headers.TryGetValue(Headers.RetryCount, out var str) && int.TryParse(str, out var i) ? i : 0; - - retryCount++; - - if (retryCount < _options.MaxRetries) - { - _logger.LogTrace($"Error {retryCount}/{_options.MaxRetries}: will retry"); - - message.Headers[Headers.RetryCount] = retryCount.ToString(); - - await engine.SendCommandAsync(message).ConfigureAwait(false); - } - else - { - _logger.LogTrace($"Error {retryCount}/{_options.MaxRetries}: will not retry"); - - await engine.NotifyFailAsync(message).ConfigureAwait(false); - } - - } - - public async Task HandleErrorAsync(IBusEngine engine, Exception exception, EventMessage message) - where TEvent : class, IEvent - { - var retryCount = message.Headers.TryGetValue(Headers.RetryCount, out string str) && int.TryParse(str, out int i) ? i : 0; - - retryCount++; - - if (retryCount < _options.MaxRetries) - { - message.Headers[Headers.RetryCount] = retryCount.ToString(); - - await engine.SendEventAsync(message).ConfigureAwait(false); - } - else - { - await engine.NotifyFailAsync(message).ConfigureAwait(false); - } - } - - public int MaxRetries => _options.MaxRetries; - } - - public class RetryErrorPolicyOptions - { - public int MaxRetries { get; set; } - } -} diff --git a/src/Nybus/ServiceCollectionExtensions.cs b/src/Nybus/ServiceCollectionExtensions.cs index 0ea7a7d..0453d55 100644 --- a/src/Nybus/ServiceCollectionExtensions.cs +++ b/src/Nybus/ServiceCollectionExtensions.cs @@ -3,7 +3,7 @@ using System; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; -using Nybus.Policies; +using Nybus.Filters; using Nybus.Utils; namespace Nybus @@ -14,8 +14,10 @@ public static IServiceCollection AddNybus(this IServiceCollection services, Acti { var configurator = new NybusConfigurator(); - configurator.RegisterErrorPolicyProvider(); - configurator.RegisterErrorPolicyProvider(); + configurator.RegisterErrorFilterProvider(); + configurator.RegisterErrorFilterProvider(); + + services.AddSingleton(); configure(configurator); @@ -58,7 +60,7 @@ public static IServiceCollection AddNybus(this IServiceCollection services, Acti { var engine = sp.GetRequiredService(); var builder = sp.GetRequiredService(); - var configuration = sp.GetRequiredService(); + var configuration = sp.GetRequiredService(); configurator.ConfigureBuilder(builder); diff --git a/tests/Tests.Nybus.Abstractions/Configuration/ConfigurationExceptionTests.cs b/tests/Tests.Nybus.Abstractions/ConfigurationExceptionTests.cs similarity index 87% rename from tests/Tests.Nybus.Abstractions/Configuration/ConfigurationExceptionTests.cs rename to tests/Tests.Nybus.Abstractions/ConfigurationExceptionTests.cs index ea46518..7bc2bc1 100644 --- a/tests/Tests.Nybus.Abstractions/Configuration/ConfigurationExceptionTests.cs +++ b/tests/Tests.Nybus.Abstractions/ConfigurationExceptionTests.cs @@ -1,10 +1,8 @@ using System; -using System.Collections.Generic; -using System.Text; using NUnit.Framework; -using Nybus.Configuration; +using Nybus; -namespace Tests.Configuration +namespace Tests { [TestFixture] public class ConfigurationExceptionTests diff --git a/tests/Tests.Nybus.Abstractions/Utils/EnumerableExtensionsTests.cs b/tests/Tests.Nybus.Abstractions/Utils/EnumerableExtensionsTests.cs new file mode 100644 index 0000000..ba92d67 --- /dev/null +++ b/tests/Tests.Nybus.Abstractions/Utils/EnumerableExtensionsTests.cs @@ -0,0 +1,56 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using NUnit.Framework; +using Nybus.Utils; + +namespace Tests.Utils +{ + [TestFixture] + public class EnumerableExtensionsTests + { + [Test, AutoMoqData] + public void NotNull_filters_null_items_out(IEnumerable items) + { + items = items.Concat(new string[] { null }); + + Assume.That(items.All(i => i != null), Is.False); + + Assert.That(EnumerableExtensions.NotNull(items).All(i => i != null), Is.True); + } + + [Test, AutoMoqData] + public void EmptyIfNull_returns_same_set_if_not_null(IEnumerable items) + { + Assume.That(items, Is.Not.Null); + + Assert.That(items.EmptyIfNull(), Is.SameAs(items)); + } + + [Test, AutoMoqData] + public void EmptyIfNull_returns_empty_set_if_null() + { + IEnumerable items = null; + + Assume.That(items, Is.Null); + + Assert.That(items.EmptyIfNull(), Is.Not.Null); + Assert.That(items.EmptyIfNull(), Is.Empty); + } + + [Test, AutoMoqData] + public void IfNull_returns_set_if_not_null(IEnumerable items, IEnumerable alternative) + { + Assert.That(items.IfNull(alternative), Is.SameAs(items)); + } + + [Test, AutoMoqData] + public void IfNull_returns_alternative_if_not_null(IEnumerable alternative) + { + IEnumerable items = null; + + Assert.That(items.IfNull(alternative), Is.SameAs(alternative)); + } + } +} diff --git a/tests/Tests.Nybus.Engine.RabbitMq/Configuration/ConfigurationFactoryTests.cs b/tests/Tests.Nybus.Engine.RabbitMq/Configuration/ConfigurationFactoryTests.cs index 4a4da06..5109b49 100644 --- a/tests/Tests.Nybus.Engine.RabbitMq/Configuration/ConfigurationFactoryTests.cs +++ b/tests/Tests.Nybus.Engine.RabbitMq/Configuration/ConfigurationFactoryTests.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.Logging; using Moq; using NUnit.Framework; +using Nybus; using Nybus.Configuration; namespace Tests.Configuration diff --git a/tests/Tests.Nybus/Configuration/NybusHostConfigurationFactoryTests.cs b/tests/Tests.Nybus/Configuration/NybusHostConfigurationFactoryTests.cs index cbcdb05..36d88a2 100644 --- a/tests/Tests.Nybus/Configuration/NybusHostConfigurationFactoryTests.cs +++ b/tests/Tests.Nybus/Configuration/NybusHostConfigurationFactoryTests.cs @@ -2,41 +2,163 @@ using System.Collections.Generic; using System.Linq; using System.Text; +using AutoFixture.Idioms; using AutoFixture.NUnit3; using Microsoft.Extensions.Configuration; using Moq; using NUnit.Framework; using Nybus.Configuration; -using Nybus.Policies; +using Nybus.Filters; namespace Tests.Configuration { [TestFixture] public class NybusHostConfigurationFactoryTests { - [Test] - public void ErrorPolicyProviders_are_required() + [Test, AutoMoqData] + public void Constructor_is_guarded(GuardClauseAssertion assertion) { - Assert.Throws(() => new NybusHostConfigurationFactory(null)); + assertion.Verify(typeof(NybusHostConfigurationFactory).GetConstructors()); } [Test, CustomAutoMoqData] - public void CreateConfiguration_uses_NoopErrorPolicy_if_no_policy_is_specified(NybusHostConfigurationFactory sut, NybusHostOptions options) + public void CreateConfiguration_uses_selected_provider_to_create_Command_filters([Frozen] IEnumerable errorFilterProviders, NybusHostConfigurationFactory sut) + { + var providers = errorFilterProviders.ToArray(); + var selectedProviders = providers.Take(providers.Length - 1); + + var settings = new Dictionary(); + + var i = 0; + foreach (var provider in selectedProviders) + { + settings.Add($"CommandErrorFilters:{i}:type", provider.ProviderName); + i++; + } + + var config = new ConfigurationBuilder().AddInMemoryCollection(settings).Build(); + + var options = new NybusHostOptions(); + + config.Bind(options); + + var configuration = sut.CreateConfiguration(options); + + Assert.True(selectedProviders.All(p => + { + Mock.Get(p).Verify(o => o.CreateErrorFilter(It.IsAny())); + return true; + })); + } + + + [Test, CustomAutoMoqData] + public void CreateConfiguration_discards_unused_selected_provider_to_create_Command_filters([Frozen] IEnumerable errorFilterProviders, NybusHostConfigurationFactory sut) + { + var providers = errorFilterProviders.ToArray(); + var selectedProviders = providers.Take(providers.Length - 1); + var unusedProviders = providers.Skip(providers.Length - 1); + + var settings = new Dictionary(); + + var i = 0; + foreach (var provider in selectedProviders) + { + settings.Add($"CommandErrorFilters:{i}:type", provider.ProviderName); + i++; + } + + var config = new ConfigurationBuilder().AddInMemoryCollection(settings).Build(); + + var options = new NybusHostOptions(); + + config.Bind(options); + + var configuration = sut.CreateConfiguration(options); + + Assert.True(unusedProviders.All(p => + { + Mock.Get(p).Verify(o => o.CreateErrorFilter(It.IsAny()), Times.Never); + return true; + })); + } + + [Test, CustomAutoMoqData] + public void CreateConfiguration_ignores_unregistered_providers_when_creating_command_filters(NybusHostConfigurationFactory sut, NybusHostOptions options) { var configuration = sut.CreateConfiguration(options); - Assert.That(configuration.ErrorPolicy, Is.InstanceOf()); + Assert.That(configuration.CommandErrorFilters, Is.Empty); } [Test, CustomAutoMoqData] - public void CreateConfiguration_uses_selected_provider([Frozen] IEnumerable errorPolicyProviders, NybusHostConfigurationFactory sut, NybusHostOptions options) + public void CreateConfiguration_uses_selected_provider_to_create_Event_filters([Frozen] IEnumerable errorFilterProviders, NybusHostConfigurationFactory sut) { - Mock.Get(options.ErrorPolicy.GetSection("ProviderName")).SetupGet(p => p.Value).Returns(errorPolicyProviders.First().ProviderName); + var providers = errorFilterProviders.ToArray(); + var selectedProviders = providers.Take(providers.Length - 1); + var settings = new Dictionary(); + + var i = 0; + foreach (var provider in selectedProviders) + { + settings.Add($"EventErrorFilters:{i}:type", provider.ProviderName); + i++; + } + + var config = new ConfigurationBuilder().AddInMemoryCollection(settings).Build(); + + var options = new NybusHostOptions(); + + config.Bind(options); + + var configuration = sut.CreateConfiguration(options); + + Assert.True(selectedProviders.All(p => + { + Mock.Get(p).Verify(o => o.CreateErrorFilter(It.IsAny())); + return true; + })); + } + + + [Test, CustomAutoMoqData] + public void CreateConfiguration_discards_unused_selected_provider_to_create_Event_filters([Frozen] IEnumerable errorFilterProviders, NybusHostConfigurationFactory sut) + { + var providers = errorFilterProviders.ToArray(); + var selectedProviders = providers.Take(providers.Length - 1); + var unusedProviders = providers.Skip(providers.Length - 1); + + var settings = new Dictionary(); + + var i = 0; + foreach (var provider in selectedProviders) + { + settings.Add($"EventErrorFilters:{i}:type", provider.ProviderName); + i++; + } + + var config = new ConfigurationBuilder().AddInMemoryCollection(settings).Build(); + + var options = new NybusHostOptions(); + + config.Bind(options); + + var configuration = sut.CreateConfiguration(options); + + Assert.True(unusedProviders.All(p => + { + Mock.Get(p).Verify(o => o.CreateErrorFilter(It.IsAny()), Times.Never); + return true; + })); + } + + [Test, CustomAutoMoqData] + public void CreateConfiguration_ignores_unregistered_providers_when_creating_event_filters(NybusHostConfigurationFactory sut, NybusHostOptions options) + { var configuration = sut.CreateConfiguration(options); - Mock.Get(errorPolicyProviders.First()).Verify(p => p.CreatePolicy(options.ErrorPolicy), Times.Once); - Assert.That(configuration.ErrorPolicy, Is.SameAs(errorPolicyProviders.First().CreatePolicy(options.ErrorPolicy))); + Assert.That(configuration.EventErrorFilters, Is.Empty); } } } diff --git a/tests/Tests.Nybus/Filters/DiscardErrorFilterProviderTests.cs b/tests/Tests.Nybus/Filters/DiscardErrorFilterProviderTests.cs new file mode 100644 index 0000000..a98729a --- /dev/null +++ b/tests/Tests.Nybus/Filters/DiscardErrorFilterProviderTests.cs @@ -0,0 +1,39 @@ +using System; +using AutoFixture.Idioms; +using AutoFixture.NUnit3; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Moq; +using NUnit.Framework; +using Nybus; +using Nybus.Filters; + +namespace Tests.Filters +{ + [TestFixture] + public class DiscardErrorFilterProviderTests + { + [Test, AutoMoqData] + public void Constructor_is_guarded(GuardClauseAssertion assertion) + { + assertion.Verify(typeof(DiscardErrorFilterProvider).GetConstructors()); + } + + [Test, AutoMoqData] + public void ProviderName_is_correct(DiscardErrorFilterProvider sut) + { + Assert.That(sut.ProviderName, Is.EqualTo("discard")); + } + + [Test, AutoMoqData] + public void CreateErrorFilter_returns_filter([Frozen] IServiceProvider serviceProvider, DiscardErrorFilterProvider sut, IConfigurationSection settings, IBusEngine engine, ILogger logger) + { + Mock.Get(serviceProvider).Setup(p => p.GetService(typeof(IBusEngine))).Returns(engine); + Mock.Get(serviceProvider).Setup(p => p.GetService(typeof(ILogger))).Returns(logger); + + var filter = sut.CreateErrorFilter(settings); + + Assert.That(filter, Is.InstanceOf()); + } + } +} \ No newline at end of file diff --git a/tests/Tests.Nybus/Filters/DiscardErrorFilterTests.cs b/tests/Tests.Nybus/Filters/DiscardErrorFilterTests.cs new file mode 100644 index 0000000..dac7d8c --- /dev/null +++ b/tests/Tests.Nybus/Filters/DiscardErrorFilterTests.cs @@ -0,0 +1,65 @@ +using System; +using System.Threading.Tasks; +using AutoFixture.Idioms; +using AutoFixture.NUnit3; +using Moq; +using NUnit.Framework; +using Nybus; +using Nybus.Filters; + +namespace Tests.Filters +{ + [TestFixture] + public class DiscardErrorFilterTests + { + [Test, AutoMoqData] + public void Constructor_is_guarded(GuardClauseAssertion assertion) + { + assertion.Verify(typeof(DiscardErrorFilter).GetConstructors()); + } + + [Test, AutoMoqData] + public async Task HandleErrorAsync_notifies_engine_on_Command([Frozen] IBusEngine engine, DiscardErrorFilter sut, ICommandContext context, Exception exception) + { + var next = Mock.Of>(); + + await sut.HandleErrorAsync(context, exception, next); + + Mock.Get(engine).Verify(p => p.NotifyFailAsync(context.Message)); + } + + [Test, AutoMoqData] + public async Task HandleErrorAsync_notifies_engine_on_Event([Frozen] IBusEngine engine, DiscardErrorFilter sut, IEventContext context, Exception exception) + { + var next = Mock.Of>(); + + await sut.HandleErrorAsync(context, exception, next); + + Mock.Get(engine).Verify(p => p.NotifyFailAsync(context.Message)); + } + + [Test, AutoMoqData] + public async Task HandleErrorAsync_forwards_to_next_if_error_on_Command([Frozen] IBusEngine engine, DiscardErrorFilter sut, ICommandContext context, Exception exception, Exception discardException) + { + var next = Mock.Of>(); + + Mock.Get(engine).Setup(p => p.NotifyFailAsync(It.IsAny())).ThrowsAsync(discardException); + + await sut.HandleErrorAsync(context, exception, next); + + Mock.Get(next).Verify(p => p(context, exception)); + } + + [Test, AutoMoqData] + public async Task HandleErrorAsync_forwards_to_next_if_error_on_Event([Frozen] IBusEngine engine, DiscardErrorFilter sut, IEventContext context, Exception exception, Exception discardException) + { + var next = Mock.Of>(); + + Mock.Get(engine).Setup(p => p.NotifyFailAsync(It.IsAny())).ThrowsAsync(discardException); + + await sut.HandleErrorAsync(context, exception, next); + + Mock.Get(next).Verify(p => p(context, exception)); + } + } +} diff --git a/tests/Tests.Nybus/Filters/FallbackErrorFilterTests.cs b/tests/Tests.Nybus/Filters/FallbackErrorFilterTests.cs new file mode 100644 index 0000000..44a24ad --- /dev/null +++ b/tests/Tests.Nybus/Filters/FallbackErrorFilterTests.cs @@ -0,0 +1,41 @@ +using System; +using System.Threading.Tasks; +using AutoFixture.Idioms; +using AutoFixture.NUnit3; +using Moq; +using NUnit.Framework; +using Nybus; +using Nybus.Filters; + +namespace Tests.Filters +{ + [TestFixture] + public class FallbackErrorFilterTests + { + [Test, AutoMoqData] + public void Constructor_is_guarded(GuardClauseAssertion assertion) + { + assertion.Verify(typeof(FallbackErrorFilter).GetConstructors()); + } + + [Test, AutoMoqData] + public async Task HandleErrorAsync_notifies_engine_on_Command([Frozen] IBusEngine engine, FallbackErrorFilter sut, ICommandContext context, Exception exception) + { + var next = Mock.Of>(); + + await sut.HandleErrorAsync(context, exception, next); + + Mock.Get(engine).Verify(p => p.NotifyFailAsync(context.Message)); + } + + [Test, AutoMoqData] + public async Task HandleErrorAsync_notifies_engine_on_Event([Frozen] IBusEngine engine, FallbackErrorFilter sut, IEventContext context, Exception exception) + { + var next = Mock.Of>(); + + await sut.HandleErrorAsync(context, exception, next); + + Mock.Get(engine).Verify(p => p.NotifyFailAsync(context.Message)); + } + } +} \ No newline at end of file diff --git a/tests/Tests.Nybus/Filters/RetryErrorFilterProviderTests.cs b/tests/Tests.Nybus/Filters/RetryErrorFilterProviderTests.cs new file mode 100644 index 0000000..b11afa8 --- /dev/null +++ b/tests/Tests.Nybus/Filters/RetryErrorFilterProviderTests.cs @@ -0,0 +1,199 @@ +using System; +using System.Threading.Tasks; +using AutoFixture.Idioms; +using AutoFixture.NUnit3; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Moq; +using NUnit.Framework; +using Nybus; +using Nybus.Filters; +using Nybus.Utils; + +namespace Tests.Filters +{ + [TestFixture] + public class RetryErrorFilterProviderTests + { + [Test, AutoMoqData] + public void Constructor_is_guarded(GuardClauseAssertion assertion) + { + assertion.Verify(typeof(RetryErrorFilterProvider).GetConstructors()); + } + + [Test, AutoMoqData] + public void ProviderName_is_correct(RetryErrorFilterProvider sut) + { + Assert.That(sut.ProviderName, Is.EqualTo("retry")); + } + + [Test, AutoMoqData] + public void CreateErrorFilter_returns_filter([Frozen] IServiceProvider serviceProvider, RetryErrorFilterProvider sut, IConfigurationSection settings, IBusEngine engine, ILogger logger, int maxRetries) + { + Mock.Get(settings.GetSection("MaxRetries")).Setup(p => p.Value).Returns(maxRetries.ToString()); + + Mock.Get(serviceProvider).Setup(p => p.GetService(typeof(IBusEngine))).Returns(engine); + Mock.Get(serviceProvider).Setup(p => p.GetService(typeof(ILogger))).Returns(logger); + + var filter = sut.CreateErrorFilter(settings); + + Assert.That(filter, Is.InstanceOf()); + } + } + + [TestFixture] + public class RetryErrorFilterTests + { + [Test, AutoMoqData] + public void Constructor_is_guarded(GuardClauseAssertion assertion) + { + assertion.Verify(typeof(RetryErrorFilter).GetConstructors()); + } + + [Test, AutoMoqData] + public void MaxRetries_cant_be_negative(IBusEngine engine, ILogger logger, int retries) + { + var options = new RetryErrorFilterOptions { MaxRetries = -retries }; + + Assert.Throws(() => new RetryErrorFilter(engine, options, logger)); + } + + [Test, CustomAutoMoqData] + public async Task HandleError_notifies_engine_if_retry_count_equal_or_greater_than_maxRetries([Frozen] RetryErrorFilterOptions options, [Frozen] IBusEngine engine, RetryErrorFilter sut, Exception error, NybusCommandContext context) + { + var next = Mock.Of>(); + + context.Message.Headers[Headers.RetryCount] = options.MaxRetries.Stringfy(); + + await sut.HandleErrorAsync(context, error, next); + + Mock.Get(engine).Verify(p => p.NotifyFailAsync(context.Message)); + } + + [Test, CustomAutoMoqData] + public async Task HandleError_invokes_next_if_retry_count_equal_or_greater_than_maxRetries([Frozen] RetryErrorFilterOptions options, [Frozen] IBusEngine engine, RetryErrorFilter sut, Exception error, NybusCommandContext context) + { + var next = Mock.Of>(); + + context.Message.Headers[Headers.RetryCount] = options.MaxRetries.Stringfy(); + + await sut.HandleErrorAsync(context, error, next); + + Mock.Get(next).Verify(p => p(context, error)); + } + + [Test, CustomAutoMoqData] + public async Task HandleError_retries_if_retry_count_less_than_maxRetries([Frozen] RetryErrorFilterOptions options, [Frozen] IBusEngine engine, RetryErrorFilter sut, Exception error, NybusCommandContext context) + { + var next = Mock.Of>(); + + context.Message.Headers[Headers.RetryCount] = (options.MaxRetries - 2).Stringfy(); + + await sut.HandleErrorAsync(context, error, next); + + Mock.Get(engine).Verify(p => p.SendCommandAsync(context.CommandMessage)); + } + + [Test, CustomAutoMoqData] + public async Task HandleError_increments_retry_count_if_retry_count_present([Frozen] RetryErrorFilterOptions options, [Frozen] IBusEngine engine, RetryErrorFilter sut, Exception error, NybusCommandContext context) + { + var next = Mock.Of>(); + + context.Message.Headers[Headers.RetryCount] = (options.MaxRetries - 2).Stringfy(); + + await sut.HandleErrorAsync(context, error, next); + + Assert.That(context.Message.Headers.ContainsKey(Headers.RetryCount)); + Assert.That(context.Message.Headers[Headers.RetryCount], Is.EqualTo((options.MaxRetries - 1).Stringfy())); + } + + [Test, CustomAutoMoqData] + public async Task HandleError_retries_if_retry_count_not_present([Frozen] IBusEngine engine, RetryErrorFilter sut, Exception error, NybusCommandContext context) + { + var next = Mock.Of>(); + + await sut.HandleErrorAsync(context, error, next); + + Mock.Get(engine).Verify(p => p.SendCommandAsync(context.CommandMessage)); + } + + [Test, CustomAutoMoqData] + public async Task HandleError_adds_retry_count_if_retry_count_not_present([Frozen] IBusEngine engine, RetryErrorFilter sut, Exception error, NybusCommandContext context) + { + var next = Mock.Of>(); + + await sut.HandleErrorAsync(context, error, next); + + Assert.That(context.Message.Headers.ContainsKey(Headers.RetryCount)); + } + + [Test, CustomAutoMoqData] + public async Task HandleError_notifies_engine_if_retry_count_equal_or_greater_than_maxRetries([Frozen] RetryErrorFilterOptions options, [Frozen] IBusEngine engine, RetryErrorFilter sut, Exception error, NybusEventContext context) + { + var next = Mock.Of>(); + + context.Message.Headers[Headers.RetryCount] = options.MaxRetries.Stringfy(); + + await sut.HandleErrorAsync(context, error, next); + + Mock.Get(engine).Verify(p => p.NotifyFailAsync(context.Message)); + } + + [Test, CustomAutoMoqData] + public async Task HandleError_invokes_next_if_retry_count_equal_or_greater_than_maxRetries([Frozen] RetryErrorFilterOptions options, [Frozen] IBusEngine engine, RetryErrorFilter sut, Exception error, NybusEventContext context) + { + var next = Mock.Of>(); + + context.Message.Headers[Headers.RetryCount] = options.MaxRetries.Stringfy(); + + await sut.HandleErrorAsync(context, error, next); + + Mock.Get(next).Verify(p => p(context, error)); + } + + [Test, CustomAutoMoqData] + public async Task HandleError_retries_if_retry_count_less_than_maxRetries([Frozen] RetryErrorFilterOptions options, [Frozen] IBusEngine engine, RetryErrorFilter sut, Exception error, NybusEventContext context) + { + var next = Mock.Of>(); + + context.Message.Headers[Headers.RetryCount] = (options.MaxRetries - 2).Stringfy(); + + await sut.HandleErrorAsync(context, error, next); + + Mock.Get(engine).Verify(p => p.SendEventAsync(context.EventMessage)); + } + + [Test, CustomAutoMoqData] + public async Task HandleError_increments_retry_count_if_retry_count_present([Frozen] RetryErrorFilterOptions options, [Frozen] IBusEngine engine, RetryErrorFilter sut, Exception error, NybusEventContext context) + { + var next = Mock.Of>(); + + context.Message.Headers[Headers.RetryCount] = (options.MaxRetries - 2).Stringfy(); + + await sut.HandleErrorAsync(context, error, next); + + Assert.That(context.Message.Headers.ContainsKey(Headers.RetryCount)); + Assert.That(context.Message.Headers[Headers.RetryCount], Is.EqualTo((options.MaxRetries - 1).Stringfy())); + } + + [Test, CustomAutoMoqData] + public async Task HandleError_retries_if_retry_count_not_present([Frozen] IBusEngine engine, RetryErrorFilter sut, Exception error, NybusEventContext context) + { + var next = Mock.Of>(); + + await sut.HandleErrorAsync(context, error, next); + + Mock.Get(engine).Verify(p => p.SendEventAsync(context.EventMessage)); + } + + [Test, CustomAutoMoqData] + public async Task HandleError_adds_retry_count_if_retry_count_not_present([Frozen] IBusEngine engine, RetryErrorFilter sut, Exception error, NybusEventContext context) + { + var next = Mock.Of>(); + + await sut.HandleErrorAsync(context, error, next); + + Assert.That(context.Message.Headers.ContainsKey(Headers.RetryCount)); + } + } +} \ No newline at end of file diff --git a/tests/Tests.Nybus/NybusConfiguratorExtensionsTests.cs b/tests/Tests.Nybus/NybusConfiguratorExtensionsTests.cs index 8b159e4..ea44592 100644 --- a/tests/Tests.Nybus/NybusConfiguratorExtensionsTests.cs +++ b/tests/Tests.Nybus/NybusConfiguratorExtensionsTests.cs @@ -5,7 +5,7 @@ using NUnit.Framework; using Nybus; using Nybus.Configuration; -using Nybus.Policies; +using Nybus.Filters; using NybusConfiguratorExtensions = Nybus.NybusConfiguratorExtensions; // ReSharper disable InvokeAsExtensionMethod @@ -164,35 +164,36 @@ public void SubscribeToEvent_registers_handler_instance(TestNybusConfigurator ny } [Test, CustomAutoMoqData] - public void RegisterErrorPolicyProvider_adds_provider_with_default_setup(TestNybusConfigurator nybus, IServiceCollection services) + public void RegisterErrorFilterProvider_adds_provider_with_default_setup(TestNybusConfigurator nybus, IServiceCollection services) { - NybusConfiguratorExtensions.RegisterErrorPolicyProvider(nybus); + NybusConfiguratorExtensions.RegisterErrorFilterProvider(nybus); nybus.ApplyServiceConfigurations(services); - Mock.Get(services).Verify(p => p.Add(It.Is(sd => sd.ServiceType == typeof(IErrorPolicyProvider) && sd.ImplementationType == typeof(TestErrorPolicyProvider)))); + Mock.Get(services).Verify(p => p.Add(It.Is(sd => sd.ServiceType == typeof(IErrorFilterProvider) && sd.ImplementationType == typeof(TestErrorFilterProvider)))); } [Test, CustomAutoMoqData] - public void RegisterErrorPolicyProvider_adds_provider_with_custom_setup(TestNybusConfigurator nybus, IServiceCollection services) + public void RegisterErrorFilterProvider_adds_provider_with_custom_setup(TestNybusConfigurator nybus, IServiceCollection services) { - var providerFactory = Mock.Of>(); + var providerFactory = Mock.Of>(); - NybusConfiguratorExtensions.RegisterErrorPolicyProvider(nybus, providerFactory); + NybusConfiguratorExtensions.RegisterErrorFilterProvider(nybus, providerFactory); nybus.ApplyServiceConfigurations(services); - Mock.Get(services).Verify(p => p.Add(It.Is(sd => sd.ServiceType == typeof(IErrorPolicyProvider) && sd.ImplementationFactory == providerFactory))); + Mock.Get(services).Verify(p => p.Add(It.Is(sd => sd.ServiceType == typeof(IErrorFilterProvider) && sd.ImplementationFactory == providerFactory))); } } - public class TestErrorPolicyProvider : IErrorPolicyProvider + public class TestErrorFilterProvider : IErrorFilterProvider { public string ProviderName { get; } - public IErrorPolicy CreatePolicy(IConfigurationSection configuration) + + public IErrorFilter CreateErrorFilter(IConfigurationSection settings) { - throw new System.NotImplementedException(); + throw new NotImplementedException(); } } } diff --git a/tests/Tests.Nybus/NybusHostTests.cs b/tests/Tests.Nybus/NybusHostTests.cs index a9f4b5c..e212726 100644 --- a/tests/Tests.Nybus/NybusHostTests.cs +++ b/tests/Tests.Nybus/NybusHostTests.cs @@ -9,6 +9,7 @@ using NUnit.Framework; using Nybus; using Nybus.Configuration; +using Nybus.Filters; namespace Tests { @@ -35,7 +36,7 @@ public void ServiceProvider_is_required(IBusEngine engine, INybusConfiguration c [Test, CustomAutoMoqData] - public void Logger_is_required(IBusEngine engine, INybusConfiguration configuration, IServiceProvider serviceProvider) + public void Logger_is_required(IBusEngine engine, NybusConfiguration configuration, IServiceProvider serviceProvider) { Assert.Throws(() => new NybusHost(engine, configuration, serviceProvider, null)); } @@ -110,7 +111,7 @@ public async Task Handler_is_executed_when_commandMessages_are_processed([Frozen } [Test, CustomAutoMoqData] - public async Task Engine_is_notified_when_commandMessages_are_successfully_processed([Frozen] IBusEngine engine, NybusHost sut, CommandMessage testMessage) + public async Task Null_messages_delivered_from_engine_are_discarded([Frozen] IBusEngine engine, NybusHost sut, CommandMessage testMessage) { var subject = new Subject(); @@ -122,24 +123,23 @@ public async Task Engine_is_notified_when_commandMessages_are_successfully_proce await sut.StartAsync(); - subject.OnNext(testMessage); + subject.OnNext(null); await sut.StopAsync(); - Mock.Get(engine).Verify(p => p.NotifySuccessAsync(testMessage), Times.Once); + Mock.Get(receivedMessage).Verify(p => p(It.IsAny(), It.IsAny>()), Times.Never); } [Test, CustomAutoMoqData] - public async Task Error_policy_is_executed_when_commandMessages_are_processed_with_errors([Frozen] IBusEngine engine, [Frozen] INybusConfiguration configuration, NybusHost sut, CommandMessage testMessage, Exception error) + public async Task Handler_is_executed_when_eventMessages_are_processed([Frozen] IBusEngine engine, NybusHost sut, EventMessage testMessage) { var subject = new Subject(); Mock.Get(engine).Setup(p => p.StartAsync()).ReturnsAsync(subject); - var receivedMessage = Mock.Of>(); - Mock.Get(receivedMessage).Setup(p => p(It.IsAny(), It.IsAny>())).Throws(error); + var receivedMessage = Mock.Of>(); - sut.SubscribeToCommand(receivedMessage); + sut.SubscribeToEvent(receivedMessage); await sut.StartAsync(); @@ -147,19 +147,19 @@ public async Task Error_policy_is_executed_when_commandMessages_are_processed_wi await sut.StopAsync(); - Mock.Get(configuration.ErrorPolicy).Verify(p => p.HandleErrorAsync(engine, error, testMessage), Times.Once); + Mock.Get(receivedMessage).Verify(p => p(It.IsAny(), It.IsAny>()), Times.Once); } [Test, CustomAutoMoqData] - public async Task Null_messages_delivered_from_engine_are_discarded([Frozen] IBusEngine engine, NybusHost sut, CommandMessage testMessage) + public async Task Null_messages_delivered_from_engine_are_discarded([Frozen] IBusEngine engine, NybusHost sut, EventMessage testMessage) { var subject = new Subject(); Mock.Get(engine).Setup(p => p.StartAsync()).ReturnsAsync(subject); - var receivedMessage = Mock.Of>(); + var receivedMessage = Mock.Of>(); - sut.SubscribeToCommand(receivedMessage); + sut.SubscribeToEvent(receivedMessage); await sut.StartAsync(); @@ -167,98 +167,93 @@ public async Task Null_messages_delivered_from_engine_are_discarded([Frozen] IBu await sut.StopAsync(); - Mock.Get(receivedMessage).Verify(p => p(It.IsAny(), It.IsAny>()), Times.Never); + Mock.Get(receivedMessage).Verify(p => p(It.IsAny(), It.IsAny>()), Times.Never); } [Test, CustomAutoMoqData] - public async Task Handler_is_executed_when_eventMessages_are_processed([Frozen] IBusEngine engine, NybusHost sut, EventMessage testMessage) + public void ExecutionEnvironment_returns_self(NybusHost sut) { - var subject = new Subject(); - - Mock.Get(engine).Setup(p => p.StartAsync()).ReturnsAsync(subject); - - var receivedMessage = Mock.Of>(); + Assert.That(sut.ExecutionEnvironment, Is.SameAs(sut)); + } - sut.SubscribeToEvent(receivedMessage); + [Test, CustomAutoMoqData] + public async Task ExecuteCommandHandler_creates_new_scope_for_execution([Frozen] IServiceProvider serviceProvider, NybusHost sut, IDispatcher dispatcher, ICommandContext commandContext, IServiceScopeFactory scopeFactory, ICommandHandler handler) + { + var handlerType = handler.GetType(); - await sut.StartAsync(); + Mock.Get(serviceProvider).Setup(p => p.GetService(typeof(IServiceScopeFactory))).Returns(scopeFactory); - subject.OnNext(testMessage); + Mock.Get(serviceProvider).Setup(p => p.GetService(handlerType)).Returns(handler); - await sut.StopAsync(); + await sut.ExecuteCommandHandlerAsync(dispatcher, commandContext, handlerType); - Mock.Get(receivedMessage).Verify(p => p(It.IsAny(), It.IsAny>()), Times.Once); + Mock.Get(scopeFactory).Verify(p => p.CreateScope(), Times.Once); } [Test, CustomAutoMoqData] - public async Task Engine_is_notified_when_eventMessages_are_successfully_processed([Frozen] IBusEngine engine, NybusHost sut, EventMessage testMessage) + public async Task ExecuteCommandHandler_executes_handler([Frozen] IServiceProvider serviceProvider, NybusHost sut, IDispatcher dispatcher, ICommandContext commandContext, IServiceScopeFactory scopeFactory, ICommandHandler handler) { - var subject = new Subject(); + var handlerType = handler.GetType(); - Mock.Get(engine).Setup(p => p.StartAsync()).ReturnsAsync(subject); + Mock.Get(serviceProvider).Setup(p => p.GetService(typeof(IServiceScopeFactory))).Returns(scopeFactory); - var receivedMessage = Mock.Of>(); + Mock.Get(serviceProvider).Setup(p => p.GetService(handlerType)).Returns(handler); - sut.SubscribeToEvent(receivedMessage); + await sut.ExecuteCommandHandlerAsync(dispatcher, commandContext, handlerType); - await sut.StartAsync(); + Mock.Get(handler).Verify(p => p.HandleAsync(dispatcher, commandContext), Times.Once); + } - subject.OnNext(testMessage); + [Test, CustomAutoMoqData] + public void ExecuteCommandHandlerAsync_throws_if_handler_is_not_registered([Frozen] IServiceProvider serviceProvider, NybusHost sut, IDispatcher dispatcher, ICommandContext commandContext, IServiceScopeFactory scopeFactory) + { + var handlerType = typeof(FirstTestCommandHandler); - await sut.StopAsync(); + Mock.Get(serviceProvider).Setup(p => p.GetService(typeof(IServiceScopeFactory))).Returns(scopeFactory); + + Mock.Get(serviceProvider).Setup(p => p.GetService(handlerType)).Returns(null as FirstTestCommandHandler); - Mock.Get(engine).Verify(p => p.NotifySuccessAsync(testMessage), Times.Once); + Assert.ThrowsAsync(() => sut.ExecuteCommandHandlerAsync(dispatcher, commandContext, handlerType)); } [Test, CustomAutoMoqData] - public async Task Error_policy_is_executed_when_eventMessages_are_processed_with_errors([Frozen] IBusEngine engine, [Frozen] INybusConfiguration configuration, NybusHost sut, EventMessage testMessage, Exception error) + public async Task ExecuteCommandHandlerAsync_notifies_engine_on_success([Frozen] IServiceProvider serviceProvider, [Frozen] IBusEngine engine, NybusHost sut, IDispatcher dispatcher, CommandMessage commandMessage, IServiceScopeFactory scopeFactory, ICommandHandler handler) { - var subject = new Subject(); - - Mock.Get(engine).Setup(p => p.StartAsync()).ReturnsAsync(subject); - - var receivedMessage = Mock.Of>(); - Mock.Get(receivedMessage).Setup(p => p(It.IsAny(), It.IsAny>())).Throws(error); + var handlerType = handler.GetType(); - sut.SubscribeToEvent(receivedMessage); + var context = new NybusCommandContext(commandMessage); - await sut.StartAsync(); + Mock.Get(serviceProvider).Setup(p => p.GetService(typeof(IServiceScopeFactory))).Returns(scopeFactory); - subject.OnNext(testMessage); + Mock.Get(serviceProvider).Setup(p => p.GetService(handlerType)).Returns(handler); - await sut.StopAsync(); + await sut.ExecuteCommandHandlerAsync(dispatcher, context, handlerType); - Mock.Get(configuration.ErrorPolicy).Verify(p => p.HandleErrorAsync(engine, error, testMessage), Times.Once); + Mock.Get(engine).Verify(p => p.NotifySuccessAsync(context.Message)); } [Test, CustomAutoMoqData] - public async Task Null_messages_delivered_from_engine_are_discarded([Frozen] IBusEngine engine, NybusHost sut, EventMessage testMessage) + public async Task ExecuteCommandHandlerAsync_executed_error_filter_on_fail([Frozen] IServiceProvider serviceProvider, [Frozen] IBusEngine engine, [Frozen] INybusConfiguration configuration, NybusHost sut, IDispatcher dispatcher, CommandMessage commandMessage, IServiceScopeFactory scopeFactory, ICommandHandler handler, Exception error, IErrorFilter errorFilter) { - var subject = new Subject(); - - Mock.Get(engine).Setup(p => p.StartAsync()).ReturnsAsync(subject); + configuration.CommandErrorFilters = new[] { errorFilter }; - var receivedMessage = Mock.Of>(); + var handlerType = handler.GetType(); - sut.SubscribeToEvent(receivedMessage); + var context = new NybusCommandContext(commandMessage); - await sut.StartAsync(); + Mock.Get(serviceProvider).Setup(p => p.GetService(typeof(IServiceScopeFactory))).Returns(scopeFactory); - subject.OnNext(null); + Mock.Get(serviceProvider).Setup(p => p.GetService(handlerType)).Returns(handler); - await sut.StopAsync(); + Mock.Get(handler).Setup(p => p.HandleAsync(It.IsAny(), It.IsAny>())).Throws(error); - Mock.Get(receivedMessage).Verify(p => p(It.IsAny(), It.IsAny>()), Times.Never); - } + await sut.ExecuteCommandHandlerAsync(dispatcher, context, handlerType); - [Test, CustomAutoMoqData] - public void ExecutionEnvironment_returns_self(NybusHost sut) - { - Assert.That(sut.ExecutionEnvironment, Is.SameAs(sut)); + Mock.Get(errorFilter).Verify(p => p.HandleErrorAsync(context, error, It.IsAny>())); } [Test, CustomAutoMoqData] - public async Task ExecuteCommandHandler_creates_new_scope_for_execution([Frozen] IServiceProvider serviceProvider, NybusHost sut, IDispatcher dispatcher, ICommandContext commandContext, IServiceScopeFactory scopeFactory, ICommandHandler handler) + public async Task ExecuteEventHandler_creates_new_scope_for_execution([Frozen] IServiceProvider serviceProvider, NybusHost sut, IDispatcher dispatcher, IEventContext eventContext, IServiceScopeFactory scopeFactory, IEventHandler handler) { var handlerType = handler.GetType(); @@ -266,13 +261,13 @@ public async Task ExecuteCommandHandler_creates_new_scope_for_execution([Frozen] Mock.Get(serviceProvider).Setup(p => p.GetService(handlerType)).Returns(handler); - await sut.ExecuteCommandHandlerAsync(dispatcher, commandContext, handlerType); + await sut.ExecuteEventHandlerAsync(dispatcher, eventContext, handlerType); Mock.Get(scopeFactory).Verify(p => p.CreateScope(), Times.Once); } [Test, CustomAutoMoqData] - public async Task ExecuteCommandHandler_executes_handler([Frozen] IServiceProvider serviceProvider, NybusHost sut, IDispatcher dispatcher, ICommandContext commandContext, IServiceScopeFactory scopeFactory, ICommandHandler handler) + public async Task ExecuteEventHandler_executes_handler([Frozen] IServiceProvider serviceProvider, NybusHost sut, IDispatcher dispatcher, IEventContext eventContext, IServiceScopeFactory scopeFactory, IEventHandler handler) { var handlerType = handler.GetType(); @@ -280,61 +275,57 @@ public async Task ExecuteCommandHandler_executes_handler([Frozen] IServiceProvid Mock.Get(serviceProvider).Setup(p => p.GetService(handlerType)).Returns(handler); - await sut.ExecuteCommandHandlerAsync(dispatcher, commandContext, handlerType); + await sut.ExecuteEventHandlerAsync(dispatcher, eventContext, handlerType); - Mock.Get(handler).Verify(p => p.HandleAsync(dispatcher, commandContext), Times.Once); + Mock.Get(handler).Verify(p => p.HandleAsync(dispatcher, eventContext), Times.Once); } [Test, CustomAutoMoqData] - public void ExecuteCommandHandlerAsync_throws_if_handler_is_not_registered([Frozen] IServiceProvider serviceProvider, NybusHost sut, IDispatcher dispatcher, ICommandContext commandContext, IServiceScopeFactory scopeFactory) + public void ExecuteEventHandlerAsync_throws_if_handler_is_not_registered([Frozen] IServiceProvider serviceProvider, NybusHost sut, IDispatcher dispatcher, IEventContext eventContext, IServiceScopeFactory scopeFactory) { - var handlerType = typeof(FirstTestCommandHandler); + var handlerType = typeof(FirstTestEventHandler); Mock.Get(serviceProvider).Setup(p => p.GetService(typeof(IServiceScopeFactory))).Returns(scopeFactory); - Mock.Get(serviceProvider).Setup(p => p.GetService(handlerType)).Throws(); + Mock.Get(serviceProvider).Setup(p => p.GetService(handlerType)).Returns(null as FirstTestEventHandler); - Assert.ThrowsAsync(() => sut.ExecuteCommandHandlerAsync(dispatcher, commandContext, handlerType)); + Assert.ThrowsAsync(() => sut.ExecuteEventHandlerAsync(dispatcher, eventContext, handlerType)); } [Test, CustomAutoMoqData] - public async Task ExecuteEventHandler_creates_new_scope_for_execution([Frozen] IServiceProvider serviceProvider, NybusHost sut, IDispatcher dispatcher, IEventContext eventContext, IServiceScopeFactory scopeFactory, IEventHandler handler) + public async Task ExecuteEventHandlerAsync_notifies_engine_on_success([Frozen] IServiceProvider serviceProvider, [Frozen] IBusEngine engine, NybusHost sut, IDispatcher dispatcher, EventMessage eventMessage, IServiceScopeFactory scopeFactory, IEventHandler handler) { var handlerType = handler.GetType(); + var context = new NybusEventContext(eventMessage); + Mock.Get(serviceProvider).Setup(p => p.GetService(typeof(IServiceScopeFactory))).Returns(scopeFactory); Mock.Get(serviceProvider).Setup(p => p.GetService(handlerType)).Returns(handler); - await sut.ExecuteEventHandlerAsync(dispatcher, eventContext, handlerType); + await sut.ExecuteEventHandlerAsync(dispatcher, context, handlerType); - Mock.Get(scopeFactory).Verify(p => p.CreateScope(), Times.Once); + Mock.Get(engine).Verify(p => p.NotifySuccessAsync(context.Message)); } [Test, CustomAutoMoqData] - public async Task ExecuteEventHandler_executes_handler([Frozen] IServiceProvider serviceProvider, NybusHost sut, IDispatcher dispatcher, IEventContext eventContext, IServiceScopeFactory scopeFactory, IEventHandler handler) + public async Task ExecuteEventHandlerAsync_executed_error_filter_on_fail([Frozen] IServiceProvider serviceProvider, [Frozen] IBusEngine engine, [Frozen] INybusConfiguration configuration, NybusHost sut, IDispatcher dispatcher, EventMessage eventMessage, IServiceScopeFactory scopeFactory, IEventHandler handler, Exception error, IErrorFilter errorFilter) { + configuration.EventErrorFilters = new[] { errorFilter }; + var handlerType = handler.GetType(); + var context = new NybusEventContext(eventMessage); + Mock.Get(serviceProvider).Setup(p => p.GetService(typeof(IServiceScopeFactory))).Returns(scopeFactory); Mock.Get(serviceProvider).Setup(p => p.GetService(handlerType)).Returns(handler); - await sut.ExecuteEventHandlerAsync(dispatcher, eventContext, handlerType); - - Mock.Get(handler).Verify(p => p.HandleAsync(dispatcher, eventContext), Times.Once); - } - - [Test, CustomAutoMoqData] - public void ExecuteEventHandlerAsync_throws_if_handler_is_not_registered([Frozen] IServiceProvider serviceProvider, NybusHost sut, IDispatcher dispatcher, IEventContext eventContext, IServiceScopeFactory scopeFactory) - { - var handlerType = typeof(FirstTestEventHandler); - - Mock.Get(serviceProvider).Setup(p => p.GetService(typeof(IServiceScopeFactory))).Returns(scopeFactory); + Mock.Get(handler).Setup(p => p.HandleAsync(It.IsAny(), It.IsAny>())).Throws(error); - Mock.Get(serviceProvider).Setup(p => p.GetService(handlerType)).Throws(); + await sut.ExecuteEventHandlerAsync(dispatcher, context, handlerType); - Assert.ThrowsAsync(() => sut.ExecuteEventHandlerAsync(dispatcher, eventContext, handlerType)); + Mock.Get(errorFilter).Verify(p => p.HandleErrorAsync(context, error, It.IsAny>())); } } } \ No newline at end of file diff --git a/tests/Tests.Nybus/Policies/NoopErrorPolicyProviderTests.cs b/tests/Tests.Nybus/Policies/NoopErrorPolicyProviderTests.cs deleted file mode 100644 index 44579d4..0000000 --- a/tests/Tests.Nybus/Policies/NoopErrorPolicyProviderTests.cs +++ /dev/null @@ -1,24 +0,0 @@ -using Microsoft.Extensions.Configuration; -using NUnit.Framework; -using Nybus.Policies; - -namespace Tests.Policies -{ - [TestFixture] - public class NoopErrorPolicyProviderTests - { - [Test, CustomAutoMoqData] - public void ProvideName_is_noop(NoopErrorPolicyProvider sut) - { - Assert.That(sut.ProviderName, Is.EqualTo("noop")); - } - - [Test, CustomAutoMoqData] - public void CreatePolicy_returns_NoopErrorPolicy_instance(NoopErrorPolicyProvider sut, IConfigurationSection configurationSection) - { - var policy = sut.CreatePolicy(configurationSection) as NoopErrorPolicy; - - Assert.That(policy, Is.Not.Null); - } - } -} \ No newline at end of file diff --git a/tests/Tests.Nybus/Policies/NoopErrorPolicyTests.cs b/tests/Tests.Nybus/Policies/NoopErrorPolicyTests.cs deleted file mode 100644 index 2a63713..0000000 --- a/tests/Tests.Nybus/Policies/NoopErrorPolicyTests.cs +++ /dev/null @@ -1,29 +0,0 @@ -using System; -using System.Threading.Tasks; -using Moq; -using NUnit.Framework; -using Nybus; -using Nybus.Policies; - -namespace Tests.Policies -{ - [TestFixture] - public class NoopErrorPolicyTests - { - [Test, CustomAutoMoqData] - public async Task HandleError_notifies_engine(NoopErrorPolicy sut, IBusEngine engine, Exception exception, CommandMessage message) - { - await sut.HandleErrorAsync(engine, exception, message); - - Mock.Get(engine).Verify(p => p.NotifyFailAsync(message)); - } - - [Test, CustomAutoMoqData] - public async Task HandleError_notifies_engine(NoopErrorPolicy sut, IBusEngine engine, Exception exception, EventMessage message) - { - await sut.HandleErrorAsync(engine, exception, message); - - Mock.Get(engine).Verify(p => p.NotifyFailAsync(message)); - } - } -} \ No newline at end of file diff --git a/tests/Tests.Nybus/Policies/RetryErrorPolicyProviderTests.cs b/tests/Tests.Nybus/Policies/RetryErrorPolicyProviderTests.cs deleted file mode 100644 index 9477872..0000000 --- a/tests/Tests.Nybus/Policies/RetryErrorPolicyProviderTests.cs +++ /dev/null @@ -1,38 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using Castle.Core.Logging; -using Microsoft.Extensions.Configuration; -using Moq; -using NUnit.Framework; -using Nybus.Policies; - -namespace Tests.Policies -{ - [TestFixture] - public class RetryErrorPolicyProviderTests - { - [Test, CustomAutoMoqData] - public void LoggerFactory_is_required() - { - Assert.Throws(() => new RetryErrorPolicyProvider(null)); - } - - [Test, CustomAutoMoqData] - public void ProviderName_is_retry(RetryErrorPolicyProvider sut) - { - Assert.That(sut.ProviderName, Is.EqualTo("retry")); - } - - [Test, CustomAutoMoqData] - public void CreatePolicy_returns_RetryErrorPolicy_instance(RetryErrorPolicyProvider sut, IConfigurationSection configurationSection, int maxRetries) - { - Mock.Get(configurationSection.GetSection("MaxRetries")).Setup(p => p.Value).Returns(maxRetries.ToString()); - - var policy = sut.CreatePolicy(configurationSection) as RetryErrorPolicy; - - Assert.That(policy, Is.Not.Null); - Assert.That(policy.MaxRetries, Is.EqualTo(maxRetries)); - } - } -} diff --git a/tests/Tests.Nybus/Policies/RetryErrorPolicyTests.cs b/tests/Tests.Nybus/Policies/RetryErrorPolicyTests.cs deleted file mode 100644 index 79b6cf5..0000000 --- a/tests/Tests.Nybus/Policies/RetryErrorPolicyTests.cs +++ /dev/null @@ -1,130 +0,0 @@ -using System; -using System.Threading.Tasks; -using AutoFixture.NUnit3; -using Microsoft.Extensions.Logging; -using Moq; -using NUnit.Framework; -using Nybus; -using Nybus.Policies; -using Nybus.Utils; - -namespace Tests.Policies -{ - [TestFixture] - public class RetryErrorPolicyTests - { - [Test, CustomAutoMoqData] - public void Options_is_required(ILogger logger) - { - Assert.Throws(() => new RetryErrorPolicy(null, logger)); - } - - [Test, CustomAutoMoqData] - public void MaxRetries_cant_be_negative(ILogger logger, int retries) - { - var options = new RetryErrorPolicyOptions { MaxRetries = -retries }; - - Assert.Throws(() => new RetryErrorPolicy(options, logger)); - } - - [Test, CustomAutoMoqData] - public void Logger_is_required(RetryErrorPolicyOptions options) - { - Assert.Throws(() => new RetryErrorPolicy(options, null)); - } - - [Test, CustomAutoMoqData] - public async Task HandleError_notifies_engine_if_retry_count_equal_or_greater_than_maxRetries([Frozen] RetryErrorPolicyOptions options, RetryErrorPolicy sut, IBusEngine engine, Exception error, CommandMessage message) - { - message.Headers[Headers.RetryCount] = options.MaxRetries.Stringfy(); - - await sut.HandleErrorAsync(engine, error, message); - - Mock.Get(engine).Verify(p => p.NotifyFailAsync(message)); - } - - [Test, CustomAutoMoqData] - public async Task HandleError_retries_if_retry_count_less_than_maxRetries([Frozen] RetryErrorPolicyOptions options, RetryErrorPolicy sut, IBusEngine engine, Exception error, CommandMessage message) - { - message.Headers[Headers.RetryCount] = (options.MaxRetries - 2).Stringfy(); - - await sut.HandleErrorAsync(engine, error, message); - - Mock.Get(engine).Verify(p => p.SendCommandAsync(message)); - } - - [Test, CustomAutoMoqData] - public async Task HandleError_increments_retry_count_if_retry_count_present([Frozen] RetryErrorPolicyOptions options, RetryErrorPolicy sut, IBusEngine engine, Exception error, CommandMessage message) - { - message.Headers[Headers.RetryCount] = (options.MaxRetries - 2).Stringfy(); - - await sut.HandleErrorAsync(engine, error, message); - - Assert.That(message.Headers.ContainsKey(Headers.RetryCount)); - Assert.That(message.Headers[Headers.RetryCount], Is.EqualTo((options.MaxRetries - 1).Stringfy())); - } - - [Test, CustomAutoMoqData] - public async Task HandleError_retries_if_retry_count_not_present(RetryErrorPolicy sut, IBusEngine engine, Exception error, CommandMessage message) - { - await sut.HandleErrorAsync(engine, error, message); - - Mock.Get(engine).Verify(p => p.SendCommandAsync(message)); - } - - [Test, CustomAutoMoqData] - public async Task HandleError_adds_retry_count_if_retry_count_not_present(RetryErrorPolicy sut, IBusEngine engine, Exception error, CommandMessage message) - { - await sut.HandleErrorAsync(engine, error, message); - - Assert.That(message.Headers.ContainsKey(Headers.RetryCount)); - } - - [Test, CustomAutoMoqData] - public async Task HandleError_notifies_engine_if_retry_count_equal_or_greater_than_maxRetries([Frozen] RetryErrorPolicyOptions options, RetryErrorPolicy sut, IBusEngine engine, Exception error, EventMessage message) - { - message.Headers[Headers.RetryCount] = options.MaxRetries.Stringfy(); - - await sut.HandleErrorAsync(engine, error, message); - - Mock.Get(engine).Verify(p => p.NotifyFailAsync(message)); - } - - [Test, CustomAutoMoqData] - public async Task HandleError_retries_if_retry_count_less_than_maxRetries([Frozen] RetryErrorPolicyOptions options, RetryErrorPolicy sut, IBusEngine engine, Exception error, EventMessage message) - { - message.Headers[Headers.RetryCount] = (options.MaxRetries - 2).Stringfy(); - - await sut.HandleErrorAsync(engine, error, message); - - Mock.Get(engine).Verify(p => p.SendEventAsync(message)); - } - - [Test, CustomAutoMoqData] - public async Task HandleError_retries_if_retry_count_not_present(RetryErrorPolicy sut, IBusEngine engine, Exception error, EventMessage message) - { - await sut.HandleErrorAsync(engine, error, message); - - Mock.Get(engine).Verify(p => p.SendEventAsync(message)); - } - - [Test, CustomAutoMoqData] - public async Task HandleError_adds_retry_count_if_retry_count_not_present(RetryErrorPolicy sut, IBusEngine engine, Exception error, EventMessage message) - { - await sut.HandleErrorAsync(engine, error, message); - - Assert.That(message.Headers.ContainsKey(Headers.RetryCount)); - } - - [Test, CustomAutoMoqData] - public async Task HandleError_increments_retry_count_if_retry_count_present([Frozen] RetryErrorPolicyOptions options, RetryErrorPolicy sut, IBusEngine engine, Exception error, EventMessage message) - { - message.Headers[Headers.RetryCount] = (options.MaxRetries - 2).Stringfy(); - - await sut.HandleErrorAsync(engine, error, message); - - Assert.That(message.Headers.ContainsKey(Headers.RetryCount)); - Assert.That(message.Headers[Headers.RetryCount], Is.EqualTo((options.MaxRetries - 1).Stringfy())); - } - } -} \ No newline at end of file diff --git a/tests/Tests.Nybus/ServiceCollectionExtensionsTests.cs b/tests/Tests.Nybus/ServiceCollectionExtensionsTests.cs index 091dc84..409a038 100644 --- a/tests/Tests.Nybus/ServiceCollectionExtensionsTests.cs +++ b/tests/Tests.Nybus/ServiceCollectionExtensionsTests.cs @@ -38,7 +38,7 @@ public void AddNybus_invokes_configuratorDelegate(IServiceCollection services) [Test] [InlineAutoMoqData(typeof(NybusHostBuilder))] - [InlineAutoMoqData(typeof(INybusConfiguration))] + [InlineAutoMoqData(typeof(NybusConfiguration))] [InlineAutoMoqData(typeof(NybusHost))] [InlineAutoMoqData(typeof(IBusHost))] [InlineAutoMoqData(typeof(IBus))] diff --git a/tests/Tests.Nybus/SetupTests.cs b/tests/Tests.Nybus/SetupTests.cs new file mode 100644 index 0000000..dfdbd41 --- /dev/null +++ b/tests/Tests.Nybus/SetupTests.cs @@ -0,0 +1,70 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Moq; +using NUnit.Framework; +using Nybus; +using Nybus.Configuration; + +namespace Tests +{ + [TestFixture] + public class SetupTests + { + [Test, AutoMoqData] + public void UseBusEngine_is_required(ILoggerFactory loggerFactory) + { + var services = new ServiceCollection(); + services.AddSingleton(loggerFactory); + + services.AddNybus(nybus => + { + + }); + + var serviceProvider = services.BuildServiceProvider(); + + Assert.Throws(() => serviceProvider.GetRequiredService()); + } + + [Test, AutoMoqData] + public void Logging_is_required() + { + var services = new ServiceCollection(); + + services.AddNybus(nybus => + { + nybus.UseBusEngine(); + }); + + var serviceProvider = services.BuildServiceProvider(); + + Assert.Throws(() => serviceProvider.GetRequiredService()); + } + + [Test, AutoMoqData] + public void Configuration_delegate_is_invoked_when_assembling_the_host(ILoggerFactory loggerFactory) + { + var configurationDelegate = Mock.Of>(); + + var services = new ServiceCollection(); + services.AddSingleton(loggerFactory); + + services.AddNybus(nybus => + { + nybus.UseBusEngine(); + + nybus.Configure(configurationDelegate); + }); + + var serviceProvider = services.BuildServiceProvider(); + + var host = serviceProvider.GetRequiredService(); + + Mock.Get(configurationDelegate).Verify(p => p(It.IsAny())); + } + } +}