Skip to content

Commit

Permalink
RabbitMQ headers overridden when also passed as header (#89)
Browse files Browse the repository at this point in the history
Delivery tag is not accepted by RabbitMQ and the retried message will be "stuck" and the consumer will not process any more messages in the queue. Solved by making sure to not set RabbitMq headers when it's already set
  • Loading branch information
simgu authored and Kralizek committed Jun 28, 2019
1 parent 291d103 commit 836ad6e
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ Message GetMessage(BasicDeliverEventArgs args)

message.Headers.Add(headerKey, value);
}
else if (header.Key.StartsWith("RabbitMq:"))
else if (header.Key.StartsWith("RabbitMq:") && !message.Headers.ContainsKey(header.Key))
{
var value = args.BasicProperties.GetHeader(header.Key, encoding);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -952,5 +952,45 @@ public async Task RabbitMq_headers_are_read_from_incoming_message([Frozen] IRabb
Assert.That(message.Headers, Contains.Key($"RabbitMq:{headerKey}"));
Assert.That(message.Headers[$"RabbitMq:{headerKey}"], Is.EqualTo(headerValue));
}

[Test, AutoMoqData]
[Description("https://github.com/Nybus-project/Nybus/issues/90")]
public async Task Issue90([Frozen] ISerializer serializer, [Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, string consumerTag, ulong headerDeliveryTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, string messageId, Guid correlationId, DateTimeOffset sentOn, FirstTestCommand testCommand)
{
Mock.Get(serializer).Setup(p => p.DeserializeObject(It.IsAny<byte[]>(), It.IsAny<Type>(), It.IsAny<Encoding>())).Returns(testCommand);

sut.SubscribeToCommand<FirstTestCommand>();

var sequence = await sut.StartAsync();

var encoding = Encoding.UTF8;

IBasicProperties properties = new BasicProperties
{
MessageId = messageId,
ContentEncoding = encoding.WebName,
Headers = new Dictionary<string, object>
{
["Nybus:MessageId"] = encoding.GetBytes(messageId),
["Nybus:MessageType"] = encoding.GetBytes(DescriptorName(testCommand.GetType())),
["Nybus:CorrelationId"] = correlationId.ToByteArray(),
["RabbitMq:DeliveryTag"] = headerDeliveryTag
}
};

var body = configuration.Serializer.SerializeObject(testCommand, encoding);

var incomingMessages = sequence.DumpInList();

sut.Consumers.First().Value.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);

Assert.That(incomingMessages, Has.Exactly(1).InstanceOf<CommandMessage<FirstTestCommand>>());

var message = incomingMessages[0] as CommandMessage<FirstTestCommand>;

Assert.That(message.Headers, Contains.Key("RabbitMq:DeliveryTag"));
Assert.That(message.Headers["RabbitMq:DeliveryTag"], Is.EqualTo(deliveryTag.ToString()));
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Moq;
using NUnit.Framework;
using Nybus;
using Nybus.Filters;
using Nybus.Utils;
using RabbitMQ.Client;

namespace Tests.External
{
[ExternalTestFixture]
public class IssueTests
{
[TearDown]
public void OnTestComplete()
{
var connectionFactory = new ConnectionFactory();
var connection = connectionFactory.CreateConnection();
var model = connection.CreateModel();

model.ExchangeDelete(ExchangeName(typeof(FirstTestCommand)));
model.ExchangeDelete(ExchangeName(typeof(SecondTestCommand)));
model.ExchangeDelete(ExchangeName(typeof(ThirdTestCommand)));
model.ExchangeDelete(ExchangeName(typeof(FirstTestEvent)));
model.ExchangeDelete(ExchangeName(typeof(SecondTestEvent)));
model.ExchangeDelete(ExchangeName(typeof(ThirdTestEvent)));

connection.Close();
}

private string ExchangeName(Type type) => $"{type.Namespace}:{type.Name}";

[Test, AutoMoqData]
[Description("https://github.com/Nybus-project/Nybus/issues/90")]
public async Task Issue90(CommandReceivedAsync<FirstTestCommand> commandReceived, Exception exception, FirstTestCommand testCommand)
{
const int maxRetries = 5;

Mock.Get(commandReceived)
.Setup(p => p(It.IsAny<IDispatcher>(), It.IsAny<ICommandContext<FirstTestCommand>>()))
.Throws(exception);

var builder = new ConfigurationBuilder();
builder.AddInMemoryCollection(new Dictionary<string, string>
{
["Nybus:CommandErrorFilters:0:type"] = "retry",
["Nybus:CommandErrorFilters:0:maxRetries"] = maxRetries.Stringfy()
});

var configuration = builder.Build();

var host = TestUtils.CreateNybusHost(nybus =>
{
nybus.SubscribeToCommand(commandReceived);
nybus.UseRabbitMqBusEngine(rabbitMq =>
{
rabbitMq.Configure(c => c.ConnectionFactory = new ConnectionFactory());
});
nybus.UseConfiguration(configuration);
});

await host.StartAsync();

await host.Bus.InvokeCommandAsync(testCommand);

await Task.Delay(TimeSpan.FromMilliseconds(50));

await host.StopAsync();

Mock.Get(commandReceived).Verify(p => p(It.IsAny<IDispatcher>(), It.IsAny<ICommandContext<FirstTestCommand>>()), Times.Exactly(maxRetries));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using FakeRabbitMQ;
using Microsoft.Extensions.Configuration;
using Moq;
using NUnit.Framework;
using Nybus;
using Nybus.Utils;
using static Tests.TestUtils;

namespace Tests
{
[TestFixture]
public class IssueTests
{
[Test, AutoMoqData]
[Description("https://github.com/Nybus-project/Nybus/issues/90")]
public async Task Issue90(FakeServer server, CommandReceivedAsync<FirstTestCommand> commandReceived, Exception exception, FirstTestCommand testCommand)
{
const int maxRetries = 5;

Mock.Get(commandReceived)
.Setup(p => p(It.IsAny<IDispatcher>(), It.IsAny<ICommandContext<FirstTestCommand>>()))
.Throws(exception);

var builder = new ConfigurationBuilder();
builder.AddInMemoryCollection(new Dictionary<string, string>
{
["Nybus:CommandErrorFilters:0:type"] = "retry",
["Nybus:CommandErrorFilters:0:maxRetries"] = maxRetries.Stringfy()
});

var configuration = builder.Build();

var host = CreateNybusHost(nybus =>
{
nybus.SubscribeToCommand(commandReceived);
nybus.UseRabbitMqBusEngine(rabbitMq =>
{
rabbitMq.Configure(c => c.ConnectionFactory = server.CreateConnectionFactory());
});
nybus.UseConfiguration(configuration);
});

await host.StartAsync();

await host.Bus.InvokeCommandAsync(testCommand);

await Task.Delay(TimeSpan.FromMilliseconds(50));

await host.StopAsync();

Mock.Get(commandReceived).Verify(p => p(It.IsAny<IDispatcher>(), It.IsAny<ICommandContext<FirstTestCommand>>()), Times.Exactly(maxRetries));

}
}
}

0 comments on commit 836ad6e

Please sign in to comment.