From 16c9c1123b4fc9c09b61cc15b4ff5b797beb3922 Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Fri, 13 Oct 2023 12:37:44 +0200 Subject: [PATCH] Provide correct queue address to MessageContext (#314) Co-authored-by: Szymon Pobiega # Conflicts: # src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs --- .../When_message_fails.cs | 89 +++++++++++++++++++ .../When_message_fails_using_queue_prefix.cs | 2 +- .../AwsLambdaEndpoint.cs | 20 +++-- 3 files changed, 101 insertions(+), 10 deletions(-) create mode 100644 src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails.cs diff --git a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails.cs b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails.cs new file mode 100644 index 0000000..c8d202f --- /dev/null +++ b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails.cs @@ -0,0 +1,89 @@ +namespace NServiceBus.AcceptanceTests; + +using Microsoft.Extensions.DependencyInjection; +using NUnit.Framework; +using System.Text.Json; +using System.Threading.Tasks; +using System; +using System.Linq; + +class When_message_fails : AwsLambdaSQSEndpointTestBase +{ + [Test] + public async Task Should_move_message_to_error_queue() + { + // the prefix will be configured using the transport's prefix configuration therefore we remove it for the endpoint name + var endpointName = QueueName; + + var receivedMessages = await GenerateAndReceiveSQSEvent(); + + var context = new TestContext(); + + var endpoint = new AwsLambdaSQSEndpoint(ctx => + { + var configuration = new AwsLambdaSQSEndpointConfiguration(endpointName, CreateSQSClient(), CreateSNSClient()); + + var advanced = configuration.AdvancedConfiguration; + advanced.RegisterComponents(c => c.AddSingleton(typeof(TestContext), context)); + advanced.Recoverability().Immediate(i => i.NumberOfRetries(0)); + + advanced.SendFailedMessagesTo(ErrorQueueName); + + return configuration; + }); + + Assert.DoesNotThrowAsync(() => endpoint.Process(receivedMessages, null), "message should be moved to the error queue instead"); + + var errorMessages = await RetrieveMessagesInErrorQueue(); + Assert.AreEqual(1, errorMessages.Records.Count); + JsonDocument errorMessage = JsonSerializer.Deserialize(errorMessages.Records.First().Body); + var errorMessageHeader = errorMessage.RootElement.GetProperty("Headers"); + Assert.AreEqual("simulated exception", errorMessageHeader.GetProperty("NServiceBus.ExceptionInfo.Message").GetString()); + Assert.AreEqual(endpointName, errorMessageHeader.GetProperty("NServiceBus.ProcessingEndpoint").GetString()); + Assert.AreEqual(QueueName, errorMessageHeader.GetProperty("NServiceBus.FailedQ").GetString()); + } + + [Test] + public async Task Should_rethrow_when_disabling_error_queue() + { + // the prefix will be configured using the transport's prefix configuration therefore we remove it for the endpoint name + var endpointName = QueueName; + + var receivedMessages = await GenerateAndReceiveSQSEvent(); + + var context = new TestContext(); + + var endpoint = new AwsLambdaSQSEndpoint(ctx => + { + var configuration = new AwsLambdaSQSEndpointConfiguration(endpointName, CreateSQSClient(), CreateSNSClient()); + + configuration.DoNotSendMessagesToErrorQueue(); + + var advanced = configuration.AdvancedConfiguration; + advanced.RegisterComponents(c => c.AddSingleton(typeof(TestContext), context)); + advanced.Recoverability().Immediate(i => i.NumberOfRetries(0)); + advanced.SendFailedMessagesTo(ErrorQueueName); + + return configuration; + }); + + var exception = Assert.ThrowsAsync(() => endpoint.Process(receivedMessages, null)); + + StringAssert.Contains("Failed to process message", exception.Message); + Assert.AreEqual("simulated exception", exception.InnerException.Message); + Assert.AreEqual(0, await CountMessagesInErrorQueue()); + } + + public class TestContext + { + } + + public class TestMessage : ICommand + { + } + + public class FailingMessageHandler : IHandleMessages + { + public Task Handle(TestMessage message, IMessageHandlerContext context) => throw new Exception("simulated exception"); + } +} \ No newline at end of file diff --git a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails_using_queue_prefix.cs b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails_using_queue_prefix.cs index d791812..a5a9195 100644 --- a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails_using_queue_prefix.cs +++ b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails_using_queue_prefix.cs @@ -50,7 +50,7 @@ public async Task Should_move_message_to_prefixed_error_queue() var errorMessageHeader = errorMessage.RootElement.GetProperty("Headers"); Assert.AreEqual("simulated exception", errorMessageHeader.GetProperty("NServiceBus.ExceptionInfo.Message").GetString()); Assert.AreEqual(endpointName, errorMessageHeader.GetProperty("NServiceBus.ProcessingEndpoint").GetString()); - StringAssert.EndsWith(QueueName, errorMessageHeader.GetProperty("NServiceBus.FailedQ").GetString()); + Assert.AreEqual(QueueName, errorMessageHeader.GetProperty("NServiceBus.FailedQ").GetString()); } public class TestContext diff --git a/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs b/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs index 02008ea..b430809 100644 --- a/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs +++ b/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs @@ -85,7 +85,8 @@ async Task Initialize(ILambdaContext executio var transportInfrastructure = serverlessTransport.GetTransportInfrastructure(endpoint); - queueUrl = await GetQueueUrl(transportInfrastructure.PipelineInvoker.ReceiveAddress).ConfigureAwait(false); + receiveQueueAddress = transportInfrastructure.PipelineInvoker.ReceiveAddress; + receiveQueueUrl = await GetQueueUrl(receiveQueueAddress).ConfigureAwait(false); errorQueueUrl = await GetQueueUrl(transportInfrastructure.ErrorQueueAddress).ConfigureAwait(false); return transportInfrastructure; @@ -382,7 +383,7 @@ async Task ProcessMessageWithInMemoryRetries(Dictionary headers, new Dictionary(headers), body, transportTransaction, - queueUrl, + receiveQueueAddress, context); await Process(messageContext, lambdaContext, token).ConfigureAwait(false); @@ -404,7 +405,7 @@ async Task ProcessMessageWithInMemoryRetries(Dictionary headers, body, transportTransaction, immediateProcessingAttempts, - queueUrl, + receiveQueueAddress, context); errorHandlerResult = await ProcessFailedMessage(errorContext, lambdaContext).ConfigureAwait(false); @@ -442,7 +443,7 @@ async Task DeleteMessageAndBodyIfRequired(Message message, string messageS3BodyK try { // should not be cancelled - await sqsClient.DeleteMessageAsync(queueUrl, message.ReceiptHandle, CancellationToken.None) + await sqsClient.DeleteMessageAsync(receiveQueueUrl, message.ReceiptHandle, CancellationToken.None) .ConfigureAwait(false); } catch (ReceiptHandleIsInvalidException ex) @@ -483,7 +484,7 @@ async Task MovePoisonMessageToErrorQueue(Message message, CancellationToken canc { await sqsClient.ChangeMessageVisibilityAsync(new ChangeMessageVisibilityRequest { - QueueUrl = queueUrl, + QueueUrl = receiveQueueUrl, ReceiptHandle = message.ReceiptHandle, VisibilityTimeout = 0 }, CancellationToken.None) @@ -491,7 +492,7 @@ async Task MovePoisonMessageToErrorQueue(Message message, CancellationToken canc } catch (Exception changeMessageVisibilityEx) { - Logger.Warn($"Error returning poison message back to input queue at url {queueUrl}. Poison message will become available at the input queue again after the visibility timeout expires.", changeMessageVisibilityEx); + Logger.Warn($"Error returning poison message back to input queue at url {receiveQueueUrl}. Poison message will become available at the input queue again after the visibility timeout expires.", changeMessageVisibilityEx); } throw; @@ -501,14 +502,14 @@ async Task MovePoisonMessageToErrorQueue(Message message, CancellationToken canc { await sqsClient.DeleteMessageAsync(new DeleteMessageRequest { - QueueUrl = queueUrl, + QueueUrl = receiveQueueUrl, ReceiptHandle = message.ReceiptHandle }, CancellationToken.None) .ConfigureAwait(false); } catch (Exception ex) { - Logger.Warn($"Error removing poison message from input queue {queueUrl}. This may cause duplicate poison messages in the error queue for this endpoint.", ex); + Logger.Warn($"Error removing poison message from input queue {receiveQueueAddress}. This may cause duplicate poison messages in the error queue for this endpoint.", ex); } // If there is a message body in S3, simply leave it there @@ -538,7 +539,8 @@ static void LogPoisonMessage(string messageId, Exception exception) IEndpointInstance endpoint; IAmazonSQS sqsClient; S3Settings s3Settings; - string queueUrl; + string receiveQueueAddress; + string receiveQueueUrl; string errorQueueUrl; static readonly ILog Logger = LogManager.GetLogger(typeof(AwsLambdaSQSEndpoint));