Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue Prefix setting is not being considered in the AwsLambdaSQSEndpoint #313

Merged
merged 2 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,24 @@ class AwsLambdaSQSEndpointTestBase
protected const string DelayedDeliveryQueueSuffix = "-delay.fifo";
const int QueueDelayInSeconds = 900; // 15 * 60

protected string QueueName { get; set; }
protected string QueueName { get; private set; }

protected string DelayQueueName { get; set; }
protected string DelayQueueName { get; private set; }

protected string ErrorQueueName { get; set; }
protected string ErrorQueueName { get; private set; }

protected string QueueNamePrefix { get; set; }

protected string BucketName { get; } = Environment.GetEnvironmentVariable("NSERVICEBUS_AMAZONSQS_S3BUCKET");
protected string KeyPrefix { get; set; }
protected string KeyPrefix { get; private set; }


[SetUp]
public async Task Setup()
public virtual async Task Setup()
{
queueNames = new List<string>();

QueueNamePrefix = Path.GetFileNameWithoutExtension(Path.GetRandomFileName()).ToLowerInvariant();
QueueNamePrefix ??= Path.GetFileNameWithoutExtension(Path.GetRandomFileName()).ToLowerInvariant();

QueueName = $"{QueueNamePrefix}testqueue";
sqsClient = CreateSQSClient();
Expand Down Expand Up @@ -78,6 +78,9 @@ public async Task Setup()
[TearDown]
public async Task TearDown()
{
// clear queue name prefix after each test, otherwise classes with multiple tests in the same class will run into issues
QueueNamePrefix = null;

var queueUrls = queueNames.Select(name => sqsClient.GetQueueUrlAsync(name));
await Task.WhenAll(queueUrls);
var queueDeletions = queueUrls.Select(x => x.Result.QueueUrl).Select(url => sqsClient.DeleteQueueAsync(url));
Expand Down Expand Up @@ -106,7 +109,7 @@ protected void RegisterQueueNameToCleanup(string queueName)
queueNames.Add(queueName);
}

protected async Task<SQSEvent> GenerateAndReceiveSQSEvent<T>(int count) where T : new()
protected async Task<SQSEvent> GenerateAndReceiveSQSEvent<T>(int count = 1) where T : new()
{
var endpointConfiguration = new EndpointConfiguration($"{QueueNamePrefix}sender");
endpointConfiguration.SendOnly();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
namespace NServiceBus.AcceptanceTests
{
using Microsoft.Extensions.DependencyInjection;
using NUnit.Framework;
using System.IO;
using System.Threading.Tasks;

class When_configuring_queue_name_prefix : AwsLambdaSQSEndpointTestBase
{
public string Prefix { get; } = "test-";

public override Task Setup()
{
QueueNamePrefix = Prefix + Path.GetFileNameWithoutExtension(Path.GetRandomFileName()).ToLowerInvariant();
return base.Setup();
}

[Test]
public async Task The_message_should_be_received_from_prefixed_queue()
{
// the prefix will be configured using the transport's prefix configuration therefore we remove it for the endpoint name
var endpointName = QueueName.Substring(Prefix.Length);

var receivedMessages = await GenerateAndReceiveSQSEvent<SentMessage>();

var context = new TestContext();

var endpoint = new AwsLambdaSQSEndpoint(ctx =>
{
var configuration = new AwsLambdaSQSEndpointConfiguration(endpointName, CreateSQSClient(), CreateSNSClient());

configuration.Transport.QueueNamePrefix = Prefix;

var advanced = configuration.AdvancedConfiguration;
advanced.RegisterComponents(c => c.AddSingleton(typeof(TestContext), context));

// SQS will add the specified queue prefix to the configured error queue
advanced.SendFailedMessagesTo(ErrorQueueName.Substring(Prefix.Length));

return configuration;
});

await endpoint.Process(receivedMessages, null);

Assert.IsTrue(context.MessageHandled);
}

public class TestContext
{
public bool MessageHandled { get; set; }
}

public class SentMessage : ICommand
{
}

public class MessageHandlerThatReceivesSentMessage : IHandleMessages<SentMessage>
{
public MessageHandlerThatReceivesSentMessage(TestContext context) => testContext = context;

public Task Handle(SentMessage message, IMessageHandlerContext context)
{
testContext.MessageHandled = true;
return Task.CompletedTask;
}

readonly TestContext testContext;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
namespace NServiceBus.AcceptanceTests;

using System;
using Microsoft.Extensions.DependencyInjection;
using NUnit.Framework;
using System.IO;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;

class When_message_fails_using_queue_prefix : AwsLambdaSQSEndpointTestBase
{
public string Prefix { get; } = "test-";

public override Task Setup()
{
QueueNamePrefix = Prefix + Path.GetFileNameWithoutExtension(Path.GetRandomFileName()).ToLowerInvariant();
return base.Setup();
}

[Test]
public async Task Should_move_message_to_prefixed_error_queue()
{
// the prefix will be configured using the transport's prefix configuration therefore we remove it for the endpoint name
var endpointName = QueueName.Substring(Prefix.Length);

var receivedMessages = await GenerateAndReceiveSQSEvent<TestMessage>();

var context = new TestContext();

var endpoint = new AwsLambdaSQSEndpoint(ctx =>
{
var configuration = new AwsLambdaSQSEndpointConfiguration(endpointName, CreateSQSClient(), CreateSNSClient());
configuration.Transport.QueueNamePrefix = Prefix;

var advanced = configuration.AdvancedConfiguration;
advanced.RegisterComponents(c => c.AddSingleton(typeof(TestContext), context));
advanced.Recoverability().Immediate(i => i.NumberOfRetries(0));

advanced.SendFailedMessagesTo(ErrorQueueName.Substring(Prefix.Length));

return configuration;
});

Assert.DoesNotThrowAsync(() => endpoint.Process(receivedMessages, null), "message should be moved to the error queue instead");

var errorMessages = await RetrieveMessagesInErrorQueue();
Assert.AreEqual(1, errorMessages.Records.Count);
JsonDocument errorMessage = JsonSerializer.Deserialize<JsonDocument>(errorMessages.Records.First().Body);
var errorMessageHeader = errorMessage.RootElement.GetProperty("Headers");
Assert.AreEqual("simulated exception", errorMessageHeader.GetProperty("NServiceBus.ExceptionInfo.Message").GetString());
Assert.AreEqual(endpointName, errorMessageHeader.GetProperty("NServiceBus.ProcessingEndpoint").GetString());
StringAssert.EndsWith(QueueName, errorMessageHeader.GetProperty("NServiceBus.FailedQ").GetString());
}

public class TestContext
{
}

public class TestMessage : ICommand
{
}

public class FailingMessageHandler : IHandleMessages<TestMessage>
{
public Task Handle(TestMessage message, IMessageHandlerContext context) => throw new Exception("simulated exception");
}
}
56 changes: 24 additions & 32 deletions src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
using Amazon.SQS.Model;
using AwsLambda.SQS;
using AwsLambda.SQS.TransportWrapper;
using Configuration.AdvancedExtensibility;
using Extensibility;
using Logging;
using Transport;
Expand Down Expand Up @@ -42,7 +41,6 @@ await InitializeEndpointIfNecessary(lambdaContext, cancellationToken)
.ConfigureAwait(false);

var processTasks = new List<Task>();

foreach (var receivedMessage in @event.Records)
{
processTasks.Add(ProcessMessage(receivedMessage, lambdaContext, cancellationToken));
Expand All @@ -62,15 +60,8 @@ await semaphoreLock.WaitAsync(token)
{
if (pipeline == null)
{
var configuration = configurationFactory(executionContext);

var serverlessTransport = await Initialize(configuration)
.ConfigureAwait(false);

endpoint = await Endpoint.Start(configuration.EndpointConfiguration, token)
.ConfigureAwait(false);

pipeline = serverlessTransport.PipelineInvoker;
var transportInfrastructure = await Initialize(executionContext, token).ConfigureAwait(false);
pipeline = transportInfrastructure.PipelineInvoker;
}
}
finally
Expand All @@ -80,6 +71,26 @@ await semaphoreLock.WaitAsync(token)
}
}

async Task<ServerlessTransportInfrastructure> Initialize(ILambdaContext executionContext, CancellationToken token)
{
var configuration = configurationFactory(executionContext);

sqsClient = configuration.Transport.SqsClient;
s3Settings = configuration.Transport.S3;

var serverlessTransport = new ServerlessTransport(configuration.Transport);
configuration.EndpointConfiguration.UseTransport(serverlessTransport);

endpoint = await Endpoint.Start(configuration.EndpointConfiguration, token).ConfigureAwait(false);

var transportInfrastructure = serverlessTransport.GetTransportInfrastructure(endpoint);

queueUrl = await GetQueueUrl(transportInfrastructure.PipelineInvoker.ReceiveAddress).ConfigureAwait(false);
errorQueueUrl = await GetQueueUrl(transportInfrastructure.ErrorQueueAddress).ConfigureAwait(false);

return transportInfrastructure;
}

/// <inheritdoc />
public async Task Send(object message, SendOptions options, ILambdaContext lambdaContext)
{
Expand Down Expand Up @@ -192,35 +203,16 @@ await endpoint.Unsubscribe(eventType)
.ConfigureAwait(false);
}

async Task<ServerlessTransport> Initialize(AwsLambdaSQSEndpointConfiguration configuration)
{
var settingsHolder = configuration.AdvancedConfiguration.GetSettings();

sqsClient = configuration.Transport.SqsClient;

queueUrl = await GetQueueUrl(settingsHolder.EndpointName())
.ConfigureAwait(false);
errorQueueUrl = await GetQueueUrl(settingsHolder.ErrorQueueAddress())
.ConfigureAwait(false);

s3Settings = configuration.Transport.S3;

var serverlessTransport = new ServerlessTransport(configuration.Transport);
configuration.EndpointConfiguration.UseTransport(serverlessTransport);

return serverlessTransport;
}

async Task<string> GetQueueUrl(string queueName)
{
var sanitizedQueueName = QueueNameHelper.GetSanitizedQueueName(queueName);
try
{
return (await sqsClient.GetQueueUrlAsync(sanitizedQueueName).ConfigureAwait(false)).QueueUrl;
return (await sqsClient.GetQueueUrlAsync(queueName).ConfigureAwait(false)).QueueUrl;
}
catch (Exception e)
{
Logger.Error($"Failed to obtain the queue URL for queue {sanitizedQueueName} (derived from configured name {queueName}).", e);
Logger.Error($"Failed to obtain the queue URL for queue {queueName}.", e);
throw;
}
}
Expand Down
34 changes: 0 additions & 34 deletions src/NServiceBus.AwsLambda.SQS/QueueNameHelper.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace NServiceBus.AwsLambda.SQS.TransportWrapper
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -8,29 +9,36 @@

sealed class ServerlessTransport : TransportDefinition
{
// HINT: This constant is defined in NServiceBus but is not exposed
const string MainReceiverId = "Main";
ServerlessTransportInfrastructure serverlessTransportInfrastructure;

public ServerlessTransport(TransportDefinition baseTransport)
: base(baseTransport.TransportTransactionMode, baseTransport.SupportsDelayedDelivery, baseTransport.SupportsPublishSubscribe, baseTransport.SupportsTTBR) =>
BaseTransport = baseTransport;

public TransportDefinition BaseTransport { get; }

public PipelineInvoker PipelineInvoker { get; private set; }

public override async Task<TransportInfrastructure> Initialize(HostSettings hostSettings, ReceiveSettings[] receivers, string[] sendingAddresses, CancellationToken cancellationToken = default)
{
if (receivers.Length == 0)
{
throw new Exception(
"SendOnly endpoints are not supported in this version. Upgrade to a newer version of the NServiceBus.AwsLambda.SQS package to use SendOnly endpoints");
}

var baseTransportInfrastructure = await BaseTransport.Initialize(hostSettings, receivers, sendingAddresses, cancellationToken)
.ConfigureAwait(false);
var errorQueueAddress = baseTransportInfrastructure.ToTransportAddress(new QueueAddress(receivers[0].ErrorQueue));

var serverlessTransportInfrastructure = new ServerlessTransportInfrastructure(baseTransportInfrastructure);
PipelineInvoker = (PipelineInvoker)serverlessTransportInfrastructure.Receivers[MainReceiverId];
serverlessTransportInfrastructure = new ServerlessTransportInfrastructure(baseTransportInfrastructure, errorQueueAddress);

return serverlessTransportInfrastructure;

}

public ServerlessTransportInfrastructure GetTransportInfrastructure(IEndpointInstance _) =>
// IEndpointInstance is only required to guarantee that GetTransportInfrastructure can't be called before NServiceBus called Initialize.
serverlessTransportInfrastructure;

#pragma warning disable CS0672 // Member overrides obsolete member
#pragma warning disable CS0618 // Type or member is obsolete

Expand Down