Skip to content

Commit

Permalink
Dead letter queue support (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
elindanielsson committed Jan 20, 2022
1 parent 76d3ea1 commit acdc486
Show file tree
Hide file tree
Showing 22 changed files with 876 additions and 31 deletions.
2 changes: 2 additions & 0 deletions samples/extras/ErrorFilters/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ public async Task OnExecuteAsync()
["Nybus:RabbitMq:Connection:VirtualHost"] = VirtualHost,
["Nybus:CommandErrorFilters:0:type"] = "retry",
["Nybus:CommandErrorFilters:0:maxRetries"] = Retries.ToString(),
["Nybus:CommandErrorFilters:1:type"] = "dead-letter-queue",
["Nybus:EventErrorFilters:0:type"] = "retry",
["Nybus:EventErrorFilters:0:maxRetries"] = Retries.ToString(),
["Nybus:EventErrorFilters:1:type"] = "dead-letter-queue",
};

var configurationBuilder = new ConfigurationBuilder();
Expand Down
2 changes: 2 additions & 0 deletions src/Nybus.Abstractions/IBusEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ public interface IBusEngine
Task NotifySuccessAsync(Message message);

Task NotifyFailAsync(Message message);

Task SendMessageToErrorQueueAsync(Message message);
}
}
103 changes: 103 additions & 0 deletions src/Nybus/Filters/DeadLetterQueueErrorFilter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
using System;
using System.Diagnostics;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Nybus.Filters
{
public class DeadLetterQueueErrorFilterProvider : IErrorFilterProvider
{
private readonly IServiceProvider _serviceProvider;

public DeadLetterQueueErrorFilterProvider(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}

public string ProviderName { get; } = "dead-letter-queue";

public IErrorFilter CreateErrorFilter(IConfigurationSection settings)
{
var engine = _serviceProvider.GetRequiredService<IBusEngine>();
var logger = _serviceProvider.GetRequiredService<ILogger<DeadLetterQueueErrorFilter>>();

return new DeadLetterQueueErrorFilter(logger, engine);
}
}

public class DeadLetterQueueErrorFilter : IErrorFilter
{
private readonly ILogger<DeadLetterQueueErrorFilter> _logger;
private readonly IBusEngine _engine;

public DeadLetterQueueErrorFilter(ILogger<DeadLetterQueueErrorFilter> logger, IBusEngine engine)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_engine = engine ?? throw new ArgumentNullException(nameof(engine));
}

public async Task HandleErrorAsync<TCommand>(ICommandContext<TCommand> context, Exception exception, CommandErrorDelegate<TCommand> next) where TCommand : class, ICommand
{
try
{
_logger.LogTrace($"Sending command to dead letter queue");

await SendToErrorQueue(context.Message, exception);
}
catch (Exception dlqException)
{
_logger.LogError(new { }, dlqException, (s, ex) => $"Unable to send message to DLQ: {ex.Message}");
await next(context, exception).ConfigureAwait(false);
}
}

public async Task HandleErrorAsync<TEvent>(IEventContext<TEvent> context, Exception exception, EventErrorDelegate<TEvent> next) where TEvent : class, IEvent
{
try
{
_logger.LogTrace($"Sending event to dead letter queue");

await SendToErrorQueue(context.Message, exception);
}
catch (Exception dlqException)
{
_logger.LogError(new { }, dlqException, (s, ex) => $"Unable to send message to DLQ: {ex.Message}");
await next(context, exception).ConfigureAwait(false);
}
}

private async Task SendToErrorQueue(Message message, Exception exception)
{
if (exception != null)
{
message.Headers[DeadLetterQueueHeaders.FaultMessage] = exception.Message;
message.Headers[DeadLetterQueueHeaders.FaultStackTrace] = exception.StackTrace;
}

message.Headers[DeadLetterQueueHeaders.ErrorHost] = Environment.MachineName;
message.Headers[DeadLetterQueueHeaders.ErrorProcess] = Process.GetCurrentProcess().ProcessName;

var entryAssembly = Assembly.GetEntryAssembly();
if (entryAssembly != null)
{
message.Headers[DeadLetterQueueHeaders.ErrorAssembly] = entryAssembly.GetName().Name;
}

await _engine.NotifySuccessAsync(message).ConfigureAwait(false);

await _engine.SendMessageToErrorQueueAsync(message).ConfigureAwait(false);
}
}

public static class DeadLetterQueueHeaders
{
public static readonly string FaultMessage = "DLQ-Fault-Message";
public static readonly string FaultStackTrace = "DLQ-Fault-StackTrace";
public static readonly string ErrorHost = "DLQ-Error-Host";
public static readonly string ErrorProcess = "DLQ-Error-Process";
public static readonly string ErrorAssembly = "DLQ-Error-Assembly";
}
}
4 changes: 2 additions & 2 deletions src/Nybus/Filters/DiscardErrorFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public async Task HandleErrorAsync<TCommand>(ICommandContext<TCommand> context,
}
catch (Exception discardException)
{
_logger.LogError(discardException, ex => $"Unable to discard message: {ex.Message}");
_logger.LogError(new { }, discardException, (s, ex) => $"Unable to discard message: {ex.Message}");
await next(context, exception).ConfigureAwait(false);
}
}
Expand All @@ -62,7 +62,7 @@ public async Task HandleErrorAsync<TEvent>(IEventContext<TEvent> context, Except
}
catch (Exception discardException)
{
_logger.LogError(discardException, ex => $"Unable to discard message: {ex.Message}");
_logger.LogError(new { }, discardException, (s, ex) => $"Unable to discard message: {ex.Message}");
await next(context, exception).ConfigureAwait(false);
}
}
Expand Down
1 change: 1 addition & 0 deletions src/Nybus/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public static IServiceCollection AddNybus(this IServiceCollection services, Acti

configurator.RegisterErrorFilterProvider<RetryErrorFilterProvider>();
configurator.RegisterErrorFilterProvider<DiscardErrorFilterProvider>();
configurator.RegisterErrorFilterProvider<DeadLetterQueueErrorFilterProvider>();

services.AddSingleton<DiscardErrorFilter>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ public Task NotifyFailAsync(Message message)

public bool IsTypeAccepted(Type type) => _acceptedTypes.Contains(type);

public Task SendMessageToErrorQueueAsync(Message message)
{
return Task.CompletedTask;
}

public event EventHandler<MessageEventArgs> OnMessageNotifySuccess;

public event EventHandler<MessageEventArgs> OnMessageNotifyFail;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class RabbitMqOptions

public IConfigurationSection EventQueue { get; set; }

public IConfigurationSection ErrorQueue { get; set; }

public ExchangeOptions CommandExchange { get; set; }

public ExchangeOptions EventExchange { get; set; }
Expand Down Expand Up @@ -65,6 +67,7 @@ public IRabbitMqConfiguration Create(RabbitMqOptions options)
var outboundEncoding = GetOutboundEncoding();
var commandQueueFactory = GetQueueFactory(options.CommandQueue);
var eventQueueFactory = GetQueueFactory(options.EventQueue);
var errorQueueFactory = GetQueueFactory(options.ErrorQueue);
var connectionFactory = GetConnectionFactory();
var commandExchangeManager = GetExchangeManager(options.CommandExchange);
var eventExchangeManager = GetExchangeManager(options.EventExchange);
Expand All @@ -75,6 +78,7 @@ public IRabbitMqConfiguration Create(RabbitMqOptions options)
OutboundEncoding = outboundEncoding,
CommandQueueFactory = commandQueueFactory,
EventQueueFactory = eventQueueFactory,
ErrorQueueFactory = errorQueueFactory,
ConnectionFactory = connectionFactory,
CommandExchangeManager = commandExchangeManager,
EventExchangeManager = eventExchangeManager,
Expand Down
19 changes: 19 additions & 0 deletions src/engines/Nybus.Engine.RabbitMq/Configuration/IQueueFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,23 @@ public QueueDeclareOk CreateQueue(IModel model)
public static readonly IQueueFactory Instance = new TemporaryQueueFactory();
}

public class PrefixedTemporaryQueueFactory : IQueueFactory
{
public string Prefix { get; }

public PrefixedTemporaryQueueFactory(string prefix)
{
Prefix = prefix ?? throw new ArgumentNullException(nameof(prefix));
}

public QueueDeclareOk CreateQueue(IModel model)
{
if (model == null)
{
throw new ArgumentNullException(nameof(model));
}

return model.QueueDeclare($"{Prefix}-{Guid.NewGuid():N}", durable: true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,20 @@ public class TemporaryQueueFactoryProvider : IQueueFactoryProvider

public IQueueFactory CreateFactory(IConfigurationSection settings) => TemporaryQueueFactory.Instance;
}

public class PrefixedTemporaryQueueFactoryProvider : IQueueFactoryProvider
{
public string ProviderName { get; } = "prefix";

public IQueueFactory CreateFactory(IConfigurationSection settings)
{
if (settings.TryGetValue("Prefix", out var prefix))
{
return new PrefixedTemporaryQueueFactory(prefix);
}

// ReSharper disable once NotResolvedInText
throw new ArgumentNullException("Prefix", "Prefix setting is required");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ public interface IRabbitMqConfiguration

IQueueFactory EventQueueFactory { get; set; }

IQueueFactory ErrorQueueFactory { get; set; }

Encoding OutboundEncoding { get; set; }

ISerializer Serializer { get; set; }
Expand All @@ -32,6 +34,8 @@ public class RabbitMqConfiguration : IRabbitMqConfiguration

public IQueueFactory EventQueueFactory { get; set; }

public IQueueFactory ErrorQueueFactory { get; set; }

public Encoding OutboundEncoding { get; set; }

public ISerializer Serializer { get; set; } = new JsonSerializer();
Expand Down
68 changes: 44 additions & 24 deletions src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -214,30 +214,7 @@ public Task SendMessageAsync(Message message)

var body = _configuration.Serializer.SerializeObject(message.Item, _configuration.OutboundEncoding);

var properties = _channel.CreateBasicProperties();
properties.ContentEncoding = _configuration.OutboundEncoding.WebName;

properties.Headers = new Dictionary<string, object>
{
[Nybus(Headers.MessageId)] = message.MessageId,
[Nybus(Headers.MessageType)] = message.Descriptor.ToString()
};

foreach (var header in message.Headers)
{
if (Headers.IsNybus(header.Key))
{
properties.Headers.Add(Nybus(header.Key), header.Value);
}
else if (RabbitMqHeaders.IsRabbitMq(header.Key))
{
properties.Headers.Add(header.Key, header.Value);
}
else
{
properties.Headers.Add(Custom(header.Key), header.Value);
}
}
var properties = GetBasicProperties(message);

var exchangeName = MessageDescriptor.CreateFromType(type);

Expand Down Expand Up @@ -316,6 +293,49 @@ public Task NotifyFailAsync(Message message)
return Task.CompletedTask;
}

public Task SendMessageToErrorQueueAsync(Message message)
{
var errorQueue = _configuration.ErrorQueueFactory.CreateQueue(_channel);

var body = _configuration.Serializer.SerializeObject(message.Item, _configuration.OutboundEncoding);

var properties = GetBasicProperties(message);

_channel.BasicPublish(exchange: string.Empty, routingKey: errorQueue.QueueName, body: body, basicProperties: properties);

return Task.CompletedTask;
}

private IBasicProperties GetBasicProperties(Message message)
{
var properties = _channel.CreateBasicProperties();
properties.ContentEncoding = _configuration.OutboundEncoding.WebName;

properties.Headers = new Dictionary<string, object>
{
[Nybus(Headers.MessageId)] = message.MessageId,
[Nybus(Headers.MessageType)] = message.Descriptor.ToString()
};

foreach (var header in message.Headers)
{
if (Headers.IsNybus(header.Key))
{
properties.Headers.Add(Nybus(header.Key), header.Value);
}
else if (RabbitMqHeaders.IsRabbitMq(header.Key))
{
properties.Headers.Add(header.Key, header.Value);
}
else
{
properties.Headers.Add(Custom(header.Key), header.Value);
}
}

return properties;
}

private void NackMessage(ulong deliveryTag)
{
_channel.BasicNack(deliveryTag, false, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public static void UseRabbitMqBusEngine(this INybusConfigurator nybus, Action<IR

configurator.RegisterQueueFactoryProvider<TemporaryQueueFactoryProvider>();

configurator.RegisterQueueFactoryProvider<PrefixedTemporaryQueueFactoryProvider>();

configure?.Invoke(configurator);

configurator.Apply(nybus);
Expand Down
10 changes: 5 additions & 5 deletions tests/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="AutoFixture" Version="4.7.0" />
<PackageReference Include="AutoFixture.NUnit3" Version="4.7.0" />
<PackageReference Include="AutoFixture.AutoMoq" Version="4.7.0" />
<PackageReference Include="AutoFixture.Idioms" Version="4.7.0" />
<PackageReference Include="Moq" Version="4.10.1" />
<PackageReference Include="AutoFixture" Version="4.15.0" />
<PackageReference Include="AutoFixture.NUnit3" Version="4.15.0" />
<PackageReference Include="AutoFixture.AutoMoq" Version="4.15.0" />
<PackageReference Include="AutoFixture.Idioms" Version="4.15.0" />
<PackageReference Include="Moq" Version="4.16.1" />
</ItemGroup>

</Project>
5 changes: 5 additions & 0 deletions tests/TestUtils/TestBusEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public Task NotifyFailAsync(Message message)
{
throw new NotImplementedException();
}

public Task SendMessageToErrorQueueAsync(Message message)
{
throw new NotImplementedException();
}
}

}
14 changes: 14 additions & 0 deletions tests/TestUtils/TestException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;

namespace Tests
{
public class TestException : Exception
{
public TestException(string message, string stackTrace) : base(message)
{
StackTrace = stackTrace;
}

public override string StackTrace { get; }
}
}

7 comments on commit acdc486

@Kralizek
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elindanielsson could you check the failed build?

@elindanielsson
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what happened there, looks like the latest build is green @Kralizek

@Kralizek
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. It's some kind of flaky test.

@elindanielsson
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kralizek I don't know if you saw, but looks like there was an error creating the release: https://ci.appveyor.com/project/Kralizek/nybus/builds/42275426

@Kralizek
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give a look at it tomorrow

@elindanielsson
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Kralizek , have you had a chance to look at the build error when creating the release?

@Kralizek
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed the 1.6.0 packages manually

Please sign in to comment.