Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide ability to return messages forwarded to Bridge error queue back to originating Bridge queue - 2.0 branch #275

Merged
merged 1 commit into from Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/AcceptanceTesting/AcceptanceTesting.csproj
Expand Up @@ -2,6 +2,8 @@

<PropertyGroup>
<TargetFrameworks>net472;net6.0;net7.0</TargetFrameworks>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>..\NServiceBusTests.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>

<ItemGroup>
Expand All @@ -13,4 +15,10 @@
<ProjectReference Include="..\NServiceBus.MessagingBridge\NServiceBus.MessagingBridge.csproj" />
</ItemGroup>

<ItemGroup>
<None Include="..\NServiceBusTests.snk">
<Link>NServiceBusTests.snk</Link>
</None>
</ItemGroup>

</Project>
6 changes: 6 additions & 0 deletions src/AcceptanceTesting/BridgeComponent.cs
@@ -1,7 +1,9 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using NServiceBus;
Expand Down Expand Up @@ -40,6 +42,10 @@ public override async Task Start(CancellationToken cancellationToken = default)
.ConfigureServices((_, serviceCollection) =>
{
serviceCollection.AddSingleton(loggerFactory);
serviceCollection.RemoveAll(typeof(IMessageShovel));
serviceCollection.AddTransient<MessageShovel>();
serviceCollection.AddTransient<FakeShovel>();
serviceCollection.AddTransient<IMessageShovel, FakeShovel>();
});

host = await hostBuilder.StartAsync(cancellationToken).ConfigureAwait(false);
Expand Down
33 changes: 33 additions & 0 deletions src/AcceptanceTesting/FakeShovel.cs
@@ -0,0 +1,33 @@
namespace AcceptanceTesting
{
using System;
using System.Threading;
using System.Threading.Tasks;

public class FakeShovelHeader
{
public const string FailureHeader = "FakeShovelFailure";
}

class FakeShovel : IMessageShovel
{

readonly IMessageShovel messageShovel;

public FakeShovel(MessageShovel shovel)
{
messageShovel = shovel;
}

public Task TransferMessage(TransferContext transferContext, CancellationToken cancellationToken = default)
{
var messageContext = transferContext.MessageToTransfer;
if (messageContext.Headers.ContainsKey(FakeShovelHeader.FailureHeader))
{
throw new Exception("Incoming message has `FakeShovelFailure` header to test infrastructure failures");
}

return messageShovel.TransferMessage(transferContext, cancellationToken);
}
}
}
89 changes: 89 additions & 0 deletions src/AcceptanceTests/Shared/TransferFailureTests.cs
@@ -0,0 +1,89 @@
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
using AcceptanceTesting;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NUnit.Framework;

public class TransferFailureTests : BridgeAcceptanceTest
{
const string ReceiveDummyQueue = "TransferFailureTests_DummyQueue"; // Required because Bridge needs endpoints on both sides.
const string ErrorQueue = "TransferFailureTests_BridgeErrorQueue";
const string FailedQHeader = "NServiceBus.MessagingBridge.FailedQ";

[Test]
public async Task Should_add_failedq_header_when_transfer_fails()
{
var ctx = await Scenario.Define<Context>()
.WithEndpoint<ErrorSpy>()
.WithEndpoint<Sender>(b => b
.When(c => c.EndpointsStarted, async (session, c) =>
{
var opts = new SendOptions();
opts.SetHeader(FakeShovelHeader.FailureHeader, string.Empty);
await session.Send(new FaultyMessage(), opts);
}))
.WithBridge(bridgeConfiguration =>
{
var bridgeTransport = new TestableBridgeTransport(TransportBeingTested);
bridgeTransport.AddTestEndpoint<Sender>();
bridgeConfiguration.AddTransport(bridgeTransport);
bridgeTransport.ErrorQueue = ErrorQueue;

var subscriberEndpoint = new BridgeEndpoint(ReceiveDummyQueue);
bridgeConfiguration.AddTestTransportEndpoint(subscriberEndpoint);
})
.Done(c => c.MessageFailed)
.Run();

Assert.IsTrue(ctx.MessageFailed, "Message did not fail");
Assert.IsTrue(ctx.FailedMessageHeaders.ContainsKey(FailedQHeader),
$"Failed message headers does not contain {FailedQHeader}");
}

public class Sender : EndpointConfigurationBuilder
{
public Sender() =>
EndpointSetup<DefaultServer>(c =>
{
c.ConfigureRouting().RouteToEndpoint(typeof(FaultyMessage), ReceiveDummyQueue);
});
}

public class ErrorSpy : EndpointConfigurationBuilder
{
public ErrorSpy()
{
EndpointSetup<DefaultServer>(c =>
{
c.OverrideLocalAddress(ErrorQueue);
});
}

class FailedMessageHander : IHandleMessages<FaultyMessage>
{
public FailedMessageHander(Context context) => testContext = context;

public Task Handle(FaultyMessage message, IMessageHandlerContext context)
{
testContext.FailedMessageHeaders =
new ReadOnlyDictionary<string, string>((IDictionary<string, string>)context.MessageHeaders);
testContext.MessageFailed = true;
return Task.CompletedTask;
}

readonly Context testContext;
}
}

public class Context : ScenarioContext
{
public bool MessageFailed { get; set; }
public IReadOnlyDictionary<string, string> FailedMessageHeaders { get; set; }
}

public class FaultyMessage : ICommand
{
}
}
5 changes: 3 additions & 2 deletions src/NServiceBus.MessagingBridge/BridgeHeaders.cs
@@ -1,4 +1,5 @@
class BridgeHeaders
{
public static string Transfer = "NServiceBus.Bridge.Transfer";
}
public const string Transfer = "NServiceBus.Bridge.Transfer";
public const string FailedQ = "NServiceBus.MessagingBridge.FailedQ";
}
Expand Up @@ -55,7 +55,7 @@ public static class HostBuilderExtensions
serviceCollection.AddSingleton<SubscriptionManager>();
serviceCollection.AddSingleton<EndpointRegistry>();
serviceCollection.AddSingleton<IEndpointRegistry>(sp => sp.GetRequiredService<EndpointRegistry>());
serviceCollection.AddTransient<MessageShovel>();
serviceCollection.AddTransient<IMessageShovel, MessageShovel>();
});

return hostBuilder;
Expand Down
4 changes: 2 additions & 2 deletions src/NServiceBus.MessagingBridge/EndpointProxyFactory.cs
Expand Up @@ -47,8 +47,8 @@ public EndpointProxyFactory(IServiceProvider serviceProvider)
messageContext,
shouldPassTransportTransaction);

return serviceProvider.GetRequiredService<MessageShovel>()
.TransferMessage(transferContext, ct);
return serviceProvider.GetRequiredService<IMessageShovel>()
.TransferMessage(transferContext, cancellationToken: ct);
},
translatedErrorQueue);

Expand Down
7 changes: 7 additions & 0 deletions src/NServiceBus.MessagingBridge/IMessageShovel.cs
@@ -0,0 +1,7 @@
using System.Threading;
using System.Threading.Tasks;

interface IMessageShovel
{
Task TransferMessage(TransferContext transferContext, CancellationToken cancellationToken = default);
}
5 changes: 3 additions & 2 deletions src/NServiceBus.MessagingBridge/MessageShovel.cs
Expand Up @@ -6,7 +6,7 @@
using NServiceBus.Faults;
using NServiceBus.Transport;

class MessageShovel
sealed class MessageShovel : IMessageShovel
{
public MessageShovel(
ILogger<MessageShovel> logger,
Expand All @@ -26,6 +26,7 @@ public async Task TransferMessage(TransferContext transferContext, CancellationT
var messageContext = transferContext.MessageToTransfer;

var messageToSend = new OutgoingMessage(messageContext.NativeMessageId, messageContext.Headers, messageContext.Body);
messageToSend.Headers.Remove(BridgeHeaders.FailedQ);

var transferDetails = $"{transferContext.SourceTransport}->{targetEndpointDispatcher.TransportName}";

Expand Down Expand Up @@ -98,4 +99,4 @@ public async Task TransferMessage(TransferContext transferContext, CancellationT

readonly ILogger<MessageShovel> logger;
readonly IEndpointRegistry targetEndpointRegistry;
}
}
Expand Up @@ -18,5 +18,6 @@

<ItemGroup>
<InternalsVisibleTo Include="UnitTests" Key="$(NServiceBusTestsKey)" />
<InternalsVisibleTo Include="AcceptanceTesting" Key="$(NServiceBusTestsKey)" />
</ItemGroup>
</Project>
</Project>
Expand Up @@ -35,6 +35,7 @@ async Task<ErrorHandleResult> MoveToErrorQueue(ErrorContext errorContext, string
headers.Remove(Headers.DelayedRetries);
headers.Remove(Headers.ImmediateRetries);

headers.Add(BridgeHeaders.FailedQ, localAddress);
ExceptionHeaderHelper.SetExceptionHeaders(headers, errorContext.Exception);

var transportOperations = new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag(errorQueue)));
Expand Down
@@ -1,3 +1,4 @@
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"AcceptanceTesting, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"UnitTests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")]
namespace NServiceBus
{
Expand Down