diff --git a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/AwsLambdaSQSEndpointTestBase.cs b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/AwsLambdaSQSEndpointTestBase.cs index 6bf35a9..f2c1cb3 100644 --- a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/AwsLambdaSQSEndpointTestBase.cs +++ b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/AwsLambdaSQSEndpointTestBase.cs @@ -22,24 +22,24 @@ class AwsLambdaSQSEndpointTestBase protected const string DelayedDeliveryQueueSuffix = "-delay.fifo"; const int QueueDelayInSeconds = 900; // 15 * 60 - protected string QueueName { get; set; } + protected string QueueName { get; private set; } - protected string DelayQueueName { get; set; } + protected string DelayQueueName { get; private set; } - protected string ErrorQueueName { get; set; } + protected string ErrorQueueName { get; private set; } protected string QueueNamePrefix { get; set; } protected string BucketName { get; } = Environment.GetEnvironmentVariable("NSERVICEBUS_AMAZONSQS_S3BUCKET"); - protected string KeyPrefix { get; set; } + protected string KeyPrefix { get; private set; } [SetUp] - public async Task Setup() + public virtual async Task Setup() { queueNames = new List(); - QueueNamePrefix = Path.GetFileNameWithoutExtension(Path.GetRandomFileName()).ToLowerInvariant(); + QueueNamePrefix ??= Path.GetFileNameWithoutExtension(Path.GetRandomFileName()).ToLowerInvariant(); QueueName = $"{QueueNamePrefix}testqueue"; sqsClient = CreateSQSClient(); @@ -78,6 +78,9 @@ public async Task Setup() [TearDown] public async Task TearDown() { + // clear queue name prefix after each test, otherwise classes with multiple tests in the same class will run into issues + QueueNamePrefix = null; + var queueUrls = queueNames.Select(name => sqsClient.GetQueueUrlAsync(name)); await Task.WhenAll(queueUrls); var queueDeletions = queueUrls.Select(x => x.Result.QueueUrl).Select(url => sqsClient.DeleteQueueAsync(url)); @@ -106,7 +109,7 @@ protected void RegisterQueueNameToCleanup(string queueName) queueNames.Add(queueName); } - protected async Task GenerateAndReceiveSQSEvent(int count) where T : new() + protected async Task GenerateAndReceiveSQSEvent(int count = 1) where T : new() { var endpointConfiguration = new EndpointConfiguration($"{QueueNamePrefix}sender"); endpointConfiguration.SendOnly(); diff --git a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_configuring_queue_name_prefix.cs b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_configuring_queue_name_prefix.cs new file mode 100644 index 0000000..63e71b5 --- /dev/null +++ b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_configuring_queue_name_prefix.cs @@ -0,0 +1,70 @@ +namespace NServiceBus.AcceptanceTests +{ + using Microsoft.Extensions.DependencyInjection; + using NUnit.Framework; + using System.IO; + using System.Threading.Tasks; + + class When_configuring_queue_name_prefix : AwsLambdaSQSEndpointTestBase + { + public string Prefix { get; } = "test-"; + + public override Task Setup() + { + QueueNamePrefix = Prefix + Path.GetFileNameWithoutExtension(Path.GetRandomFileName()).ToLowerInvariant(); + return base.Setup(); + } + + [Test] + public async Task The_message_should_be_received_from_prefixed_queue() + { + // the prefix will be configured using the transport's prefix configuration therefore we remove it for the endpoint name + var endpointName = QueueName.Substring(Prefix.Length); + + var receivedMessages = await GenerateAndReceiveSQSEvent(); + + var context = new TestContext(); + + var endpoint = new AwsLambdaSQSEndpoint(ctx => + { + var configuration = new AwsLambdaSQSEndpointConfiguration(endpointName, CreateSQSClient(), CreateSNSClient()); + + configuration.Transport.QueueNamePrefix = Prefix; + + var advanced = configuration.AdvancedConfiguration; + advanced.RegisterComponents(c => c.AddSingleton(typeof(TestContext), context)); + + // SQS will add the specified queue prefix to the configured error queue + advanced.SendFailedMessagesTo(ErrorQueueName.Substring(Prefix.Length)); + + return configuration; + }); + + await endpoint.Process(receivedMessages, null); + + Assert.IsTrue(context.MessageHandled); + } + + public class TestContext + { + public bool MessageHandled { get; set; } + } + + public class SentMessage : ICommand + { + } + + public class MessageHandlerThatReceivesSentMessage : IHandleMessages + { + public MessageHandlerThatReceivesSentMessage(TestContext context) => testContext = context; + + public Task Handle(SentMessage message, IMessageHandlerContext context) + { + testContext.MessageHandled = true; + return Task.CompletedTask; + } + + readonly TestContext testContext; + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails_using_queue_prefix.cs b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails_using_queue_prefix.cs new file mode 100644 index 0000000..d791812 --- /dev/null +++ b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails_using_queue_prefix.cs @@ -0,0 +1,68 @@ +namespace NServiceBus.AcceptanceTests; + +using System; +using Microsoft.Extensions.DependencyInjection; +using NUnit.Framework; +using System.IO; +using System.Linq; +using System.Text.Json; +using System.Threading.Tasks; + +class When_message_fails_using_queue_prefix : AwsLambdaSQSEndpointTestBase +{ + public string Prefix { get; } = "test-"; + + public override Task Setup() + { + QueueNamePrefix = Prefix + Path.GetFileNameWithoutExtension(Path.GetRandomFileName()).ToLowerInvariant(); + return base.Setup(); + } + + [Test] + public async Task Should_move_message_to_prefixed_error_queue() + { + // the prefix will be configured using the transport's prefix configuration therefore we remove it for the endpoint name + var endpointName = QueueName.Substring(Prefix.Length); + + var receivedMessages = await GenerateAndReceiveSQSEvent(); + + var context = new TestContext(); + + var endpoint = new AwsLambdaSQSEndpoint(ctx => + { + var configuration = new AwsLambdaSQSEndpointConfiguration(endpointName, CreateSQSClient(), CreateSNSClient()); + configuration.Transport.QueueNamePrefix = Prefix; + + var advanced = configuration.AdvancedConfiguration; + advanced.RegisterComponents(c => c.AddSingleton(typeof(TestContext), context)); + advanced.Recoverability().Immediate(i => i.NumberOfRetries(0)); + + advanced.SendFailedMessagesTo(ErrorQueueName.Substring(Prefix.Length)); + + return configuration; + }); + + Assert.DoesNotThrowAsync(() => endpoint.Process(receivedMessages, null), "message should be moved to the error queue instead"); + + var errorMessages = await RetrieveMessagesInErrorQueue(); + Assert.AreEqual(1, errorMessages.Records.Count); + JsonDocument errorMessage = JsonSerializer.Deserialize(errorMessages.Records.First().Body); + var errorMessageHeader = errorMessage.RootElement.GetProperty("Headers"); + Assert.AreEqual("simulated exception", errorMessageHeader.GetProperty("NServiceBus.ExceptionInfo.Message").GetString()); + Assert.AreEqual(endpointName, errorMessageHeader.GetProperty("NServiceBus.ProcessingEndpoint").GetString()); + StringAssert.EndsWith(QueueName, errorMessageHeader.GetProperty("NServiceBus.FailedQ").GetString()); + } + + public class TestContext + { + } + + public class TestMessage : ICommand + { + } + + public class FailingMessageHandler : IHandleMessages + { + public Task Handle(TestMessage message, IMessageHandlerContext context) => throw new Exception("simulated exception"); + } +} \ No newline at end of file diff --git a/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs b/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs index 1fb2460..02008ea 100644 --- a/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs +++ b/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs @@ -13,7 +13,6 @@ using Amazon.SQS.Model; using AwsLambda.SQS; using AwsLambda.SQS.TransportWrapper; - using Configuration.AdvancedExtensibility; using Extensibility; using Logging; using Transport; @@ -42,7 +41,6 @@ await InitializeEndpointIfNecessary(lambdaContext, cancellationToken) .ConfigureAwait(false); var processTasks = new List(); - foreach (var receivedMessage in @event.Records) { processTasks.Add(ProcessMessage(receivedMessage, lambdaContext, cancellationToken)); @@ -62,15 +60,8 @@ await semaphoreLock.WaitAsync(token) { if (pipeline == null) { - var configuration = configurationFactory(executionContext); - - var serverlessTransport = await Initialize(configuration) - .ConfigureAwait(false); - - endpoint = await Endpoint.Start(configuration.EndpointConfiguration, token) - .ConfigureAwait(false); - - pipeline = serverlessTransport.PipelineInvoker; + var transportInfrastructure = await Initialize(executionContext, token).ConfigureAwait(false); + pipeline = transportInfrastructure.PipelineInvoker; } } finally @@ -80,6 +71,26 @@ await semaphoreLock.WaitAsync(token) } } + async Task Initialize(ILambdaContext executionContext, CancellationToken token) + { + var configuration = configurationFactory(executionContext); + + sqsClient = configuration.Transport.SqsClient; + s3Settings = configuration.Transport.S3; + + var serverlessTransport = new ServerlessTransport(configuration.Transport); + configuration.EndpointConfiguration.UseTransport(serverlessTransport); + + endpoint = await Endpoint.Start(configuration.EndpointConfiguration, token).ConfigureAwait(false); + + var transportInfrastructure = serverlessTransport.GetTransportInfrastructure(endpoint); + + queueUrl = await GetQueueUrl(transportInfrastructure.PipelineInvoker.ReceiveAddress).ConfigureAwait(false); + errorQueueUrl = await GetQueueUrl(transportInfrastructure.ErrorQueueAddress).ConfigureAwait(false); + + return transportInfrastructure; + } + /// public async Task Send(object message, SendOptions options, ILambdaContext lambdaContext) { @@ -192,35 +203,16 @@ await endpoint.Unsubscribe(eventType) .ConfigureAwait(false); } - async Task Initialize(AwsLambdaSQSEndpointConfiguration configuration) - { - var settingsHolder = configuration.AdvancedConfiguration.GetSettings(); - - sqsClient = configuration.Transport.SqsClient; - - queueUrl = await GetQueueUrl(settingsHolder.EndpointName()) - .ConfigureAwait(false); - errorQueueUrl = await GetQueueUrl(settingsHolder.ErrorQueueAddress()) - .ConfigureAwait(false); - - s3Settings = configuration.Transport.S3; - - var serverlessTransport = new ServerlessTransport(configuration.Transport); - configuration.EndpointConfiguration.UseTransport(serverlessTransport); - - return serverlessTransport; - } async Task GetQueueUrl(string queueName) { - var sanitizedQueueName = QueueNameHelper.GetSanitizedQueueName(queueName); try { - return (await sqsClient.GetQueueUrlAsync(sanitizedQueueName).ConfigureAwait(false)).QueueUrl; + return (await sqsClient.GetQueueUrlAsync(queueName).ConfigureAwait(false)).QueueUrl; } catch (Exception e) { - Logger.Error($"Failed to obtain the queue URL for queue {sanitizedQueueName} (derived from configured name {queueName}).", e); + Logger.Error($"Failed to obtain the queue URL for queue {queueName}.", e); throw; } } diff --git a/src/NServiceBus.AwsLambda.SQS/QueueNameHelper.cs b/src/NServiceBus.AwsLambda.SQS/QueueNameHelper.cs deleted file mode 100644 index 1d189ea..0000000 --- a/src/NServiceBus.AwsLambda.SQS/QueueNameHelper.cs +++ /dev/null @@ -1,34 +0,0 @@ -namespace NServiceBus.AwsLambda.SQS -{ - using System; - using System.Text; - - static class QueueNameHelper - { - public static string GetSanitizedQueueName(string queueName) - { - if (queueName.Length > 80) - { - throw new Exception($"Address {queueName} is longer than 80 characters and therefore cannot be used to create an SQS queue. Use a shorter queue name."); - } - - var queueNameBuilder = new StringBuilder(queueName); - - var skipCharacters = queueName.EndsWith(".fifo") ? 5 : 0; - // SQS queue names can only have alphanumeric characters, hyphens and underscores. - // Any other characters will be replaced with a hyphen. - for (var i = 0; i < queueNameBuilder.Length - skipCharacters; ++i) - { - var c = queueNameBuilder[i]; - if (!char.IsLetterOrDigit(c) - && c != '-' - && c != '_') - { - queueNameBuilder[i] = '-'; - } - } - - return queueNameBuilder.ToString(); - } - } -} \ No newline at end of file diff --git a/src/NServiceBus.AwsLambda.SQS/TransportWrapper/ServerlessTransport.cs b/src/NServiceBus.AwsLambda.SQS/TransportWrapper/ServerlessTransport.cs index d347c02..6c7f08f 100644 --- a/src/NServiceBus.AwsLambda.SQS/TransportWrapper/ServerlessTransport.cs +++ b/src/NServiceBus.AwsLambda.SQS/TransportWrapper/ServerlessTransport.cs @@ -1,5 +1,6 @@ namespace NServiceBus.AwsLambda.SQS.TransportWrapper { + using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -8,8 +9,7 @@ sealed class ServerlessTransport : TransportDefinition { - // HINT: This constant is defined in NServiceBus but is not exposed - const string MainReceiverId = "Main"; + ServerlessTransportInfrastructure serverlessTransportInfrastructure; public ServerlessTransport(TransportDefinition baseTransport) : base(baseTransport.TransportTransactionMode, baseTransport.SupportsDelayedDelivery, baseTransport.SupportsPublishSubscribe, baseTransport.SupportsTTBR) => @@ -17,20 +17,28 @@ public ServerlessTransport(TransportDefinition baseTransport) public TransportDefinition BaseTransport { get; } - public PipelineInvoker PipelineInvoker { get; private set; } - public override async Task Initialize(HostSettings hostSettings, ReceiveSettings[] receivers, string[] sendingAddresses, CancellationToken cancellationToken = default) { + if (receivers.Length == 0) + { + throw new Exception( + "SendOnly endpoints are not supported in this version. Upgrade to a newer version of the NServiceBus.AwsLambda.SQS package to use SendOnly endpoints"); + } + var baseTransportInfrastructure = await BaseTransport.Initialize(hostSettings, receivers, sendingAddresses, cancellationToken) .ConfigureAwait(false); + var errorQueueAddress = baseTransportInfrastructure.ToTransportAddress(new QueueAddress(receivers[0].ErrorQueue)); - var serverlessTransportInfrastructure = new ServerlessTransportInfrastructure(baseTransportInfrastructure); - PipelineInvoker = (PipelineInvoker)serverlessTransportInfrastructure.Receivers[MainReceiverId]; + serverlessTransportInfrastructure = new ServerlessTransportInfrastructure(baseTransportInfrastructure, errorQueueAddress); return serverlessTransportInfrastructure; } + public ServerlessTransportInfrastructure GetTransportInfrastructure(IEndpointInstance _) => + // IEndpointInstance is only required to guarantee that GetTransportInfrastructure can't be called before NServiceBus called Initialize. + serverlessTransportInfrastructure; + #pragma warning disable CS0672 // Member overrides obsolete member #pragma warning disable CS0618 // Type or member is obsolete diff --git a/src/NServiceBus.AwsLambda.SQS/TransportWrapper/ServerlessTransportInfrastructure.cs b/src/NServiceBus.AwsLambda.SQS/TransportWrapper/ServerlessTransportInfrastructure.cs index 131db87..26292e8 100644 --- a/src/NServiceBus.AwsLambda.SQS/TransportWrapper/ServerlessTransportInfrastructure.cs +++ b/src/NServiceBus.AwsLambda.SQS/TransportWrapper/ServerlessTransportInfrastructure.cs @@ -7,11 +7,25 @@ sealed class ServerlessTransportInfrastructure : TransportInfrastructure { - public ServerlessTransportInfrastructure(TransportInfrastructure baseTransportInfrastructure) + // HINT: This constant is defined in NServiceBus but is not exposed + const string MainReceiverId = "Main"; + + public PipelineInvoker PipelineInvoker { get; private set; } + + public string ErrorQueueAddress { get; set; } + + public bool IsSendOnly { get; private set; } + + public ServerlessTransportInfrastructure(TransportInfrastructure baseTransportInfrastructure, string errorQueueAddress) { this.baseTransportInfrastructure = baseTransportInfrastructure; Dispatcher = baseTransportInfrastructure.Dispatcher; Receivers = baseTransportInfrastructure.Receivers.ToDictionary(r => r.Key, r => (IMessageReceiver)new PipelineInvoker(r.Value)); + + IsSendOnly = Receivers.Count == 0; + PipelineInvoker = (PipelineInvoker)Receivers[MainReceiverId]; + + ErrorQueueAddress = errorQueueAddress; } public override Task Shutdown(CancellationToken cancellationToken = default) => baseTransportInfrastructure.Shutdown(cancellationToken);