From de8a494cbd3024c93fdd7a2a3827cffede94b5ce Mon Sep 17 00:00:00 2001 From: fennekin23 Date: Thu, 10 Apr 2025 21:23:30 +0300 Subject: [PATCH 1/4] feat: add support for optional SQS client region in event source test tool --- .../d0cfdbc8-e15b-49ae-b0b4-ec882dec15f2.json | 11 + .../SQSEventSourceBackgroundService.cs | 482 +++++++++--------- 2 files changed, 251 insertions(+), 242 deletions(-) create mode 100644 .autover/changes/d0cfdbc8-e15b-49ae-b0b4-ec882dec15f2.json diff --git a/.autover/changes/d0cfdbc8-e15b-49ae-b0b4-ec882dec15f2.json b/.autover/changes/d0cfdbc8-e15b-49ae-b0b4-ec882dec15f2.json new file mode 100644 index 000000000..f181a1185 --- /dev/null +++ b/.autover/changes/d0cfdbc8-e15b-49ae-b0b4-ec882dec15f2.json @@ -0,0 +1,11 @@ +{ + "Projects": [ + { + "Name": "Amazon.Lambda.TestTool", + "Type": "Minor", + "ChangelogMessages": [ + "Support optional SQS client region in event source test tool" + ] + } + ] +} diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundService.cs index 2125b6f9a..3f23c3597 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundService.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundService.cs @@ -1,242 +1,240 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -using Amazon.Lambda.Model; -using Amazon.Lambda.SQSEvents; -using Amazon.Runtime; -using Amazon.SQS.Model; -using Amazon.SQS; -using System.Text.Json; -using Amazon.Lambda.TestTool.Services; - -namespace Amazon.Lambda.TestTool.Processes.SQSEventSource; - -/// -/// IHostedService that will run continually polling the SQS queue for messages and invoking the connected -/// Lambda function with the polled messages. -/// -public class SQSEventSourceBackgroundService : BackgroundService -{ - private static readonly List DefaultAttributesToReceive = new List { "All" }; - private static readonly JsonSerializerOptions _jsonOptions = new JsonSerializerOptions - { - PropertyNamingPolicy = JsonNamingPolicy.CamelCase - }; - - private readonly ILogger _logger; - private readonly IAmazonSQS _sqsClient; - private readonly ILambdaClient _lambdaClient; - private readonly SQSEventSourceBackgroundServiceConfig _config; - - /// - /// Constructs instance of . - /// - /// The logger - /// The SQS client used to poll messages from a queue. - /// The config of the service - /// The Lambda client that can use a different endpoint for each invoke request. - public SQSEventSourceBackgroundService(ILogger logger, IAmazonSQS sqsClient, SQSEventSourceBackgroundServiceConfig config, ILambdaClient lambdaClient) - { - _logger = logger; - _sqsClient = sqsClient; - _config = config; - _lambdaClient = lambdaClient; - } - - private async Task GetQueueArn(CancellationToken stoppingToken) - { - var response = await _sqsClient.GetQueueAttributesAsync(new GetQueueAttributesRequest - { - QueueUrl = _config.QueueUrl, - AttributeNames = new List { "QueueArn" } - }, stoppingToken); - - return response.QueueARN; - } - - /// - /// Execute the SQSEventSourceBackgroundService. - /// - /// CancellationToken used to end the service. - /// Task for the background service. - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - // The queue arn is needed for creating the Lambda event. - var queueArn = await GetQueueArn(stoppingToken); - _logger.LogInformation("Starting polling for messages on SQS queue: {queueArn}", queueArn); - while (!stoppingToken.IsCancellationRequested) - { - try - { - _logger.LogDebug("Polling {queueUrl} for messages", _config.QueueUrl); - // Read a message from the queue using the ExternalCommands console application. - var response = await _sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest - { - QueueUrl = _config.QueueUrl, - WaitTimeSeconds = 20, - MessageAttributeNames = DefaultAttributesToReceive, - MessageSystemAttributeNames = DefaultAttributesToReceive, - MaxNumberOfMessages = _config.BatchSize, - VisibilityTimeout = _config.VisibilityTimeout, - }, stoppingToken); - - if (stoppingToken.IsCancellationRequested) - { - return; - } - if (response.Messages == null || response.Messages.Count == 0) - { - _logger.LogDebug("No messages received from while polling SQS"); - // Since there are no messages, sleep a bit to wait for messages to come. - await Task.Delay(200); - continue; - } - - var lambdaPayload = new - { - Records = ConvertToLambdaMessages(response.Messages, _sqsClient.Config.RegionEndpoint.SystemName, queueArn) - }; - - var invokeRequest = new InvokeRequest - { - InvocationType = InvocationType.RequestResponse, - FunctionName = _config.FunctionName, - Payload = JsonSerializer.Serialize(lambdaPayload, _jsonOptions) - }; - - _logger.LogInformation("Invoking Lambda function {functionName} function with {messageCount} messages", _config.FunctionName, lambdaPayload.Records.Count); - var lambdaResponse = await _lambdaClient.InvokeAsync(invokeRequest, _config.LambdaRuntimeApi); - - if (lambdaResponse.FunctionError != null) - { - _logger.LogError("Invoking Lambda {function} function with {messageCount} failed with error {errorMessage}", _config.FunctionName, response.Messages.Count, lambdaResponse.FunctionError); - continue; - } - - if (!_config.DisableMessageDelete) - { - List messagesToDelete; - if (lambdaResponse.Payload != null && lambdaResponse.Payload.Length > 0) - { - var partialResponse = JsonSerializer.Deserialize(lambdaResponse.Payload); - if (partialResponse == null) - { - lambdaResponse.Payload.Position = 0; - using var reader = new StreamReader(lambdaResponse.Payload); - var payloadString = reader.ReadToEnd(); - _logger.LogError("Failed to deserialize response from Lambda function into SQSBatchResponse. Response payload:\n{payload}", payloadString); - continue; - } - - if (partialResponse.BatchItemFailures == null || partialResponse.BatchItemFailures.Count == 0) - { - _logger.LogDebug("Partial SQS response received with no failures"); - messagesToDelete = response.Messages; - } - else - { - _logger.LogDebug("Partial SQS response received with {count} failures", partialResponse.BatchItemFailures.Count); - messagesToDelete = new List(); - foreach (var message in response.Messages) - { - if (!partialResponse.BatchItemFailures.Any(x => string.Equals(x.ItemIdentifier, message.MessageId))) - { - messagesToDelete.Add(message); - } - } - } - } - else - { - _logger.LogDebug("No partial response received. All messages eligible for deletion"); - messagesToDelete = response.Messages; - } - - if (messagesToDelete.Count > 0) - { - var deleteRequest = new DeleteMessageBatchRequest - { - QueueUrl = _config.QueueUrl, - Entries = messagesToDelete.Select(m => new DeleteMessageBatchRequestEntry { Id = m.MessageId, ReceiptHandle = m.ReceiptHandle }).ToList() - }; - - _logger.LogDebug("Deleting {messageCount} messages from queue", deleteRequest.Entries.Count); - await _sqsClient.DeleteMessageBatchAsync(deleteRequest, stoppingToken); - } - } - } - catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) - { - return; - } - catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested) - { - return; - } - catch (Exception e) - { - _logger.LogWarning(e, "Exception occurred in SQS poller for {queueUrl}: {message}", _config.QueueUrl, e.Message); - - // Add a delay before restarting loop in case the exception was a transient error that needs a little time to reset. - await Task.Delay(3000); - } - } - } - - /// - /// Convert from the SDK's list of messages to the Lambda event's SQS message type. - /// - /// List of messages using the SDK's .NET type - /// The aws region the messages came from. - /// The SQS queue arn the messages came from. - /// List of messages using the Lambda event's .NET type. - internal static List ConvertToLambdaMessages(List messages, string awsRegion, string queueArn) - { - return messages.Select(m => ConvertToLambdaMessage(m, awsRegion, queueArn)).ToList(); - } - - /// - /// Convert from the SDK's SQS message to the Lambda event's SQS message type. - /// - /// Message using the SDK's .NET type - /// The aws region the message came from. - /// The SQS queue arn the message came from. - /// Messages using the Lambda event's .NET type. - internal static SQSEvent.SQSMessage ConvertToLambdaMessage(Message message, string awsRegion, string queueArn) - { - var lambdaMessage = new SQSEvent.SQSMessage - { - AwsRegion = awsRegion, - Body = message.Body, - EventSource = "aws:sqs", - EventSourceArn = queueArn, - Md5OfBody = message.MD5OfBody, - Md5OfMessageAttributes = message.MD5OfMessageAttributes, - MessageId = message.MessageId, - ReceiptHandle = message.ReceiptHandle, - }; - - if (message.MessageAttributes != null && message.MessageAttributes.Count > 0) - { - lambdaMessage.MessageAttributes = new Dictionary(); - foreach (var kvp in message.MessageAttributes) - { - var lambdaAttribute = new SQSEvent.MessageAttribute - { - DataType = kvp.Value.DataType, - StringValue = kvp.Value.StringValue, - BinaryValue = kvp.Value.BinaryValue - }; - - lambdaMessage.MessageAttributes.Add(kvp.Key, lambdaAttribute); - } - } - - if (message.Attributes != null && message.Attributes.Count > 0) - { - lambdaMessage.Attributes = message.Attributes; - } - - return lambdaMessage; - } -} +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using Amazon.Lambda.Model; +using Amazon.Lambda.SQSEvents; +using Amazon.Runtime; +using Amazon.SQS.Model; +using Amazon.SQS; +using System.Text.Json; +using Amazon.Lambda.TestTool.Services; + +namespace Amazon.Lambda.TestTool.Processes.SQSEventSource; + +/// +/// IHostedService that will run continually polling the SQS queue for messages and invoking the connected +/// Lambda function with the polled messages. +/// +public class SQSEventSourceBackgroundService : BackgroundService +{ + private static readonly List DefaultAttributesToReceive = new List { "All" }; + private static readonly JsonSerializerOptions _jsonOptions = new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + + private readonly ILogger _logger; + private readonly IAmazonSQS _sqsClient; + private readonly ILambdaClient _lambdaClient; + private readonly SQSEventSourceBackgroundServiceConfig _config; + + /// + /// Constructs instance of . + /// + /// The logger + /// The SQS client used to poll messages from a queue. + /// The config of the service + /// The Lambda client that can use a different endpoint for each invoke request. + public SQSEventSourceBackgroundService(ILogger logger, IAmazonSQS sqsClient, SQSEventSourceBackgroundServiceConfig config, ILambdaClient lambdaClient) + { + _logger = logger; + _sqsClient = sqsClient; + _config = config; + _lambdaClient = lambdaClient; + } + + private async Task GetQueueArn(CancellationToken stoppingToken) + { + var response = await _sqsClient.GetQueueAttributesAsync(new GetQueueAttributesRequest + { + QueueUrl = _config.QueueUrl, + AttributeNames = new List { "QueueArn" } + }, stoppingToken); + + return Arn.Parse(response.QueueARN); + } + + /// + /// Execute the SQSEventSourceBackgroundService. + /// + /// CancellationToken used to end the service. + /// Task for the background service. + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + // The queue arn is needed for creating the Lambda event. + var queueArn = await GetQueueArn(stoppingToken); + _logger.LogInformation("Starting polling for messages on SQS queue: {queueArn}", queueArn); + while (!stoppingToken.IsCancellationRequested) + { + try + { + _logger.LogDebug("Polling {queueUrl} for messages", _config.QueueUrl); + // Read a message from the queue using the ExternalCommands console application. + var response = await _sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest + { + QueueUrl = _config.QueueUrl, + WaitTimeSeconds = 20, + MessageAttributeNames = DefaultAttributesToReceive, + MessageSystemAttributeNames = DefaultAttributesToReceive, + MaxNumberOfMessages = _config.BatchSize, + VisibilityTimeout = _config.VisibilityTimeout, + }, stoppingToken); + + if (stoppingToken.IsCancellationRequested) + { + return; + } + if (response.Messages == null || response.Messages.Count == 0) + { + _logger.LogDebug("No messages received from while polling SQS"); + // Since there are no messages, sleep a bit to wait for messages to come. + await Task.Delay(200); + continue; + } + + var lambdaPayload = new + { + Records = ConvertToLambdaMessages(response.Messages, queueArn) + }; + + var invokeRequest = new InvokeRequest + { + InvocationType = InvocationType.RequestResponse, + FunctionName = _config.FunctionName, + Payload = JsonSerializer.Serialize(lambdaPayload, _jsonOptions) + }; + + _logger.LogInformation("Invoking Lambda function {functionName} function with {messageCount} messages", _config.FunctionName, lambdaPayload.Records.Count); + var lambdaResponse = await _lambdaClient.InvokeAsync(invokeRequest, _config.LambdaRuntimeApi); + + if (lambdaResponse.FunctionError != null) + { + _logger.LogError("Invoking Lambda {function} function with {messageCount} failed with error {errorMessage}", _config.FunctionName, response.Messages.Count, lambdaResponse.FunctionError); + continue; + } + + if (!_config.DisableMessageDelete) + { + List messagesToDelete; + if (lambdaResponse.Payload != null && lambdaResponse.Payload.Length > 0) + { + var partialResponse = JsonSerializer.Deserialize(lambdaResponse.Payload); + if (partialResponse == null) + { + lambdaResponse.Payload.Position = 0; + using var reader = new StreamReader(lambdaResponse.Payload); + var payloadString = reader.ReadToEnd(); + _logger.LogError("Failed to deserialize response from Lambda function into SQSBatchResponse. Response payload:\n{payload}", payloadString); + continue; + } + + if (partialResponse.BatchItemFailures == null || partialResponse.BatchItemFailures.Count == 0) + { + _logger.LogDebug("Partial SQS response received with no failures"); + messagesToDelete = response.Messages; + } + else + { + _logger.LogDebug("Partial SQS response received with {count} failures", partialResponse.BatchItemFailures.Count); + messagesToDelete = new List(); + foreach (var message in response.Messages) + { + if (!partialResponse.BatchItemFailures.Any(x => string.Equals(x.ItemIdentifier, message.MessageId))) + { + messagesToDelete.Add(message); + } + } + } + } + else + { + _logger.LogDebug("No partial response received. All messages eligible for deletion"); + messagesToDelete = response.Messages; + } + + if (messagesToDelete.Count > 0) + { + var deleteRequest = new DeleteMessageBatchRequest + { + QueueUrl = _config.QueueUrl, + Entries = messagesToDelete.Select(m => new DeleteMessageBatchRequestEntry { Id = m.MessageId, ReceiptHandle = m.ReceiptHandle }).ToList() + }; + + _logger.LogDebug("Deleting {messageCount} messages from queue", deleteRequest.Entries.Count); + await _sqsClient.DeleteMessageBatchAsync(deleteRequest, stoppingToken); + } + } + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + return; + } + catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested) + { + return; + } + catch (Exception e) + { + _logger.LogWarning(e, "Exception occurred in SQS poller for {queueUrl}: {message}", _config.QueueUrl, e.Message); + + // Add a delay before restarting loop in case the exception was a transient error that needs a little time to reset. + await Task.Delay(3000); + } + } + } + + /// + /// Convert from the SDK's list of messages to the Lambda event's SQS message type. + /// + /// List of messages using the SDK's .NET type + /// The SQS queue arn the messages came from. + /// List of messages using the Lambda event's .NET type. + internal static List ConvertToLambdaMessages(List messages, Arn queueArn) + { + return messages.Select(m => ConvertToLambdaMessage(m, queueArn)).ToList(); + } + + /// + /// Convert from the SDK's SQS message to the Lambda event's SQS message type. + /// + /// Message using the SDK's .NET type + /// The SQS queue arn the message came from. + /// Messages using the Lambda event's .NET type. + internal static SQSEvent.SQSMessage ConvertToLambdaMessage(Message message, Arn queueArn) + { + var lambdaMessage = new SQSEvent.SQSMessage + { + AwsRegion = queueArn.Region, + Body = message.Body, + EventSource = "aws:sqs", + EventSourceArn = queueArn.ToString(), + Md5OfBody = message.MD5OfBody, + Md5OfMessageAttributes = message.MD5OfMessageAttributes, + MessageId = message.MessageId, + ReceiptHandle = message.ReceiptHandle, + }; + + if (message.MessageAttributes != null && message.MessageAttributes.Count > 0) + { + lambdaMessage.MessageAttributes = new Dictionary(); + foreach (var kvp in message.MessageAttributes) + { + var lambdaAttribute = new SQSEvent.MessageAttribute + { + DataType = kvp.Value.DataType, + StringValue = kvp.Value.StringValue, + BinaryValue = kvp.Value.BinaryValue + }; + + lambdaMessage.MessageAttributes.Add(kvp.Key, lambdaAttribute); + } + } + + if (message.Attributes != null && message.Attributes.Count > 0) + { + lambdaMessage.Attributes = message.Attributes; + } + + return lambdaMessage; + } +} From 2b81dac2458aad022997f9102ea2ad4e94b30631 Mon Sep 17 00:00:00 2001 From: fennekin23 Date: Thu, 10 Apr 2025 22:25:53 +0300 Subject: [PATCH 2/4] fix: fix line endings, set LF. --- .../SQSEventSourceBackgroundService.cs | 480 +++++++++--------- 1 file changed, 240 insertions(+), 240 deletions(-) diff --git a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundService.cs b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundService.cs index 3f23c3597..62e214ae8 100644 --- a/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundService.cs +++ b/Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundService.cs @@ -1,240 +1,240 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -using Amazon.Lambda.Model; -using Amazon.Lambda.SQSEvents; -using Amazon.Runtime; -using Amazon.SQS.Model; -using Amazon.SQS; -using System.Text.Json; -using Amazon.Lambda.TestTool.Services; - -namespace Amazon.Lambda.TestTool.Processes.SQSEventSource; - -/// -/// IHostedService that will run continually polling the SQS queue for messages and invoking the connected -/// Lambda function with the polled messages. -/// -public class SQSEventSourceBackgroundService : BackgroundService -{ - private static readonly List DefaultAttributesToReceive = new List { "All" }; - private static readonly JsonSerializerOptions _jsonOptions = new JsonSerializerOptions - { - PropertyNamingPolicy = JsonNamingPolicy.CamelCase - }; - - private readonly ILogger _logger; - private readonly IAmazonSQS _sqsClient; - private readonly ILambdaClient _lambdaClient; - private readonly SQSEventSourceBackgroundServiceConfig _config; - - /// - /// Constructs instance of . - /// - /// The logger - /// The SQS client used to poll messages from a queue. - /// The config of the service - /// The Lambda client that can use a different endpoint for each invoke request. - public SQSEventSourceBackgroundService(ILogger logger, IAmazonSQS sqsClient, SQSEventSourceBackgroundServiceConfig config, ILambdaClient lambdaClient) - { - _logger = logger; - _sqsClient = sqsClient; - _config = config; - _lambdaClient = lambdaClient; - } - - private async Task GetQueueArn(CancellationToken stoppingToken) - { - var response = await _sqsClient.GetQueueAttributesAsync(new GetQueueAttributesRequest - { - QueueUrl = _config.QueueUrl, - AttributeNames = new List { "QueueArn" } - }, stoppingToken); - - return Arn.Parse(response.QueueARN); - } - - /// - /// Execute the SQSEventSourceBackgroundService. - /// - /// CancellationToken used to end the service. - /// Task for the background service. - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - // The queue arn is needed for creating the Lambda event. - var queueArn = await GetQueueArn(stoppingToken); - _logger.LogInformation("Starting polling for messages on SQS queue: {queueArn}", queueArn); - while (!stoppingToken.IsCancellationRequested) - { - try - { - _logger.LogDebug("Polling {queueUrl} for messages", _config.QueueUrl); - // Read a message from the queue using the ExternalCommands console application. - var response = await _sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest - { - QueueUrl = _config.QueueUrl, - WaitTimeSeconds = 20, - MessageAttributeNames = DefaultAttributesToReceive, - MessageSystemAttributeNames = DefaultAttributesToReceive, - MaxNumberOfMessages = _config.BatchSize, - VisibilityTimeout = _config.VisibilityTimeout, - }, stoppingToken); - - if (stoppingToken.IsCancellationRequested) - { - return; - } - if (response.Messages == null || response.Messages.Count == 0) - { - _logger.LogDebug("No messages received from while polling SQS"); - // Since there are no messages, sleep a bit to wait for messages to come. - await Task.Delay(200); - continue; - } - - var lambdaPayload = new - { - Records = ConvertToLambdaMessages(response.Messages, queueArn) - }; - - var invokeRequest = new InvokeRequest - { - InvocationType = InvocationType.RequestResponse, - FunctionName = _config.FunctionName, - Payload = JsonSerializer.Serialize(lambdaPayload, _jsonOptions) - }; - - _logger.LogInformation("Invoking Lambda function {functionName} function with {messageCount} messages", _config.FunctionName, lambdaPayload.Records.Count); - var lambdaResponse = await _lambdaClient.InvokeAsync(invokeRequest, _config.LambdaRuntimeApi); - - if (lambdaResponse.FunctionError != null) - { - _logger.LogError("Invoking Lambda {function} function with {messageCount} failed with error {errorMessage}", _config.FunctionName, response.Messages.Count, lambdaResponse.FunctionError); - continue; - } - - if (!_config.DisableMessageDelete) - { - List messagesToDelete; - if (lambdaResponse.Payload != null && lambdaResponse.Payload.Length > 0) - { - var partialResponse = JsonSerializer.Deserialize(lambdaResponse.Payload); - if (partialResponse == null) - { - lambdaResponse.Payload.Position = 0; - using var reader = new StreamReader(lambdaResponse.Payload); - var payloadString = reader.ReadToEnd(); - _logger.LogError("Failed to deserialize response from Lambda function into SQSBatchResponse. Response payload:\n{payload}", payloadString); - continue; - } - - if (partialResponse.BatchItemFailures == null || partialResponse.BatchItemFailures.Count == 0) - { - _logger.LogDebug("Partial SQS response received with no failures"); - messagesToDelete = response.Messages; - } - else - { - _logger.LogDebug("Partial SQS response received with {count} failures", partialResponse.BatchItemFailures.Count); - messagesToDelete = new List(); - foreach (var message in response.Messages) - { - if (!partialResponse.BatchItemFailures.Any(x => string.Equals(x.ItemIdentifier, message.MessageId))) - { - messagesToDelete.Add(message); - } - } - } - } - else - { - _logger.LogDebug("No partial response received. All messages eligible for deletion"); - messagesToDelete = response.Messages; - } - - if (messagesToDelete.Count > 0) - { - var deleteRequest = new DeleteMessageBatchRequest - { - QueueUrl = _config.QueueUrl, - Entries = messagesToDelete.Select(m => new DeleteMessageBatchRequestEntry { Id = m.MessageId, ReceiptHandle = m.ReceiptHandle }).ToList() - }; - - _logger.LogDebug("Deleting {messageCount} messages from queue", deleteRequest.Entries.Count); - await _sqsClient.DeleteMessageBatchAsync(deleteRequest, stoppingToken); - } - } - } - catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) - { - return; - } - catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested) - { - return; - } - catch (Exception e) - { - _logger.LogWarning(e, "Exception occurred in SQS poller for {queueUrl}: {message}", _config.QueueUrl, e.Message); - - // Add a delay before restarting loop in case the exception was a transient error that needs a little time to reset. - await Task.Delay(3000); - } - } - } - - /// - /// Convert from the SDK's list of messages to the Lambda event's SQS message type. - /// - /// List of messages using the SDK's .NET type - /// The SQS queue arn the messages came from. - /// List of messages using the Lambda event's .NET type. - internal static List ConvertToLambdaMessages(List messages, Arn queueArn) - { - return messages.Select(m => ConvertToLambdaMessage(m, queueArn)).ToList(); - } - - /// - /// Convert from the SDK's SQS message to the Lambda event's SQS message type. - /// - /// Message using the SDK's .NET type - /// The SQS queue arn the message came from. - /// Messages using the Lambda event's .NET type. - internal static SQSEvent.SQSMessage ConvertToLambdaMessage(Message message, Arn queueArn) - { - var lambdaMessage = new SQSEvent.SQSMessage - { - AwsRegion = queueArn.Region, - Body = message.Body, - EventSource = "aws:sqs", - EventSourceArn = queueArn.ToString(), - Md5OfBody = message.MD5OfBody, - Md5OfMessageAttributes = message.MD5OfMessageAttributes, - MessageId = message.MessageId, - ReceiptHandle = message.ReceiptHandle, - }; - - if (message.MessageAttributes != null && message.MessageAttributes.Count > 0) - { - lambdaMessage.MessageAttributes = new Dictionary(); - foreach (var kvp in message.MessageAttributes) - { - var lambdaAttribute = new SQSEvent.MessageAttribute - { - DataType = kvp.Value.DataType, - StringValue = kvp.Value.StringValue, - BinaryValue = kvp.Value.BinaryValue - }; - - lambdaMessage.MessageAttributes.Add(kvp.Key, lambdaAttribute); - } - } - - if (message.Attributes != null && message.Attributes.Count > 0) - { - lambdaMessage.Attributes = message.Attributes; - } - - return lambdaMessage; - } -} +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using Amazon.Lambda.Model; +using Amazon.Lambda.SQSEvents; +using Amazon.Runtime; +using Amazon.SQS.Model; +using Amazon.SQS; +using System.Text.Json; +using Amazon.Lambda.TestTool.Services; + +namespace Amazon.Lambda.TestTool.Processes.SQSEventSource; + +/// +/// IHostedService that will run continually polling the SQS queue for messages and invoking the connected +/// Lambda function with the polled messages. +/// +public class SQSEventSourceBackgroundService : BackgroundService +{ + private static readonly List DefaultAttributesToReceive = new List { "All" }; + private static readonly JsonSerializerOptions _jsonOptions = new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + + private readonly ILogger _logger; + private readonly IAmazonSQS _sqsClient; + private readonly ILambdaClient _lambdaClient; + private readonly SQSEventSourceBackgroundServiceConfig _config; + + /// + /// Constructs instance of . + /// + /// The logger + /// The SQS client used to poll messages from a queue. + /// The config of the service + /// The Lambda client that can use a different endpoint for each invoke request. + public SQSEventSourceBackgroundService(ILogger logger, IAmazonSQS sqsClient, SQSEventSourceBackgroundServiceConfig config, ILambdaClient lambdaClient) + { + _logger = logger; + _sqsClient = sqsClient; + _config = config; + _lambdaClient = lambdaClient; + } + + private async Task GetQueueArn(CancellationToken stoppingToken) + { + var response = await _sqsClient.GetQueueAttributesAsync(new GetQueueAttributesRequest + { + QueueUrl = _config.QueueUrl, + AttributeNames = new List { "QueueArn" } + }, stoppingToken); + + return Arn.Parse(response.QueueARN); + } + + /// + /// Execute the SQSEventSourceBackgroundService. + /// + /// CancellationToken used to end the service. + /// Task for the background service. + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + // The queue arn is needed for creating the Lambda event. + var queueArn = await GetQueueArn(stoppingToken); + _logger.LogInformation("Starting polling for messages on SQS queue: {queueArn}", queueArn); + while (!stoppingToken.IsCancellationRequested) + { + try + { + _logger.LogDebug("Polling {queueUrl} for messages", _config.QueueUrl); + // Read a message from the queue using the ExternalCommands console application. + var response = await _sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest + { + QueueUrl = _config.QueueUrl, + WaitTimeSeconds = 20, + MessageAttributeNames = DefaultAttributesToReceive, + MessageSystemAttributeNames = DefaultAttributesToReceive, + MaxNumberOfMessages = _config.BatchSize, + VisibilityTimeout = _config.VisibilityTimeout, + }, stoppingToken); + + if (stoppingToken.IsCancellationRequested) + { + return; + } + if (response.Messages == null || response.Messages.Count == 0) + { + _logger.LogDebug("No messages received from while polling SQS"); + // Since there are no messages, sleep a bit to wait for messages to come. + await Task.Delay(200); + continue; + } + + var lambdaPayload = new + { + Records = ConvertToLambdaMessages(response.Messages, queueArn) + }; + + var invokeRequest = new InvokeRequest + { + InvocationType = InvocationType.RequestResponse, + FunctionName = _config.FunctionName, + Payload = JsonSerializer.Serialize(lambdaPayload, _jsonOptions) + }; + + _logger.LogInformation("Invoking Lambda function {functionName} function with {messageCount} messages", _config.FunctionName, lambdaPayload.Records.Count); + var lambdaResponse = await _lambdaClient.InvokeAsync(invokeRequest, _config.LambdaRuntimeApi); + + if (lambdaResponse.FunctionError != null) + { + _logger.LogError("Invoking Lambda {function} function with {messageCount} failed with error {errorMessage}", _config.FunctionName, response.Messages.Count, lambdaResponse.FunctionError); + continue; + } + + if (!_config.DisableMessageDelete) + { + List messagesToDelete; + if (lambdaResponse.Payload != null && lambdaResponse.Payload.Length > 0) + { + var partialResponse = JsonSerializer.Deserialize(lambdaResponse.Payload); + if (partialResponse == null) + { + lambdaResponse.Payload.Position = 0; + using var reader = new StreamReader(lambdaResponse.Payload); + var payloadString = reader.ReadToEnd(); + _logger.LogError("Failed to deserialize response from Lambda function into SQSBatchResponse. Response payload:\n{payload}", payloadString); + continue; + } + + if (partialResponse.BatchItemFailures == null || partialResponse.BatchItemFailures.Count == 0) + { + _logger.LogDebug("Partial SQS response received with no failures"); + messagesToDelete = response.Messages; + } + else + { + _logger.LogDebug("Partial SQS response received with {count} failures", partialResponse.BatchItemFailures.Count); + messagesToDelete = new List(); + foreach (var message in response.Messages) + { + if (!partialResponse.BatchItemFailures.Any(x => string.Equals(x.ItemIdentifier, message.MessageId))) + { + messagesToDelete.Add(message); + } + } + } + } + else + { + _logger.LogDebug("No partial response received. All messages eligible for deletion"); + messagesToDelete = response.Messages; + } + + if (messagesToDelete.Count > 0) + { + var deleteRequest = new DeleteMessageBatchRequest + { + QueueUrl = _config.QueueUrl, + Entries = messagesToDelete.Select(m => new DeleteMessageBatchRequestEntry { Id = m.MessageId, ReceiptHandle = m.ReceiptHandle }).ToList() + }; + + _logger.LogDebug("Deleting {messageCount} messages from queue", deleteRequest.Entries.Count); + await _sqsClient.DeleteMessageBatchAsync(deleteRequest, stoppingToken); + } + } + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + return; + } + catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested) + { + return; + } + catch (Exception e) + { + _logger.LogWarning(e, "Exception occurred in SQS poller for {queueUrl}: {message}", _config.QueueUrl, e.Message); + + // Add a delay before restarting loop in case the exception was a transient error that needs a little time to reset. + await Task.Delay(3000); + } + } + } + + /// + /// Convert from the SDK's list of messages to the Lambda event's SQS message type. + /// + /// List of messages using the SDK's .NET type + /// The SQS queue arn the messages came from. + /// List of messages using the Lambda event's .NET type. + internal static List ConvertToLambdaMessages(List messages, Arn queueArn) + { + return messages.Select(m => ConvertToLambdaMessage(m, queueArn)).ToList(); + } + + /// + /// Convert from the SDK's SQS message to the Lambda event's SQS message type. + /// + /// Message using the SDK's .NET type + /// The SQS queue arn the message came from. + /// Messages using the Lambda event's .NET type. + internal static SQSEvent.SQSMessage ConvertToLambdaMessage(Message message, Arn queueArn) + { + var lambdaMessage = new SQSEvent.SQSMessage + { + AwsRegion = queueArn.Region, + Body = message.Body, + EventSource = "aws:sqs", + EventSourceArn = queueArn.ToString(), + Md5OfBody = message.MD5OfBody, + Md5OfMessageAttributes = message.MD5OfMessageAttributes, + MessageId = message.MessageId, + ReceiptHandle = message.ReceiptHandle, + }; + + if (message.MessageAttributes != null && message.MessageAttributes.Count > 0) + { + lambdaMessage.MessageAttributes = new Dictionary(); + foreach (var kvp in message.MessageAttributes) + { + var lambdaAttribute = new SQSEvent.MessageAttribute + { + DataType = kvp.Value.DataType, + StringValue = kvp.Value.StringValue, + BinaryValue = kvp.Value.BinaryValue + }; + + lambdaMessage.MessageAttributes.Add(kvp.Key, lambdaAttribute); + } + } + + if (message.Attributes != null && message.Attributes.Count > 0) + { + lambdaMessage.Attributes = message.Attributes; + } + + return lambdaMessage; + } +} From f8e3e4ad6ca39452c9eba613e3a448342fce87d0 Mon Sep 17 00:00:00 2001 From: fennekin23 Date: Thu, 10 Apr 2025 22:37:49 +0300 Subject: [PATCH 3/4] fix: update change type from Minor to Patch --- .autover/changes/d0cfdbc8-e15b-49ae-b0b4-ec882dec15f2.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.autover/changes/d0cfdbc8-e15b-49ae-b0b4-ec882dec15f2.json b/.autover/changes/d0cfdbc8-e15b-49ae-b0b4-ec882dec15f2.json index f181a1185..807ffaf08 100644 --- a/.autover/changes/d0cfdbc8-e15b-49ae-b0b4-ec882dec15f2.json +++ b/.autover/changes/d0cfdbc8-e15b-49ae-b0b4-ec882dec15f2.json @@ -2,7 +2,7 @@ "Projects": [ { "Name": "Amazon.Lambda.TestTool", - "Type": "Minor", + "Type": "Patch", "ChangelogMessages": [ "Support optional SQS client region in event source test tool" ] From bff544468431ce84075bd51577a2f35eba31d48a Mon Sep 17 00:00:00 2001 From: fennekin23 Date: Thu, 10 Apr 2025 23:05:41 +0300 Subject: [PATCH 4/4] fix: update ConvertToLambdaMessage calls to use parsed queue ARN --- .../SQSEventSource/ConvertSDKToLambdaEventTests.cs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/SQSEventSource/ConvertSDKToLambdaEventTests.cs b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/SQSEventSource/ConvertSDKToLambdaEventTests.cs index c8722551d..31a08b5ae 100644 --- a/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/SQSEventSource/ConvertSDKToLambdaEventTests.cs +++ b/Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/SQSEventSource/ConvertSDKToLambdaEventTests.cs @@ -24,9 +24,10 @@ public void ConvertSDKMessageFull() ReceiptHandle = "receiptHandle" }; - var eventMessage = SQSEventSourceBackgroundService.ConvertToLambdaMessage(sdkMessage, "us-west-2", "queueArn"); + var queueArn = Arn.Parse("arn:aws:sqs:us-west-2:123456789012:queueName"); + var eventMessage = SQSEventSourceBackgroundService.ConvertToLambdaMessage(sdkMessage, queueArn); Assert.Equal("us-west-2", eventMessage.AwsRegion); - Assert.Equal("queueArn", eventMessage.EventSourceArn); + Assert.Equal("arn:aws:sqs:us-west-2:123456789012:queueName", eventMessage.EventSourceArn); Assert.Equal("aws:sqs", eventMessage.EventSource); Assert.Equal(sdkMessage.Attributes, eventMessage.Attributes); @@ -61,9 +62,10 @@ public void ConvertSDKMessageWithNullCollections() ReceiptHandle = "receiptHandle" }; - var eventMessage = SQSEventSourceBackgroundService.ConvertToLambdaMessage(sdkMessage, "us-west-2", "queueArn"); + var queueArn = Arn.Parse("arn:aws:sqs:us-west-2:123456789012:queueName"); + var eventMessage = SQSEventSourceBackgroundService.ConvertToLambdaMessage(sdkMessage, queueArn); Assert.Equal("us-west-2", eventMessage.AwsRegion); - Assert.Equal("queueArn", eventMessage.EventSourceArn); + Assert.Equal("arn:aws:sqs:us-west-2:123456789012:queueName", eventMessage.EventSourceArn); Assert.Equal("aws:sqs", eventMessage.EventSource); Assert.Equal("theBody", eventMessage.Body);