From 4d260709369a603c9e20d916425d48503a0e5d88 Mon Sep 17 00:00:00 2001 From: Tim Bussmann Date: Fri, 13 Oct 2023 14:48:08 +0200 Subject: [PATCH] Provide correct queue address to MessageContext (#314) (#318) Co-authored-by: Szymon Pobiega --- .../When_message_fails.cs | 89 +++++++++++++++++++ .../When_message_fails_using_queue_prefix.cs | 2 +- .../AwsLambdaEndpoint.cs | 20 +++-- 3 files changed, 101 insertions(+), 10 deletions(-) create mode 100644 src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails.cs diff --git a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails.cs b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails.cs new file mode 100644 index 0000000..c8d202f --- /dev/null +++ b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails.cs @@ -0,0 +1,89 @@ +namespace NServiceBus.AcceptanceTests; + +using Microsoft.Extensions.DependencyInjection; +using NUnit.Framework; +using System.Text.Json; +using System.Threading.Tasks; +using System; +using System.Linq; + +class When_message_fails : AwsLambdaSQSEndpointTestBase +{ + [Test] + public async Task Should_move_message_to_error_queue() + { + // the prefix will be configured using the transport's prefix configuration therefore we remove it for the endpoint name + var endpointName = QueueName; + + var receivedMessages = await GenerateAndReceiveSQSEvent(); + + var context = new TestContext(); + + var endpoint = new AwsLambdaSQSEndpoint(ctx => + { + var configuration = new AwsLambdaSQSEndpointConfiguration(endpointName, CreateSQSClient(), CreateSNSClient()); + + var advanced = configuration.AdvancedConfiguration; + advanced.RegisterComponents(c => c.AddSingleton(typeof(TestContext), context)); + advanced.Recoverability().Immediate(i => i.NumberOfRetries(0)); + + advanced.SendFailedMessagesTo(ErrorQueueName); + + return configuration; + }); + + Assert.DoesNotThrowAsync(() => endpoint.Process(receivedMessages, null), "message should be moved to the error queue instead"); + + var errorMessages = await RetrieveMessagesInErrorQueue(); + Assert.AreEqual(1, errorMessages.Records.Count); + JsonDocument errorMessage = JsonSerializer.Deserialize(errorMessages.Records.First().Body); + var errorMessageHeader = errorMessage.RootElement.GetProperty("Headers"); + Assert.AreEqual("simulated exception", errorMessageHeader.GetProperty("NServiceBus.ExceptionInfo.Message").GetString()); + Assert.AreEqual(endpointName, errorMessageHeader.GetProperty("NServiceBus.ProcessingEndpoint").GetString()); + Assert.AreEqual(QueueName, errorMessageHeader.GetProperty("NServiceBus.FailedQ").GetString()); + } + + [Test] + public async Task Should_rethrow_when_disabling_error_queue() + { + // the prefix will be configured using the transport's prefix configuration therefore we remove it for the endpoint name + var endpointName = QueueName; + + var receivedMessages = await GenerateAndReceiveSQSEvent(); + + var context = new TestContext(); + + var endpoint = new AwsLambdaSQSEndpoint(ctx => + { + var configuration = new AwsLambdaSQSEndpointConfiguration(endpointName, CreateSQSClient(), CreateSNSClient()); + + configuration.DoNotSendMessagesToErrorQueue(); + + var advanced = configuration.AdvancedConfiguration; + advanced.RegisterComponents(c => c.AddSingleton(typeof(TestContext), context)); + advanced.Recoverability().Immediate(i => i.NumberOfRetries(0)); + advanced.SendFailedMessagesTo(ErrorQueueName); + + return configuration; + }); + + var exception = Assert.ThrowsAsync(() => endpoint.Process(receivedMessages, null)); + + StringAssert.Contains("Failed to process message", exception.Message); + Assert.AreEqual("simulated exception", exception.InnerException.Message); + Assert.AreEqual(0, await CountMessagesInErrorQueue()); + } + + public class TestContext + { + } + + public class TestMessage : ICommand + { + } + + public class FailingMessageHandler : IHandleMessages + { + public Task Handle(TestMessage message, IMessageHandlerContext context) => throw new Exception("simulated exception"); + } +} \ No newline at end of file diff --git a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails_using_queue_prefix.cs b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails_using_queue_prefix.cs index d791812..a5a9195 100644 --- a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails_using_queue_prefix.cs +++ b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails_using_queue_prefix.cs @@ -50,7 +50,7 @@ public async Task Should_move_message_to_prefixed_error_queue() var errorMessageHeader = errorMessage.RootElement.GetProperty("Headers"); Assert.AreEqual("simulated exception", errorMessageHeader.GetProperty("NServiceBus.ExceptionInfo.Message").GetString()); Assert.AreEqual(endpointName, errorMessageHeader.GetProperty("NServiceBus.ProcessingEndpoint").GetString()); - StringAssert.EndsWith(QueueName, errorMessageHeader.GetProperty("NServiceBus.FailedQ").GetString()); + Assert.AreEqual(QueueName, errorMessageHeader.GetProperty("NServiceBus.FailedQ").GetString()); } public class TestContext diff --git a/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs b/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs index e9d5bf0..7101540 100644 --- a/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs +++ b/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs @@ -96,7 +96,8 @@ async Task Initialize(ILambdaContext executio if (!isSendOnly) { - queueUrl = await GetQueueUrl(transportInfrastructure.PipelineInvoker.ReceiveAddress).ConfigureAwait(false); + receiveQueueAddress = transportInfrastructure.PipelineInvoker.ReceiveAddress; + receiveQueueUrl = await GetQueueUrl(receiveQueueAddress).ConfigureAwait(false); errorQueueUrl = await GetQueueUrl(transportInfrastructure.ErrorQueueAddress).ConfigureAwait(false); } @@ -393,7 +394,7 @@ async Task ProcessMessageWithInMemoryRetries(Dictionary headers, new Dictionary(headers), body, transportTransaction, - queueUrl, + receiveQueueAddress, context); await Process(messageContext, lambdaContext, token).ConfigureAwait(false); @@ -415,7 +416,7 @@ async Task ProcessMessageWithInMemoryRetries(Dictionary headers, body, transportTransaction, immediateProcessingAttempts, - queueUrl, + receiveQueueAddress, context); errorHandlerResult = await ProcessFailedMessage(errorContext, lambdaContext).ConfigureAwait(false); @@ -453,7 +454,7 @@ async Task DeleteMessageAndBodyIfRequired(Message message, string messageS3BodyK try { // should not be cancelled - await sqsClient.DeleteMessageAsync(queueUrl, message.ReceiptHandle, CancellationToken.None) + await sqsClient.DeleteMessageAsync(receiveQueueUrl, message.ReceiptHandle, CancellationToken.None) .ConfigureAwait(false); } catch (ReceiptHandleIsInvalidException ex) @@ -494,7 +495,7 @@ await sqsClient.SendMessageAsync(new SendMessageRequest { await sqsClient.ChangeMessageVisibilityAsync(new ChangeMessageVisibilityRequest { - QueueUrl = queueUrl, + QueueUrl = receiveQueueUrl, ReceiptHandle = message.ReceiptHandle, VisibilityTimeout = 0 }, CancellationToken.None) @@ -502,7 +503,7 @@ await sqsClient.ChangeMessageVisibilityAsync(new ChangeMessageVisibilityRequest } catch (Exception changeMessageVisibilityEx) { - Logger.Warn($"Error returning poison message back to input queue at url {queueUrl}. Poison message will become available at the input queue again after the visibility timeout expires.", changeMessageVisibilityEx); + Logger.Warn($"Error returning poison message back to input queue at url {receiveQueueUrl}. Poison message will become available at the input queue again after the visibility timeout expires.", changeMessageVisibilityEx); } throw; @@ -512,14 +513,14 @@ await sqsClient.ChangeMessageVisibilityAsync(new ChangeMessageVisibilityRequest { await sqsClient.DeleteMessageAsync(new DeleteMessageRequest { - QueueUrl = queueUrl, + QueueUrl = receiveQueueUrl, ReceiptHandle = message.ReceiptHandle }, CancellationToken.None) .ConfigureAwait(false); } catch (Exception ex) { - Logger.Warn($"Error removing poison message from input queue {queueUrl}. This may cause duplicate poison messages in the error queue for this endpoint.", ex); + Logger.Warn($"Error removing poison message from input queue {receiveQueueAddress}. This may cause duplicate poison messages in the error queue for this endpoint.", ex); } // If there is a message body in S3, simply leave it there @@ -550,7 +551,8 @@ static void LogPoisonMessage(string messageId, Exception exception) IEndpointInstance endpoint; IAmazonSQS sqsClient; S3Settings s3Settings; - string queueUrl; + string receiveQueueAddress; + string receiveQueueUrl; string errorQueueUrl; static readonly ILog Logger = LogManager.GetLogger(typeof(AwsLambdaSQSEndpoint));