Skip to content

Commit

Permalink
Use Amazon.SQS.Model.Message model internally (#257)
Browse files Browse the repository at this point in the history
* Add tests to show lost attributes

* Change internals to use Message type from SQS library

* Make native message available in handler context

* Move TransportTransaction into message loop for native message id

* Preserve all poison message attributes in error queue

* Use extension methods instead of MessageTypeAdapter

* Added too many if statements to check for null

* Add lambda native message model to the context

* Remove big list of if statements

* Update src/NServiceBus.AwsLambda.SQS.AcceptanceTests/NativeIntegration/When_receiving_a_native_message_with_encoding.cs

Co-authored-by: Tim Bussmann <timbussmann@users.noreply.github.com>

* Update src/NServiceBus.AwsLambda.SQS.AcceptanceTests/NativeIntegration/When_receiving_a_native_message_with_encoding.cs

Co-authored-by: Tim Bussmann <timbussmann@users.noreply.github.com>

* Remove extra parameter for message type

* Apply suggestions from code review

* Use null coalescing operator in SQSMessage map

* Fix bug in SQSMessage map

* Do not redundantly count messages in eerror queue

---------

Co-authored-by: Tim Bussmann <timbussmann@users.noreply.github.com>
Co-authored-by: Daniel Marbach <daniel.marbach@nservicebus.com>
  • Loading branch information
3 people committed Apr 21, 2023
1 parent ebe6844 commit 63a1356
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,21 @@ protected async Task<int> CountMessagesInErrorQueue()
return response.ApproximateNumberOfMessages;
}

protected async Task<SQSEvent> RetrieveMessagesInErrorQueue(int maxMessageCount = 10)
{
var receiveRequest = new ReceiveMessageRequest(createdErrorQueue.QueueUrl)
{
MaxNumberOfMessages = maxMessageCount,
WaitTimeSeconds = 20,
AttributeNames = new List<string> { "SentTimestamp" },
MessageAttributeNames = new List<string> { "*" }
};

var receivedMessages = await sqsClient.ReceiveMessageAsync(receiveRequest);

return receivedMessages.ToSQSEvent();
}

public static IAmazonSQS CreateSQSClient()
{
var credentials = new EnvironmentVariablesAWSCredentials();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace NServiceBus.AcceptanceTests.NativeIntegration
{
using System;
using System.Collections.Generic;
using System.Text.Json.Nodes;
using System.Threading.Tasks;
using System.Xml.Linq;
using Amazon.SQS.Model;
Expand All @@ -12,6 +13,7 @@ namespace NServiceBus.AcceptanceTests.NativeIntegration
class When_receiving_a_native_message_with_encoding : AwsLambdaSQSEndpointTestBase
{
static readonly string MessageToSend = new XDocument(new XElement("Message", new XElement("ThisIsTheMessage", "Hello!"))).ToString();
static readonly string FailingMessageToSend = new XDocument(new XElement("FailingMessage", new XElement("ThisIsTheMessage", "Hello!"))).ToString();

[Test]
public async Task Should_be_processed_when_messagetypefullname_present()
Expand Down Expand Up @@ -51,7 +53,6 @@ public async Task Should_fail_when_messagetypefullname_not_present()
var messageId = Guid.NewGuid();
var receivedMessages = await GenerateAndReceiveNativeSQSEvent(new Dictionary<string, MessageAttributeValue>
{
// unfortunately only the message id attribute is preserved when moving to the poison queue
{
Headers.MessageId, new MessageAttributeValue {DataType = "String", StringValue = messageId.ToString() }
}
Expand Down Expand Up @@ -79,6 +80,90 @@ public async Task Should_fail_when_messagetypefullname_not_present()
Assert.AreEqual(1, messagesInErrorQueueCount);
}

[Test]
public async Task Should_preserve_poison_message_attributes_in_error_queue()
{
var messageId = Guid.NewGuid();
var s3Key = Guid.NewGuid().ToString();

var receivedMessages = await GenerateAndReceiveNativeSQSEvent(new Dictionary<string, MessageAttributeValue>
{
{ Headers.MessageId, new MessageAttributeValue {DataType = "String", StringValue = messageId.ToString() }},
{"S3BodyKey", new MessageAttributeValue {DataType = "String", StringValue = s3Key}},
{"MessageTypeFullName", new MessageAttributeValue {DataType = "String", StringValue = typeof(Message).FullName}},
{"CustomAttribute", new MessageAttributeValue {DataType="String", StringValue = "TestAttribute" } },

}, "Invalid XML");

var context = new TestContext();

var endpoint = new AwsLambdaSQSEndpoint(ctx =>
{
var configuration = new AwsLambdaSQSEndpointConfiguration(QueueName, CreateSQSClient(), CreateSNSClient());
var transport = configuration.Transport;
transport.S3 = new S3Settings(BucketName, KeyPrefix, CreateS3Client());
var advanced = configuration.AdvancedConfiguration;
advanced.SendFailedMessagesTo(ErrorQueueName);
advanced.RegisterComponents(c => c.AddSingleton(typeof(TestContext), context));
return configuration;
});

await endpoint.Process(receivedMessages, null);
var poisonMessages = await RetrieveMessagesInErrorQueue();

Assert.AreEqual(1, poisonMessages.Records.Count);
var message = poisonMessages.Records[0];

Assert.IsNotNull(message);
Assert.That(message.MessageAttributes.ContainsKey(Headers.MessageId), "Message ID message attribute is missing.");
Assert.That(message.MessageAttributes.ContainsKey("S3BodyKey"), "S3BodyKey message attribute is missing.");
Assert.That(message.MessageAttributes.ContainsKey("MessageTypeFullName"), "MessageTypeFullName message attribute is missing.");
Assert.That(message.MessageAttributes.ContainsKey("CustomAttribute"), "CustomAttribute message attribute is missing.");
}

[Test]
public async Task Should_preserve_message_attributes_in_error_queue()
{
var messageId = Guid.NewGuid();
var messageType = typeof(FailingNativeMessage).FullName;

var receivedMessages = await GenerateAndReceiveNativeSQSEvent(new Dictionary<string, MessageAttributeValue>
{
{ Headers.MessageId, new MessageAttributeValue {DataType = "String", StringValue = messageId.ToString() }},
{"MessageTypeFullName", new MessageAttributeValue {DataType = "String", StringValue = messageType }},
{"CustomAttribute", new MessageAttributeValue {DataType="String", StringValue = "TestAttribute" } },

}, FailingMessageToSend);

var endpoint = new AwsLambdaSQSEndpoint(ctx =>
{
var configuration = new AwsLambdaSQSEndpointConfiguration(QueueName, CreateSQSClient(), CreateSNSClient());
var transport = configuration.Transport;
transport.S3 = new S3Settings(BucketName, KeyPrefix, CreateS3Client());
var advanced = configuration.AdvancedConfiguration;
advanced.SendFailedMessagesTo(ErrorQueueName);
advanced.Recoverability().Immediate(s => s.NumberOfRetries(0));
return configuration;
});

await endpoint.Process(receivedMessages, null);
var poisonMessages = await RetrieveMessagesInErrorQueue();
var message = poisonMessages.Records[0];

Assert.AreEqual(1, poisonMessages.Records.Count);
Assert.IsNotNull(message);

var messageNode = JsonNode.Parse(message.Body);

Assert.AreEqual(messageId.ToString(), messageNode["Headers"]["NServiceBus.MessageId"].GetValue<string>());
Assert.AreEqual(messageType, messageNode["Headers"]["NServiceBus.EnclosedMessageTypes"].GetValue<string>());
Assert.That(message.MessageAttributes.ContainsKey("CustomAttribute"), "CustomAttribute message attribute is missing.");
}

[Test]
public async Task Should_support_loading_body_from_s3()
{
Expand Down Expand Up @@ -126,6 +211,11 @@ public class Message : IMessage
public string ThisIsTheMessage { get; set; }
}

public class FailingMessage : IMessage
{
public string ThisIsTheMessage { get; set; }
}

public class WithEncodingHandler : IHandleMessages<Message>
{
public WithEncodingHandler(TestContext context) => testContext = context;
Expand All @@ -138,5 +228,13 @@ public Task Handle(Message message, IMessageHandlerContext context)

TestContext testContext;
}

public class FailingWithEncodingHandler : IHandleMessages<FailingMessage>
{
public Task Handle(FailingMessage message, IMessageHandlerContext context)
{
throw new Exception();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace NServiceBus.AcceptanceTests.NativeIntegration
class When_receiving_a_native_message_without_wrapper : AwsLambdaSQSEndpointTestBase
{
static readonly string MessageToSend = new XDocument(new XElement("NServiceBus.AcceptanceTests.NativeIntegration.NativeMessage", new XElement("ThisIsTheMessage", "Hello!"))).ToString();
static readonly string FailingMessageToSend = new XDocument(new XElement("NServiceBus.AcceptanceTests.NativeIntegration.FailingNativeMessage", new XElement("ThisIsTheMessage", "Hello!"))).ToString();

[Test]
public async Task Should_be_processed_when_nsbheaders_present_with_messageid()
Expand Down Expand Up @@ -132,6 +133,95 @@ public async Task Should_support_loading_body_from_s3()
Assert.AreEqual(0, messagesInErrorQueueCount);
}

[Test]
public async Task Should_preserve_poison_message_attributes_in_error_queue()
{
var s3Key = Guid.NewGuid().ToString();

var receivedMessages = await GenerateAndReceiveNativeSQSEvent(new Dictionary<string, MessageAttributeValue>
{
{
"NServiceBus.AmazonSQS.Headers",
new MessageAttributeValue
{
DataType = "String", StringValue = GetHeaders(messageId: Guid.NewGuid().ToString())
}
},
{"S3BodyKey", new MessageAttributeValue {DataType = "String", StringValue = s3Key}},
{"CustomAttribute", new MessageAttributeValue {DataType="String", StringValue = "TestAttribute" } },
}, "Bad XML", base64Encode: false);

var endpoint = new AwsLambdaSQSEndpoint(ctx =>
{
var configuration = new AwsLambdaSQSEndpointConfiguration(QueueName, CreateSQSClient(), CreateSNSClient());
var transport = configuration.Transport;
transport.S3 = new S3Settings(BucketName, KeyPrefix, CreateS3Client());
var advanced = configuration.AdvancedConfiguration;
advanced.SendFailedMessagesTo(ErrorQueueName);
return configuration;
});

await endpoint.Process(receivedMessages, null);

var messagesInErrorQueueCount = await CountMessagesInErrorQueue();

Assert.AreEqual(1, messagesInErrorQueueCount);

var errorMessages = await RetrieveMessagesInErrorQueue();

var message = errorMessages?.Records[0];

Assert.NotNull(message);
Assert.That(message.MessageAttributes.ContainsKey("NServiceBus.AmazonSQS.Headers"), $"NServiceBus.AmazonSQS.Headers message attribute is missing.");
Assert.That(message.MessageAttributes.ContainsKey("S3BodyKey"), "S3BodyKey message attribute is missing.");
Assert.That(message.MessageAttributes.ContainsKey("CustomAttribute"), "CustomAttribute message attribute is missing.");
}

[Test]
public async Task Should_preserve_message_attributes_in_error_queue()
{
var s3Key = Guid.NewGuid().ToString();

var receivedMessages = await GenerateAndReceiveNativeSQSEvent(new Dictionary<string, MessageAttributeValue>
{
{
"NServiceBus.AmazonSQS.Headers",
new MessageAttributeValue
{
DataType = "String", StringValue = GetHeaders(messageId: Guid.NewGuid().ToString())
}
},
{"CustomAttribute", new MessageAttributeValue {DataType="String", StringValue = "TestAttribute" } },
}, FailingMessageToSend, base64Encode: false);

var endpoint = new AwsLambdaSQSEndpoint(ctx =>
{
var configuration = new AwsLambdaSQSEndpointConfiguration(QueueName, CreateSQSClient(), CreateSNSClient());
var transport = configuration.Transport;
transport.S3 = new S3Settings(BucketName, KeyPrefix, CreateS3Client());
var advanced = configuration.AdvancedConfiguration;
advanced.Recoverability().Immediate(s => s.NumberOfRetries(0));
advanced.SendFailedMessagesTo(ErrorQueueName);
return configuration;
});

await endpoint.Process(receivedMessages, null);

var messagesInErrorQueueCount = await CountMessagesInErrorQueue();

Assert.AreEqual(1, messagesInErrorQueueCount);

var errorMessages = await RetrieveMessagesInErrorQueue();

var message = errorMessages?.Records[0];

Assert.That(message.MessageAttributes.ContainsKey("CustomAttribute"), "CustomAttribute message attribute is missing.");
}

string GetHeaders(string s3Key = null, string messageId = null)
{
var nsbHeaders = new Dictionary<string, string>();
Expand Down Expand Up @@ -166,10 +256,23 @@ public Task Handle(NativeMessage message, IMessageHandlerContext context)

readonly TestContext testContext;
}

public class FailingNativeHandler : IHandleMessages<FailingNativeMessage>
{
public Task Handle(FailingNativeMessage message, IMessageHandlerContext context)
{
throw new Exception();
}
}
}

public class NativeMessage : IMessage
{
public string ThisIsTheMessage { get; set; }
}

public class FailingNativeMessage : IMessage
{
public string ThisIsTheMessage { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
namespace NServiceBus.AcceptanceTests
{
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.SQSEvents;
using Microsoft.Extensions.DependencyInjection;
using NUnit.Framework;

using Message = Amazon.SQS.Model.Message;

class When_a_SQSEvent_is_processed : AwsLambdaSQSEndpointTestBase
{
[Test]
Expand Down Expand Up @@ -33,12 +37,41 @@ public async Task The_handlers_should_be_invoked_and_process_successfully()
Assert.AreEqual(0, messagesInErrorQueueCount);
}

[Test]
public async Task The_native_message_should_be_available_on_the_context()
{
var receivedMessages = await GenerateAndReceiveSQSEvent<SuccessMessage>(3);

var context = new TestContext();

var endpoint = new AwsLambdaSQSEndpoint(ctx =>
{
var configuration = new AwsLambdaSQSEndpointConfiguration(QueueName, CreateSQSClient(), CreateSNSClient());
var advanced = configuration.AdvancedConfiguration;
advanced.SendFailedMessagesTo(ErrorQueueName);
advanced.RegisterComponents(c => c.AddSingleton(typeof(TestContext), context));
return configuration;
});

await endpoint.Process(receivedMessages, null);

Assert.IsNotNull(context.NativeMessage, "SQS native message not found");
Assert.IsNotNull(context.LambdaNativeMessage, "Lambda native message not found");
Assert.That(receivedMessages.Records.Any(r => r.MessageId == context.NativeMessage.MessageId));
Assert.That(receivedMessages.Records.Any(r => r.MessageId == context.LambdaNativeMessage.MessageId));
}

public class TestContext
{
public int HandlerInvokationCount => count;

public void HandlerInvoked() => Interlocked.Increment(ref count);
int count;

public Message NativeMessage { get; set; }

public SQSEvent.SQSMessage LambdaNativeMessage { get; set; }
}

public class SuccessMessage : ICommand
Expand All @@ -54,6 +87,12 @@ public SuccessMessageHandler(TestContext context)

public Task Handle(SuccessMessage message, IMessageHandlerContext context)
{
var nativeMessage = context.Extensions.Get<Message>();
var lambdaNativeMessage = context.Extensions.Get<SQSEvent.SQSMessage>();

testContext.NativeMessage = nativeMessage;
testContext.LambdaNativeMessage = lambdaNativeMessage;

testContext.HandlerInvoked();
return Task.CompletedTask;
}
Expand Down

0 comments on commit 63a1356

Please sign in to comment.