Skip to content

Commit

Permalink
Fix queue prefix support (#309) (#312)
Browse files Browse the repository at this point in the history
  • Loading branch information
timbussmann committed Oct 13, 2023
1 parent 5c5e03e commit f3f9586
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,24 @@ class AwsLambdaSQSEndpointTestBase
protected const string DelayedDeliveryQueueSuffix = "-delay.fifo";
const int QueueDelayInSeconds = 900; // 15 * 60

protected string QueueName { get; set; }
protected string QueueName { get; private set; }

protected string DelayQueueName { get; set; }
protected string DelayQueueName { get; private set; }

protected string ErrorQueueName { get; set; }
protected string ErrorQueueName { get; private set; }

protected string QueueNamePrefix { get; set; }

protected string BucketName { get; } = Environment.GetEnvironmentVariable("NSERVICEBUS_AMAZONSQS_S3BUCKET");
protected string KeyPrefix { get; set; }
protected string KeyPrefix { get; private set; }


[SetUp]
public async Task Setup()
public virtual async Task Setup()
{
queueNames = new List<string>();

QueueNamePrefix = Path.GetFileNameWithoutExtension(Path.GetRandomFileName()).ToLowerInvariant();
QueueNamePrefix ??= Path.GetFileNameWithoutExtension(Path.GetRandomFileName()).ToLowerInvariant();

QueueName = $"{QueueNamePrefix}testqueue";
sqsClient = CreateSQSClient();
Expand Down Expand Up @@ -78,6 +78,9 @@ public async Task Setup()
[TearDown]
public async Task TearDown()
{
// clear queue name prefix after each test, otherwise classes with multiple tests in the same class will run into issues
QueueNamePrefix = null;

var queueUrls = queueNames.Select(name => sqsClient.GetQueueUrlAsync(name));
await Task.WhenAll(queueUrls);
var queueDeletions = queueUrls.Select(x => x.Result.QueueUrl).Select(url => sqsClient.DeleteQueueAsync(url));
Expand Down Expand Up @@ -106,7 +109,7 @@ protected void RegisterQueueNameToCleanup(string queueName)
queueNames.Add(queueName);
}

protected async Task<SQSEvent> GenerateAndReceiveSQSEvent<T>(int count) where T : new()
protected async Task<SQSEvent> GenerateAndReceiveSQSEvent<T>(int count = 1) where T : new()
{
var endpointConfiguration = new EndpointConfiguration($"{QueueNamePrefix}sender");
endpointConfiguration.SendOnly();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
namespace NServiceBus.AcceptanceTests
{
using Microsoft.Extensions.DependencyInjection;
using NUnit.Framework;
using System.IO;
using System.Threading.Tasks;

class When_configuring_queue_name_prefix : AwsLambdaSQSEndpointTestBase
{
public string Prefix { get; } = "test-";

public override Task Setup()
{
QueueNamePrefix = Prefix + Path.GetFileNameWithoutExtension(Path.GetRandomFileName()).ToLowerInvariant();
return base.Setup();
}

[Test]
public async Task The_message_should_be_received_from_prefixed_queue()
{
// the prefix will be configured using the transport's prefix configuration therefore we remove it for the endpoint name
var endpointName = QueueName.Substring(Prefix.Length);

var receivedMessages = await GenerateAndReceiveSQSEvent<SentMessage>();

var context = new TestContext();

var endpoint = new AwsLambdaSQSEndpoint(ctx =>
{
var configuration = new AwsLambdaSQSEndpointConfiguration(endpointName, CreateSQSClient(), CreateSNSClient());
configuration.Transport.QueueNamePrefix = Prefix;
var advanced = configuration.AdvancedConfiguration;
advanced.RegisterComponents(c => c.AddSingleton(typeof(TestContext), context));
// SQS will add the specified queue prefix to the configured error queue
advanced.SendFailedMessagesTo(ErrorQueueName.Substring(Prefix.Length));
return configuration;
});

await endpoint.Process(receivedMessages, null);

Assert.IsTrue(context.MessageHandled);
}

public class TestContext
{
public bool MessageHandled { get; set; }
}

public class SentMessage : ICommand
{
}

public class MessageHandlerThatReceivesSentMessage : IHandleMessages<SentMessage>
{
public MessageHandlerThatReceivesSentMessage(TestContext context) => testContext = context;

public Task Handle(SentMessage message, IMessageHandlerContext context)
{
testContext.MessageHandled = true;
return Task.CompletedTask;
}

readonly TestContext testContext;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
namespace NServiceBus.AcceptanceTests;

using System;
using Microsoft.Extensions.DependencyInjection;
using NUnit.Framework;
using System.IO;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;

class When_message_fails_using_queue_prefix : AwsLambdaSQSEndpointTestBase
{
public string Prefix { get; } = "test-";

public override Task Setup()
{
QueueNamePrefix = Prefix + Path.GetFileNameWithoutExtension(Path.GetRandomFileName()).ToLowerInvariant();
return base.Setup();
}

[Test]
public async Task Should_move_message_to_prefixed_error_queue()
{
// the prefix will be configured using the transport's prefix configuration therefore we remove it for the endpoint name
var endpointName = QueueName.Substring(Prefix.Length);

var receivedMessages = await GenerateAndReceiveSQSEvent<TestMessage>();

var context = new TestContext();

var endpoint = new AwsLambdaSQSEndpoint(ctx =>
{
var configuration = new AwsLambdaSQSEndpointConfiguration(endpointName, CreateSQSClient(), CreateSNSClient());
configuration.Transport.QueueNamePrefix = Prefix;
var advanced = configuration.AdvancedConfiguration;
advanced.RegisterComponents(c => c.AddSingleton(typeof(TestContext), context));
advanced.Recoverability().Immediate(i => i.NumberOfRetries(0));
advanced.SendFailedMessagesTo(ErrorQueueName.Substring(Prefix.Length));
return configuration;
});

Assert.DoesNotThrowAsync(() => endpoint.Process(receivedMessages, null), "message should be moved to the error queue instead");

var errorMessages = await RetrieveMessagesInErrorQueue();
Assert.AreEqual(1, errorMessages.Records.Count);
JsonDocument errorMessage = JsonSerializer.Deserialize<JsonDocument>(errorMessages.Records.First().Body);
var errorMessageHeader = errorMessage.RootElement.GetProperty("Headers");
Assert.AreEqual("simulated exception", errorMessageHeader.GetProperty("NServiceBus.ExceptionInfo.Message").GetString());
Assert.AreEqual(endpointName, errorMessageHeader.GetProperty("NServiceBus.ProcessingEndpoint").GetString());
StringAssert.EndsWith(QueueName, errorMessageHeader.GetProperty("NServiceBus.FailedQ").GetString());
}

public class TestContext
{
}

public class TestMessage : ICommand
{
}

public class FailingMessageHandler : IHandleMessages<TestMessage>
{
public Task Handle(TestMessage message, IMessageHandlerContext context) => throw new Exception("simulated exception");
}
}
66 changes: 28 additions & 38 deletions src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
using Amazon.SQS.Model;
using AwsLambda.SQS;
using AwsLambda.SQS.TransportWrapper;
using Configuration.AdvancedExtensibility;
using Extensibility;
using Logging;
using Transport;
Expand Down Expand Up @@ -49,7 +48,6 @@ await InitializeEndpointIfNecessary(lambdaContext, cancellationToken)
}

var processTasks = new List<Task>();

foreach (var receivedMessage in @event.Records)
{
processTasks.Add(ProcessMessage(receivedMessage, lambdaContext, cancellationToken));
Expand All @@ -69,16 +67,9 @@ await semaphoreLock.WaitAsync(token)
{
if (pipeline == null)
{
var configuration = configurationFactory(executionContext);

var serverlessTransport = await Initialize(configuration)
.ConfigureAwait(false);

endpoint = await Endpoint.Start(configuration.EndpointConfiguration, token)
.ConfigureAwait(false);
var transportInfrastructure = await Initialize(executionContext, token).ConfigureAwait(false);


pipeline = serverlessTransport.PipelineInvoker;
pipeline = transportInfrastructure.PipelineInvoker;
}
}
finally
Expand All @@ -88,6 +79,30 @@ await semaphoreLock.WaitAsync(token)
}
}

async Task<ServerlessTransportInfrastructure> Initialize(ILambdaContext executionContext, CancellationToken token)
{
var configuration = configurationFactory(executionContext);

sqsClient = configuration.Transport.SqsClient;
s3Settings = configuration.Transport.S3;

var serverlessTransport = new ServerlessTransport(configuration.Transport);
configuration.EndpointConfiguration.UseTransport(serverlessTransport);

endpoint = await Endpoint.Start(configuration.EndpointConfiguration, token).ConfigureAwait(false);

var transportInfrastructure = serverlessTransport.GetTransportInfrastructure(endpoint);
isSendOnly = transportInfrastructure.IsSendOnly;

if (!isSendOnly)
{
queueUrl = await GetQueueUrl(transportInfrastructure.PipelineInvoker.ReceiveAddress).ConfigureAwait(false);
errorQueueUrl = await GetQueueUrl(transportInfrastructure.ErrorQueueAddress).ConfigureAwait(false);
}

return transportInfrastructure;
}

/// <inheritdoc />
public async Task Send(object message, SendOptions options, ILambdaContext lambdaContext)
{
Expand Down Expand Up @@ -200,40 +215,15 @@ await endpoint.Unsubscribe(eventType)
.ConfigureAwait(false);
}

async Task<ServerlessTransport> Initialize(AwsLambdaSQSEndpointConfiguration configuration)
{
var settingsHolder = configuration.AdvancedConfiguration.GetSettings();

sqsClient = configuration.Transport.SqsClient;

isSendOnly = settingsHolder.GetOrDefault<bool>("Endpoint.SendOnly");

if (!isSendOnly)
{
queueUrl = await GetQueueUrl(settingsHolder.EndpointName())
.ConfigureAwait(false);
errorQueueUrl = await GetQueueUrl(settingsHolder.ErrorQueueAddress())
.ConfigureAwait(false);
}

s3Settings = configuration.Transport.S3;

var serverlessTransport = new ServerlessTransport(configuration.Transport);
configuration.EndpointConfiguration.UseTransport(serverlessTransport);

return serverlessTransport;
}

async Task<string> GetQueueUrl(string queueName)
{
var sanitizedQueueName = QueueNameHelper.GetSanitizedQueueName(queueName);
try
{
return (await sqsClient.GetQueueUrlAsync(sanitizedQueueName).ConfigureAwait(false)).QueueUrl;
return (await sqsClient.GetQueueUrlAsync(queueName).ConfigureAwait(false)).QueueUrl;
}
catch (Exception e)
{
Logger.Error($"Failed to obtain the queue URL for queue {sanitizedQueueName} (derived from configured name {queueName}).", e);
Logger.Error($"Failed to obtain the queue URL for queue {queueName} (derived from configured name {queueName}).", e);
throw;
}
}
Expand Down
34 changes: 0 additions & 34 deletions src/NServiceBus.AwsLambda.SQS/QueueNameHelper.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

interface IMessageProcessor
{
string ReceiveAddress { get; }
Task<ErrorHandleResult> PushFailedMessage(ErrorContext errorContext);
Task PushMessage(MessageContext messageContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

class SendOnlyMessageProcessor : IMessageProcessor
{
public string ReceiveAddress => string.Empty;

public Task<ErrorHandleResult> PushFailedMessage(ErrorContext errorContext) => throw new InvalidOperationException(
$"This endpoint cannot process messages because it is configured in send-only mode. Remove the '{nameof(EndpointConfiguration)}.{nameof(EndpointConfiguration.SendOnly)}' configuration.'"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,30 @@

sealed class ServerlessTransport : TransportDefinition
{
// HINT: This constant is defined in NServiceBus but is not exposed
const string MainReceiverId = "Main";
const string SendOnlyConfigKey = "Endpoint.SendOnly";

ServerlessTransportInfrastructure serverlessTransportInfrastructure;

public ServerlessTransport(TransportDefinition baseTransport)
: base(baseTransport.TransportTransactionMode, baseTransport.SupportsDelayedDelivery, baseTransport.SupportsPublishSubscribe, baseTransport.SupportsTTBR) =>
BaseTransport = baseTransport;

public TransportDefinition BaseTransport { get; }

public IMessageProcessor PipelineInvoker { get; private set; }

public override async Task<TransportInfrastructure> Initialize(HostSettings hostSettings, ReceiveSettings[] receivers, string[] sendingAddresses, CancellationToken cancellationToken = default)
{
var baseTransportInfrastructure = await BaseTransport.Initialize(hostSettings, receivers, sendingAddresses, cancellationToken)
.ConfigureAwait(false);

var serverlessTransportInfrastructure = new ServerlessTransportInfrastructure(baseTransportInfrastructure);

var isSendOnly = hostSettings.CoreSettings.GetOrDefault<bool>(SendOnlyConfigKey);

PipelineInvoker = isSendOnly
? new SendOnlyMessageProcessor()
: (PipelineInvoker)serverlessTransportInfrastructure.Receivers[MainReceiverId];
var errorQueueAddress = receivers.Length > 0
? baseTransportInfrastructure.ToTransportAddress(new QueueAddress(receivers[0].ErrorQueue)) // when using NSB, all receivers share the same error queue
: null;
serverlessTransportInfrastructure = new ServerlessTransportInfrastructure(baseTransportInfrastructure, errorQueueAddress);

return serverlessTransportInfrastructure;
}

public ServerlessTransportInfrastructure GetTransportInfrastructure(IEndpointInstance _) =>
// IEndpointInstance is only required to guarantee that GetTransportInfrastructure can't be called before NServiceBus called Initialize.
serverlessTransportInfrastructure;

#pragma warning disable CS0672 // Member overrides obsolete member
#pragma warning disable CS0618 // Type or member is obsolete

Expand Down
Loading

0 comments on commit f3f9586

Please sign in to comment.