diff --git a/src/AcceptanceTests/Shared/Error.cs b/src/AcceptanceTests/Shared/Error.cs new file mode 100644 index 00000000..bf95d8fd --- /dev/null +++ b/src/AcceptanceTests/Shared/Error.cs @@ -0,0 +1,122 @@ +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Threading.Tasks; +using NServiceBus; +using NServiceBus.AcceptanceTesting; +using NServiceBus.AcceptanceTesting.Customization; +using NUnit.Framework; +using Conventions = NServiceBus.AcceptanceTesting.Customization.Conventions; + +public class Error : BridgeAcceptanceTest +{ + [Test] + public async Task Should_forward_error_messages_by_not_modify_message() + { + var ctx = await Scenario.Define() + .WithEndpoint(b => b + .When(c => TransportBeingTested.SupportsPublishSubscribe || c.SubscriberSubscribed, (session, c) => + { + return session.Publish(new FaultyMessage()); + })) + .WithEndpoint(builder => builder.DoNotFailOnErrorMessages()) + .WithEndpoint() + .WithBridge(bridgeConfiguration => + { + var bridgeTransport = new TestableBridgeTransport(TransportBeingTested); + bridgeTransport.AddTestEndpoint(); + bridgeTransport.AddTestEndpoint(); + bridgeConfiguration.AddTransport(bridgeTransport); + + var subscriberEndpoint = + new BridgeEndpoint(Conventions.EndpointNamingConvention(typeof(ProcessingEndpoint))); + subscriberEndpoint.RegisterPublisher( + Conventions.EndpointNamingConvention(typeof(PublishingEndpoint))); + bridgeConfiguration.AddTestTransportEndpoint(subscriberEndpoint); + + }) + .Done(c => c.MessageFailed) + .Run(); + + Assert.IsTrue(ctx.MessageFailed); + foreach (var header in ctx.FailedMessageHeaders) + { + if (ctx.ReceivedMessageHeaders.TryGetValue(header.Key, out var receivedHeaderValue)) + { + Assert.AreEqual(header.Value, receivedHeaderValue, + $"{header.Key} is not the same on processed message and audit message."); + } + } + } + + public class PublishingEndpoint : EndpointConfigurationBuilder + { + public PublishingEndpoint() => + EndpointSetup(c => + { + c.OnEndpointSubscribed((_, ctx) => + { + ctx.SubscriberSubscribed = true; + }); + c.ConfigureRouting().RouteToEndpoint(typeof(FaultyMessage), typeof(ProcessingEndpoint)); + }); + } + + public class ProcessingEndpoint : EndpointConfigurationBuilder + { + public ProcessingEndpoint() => EndpointSetup( + c => c.SendFailedMessagesTo("Error.ErrorSpy")); + + public class MessageHandler : IHandleMessages + { + readonly Context testContext; + + public MessageHandler(Context context) => testContext = context; + + public Task Handle(FaultyMessage message, IMessageHandlerContext context) + { + testContext.ReceivedMessageHeaders = + new ReadOnlyDictionary((IDictionary)context.MessageHeaders); + + throw new Exception("Simulated"); + } + } + } + + public class ErrorSpy : EndpointConfigurationBuilder + { + public ErrorSpy() + { + var endpoint = EndpointSetup(c => c.AutoSubscribe().DisableFor()); + } + + class FailedMessageHander : IHandleMessages + { + public FailedMessageHander(Context context) => testContext = context; + + public Task Handle(FaultyMessage message, IMessageHandlerContext context) + { + testContext.FailedMessageHeaders = + new ReadOnlyDictionary((IDictionary)context.MessageHeaders); + + testContext.MessageFailed = true; + + return Task.CompletedTask; + } + + readonly Context testContext; + } + } + + public class Context : ScenarioContext + { + public bool SubscriberSubscribed { get; set; } + public bool MessageFailed { get; set; } + public IReadOnlyDictionary ReceivedMessageHeaders { get; set; } + public IReadOnlyDictionary FailedMessageHeaders { get; set; } + } + + public class FaultyMessage : IEvent + { + } +} \ No newline at end of file diff --git a/src/AcceptanceTests/Shared/Retry.cs b/src/AcceptanceTests/Shared/Retry.cs new file mode 100644 index 00000000..98d92c7b --- /dev/null +++ b/src/AcceptanceTests/Shared/Retry.cs @@ -0,0 +1,157 @@ +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Threading.Tasks; +using NServiceBus; +using NServiceBus.AcceptanceTesting; +using NServiceBus.AcceptanceTesting.Customization; +using NServiceBus.Faults; +using NUnit.Framework; +using Conventions = NServiceBus.AcceptanceTesting.Customization.Conventions; + +public class Retry : BridgeAcceptanceTest +{ + [Test] + public async Task Should_forward_retry_messages() + { + var ctx = await Scenario.Define() + .WithEndpoint(b => b + .When(c => c.EndpointsStarted, (session, c) => + { + return session.Publish(new FaultyMessage()); + })) + .WithEndpoint(builder => builder.DoNotFailOnErrorMessages()) + .WithEndpoint() + .WithBridge(bridgeConfiguration => + { + var bridgeTransport = new TestableBridgeTransport(DefaultTestServer.GetTestTransportDefinition()) + { + Name = "DefaultTestingTransport" + }; + bridgeTransport.AddTestEndpoint(); + bridgeTransport.AddTestEndpoint(); + bridgeConfiguration.AddTransport(bridgeTransport); + + var subscriberEndpoint = + new BridgeEndpoint(Conventions.EndpointNamingConvention(typeof(ProcessingEndpoint))); + subscriberEndpoint.RegisterPublisher( + Conventions.EndpointNamingConvention(typeof(PublishingEndpoint))); + + var theOtherTransport = new TestableBridgeTransport(TransportBeingTested); + theOtherTransport.HasEndpoint(subscriberEndpoint); + bridgeConfiguration.AddTransport(theOtherTransport); + }) + .Done(c => c.RetryDelivered) + .Run(); + + Assert.IsTrue(ctx.RetryDelivered); + Assert.IsTrue(ctx.MessageFailed); + + foreach (var header in ctx.FailedMessageHeaders) + { + if (ctx.ReceivedMessageHeaders.TryGetValue(header.Key, out var receivedHeaderValue)) + { + Assert.AreEqual(header.Value, receivedHeaderValue, + $"{header.Key} is not the same on processed message and audit message."); + } + } + } + + public class PublishingEndpoint : EndpointConfigurationBuilder + { + public PublishingEndpoint() => + EndpointSetup(c => + { + c.OnEndpointSubscribed((_, ctx) => + { + ctx.SubscriberSubscribed = true; + }); + c.ConfigureRouting().RouteToEndpoint(typeof(FaultyMessage), typeof(ProcessingEndpoint)); + }); + } + + public class ProcessingEndpoint : EndpointConfigurationBuilder + { + public ProcessingEndpoint() => EndpointSetup( + c => c.SendFailedMessagesTo("Retry.ErrorSpy")); + + public class MessageHandler : IHandleMessages + { + readonly Context testContext; + + public MessageHandler(Context context) => testContext = context; + + public Task Handle(FaultyMessage message, IMessageHandlerContext context) + { + testContext.ReceivedMessageHeaders = + new ReadOnlyDictionary((IDictionary)context.MessageHeaders); + + testContext.MessageFailed = true; + + throw new Exception("Simulated"); + } + } + + public class RetryMessageHandler : IHandleMessages + { + readonly Context testContext; + + public RetryMessageHandler(Context context) => testContext = context; + + public Task Handle(RetryMessage message, IMessageHandlerContext context) + { + testContext.RetryDelivered = true; + + return Task.CompletedTask; + } + } + } + + public class ErrorSpy : EndpointConfigurationBuilder + { + public ErrorSpy() + { + var endpoint = EndpointSetup(c => c.AutoSubscribe().DisableFor()); + } + + class FailedMessageHander : IHandleMessages + { + public FailedMessageHander(Context context) => testContext = context; + + public Task Handle(FaultyMessage message, IMessageHandlerContext context) + { + testContext.FailedMessageHeaders = + new ReadOnlyDictionary((IDictionary)context.MessageHeaders); + + var sendOptions = new SendOptions(); + + //Send the message to the FailedQ address + string destination = context.MessageHeaders[FaultsHeaderKeys.FailedQ]; + sendOptions.SetDestination(destination); + + //ServiceControl adds this header when retrying + sendOptions.SetHeader("ServiceControl.Retry.UniqueMessageId", "XYZ"); + return context.Send(new RetryMessage(), sendOptions); + } + + readonly Context testContext; + } + } + + public class Context : ScenarioContext + { + public bool SubscriberSubscribed { get; set; } + public bool MessageFailed { get; set; } + public IReadOnlyDictionary ReceivedMessageHeaders { get; set; } + public IReadOnlyDictionary FailedMessageHeaders { get; set; } + public bool RetryDelivered { get; set; } + } + + public class FaultyMessage : IEvent + { + } + + public class RetryMessage : IMessage + { + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.Bridge/MessageShovel.cs b/src/NServiceBus.Transport.Bridge/MessageShovel.cs index e5d67b64..63db21a8 100644 --- a/src/NServiceBus.Transport.Bridge/MessageShovel.cs +++ b/src/NServiceBus.Transport.Bridge/MessageShovel.cs @@ -29,14 +29,29 @@ public async Task TransferMessage(TransferContext transferContext, CancellationT var transferDetails = $"{transferContext.SourceTransport}->{targetEndpointDispatcher.TransportName}"; - // Audit messages contain all the original fields. Transforming them would destroy this. - if (!IsAuditMessage(messageToSend)) + if (IsErrorMessage(messageToSend)) { - messageToSend.Headers[BridgeHeaders.Transfer] = transferDetails; + //This is a failed message forwarded to ServiceControl. We need to transform the FailedQ header so that ServiceControl returns the message + //to the correct queue/transport on the other side - TransformAddressHeader(messageToSend, targetEndpointRegistry, Headers.ReplyToAddress); + //We _do not_ transform the ReplyToAddress header TransformAddressHeader(messageToSend, targetEndpointRegistry, FaultsHeaderKeys.FailedQ); } + else if (IsRetryMessage(messageToSend)) + { + //This is a message retried from ServiceControl. Its ReplyToAddress header has been preserved (as stated above) so we don't need to transform it back + } + else if (IsAuditMessage(messageToSend)) + { + //This is a message sent to the audit queue. We _do not_ transform its ReplyToAddress header + } + else + { + // This is a regular message sent between the endpoints on different sides of the bridge. + // The ReplyToAddress is transformed to allow for replies to be delivered + messageToSend.Headers[BridgeHeaders.Transfer] = transferDetails; + TransformAddressHeader(messageToSend, targetEndpointRegistry, Headers.ReplyToAddress); + } await targetEndpointDispatcher.Dispatch( messageToSend, @@ -58,6 +73,10 @@ public async Task TransferMessage(TransferContext transferContext, CancellationT // Assuming that a message is an audit message if a ProcessingMachine is known static bool IsAuditMessage(OutgoingMessage messageToSend) => messageToSend.Headers.ContainsKey(Headers.ProcessingEnded); + static bool IsErrorMessage(OutgoingMessage messageToSend) => messageToSend.Headers.ContainsKey(FaultsHeaderKeys.FailedQ); + + static bool IsRetryMessage(OutgoingMessage messageToSend) => messageToSend.Headers.ContainsKey("ServiceControl.Retry.UniqueMessageId"); + void TransformAddressHeader( OutgoingMessage messageToSend, IEndpointRegistry targetEndpointRegistry,