Skip to content

Commit

Permalink
Fix error filters (#88)
Browse files Browse the repository at this point in the history
- `DiscardErrorFilter` is now the default strategy. Faulty messages will be removed from the queue unless a different strategy is selected.
- Fixed `RetryErrorFilter` to better handle faulty messages past the number of retries.
- Fixed `RabbitMqBusEngine` to better handle RabbitMq specific headers.
- Fixed `RabbitMqBusEngine` to better handle nacked messages.
- Added new sample with error filters configuration
  • Loading branch information
Kralizek committed Jun 10, 2019
1 parent b610f2b commit f1c3945
Show file tree
Hide file tree
Showing 12 changed files with 548 additions and 382 deletions.
748 changes: 383 additions & 365 deletions Nybus.sln

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions samples/extras/ErrorFilters/ErrorFilters.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<LangVersion>latest</LangVersion>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="McMaster.Extensions.CommandLineUtils" Version="2.2.5" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="2.2.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\Nybus\Nybus.csproj" />
<ProjectReference Include="..\..\..\src\engines\Nybus.Engine.RabbitMq\Nybus.Engine.RabbitMq.csproj" />
<ProjectReference Include="..\..\RabbitMQ\Types\Types.csproj" />
</ItemGroup>

</Project>
102 changes: 102 additions & 0 deletions samples/extras/ErrorFilters/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using McMaster.Extensions.CommandLineUtils;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Nybus;
using Types;

namespace ErrorFilters
{
class Program
{
static async Task Main(string[] args) => await CommandLineApplication.ExecuteAsync<Program>();

[Option(CommandOptionType.SingleValue, LongName = "hostName")]
public string HostName { get; set; } = "localhost";

[Option(CommandOptionType.SingleValue, LongName = "username", ShortName = "u")]
public string Username { get; set; } = "guest";

[Option(CommandOptionType.SingleValue, LongName = "password", ShortName = "p")]
public string Password { get; set; } = "guest";

[Option(CommandOptionType.SingleValue, LongName = "virtualHost", ShortName = "v")]
public string VirtualHost { get; set; } = "/";

[Option(CommandOptionType.SingleValue, LongName = "retries", ShortName = "r")]
public int Retries { get; set; } = 5;

public async Task OnExecuteAsync()
{
var settings = new Dictionary<string, string>
{
["Nybus:RabbitMq:Connection:HostName"] = HostName,
["Nybus:RabbitMq:Connection:Username"] = Username,
["Nybus:RabbitMq:Connection:Password"] = Password,
["Nybus:RabbitMq:Connection:VirtualHost"] = VirtualHost,
["Nybus:CommandErrorFilters:0:type"] = "retry",
["Nybus:CommandErrorFilters:0:maxRetries"] = Retries.ToString(),
["Nybus:EventErrorFilters:0:type"] = "retry",
["Nybus:EventErrorFilters:0:maxRetries"] = Retries.ToString(),
};

var configurationBuilder = new ConfigurationBuilder();
configurationBuilder.AddInMemoryCollection(settings);

var configuration = configurationBuilder.Build();

var services = new ServiceCollection();
services.AddLogging(builder => builder.AddConsole().SetMinimumLevel(LogLevel.Trace));

services.AddNybus(nybus =>
{
nybus.UseConfiguration(configuration, "Nybus");
nybus.UseRabbitMqBusEngine(rabbitMq =>
{
rabbitMq.UseConfiguration("RabbitMq");
});
nybus.SubscribeToCommand<TestCommand>(async (dispatcher, context) =>
{
Console.WriteLine($"Processed command {context.Command.Message}");
await dispatcher.RaiseEventAsync(new TestCommandReceived { Message = $@"Received message ""{context.Command.Message}""" });
await Task.Delay(TimeSpan.FromMilliseconds(100));
throw new Exception("Hello world");
});
nybus.SubscribeToEvent<TestEvent>((d, msg) =>
{
Console.WriteLine($"Processed event {msg.Event.Message}");
throw new Exception("Hello world");
});
});

var serviceProvider = services.BuildServiceProvider();

var host = serviceProvider.GetRequiredService<NybusHost>();

try
{
await host.StartAsync();

Console.WriteLine("Press <ENTER> to exit");
Console.ReadLine();

await host.StopAsync();
}
catch (Exception ex)
{
Console.WriteLine(ex);
}

Console.WriteLine("Hello World!");
}
}
}
2 changes: 1 addition & 1 deletion src/Nybus/Configuration/INybusHostConfigurationFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class NybusHostConfigurationFactory : INybusHostConfigurationFactory
private readonly IErrorFilter _fallbackErrorFilter;
private readonly IReadOnlyDictionary<string, IErrorFilterProvider> _errorFilterProvidersByName;

public NybusHostConfigurationFactory(IEnumerable<IErrorFilterProvider> errorFilterProviders, FallbackErrorFilter fallbackErrorFilter)
public NybusHostConfigurationFactory(IEnumerable<IErrorFilterProvider> errorFilterProviders, DiscardErrorFilter fallbackErrorFilter)
{
_fallbackErrorFilter = fallbackErrorFilter ?? throw new ArgumentNullException(nameof(fallbackErrorFilter));
_errorFilterProvidersByName = CreateDictionary(errorFilterProviders ?? throw new ArgumentNullException(nameof(errorFilterProviders)));
Expand Down
8 changes: 6 additions & 2 deletions src/Nybus/Filters/RetryErrorFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@ public async Task HandleErrorAsync<TCommand>(ICommandContext<TCommand> context,

message.Headers[Headers.RetryCount] = retryCount.Stringfy();

await _engine.NotifySuccessAsync(message).ConfigureAwait(false);

await _engine.SendMessageAsync(message).ConfigureAwait(false);
}
else
{
_logger.LogTrace($"Error {retryCount}/{_options.MaxRetries}: will not retry");

await _engine.NotifyFailAsync(message).ConfigureAwait(false);
//await _engine.NotifyFailAsync(message).ConfigureAwait(false);

await next(context, exception).ConfigureAwait(false);
}
Expand All @@ -89,13 +91,15 @@ public async Task HandleErrorAsync<TEvent>(IEventContext<TEvent> context, Except

message.Headers[Headers.RetryCount] = retryCount.Stringfy();

await _engine.NotifySuccessAsync(message).ConfigureAwait(false);

await _engine.SendMessageAsync(message).ConfigureAwait(false);
}
else
{
_logger.LogTrace($"Error {retryCount}/{_options.MaxRetries}: will not retry");

await _engine.NotifyFailAsync(message).ConfigureAwait(false);
//await _engine.NotifyFailAsync(message).ConfigureAwait(false);

await next(context, exception).ConfigureAwait(false);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Nybus/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public static IServiceCollection AddNybus(this IServiceCollection services, Acti
configurator.RegisterErrorFilterProvider<RetryErrorFilterProvider>();
configurator.RegisterErrorFilterProvider<DiscardErrorFilterProvider>();

services.AddSingleton<FallbackErrorFilter>();
services.AddSingleton<DiscardErrorFilter>();

configure(configurator);

Expand Down
19 changes: 16 additions & 3 deletions src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,21 @@ public Task SendMessageAsync(Message message)

foreach (var header in message.Headers)
{
var headerKey = Headers.IsNybus(header.Key) ? Nybus(header.Key) : Custom(header.Key);
properties.Headers.Add(headerKey, header.Value);
if (Headers.IsNybus(header.Key))
{
properties.Headers.Add(Nybus(header.Key), header.Value);
}
else if (RabbitMqHeaders.IsRabbitMq(header.Key))
{
properties.Headers.Add(header.Key, header.Value);
}
else
{
properties.Headers.Add(Custom(header.Key), header.Value);
}

//var headerKey = Headers.IsNybus(header.Key) ? Nybus(header.Key) : Custom(header.Key);
//properties.Headers.Add(headerKey, header.Value);
}

var exchangeName = MessageDescriptor.CreateFromType(type);
Expand Down Expand Up @@ -299,7 +312,7 @@ public Task NotifyFailAsync(Message message)

private void NackMessage(ulong deliveryTag)
{
_channel.BasicNack(deliveryTag, false, true);
_channel.BasicNack(deliveryTag, false, false);
_processingMessages.TryRemoveItem(deliveryTag);
}

Expand Down
2 changes: 2 additions & 0 deletions src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqHeaders.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@ public static class RabbitMqHeaders
{
public static readonly string MessageId = "RabbitMq:MessageId";
public static readonly string DeliveryTag = "RabbitMq:DeliveryTag";

public static bool IsRabbitMq(string key) => key.StartsWith("RabbitMq");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ public async Task NotifyFail_nacks_command_messages([Frozen] IRabbitMqConfigurat

await sut.NotifyFailAsync(incomingMessages.First());

Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicNack(deliveryTag, It.IsAny<bool>(), true));
Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicNack(deliveryTag, It.IsAny<bool>(), false));
}

[Test, CustomAutoMoqData]
Expand Down
12 changes: 8 additions & 4 deletions tests/Tests.Nybus/Filters/RetryErrorFilterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ public void MaxRetries_cant_be_negative(IBusEngine engine, ILogger<RetryErrorFil
}

[Test, CustomAutoMoqData]
public async Task HandleError_notifies_engine_if_retry_count_equal_or_greater_than_maxRetries([Frozen] RetryErrorFilterOptions options, [Frozen] IBusEngine engine, RetryErrorFilter sut, Exception error, NybusCommandContext<FirstTestCommand> context, CommandErrorDelegate<FirstTestCommand> next)
public async Task HandleError_forwards_to_next_if_retry_count_equal_or_greater_than_maxRetries([Frozen] RetryErrorFilterOptions options, [Frozen] IBusEngine engine, RetryErrorFilter sut, Exception error, NybusCommandContext<FirstTestCommand> context, CommandErrorDelegate<FirstTestCommand> next)
{
context.Message.Headers[Headers.RetryCount] = options.MaxRetries.Stringfy();

await sut.HandleErrorAsync(context, error, next);

Mock.Get(engine).Verify(p => p.NotifyFailAsync(context.Message));
Mock.Get(engine).VerifyNoOtherCalls();

Mock.Get(next).Verify(p => p(context, error));
}

[Test, CustomAutoMoqData]
Expand Down Expand Up @@ -85,13 +87,15 @@ public async Task HandleError_adds_retry_count_if_retry_count_not_present([Froze
}

[Test, CustomAutoMoqData]
public async Task HandleError_notifies_engine_if_retry_count_equal_or_greater_than_maxRetries([Frozen] RetryErrorFilterOptions options, [Frozen] IBusEngine engine, RetryErrorFilter sut, Exception error, NybusEventContext<FirstTestEvent> context, EventErrorDelegate<FirstTestEvent> next)
public async Task HandleError_forwards_to_next_if_retry_count_equal_or_greater_than_maxRetries([Frozen] RetryErrorFilterOptions options, [Frozen] IBusEngine engine, RetryErrorFilter sut, Exception error, NybusEventContext<FirstTestEvent> context, EventErrorDelegate<FirstTestEvent> next)
{
context.Message.Headers[Headers.RetryCount] = options.MaxRetries.Stringfy();

await sut.HandleErrorAsync(context, error, next);

Mock.Get(engine).Verify(p => p.NotifyFailAsync(context.Message));
Mock.Get(engine).VerifyNoOtherCalls();

Mock.Get(next).Verify(p => p(context, error));
}

[Test, CustomAutoMoqData]
Expand Down
8 changes: 4 additions & 4 deletions tests/Tests.Nybus/SetupTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ namespace Tests
public class SetupTests
{
[Test, AutoMoqData]
public void UseBusEngine_is_required(ILoggerFactory loggerFactory)
public void UseBusEngine_is_required()
{
var services = new ServiceCollection();
services.AddSingleton(loggerFactory);
services.AddLogging();

services.AddNybus(nybus =>
{
Expand Down Expand Up @@ -46,10 +46,10 @@ public void Logging_is_required()
}

[Test, AutoMoqData]
public void Configuration_delegate_is_invoked_when_assembling_the_host(ILoggerFactory loggerFactory, Action<INybusConfiguration> configurationDelegate)
public void Configuration_delegate_is_invoked_when_assembling_the_host(Action<INybusConfiguration> configurationDelegate)
{
var services = new ServiceCollection();
services.AddSingleton(loggerFactory);
services.AddLogging();

services.AddNybus(nybus =>
{
Expand Down
3 changes: 2 additions & 1 deletion tests/Tests.Nybus/Tests.Nybus.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netcoreapp2.2;net472</TargetFrameworks>
Expand All @@ -10,6 +10,7 @@
<PackageReference Include="nunit" Version="3.11.0" />
<PackageReference Include="NUnit3TestAdapter" Version="3.11.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.0" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)'=='net472' ">
Expand Down

0 comments on commit f1c3945

Please sign in to comment.