Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle shoveling failed messages to and from ServiceControl #172

Merged
merged 2 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
122 changes: 122 additions & 0 deletions src/AcceptanceTests/Shared/Error.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTesting.Customization;
using NUnit.Framework;
using Conventions = NServiceBus.AcceptanceTesting.Customization.Conventions;

public class Error : BridgeAcceptanceTest
{
[Test]
public async Task Should_forward_error_messages_by_not_modify_message()
{
var ctx = await Scenario.Define<Context>()
.WithEndpoint<PublishingEndpoint>(b => b
.When(c => TransportBeingTested.SupportsPublishSubscribe || c.SubscriberSubscribed, (session, c) =>
{
return session.Publish(new FaultyMessage());
}))
.WithEndpoint<ProcessingEndpoint>(builder => builder.DoNotFailOnErrorMessages())
.WithEndpoint<ErrorSpy>()
.WithBridge(bridgeConfiguration =>
{
var bridgeTransport = new TestableBridgeTransport(TransportBeingTested);
bridgeTransport.AddTestEndpoint<PublishingEndpoint>();
bridgeTransport.AddTestEndpoint<ErrorSpy>();
bridgeConfiguration.AddTransport(bridgeTransport);

var subscriberEndpoint =
new BridgeEndpoint(Conventions.EndpointNamingConvention(typeof(ProcessingEndpoint)));
subscriberEndpoint.RegisterPublisher<FaultyMessage>(
Conventions.EndpointNamingConvention(typeof(PublishingEndpoint)));
bridgeConfiguration.AddTestTransportEndpoint(subscriberEndpoint);

})
.Done(c => c.MessageFailed)
.Run();

Assert.IsTrue(ctx.MessageFailed);
foreach (var header in ctx.FailedMessageHeaders)
{
if (ctx.ReceivedMessageHeaders.TryGetValue(header.Key, out var receivedHeaderValue))
{
Assert.AreEqual(header.Value, receivedHeaderValue,
$"{header.Key} is not the same on processed message and audit message.");
}
}
}

public class PublishingEndpoint : EndpointConfigurationBuilder
{
public PublishingEndpoint() =>
EndpointSetup<DefaultServer>(c =>
{
c.OnEndpointSubscribed<Context>((_, ctx) =>
{
ctx.SubscriberSubscribed = true;
});
c.ConfigureRouting().RouteToEndpoint(typeof(FaultyMessage), typeof(ProcessingEndpoint));
});
}

public class ProcessingEndpoint : EndpointConfigurationBuilder
{
public ProcessingEndpoint() => EndpointSetup<DefaultTestServer>(
c => c.SendFailedMessagesTo("Error.ErrorSpy"));

public class MessageHandler : IHandleMessages<FaultyMessage>
{
readonly Context testContext;

public MessageHandler(Context context) => testContext = context;

public Task Handle(FaultyMessage message, IMessageHandlerContext context)
{
testContext.ReceivedMessageHeaders =
new ReadOnlyDictionary<string, string>((IDictionary<string, string>)context.MessageHeaders);

throw new Exception("Simulated");
}
}
}

public class ErrorSpy : EndpointConfigurationBuilder
{
public ErrorSpy()
{
var endpoint = EndpointSetup<DefaultServer>(c => c.AutoSubscribe().DisableFor<FaultyMessage>());
}

class FailedMessageHander : IHandleMessages<FaultyMessage>
{
public FailedMessageHander(Context context) => testContext = context;

public Task Handle(FaultyMessage message, IMessageHandlerContext context)
{
testContext.FailedMessageHeaders =
new ReadOnlyDictionary<string, string>((IDictionary<string, string>)context.MessageHeaders);

testContext.MessageFailed = true;

return Task.CompletedTask;
}

readonly Context testContext;
}
}

public class Context : ScenarioContext
{
public bool SubscriberSubscribed { get; set; }
public bool MessageFailed { get; set; }
public IReadOnlyDictionary<string, string> ReceivedMessageHeaders { get; set; }
public IReadOnlyDictionary<string, string> FailedMessageHeaders { get; set; }
}

public class FaultyMessage : IEvent
{
}
}
157 changes: 157 additions & 0 deletions src/AcceptanceTests/Shared/Retry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTesting.Customization;
using NServiceBus.Faults;
using NUnit.Framework;
using Conventions = NServiceBus.AcceptanceTesting.Customization.Conventions;

public class Retry : BridgeAcceptanceTest
{
[Test]
public async Task Should_forward_retry_messages()
{
var ctx = await Scenario.Define<Context>()
.WithEndpoint<PublishingEndpoint>(b => b
.When(c => c.EndpointsStarted, (session, c) =>
{
return session.Publish(new FaultyMessage());
}))
.WithEndpoint<ProcessingEndpoint>(builder => builder.DoNotFailOnErrorMessages())
.WithEndpoint<ErrorSpy>()
.WithBridge(bridgeConfiguration =>
{
var bridgeTransport = new TestableBridgeTransport(DefaultTestServer.GetTestTransportDefinition())
{
Name = "DefaultTestingTransport"
};
bridgeTransport.AddTestEndpoint<PublishingEndpoint>();
bridgeTransport.AddTestEndpoint<ErrorSpy>();
bridgeConfiguration.AddTransport(bridgeTransport);

var subscriberEndpoint =
new BridgeEndpoint(Conventions.EndpointNamingConvention(typeof(ProcessingEndpoint)));
subscriberEndpoint.RegisterPublisher<FaultyMessage>(
Conventions.EndpointNamingConvention(typeof(PublishingEndpoint)));

var theOtherTransport = new TestableBridgeTransport(TransportBeingTested);
theOtherTransport.HasEndpoint(subscriberEndpoint);
bridgeConfiguration.AddTransport(theOtherTransport);
})
.Done(c => c.RetryDelivered)
.Run();

Assert.IsTrue(ctx.RetryDelivered);
Assert.IsTrue(ctx.MessageFailed);

foreach (var header in ctx.FailedMessageHeaders)
{
if (ctx.ReceivedMessageHeaders.TryGetValue(header.Key, out var receivedHeaderValue))
{
Assert.AreEqual(header.Value, receivedHeaderValue,
$"{header.Key} is not the same on processed message and audit message.");
}
}
}

public class PublishingEndpoint : EndpointConfigurationBuilder
{
public PublishingEndpoint() =>
EndpointSetup<DefaultTestPublisher>(c =>
{
c.OnEndpointSubscribed<Context>((_, ctx) =>
{
ctx.SubscriberSubscribed = true;
});
c.ConfigureRouting().RouteToEndpoint(typeof(FaultyMessage), typeof(ProcessingEndpoint));
});
}

public class ProcessingEndpoint : EndpointConfigurationBuilder
{
public ProcessingEndpoint() => EndpointSetup<DefaultServer>(
c => c.SendFailedMessagesTo("Retry.ErrorSpy"));

public class MessageHandler : IHandleMessages<FaultyMessage>
{
readonly Context testContext;

public MessageHandler(Context context) => testContext = context;

public Task Handle(FaultyMessage message, IMessageHandlerContext context)
{
testContext.ReceivedMessageHeaders =
new ReadOnlyDictionary<string, string>((IDictionary<string, string>)context.MessageHeaders);

testContext.MessageFailed = true;

throw new Exception("Simulated");
}
}

public class RetryMessageHandler : IHandleMessages<RetryMessage>
{
readonly Context testContext;

public RetryMessageHandler(Context context) => testContext = context;

public Task Handle(RetryMessage message, IMessageHandlerContext context)
{
testContext.RetryDelivered = true;

return Task.CompletedTask;
}
}
}

public class ErrorSpy : EndpointConfigurationBuilder
{
public ErrorSpy()
{
var endpoint = EndpointSetup<DefaultTestServer>(c => c.AutoSubscribe().DisableFor<FaultyMessage>());
}

class FailedMessageHander : IHandleMessages<FaultyMessage>
{
public FailedMessageHander(Context context) => testContext = context;

public Task Handle(FaultyMessage message, IMessageHandlerContext context)
{
testContext.FailedMessageHeaders =
new ReadOnlyDictionary<string, string>((IDictionary<string, string>)context.MessageHeaders);

var sendOptions = new SendOptions();

//Send the message to the FailedQ address
string destination = context.MessageHeaders[FaultsHeaderKeys.FailedQ];
sendOptions.SetDestination(destination);

//ServiceControl adds this header when retrying
sendOptions.SetHeader("ServiceControl.Retry.UniqueMessageId", "XYZ");
return context.Send(new RetryMessage(), sendOptions);
}

readonly Context testContext;
}
}

public class Context : ScenarioContext
{
public bool SubscriberSubscribed { get; set; }
public bool MessageFailed { get; set; }
public IReadOnlyDictionary<string, string> ReceivedMessageHeaders { get; set; }
public IReadOnlyDictionary<string, string> FailedMessageHeaders { get; set; }
public bool RetryDelivered { get; set; }
}

public class FaultyMessage : IEvent
{
}

public class RetryMessage : IMessage
{
}
}
27 changes: 23 additions & 4 deletions src/NServiceBus.Transport.Bridge/MessageShovel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,29 @@ public async Task TransferMessage(TransferContext transferContext, CancellationT

var transferDetails = $"{transferContext.SourceTransport}->{targetEndpointDispatcher.TransportName}";

// Audit messages contain all the original fields. Transforming them would destroy this.
if (!IsAuditMessage(messageToSend))
if (IsErrorMessage(messageToSend))
{
messageToSend.Headers[BridgeHeaders.Transfer] = transferDetails;
//This is a failed message forwarded to ServiceControl. We need to transform the FailedQ header so that ServiceControl returns the message
//to the correct queue/transport on the other side

TransformAddressHeader(messageToSend, targetEndpointRegistry, Headers.ReplyToAddress);
//We _do not_ transform the ReplyToAddress header
TransformAddressHeader(messageToSend, targetEndpointRegistry, FaultsHeaderKeys.FailedQ);
SzymonPobiega marked this conversation as resolved.
Show resolved Hide resolved
}
else if (IsRetryMessage(messageToSend))
{
//This is a message retried from ServiceControl. Its ReplyToAddress header has been preserved (as stated above) so we don't need to transform it back
}
else if (IsAuditMessage(messageToSend))
{
//This is a message sent to the audit queue. We _do not_ transform its ReplyToAddress header
}
else
{
// This is a regular message sent between the endpoints on different sides of the bridge.
// The ReplyToAddress is transformed to allow for replies to be delivered
messageToSend.Headers[BridgeHeaders.Transfer] = transferDetails;
TransformAddressHeader(messageToSend, targetEndpointRegistry, Headers.ReplyToAddress);
}

await targetEndpointDispatcher.Dispatch(
messageToSend,
Expand All @@ -58,6 +73,10 @@ public async Task TransferMessage(TransferContext transferContext, CancellationT
// Assuming that a message is an audit message if a ProcessingMachine is known
static bool IsAuditMessage(OutgoingMessage messageToSend) => messageToSend.Headers.ContainsKey(Headers.ProcessingEnded);

static bool IsErrorMessage(OutgoingMessage messageToSend) => messageToSend.Headers.ContainsKey(FaultsHeaderKeys.FailedQ);

static bool IsRetryMessage(OutgoingMessage messageToSend) => messageToSend.Headers.ContainsKey("ServiceControl.Retry.UniqueMessageId");

void TransformAddressHeader(
OutgoingMessage messageToSend,
IEndpointRegistry targetEndpointRegistry,
Expand Down