From 63d5f868e6bffd42add24165273cf084e3a0c6e3 Mon Sep 17 00:00:00 2001 From: jkwpappin <96733604+jkwpappin@users.noreply.github.com> Date: Wed, 26 Feb 2025 15:59:33 +0000 Subject: [PATCH 1/8] Allow pausing and resumption of message consumption (cherry picked from commit 4a442fbf1c6cd8b0fba3d4a0a20b183b0381eea6) --- sampleapps/SubscriberService/Program.cs | 8 ++ .../Configuration/IMessageConfiguration.cs | 6 ++ .../Configuration/MessageBusBuilder.cs | 8 ++ .../Configuration/MessageConfiguration.cs | 3 + .../Configuration/PollingControlToken.cs | 33 +++++++ src/AWS.Messaging/SQS/SQSMessagePoller.cs | 15 ++- .../SubscriberTests.cs | 72 ++++++++++++++ .../SQSMessagePollerTests.cs | 94 +++++++++++++++++-- 8 files changed, 226 insertions(+), 13 deletions(-) create mode 100644 src/AWS.Messaging/Configuration/PollingControlToken.cs diff --git a/sampleapps/SubscriberService/Program.cs b/sampleapps/SubscriberService/Program.cs index 25567658..0523e62e 100644 --- a/sampleapps/SubscriberService/Program.cs +++ b/sampleapps/SubscriberService/Program.cs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 using System.Text.Json; +using AWS.Messaging.Configuration; using AWS.Messaging.Telemetry.OpenTelemetry; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -49,6 +50,13 @@ await Host.CreateDefaultBuilder(args) }); }); + // Optional: Configure a PollingControlToken, you can call Start()/Stop() to start and stop message processing, by default it will be started + builder.ConfigurePollingControlToken(new PollingControlToken() + { + // Optional: Set how frequently it will check for changes to the state of the PollingControlToken + PollingWaitTime = TimeSpan.FromMilliseconds(200) + }); + // Logging data messages is disabled by default to protect sensitive user data. If you want this enabled, uncomment the line below. // builder.EnableMessageContentLogging(); }) diff --git a/src/AWS.Messaging/Configuration/IMessageConfiguration.cs b/src/AWS.Messaging/Configuration/IMessageConfiguration.cs index b9416210..3e34389c 100644 --- a/src/AWS.Messaging/Configuration/IMessageConfiguration.cs +++ b/src/AWS.Messaging/Configuration/IMessageConfiguration.cs @@ -3,6 +3,7 @@ using AWS.Messaging.Configuration.Internal; using AWS.Messaging.Serialization; +using AWS.Messaging.Services; using AWS.Messaging.Services.Backoff; using AWS.Messaging.Services.Backoff.Policies; using AWS.Messaging.Services.Backoff.Policies.Options; @@ -93,4 +94,9 @@ public interface IMessageConfiguration /// Holds an instance of to control the behavior of . /// CappedExponentialBackoffOptions CappedExponentialBackoffOptions { get; } + + /// + /// Holds an instance of to control behaviour of + /// + PollingControlToken PollingControlToken { get; } } diff --git a/src/AWS.Messaging/Configuration/MessageBusBuilder.cs b/src/AWS.Messaging/Configuration/MessageBusBuilder.cs index 869419ed..b1680880 100644 --- a/src/AWS.Messaging/Configuration/MessageBusBuilder.cs +++ b/src/AWS.Messaging/Configuration/MessageBusBuilder.cs @@ -164,6 +164,13 @@ public IMessageBusBuilder AddMessageSourceSuffix(string suffix) return this; } + /// + public IMessageBusBuilder ConfigurePollingControlToken(PollingControlToken pollingControlToken) + { + _messageConfiguration.PollingControlToken = pollingControlToken; + return this; + } + /// [RequiresDynamicCode("This method requires loading types dynamically as defined in the configuration system.")] [RequiresUnreferencedCode("This method requires loading types dynamically as defined in the configuration system.")] @@ -327,6 +334,7 @@ internal void Build() _serviceCollection.TryAdd(ServiceDescriptor.Singleton()); _serviceCollection.TryAdd(ServiceDescriptor.Singleton(typeof(ILogger<>), typeof(NullLogger<>))); + _serviceCollection.TryAddSingleton(_messageConfiguration.PollingControlToken); _serviceCollection.TryAddSingleton(_messageConfiguration); _serviceCollection.TryAddSingleton(); _serviceCollection.TryAddSingleton(); diff --git a/src/AWS.Messaging/Configuration/MessageConfiguration.cs b/src/AWS.Messaging/Configuration/MessageConfiguration.cs index 1947eddb..db93d17f 100644 --- a/src/AWS.Messaging/Configuration/MessageConfiguration.cs +++ b/src/AWS.Messaging/Configuration/MessageConfiguration.cs @@ -65,4 +65,7 @@ public class MessageConfiguration : IMessageConfiguration /// public CappedExponentialBackoffOptions CappedExponentialBackoffOptions { get; set; } = new(); + + /// + public PollingControlToken PollingControlToken { get; set; } = new(); } diff --git a/src/AWS.Messaging/Configuration/PollingControlToken.cs b/src/AWS.Messaging/Configuration/PollingControlToken.cs new file mode 100644 index 00000000..757ff9c8 --- /dev/null +++ b/src/AWS.Messaging/Configuration/PollingControlToken.cs @@ -0,0 +1,33 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +namespace AWS.Messaging.Configuration +{ + /// + /// Control token to start and stop message polling for a service. + /// + public record PollingControlToken + { + /// + /// Indicates if polling is enabled. + /// + internal bool IsPollingEnabled { get; private set; } = true; + + /// + /// Start polling of the SQS Queue. + /// + public void StartPolling() => IsPollingEnabled = true; + + /// + /// Stop polling of the SQS Queue. + /// + public void StopPolling() => IsPollingEnabled = false; + + /// + /// Configurable amount of time to wait between polling for a change in status + /// + public TimeSpan PollingWaitTime { get; set; } = TimeSpan.FromMilliseconds(200); + } + + +} diff --git a/src/AWS.Messaging/SQS/SQSMessagePoller.cs b/src/AWS.Messaging/SQS/SQSMessagePoller.cs index 11ee4487..5453c744 100644 --- a/src/AWS.Messaging/SQS/SQSMessagePoller.cs +++ b/src/AWS.Messaging/SQS/SQSMessagePoller.cs @@ -22,6 +22,7 @@ internal class SQSMessagePoller : IMessagePoller, ISQSMessageCommunication private readonly SQSMessagePollerConfiguration _configuration; private readonly IEnvelopeSerializer _envelopeSerializer; private readonly IBackoffHandler _backoffHandler; + private readonly PollingControlToken _pollingControlToken; private readonly bool _isFifoEndpoint; /// @@ -49,19 +50,22 @@ internal class SQSMessagePoller : IMessagePoller, ISQSMessageCommunication /// The SQS message poller configuration. /// Serializer used to deserialize the SQS messages /// Backoff handler for performing back-offs if exceptions are thrown when polling SQS. + /// Control token to start and stop the poller. public SQSMessagePoller( ILogger logger, IMessageManagerFactory messageManagerFactory, IAWSClientProvider awsClientProvider, SQSMessagePollerConfiguration configuration, IEnvelopeSerializer envelopeSerializer, - IBackoffHandler backoffHandler) + IBackoffHandler backoffHandler, + PollingControlToken pollingControlToken) { _logger = logger; _sqsClient = awsClientProvider.GetServiceClient(); _configuration = configuration; _envelopeSerializer = envelopeSerializer; _backoffHandler = backoffHandler; + _pollingControlToken = pollingControlToken; _isFifoEndpoint = configuration.SubscriberEndpoint.EndsWith(".fifo"); _messageManager = messageManagerFactory.CreateMessageManager(this, _configuration.ToMessageManagerConfiguration()); @@ -74,13 +78,18 @@ public async Task StartPollingAsync(CancellationToken token = default) } /// - /// Polls SQS indefinitely until cancelled - /// + /// Polls SQS indefinitely until cancelled. Message receipt can be stopped and started using /// Cancellation token to shutdown the poller. private async Task PollQueue(CancellationToken token) { while (!token.IsCancellationRequested) { + if (!_pollingControlToken.IsPollingEnabled) + { + await Task.Delay(_pollingControlToken.PollingWaitTime, token); + continue; + } + var numberOfMessagesToRead = _configuration.MaxNumberOfConcurrentMessages - _messageManager.ActiveMessageCount; // If already processing the maximum number of messages, wait for at least one to complete and then try again diff --git a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs index f62bc670..05b8a9dc 100644 --- a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs +++ b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using Amazon.SQS; using Amazon.SQS.Model; +using AWS.Messaging.Configuration; using AWS.Messaging.IntegrationTests.Handlers; using AWS.Messaging.IntegrationTests.Models; using AWS.Messaging.Tests.Common.Services; @@ -146,6 +147,77 @@ await publisher.PublishAsync(new ChatMessage } } + [Fact] + public async Task ReceiveMultipleMessagesOnlyWhenPollingControlTokenStarted() + { + var pollingControlToken = new PollingControlToken(); + _serviceCollection.AddSingleton>(); + _serviceCollection.AddAWSMessageBus(builder => + { + builder.ConfigurePollingControlToken(pollingControlToken); + builder.AddSQSPublisher(_sqsQueueUrl); + builder.AddSQSPoller(_sqsQueueUrl, options => + { + options.VisibilityTimeoutExtensionThreshold = 3; // and a message is eligible for extension after it's been processing at least 3 seconds + options.MaxNumberOfConcurrentMessages = 10; + }); + builder.AddMessageHandler(); + builder.AddMessageSource("/aws/messaging"); + }); + var serviceProvider = _serviceCollection.BuildServiceProvider(); + + var publishStartTime = DateTime.UtcNow; + var publisher = serviceProvider.GetRequiredService(); + for (int i = 0; i < 5; i++) + { + await publisher.PublishAsync(new ChatMessage + { + MessageDescription = $"Test{i + 1}" + }); + } + var publishEndTime = DateTime.UtcNow; + + var pump = serviceProvider.GetRequiredService() as MessagePumpService; + Assert.NotNull(pump); + var source = new CancellationTokenSource(); + + await pump.StartAsync(source.Token); + + // Wait for the pump to shut down after processing the expected number of messages, + // with some padding to ensure messages aren't being processed more than once + source.CancelAfter(3000); + + // Stop polling and wait for the polling cycle to complete with a buffer + pollingControlToken.StopPolling(); + await Task.Delay(pollingControlToken.PollingWaitTime * 2); + + // Publish the next 5 messages that should not be received due to stopping polling + for (int i = 5; i < 10; i++) + { + await publisher.PublishAsync(new ChatMessage + { + MessageDescription = $"Test{i + 1}" + }); + } + + while (!source.IsCancellationRequested) { } + + var inMemoryLogger = serviceProvider.GetRequiredService(); + var tempStorage = serviceProvider.GetRequiredService>(); + + Assert.Empty(inMemoryLogger.Logs.Where(x => x.Exception is AmazonSQSException ex && ex.ErrorCode.Equals("AWS.SimpleQueueService.TooManyEntriesInBatchRequest"))); + Assert.Equal(5, tempStorage.Messages.Count); + for (int i = 0; i < 5; i++) + { + var message = tempStorage.Messages.FirstOrDefault(x => x.Message.MessageDescription.Equals($"Test{i + 1}")); + Assert.NotNull(message); + Assert.False(string.IsNullOrEmpty(message.Id)); + Assert.Equal("/aws/messaging", message.Source.ToString()); + Assert.True(message.TimeStamp > publishStartTime); + Assert.True(message.TimeStamp < publishEndTime); + } + } + [Theory] [InlineData(20)] public async Task SendMixOfMessageTypesToSameQueue(int numberOfMessages) diff --git a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs index d5e9a5ad..e1c2c7c3 100644 --- a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs +++ b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs @@ -26,6 +26,12 @@ public class SQSMessagePollerTests { private const string TEST_QUEUE_URL = "queueUrl"; private InMemoryLogger? _inMemoryLogger; + private readonly ServiceCollection _serviceCollection; + + public SQSMessagePollerTests() + { + _serviceCollection = new ServiceCollection(); + } /// /// Tests that starting an SQS poller with default settings begins polling SQS @@ -42,6 +48,59 @@ public async Task SQSMessagePoller_Defaults_PollsSQS() client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.AtLeastOnce()); } + /// + /// Tests that starting an SQS poller with + /// set to false, will not poll any messages. + /// + [Fact] + public async Task SQSMessagePoller_PollingControlStopped_DoesNotPollSQS() + { + var client = new Mock(); + client.Setup(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(new ReceiveMessageResponse(), TimeSpan.FromMilliseconds(50)); + var pollingControlToken = new PollingControlToken + { + PollingWaitTime = TimeSpan.FromMilliseconds(25) + }; + pollingControlToken.StopPolling(); + + await RunSQSMessagePollerTest(client, pollingControlToken: pollingControlToken); + + client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.Never); + } + + /// + /// Tests that starting an SQS poller with + /// set to false, will not poll any messages first. Then when changing the value to true + /// polling resumes and messages are received. + /// + [Fact] + public async Task SQSMessagePoller_PollingControlRestarted_PollsSQS() + { + var client = new Mock(); + client.Setup(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(new ReceiveMessageResponse(), TimeSpan.FromMilliseconds(50)); + var pollingControlToken = new PollingControlToken + { + PollingWaitTime = TimeSpan.FromMilliseconds(25) + }; + pollingControlToken.StopPolling(); + + var source = new CancellationTokenSource(); + var pump = BuildMessagePumpService(client, pollingControlToken: pollingControlToken); + var task = pump.StartAsync(source.Token); + + client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.Never); + + pollingControlToken.StartPolling(); + await Task.Delay(pollingControlToken.PollingWaitTime * 2); + + client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.AtLeastOnce()); + + source.Cancel(); + await task; + } + /// /// Tests that configuring a poller with /// set to a value greater than SQS's current limit of 10 will only receive 10 messages at a time. @@ -251,20 +310,38 @@ public async Task SQSMessagePoller_ContinuesForNonFatalException() /// /// Mocked SQS client /// SQS MessagePoller options - private async Task RunSQSMessagePollerTest(Mock mockSqsClient, Action? options = null) + /// Polling control token to start or stop message receipt + private async Task RunSQSMessagePollerTest(Mock mockSqsClient, Action? options = null, PollingControlToken? pollingControlToken = null) { - var serviceCollection = new ServiceCollection(); - serviceCollection.AddLogging(); + var pump = BuildMessagePumpService(mockSqsClient, options, pollingControlToken); - serviceCollection.AddAWSMessageBus(builder => + var source = new CancellationTokenSource(); + source.CancelAfter(500); + + await pump.StartAsync(source.Token); + } + + /// + /// Helper function that initializes but does not start a with + /// a mocked SQS client + /// + /// Mocked SQS client + /// SQS MessagePoller options + /// Polling control token to start or stop message receipt + private MessagePumpService BuildMessagePumpService(Mock mockSqsClient, Action? options = null, PollingControlToken? pollingControlToken = null) + { + _serviceCollection.AddLogging(); + + _serviceCollection.AddAWSMessageBus(builder => { + if (pollingControlToken is not null) builder.ConfigurePollingControlToken(pollingControlToken); builder.AddSQSPoller(TEST_QUEUE_URL, options); builder.AddMessageHandler(); }); - serviceCollection.AddSingleton(mockSqsClient.Object); + _serviceCollection.AddSingleton(mockSqsClient.Object); - var serviceProvider = serviceCollection.BuildServiceProvider(); + var serviceProvider = _serviceCollection.BuildServiceProvider(); var pump = serviceProvider.GetService() as MessagePumpService; @@ -273,10 +350,7 @@ private async Task RunSQSMessagePollerTest(Mock mockSqsClient, Actio Assert.Fail($"Unable to get the {nameof(MessagePumpService)} from the service provider."); } - var source = new CancellationTokenSource(); - source.CancelAfter(500); - - await pump.StartAsync(source.Token); + return pump; } /// From d8e03c16956a54b205fca3846f4e6103ee2dce59 Mon Sep 17 00:00:00 2001 From: jkwpappin <96733604+jkwpappin@users.noreply.github.com> Date: Wed, 26 Feb 2025 16:06:26 +0000 Subject: [PATCH 2/8] Add changelog (cherry picked from commit fba5d5b89e781da3f6a02fd5a095538515661c2b) --- .../changes/276cbae4-0a14-4efa-9fd2-760cf4529b0a.json | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 .autover/changes/276cbae4-0a14-4efa-9fd2-760cf4529b0a.json diff --git a/.autover/changes/276cbae4-0a14-4efa-9fd2-760cf4529b0a.json b/.autover/changes/276cbae4-0a14-4efa-9fd2-760cf4529b0a.json new file mode 100644 index 00000000..f91cbfd8 --- /dev/null +++ b/.autover/changes/276cbae4-0a14-4efa-9fd2-760cf4529b0a.json @@ -0,0 +1,11 @@ +{ + "Projects": [ + { + "Name": "AWS.Messaging", + "Type": "Minor", + "ChangelogMessages": [ + "Implement a start/stop mechanism for message consumption. (ISSUE 147)" + ] + } + ] +} \ No newline at end of file From 03ba1db158e84da3e18f0f9ddeda7f98f34f39dd Mon Sep 17 00:00:00 2001 From: jkwpappin <96733604+jkwpappin@users.noreply.github.com> Date: Wed, 26 Feb 2025 16:37:45 +0000 Subject: [PATCH 3/8] Remove redundant parenthesis (cherry picked from commit 1946aecd804b93a76387d3d33d6a342f82bbdd73) --- sampleapps/SubscriberService/Program.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sampleapps/SubscriberService/Program.cs b/sampleapps/SubscriberService/Program.cs index 0523e62e..ef7cdf8f 100644 --- a/sampleapps/SubscriberService/Program.cs +++ b/sampleapps/SubscriberService/Program.cs @@ -51,7 +51,7 @@ await Host.CreateDefaultBuilder(args) }); // Optional: Configure a PollingControlToken, you can call Start()/Stop() to start and stop message processing, by default it will be started - builder.ConfigurePollingControlToken(new PollingControlToken() + builder.ConfigurePollingControlToken(new PollingControlToken { // Optional: Set how frequently it will check for changes to the state of the PollingControlToken PollingWaitTime = TimeSpan.FromMilliseconds(200) From 21eea27d9ef5704d2bdcf3dbfc4ec89c63a8307d Mon Sep 17 00:00:00 2001 From: jkwpappin <96733604+jkwpappin@users.noreply.github.com> Date: Thu, 13 Mar 2025 15:54:26 +0000 Subject: [PATCH 4/8] Fix whitespace (cherry picked from commit b86353c64ce94fc47f9745724029720a5ee6cdef) --- src/AWS.Messaging/Configuration/MessageBusBuilder.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/AWS.Messaging/Configuration/MessageBusBuilder.cs b/src/AWS.Messaging/Configuration/MessageBusBuilder.cs index b1680880..bac8126b 100644 --- a/src/AWS.Messaging/Configuration/MessageBusBuilder.cs +++ b/src/AWS.Messaging/Configuration/MessageBusBuilder.cs @@ -334,7 +334,7 @@ internal void Build() _serviceCollection.TryAdd(ServiceDescriptor.Singleton()); _serviceCollection.TryAdd(ServiceDescriptor.Singleton(typeof(ILogger<>), typeof(NullLogger<>))); - _serviceCollection.TryAddSingleton(_messageConfiguration.PollingControlToken); + _serviceCollection.TryAddSingleton(_messageConfiguration.PollingControlToken); _serviceCollection.TryAddSingleton(_messageConfiguration); _serviceCollection.TryAddSingleton(); _serviceCollection.TryAddSingleton(); From 647e8497c4e21d2d80be2a77bdd6dd14c9b726b8 Mon Sep 17 00:00:00 2001 From: jkwpappin <96733604+jkwpappin@users.noreply.github.com> Date: Thu, 13 Mar 2025 16:55:58 +0000 Subject: [PATCH 5/8] Add a ConfigureAwait as we're trying to do something and verify a Moq during the async execution (cherry picked from commit 6c3f0ccdea56cc12704d34787dc1e72797472213) --- test/AWS.Messaging.IntegrationTests/SubscriberTests.cs | 4 ++-- test/AWS.Messaging.UnitTests/MessagePublisherTests.cs | 9 ++++----- test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs | 3 ++- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs index 05b8a9dc..ab5944d0 100644 --- a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs +++ b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs @@ -181,7 +181,7 @@ await publisher.PublishAsync(new ChatMessage Assert.NotNull(pump); var source = new CancellationTokenSource(); - await pump.StartAsync(source.Token); + await pump.StartAsync(source.Token).ConfigureAwait(true); // Wait for the pump to shut down after processing the expected number of messages, // with some padding to ensure messages aren't being processed more than once @@ -189,7 +189,7 @@ await publisher.PublishAsync(new ChatMessage // Stop polling and wait for the polling cycle to complete with a buffer pollingControlToken.StopPolling(); - await Task.Delay(pollingControlToken.PollingWaitTime * 2); + await Task.Delay(pollingControlToken.PollingWaitTime * 2, source.Token); // Publish the next 5 messages that should not be received due to stopping polling for (int i = 5; i < 10; i++) diff --git a/test/AWS.Messaging.UnitTests/MessagePublisherTests.cs b/test/AWS.Messaging.UnitTests/MessagePublisherTests.cs index 27bff876..6130fcd2 100644 --- a/test/AWS.Messaging.UnitTests/MessagePublisherTests.cs +++ b/test/AWS.Messaging.UnitTests/MessagePublisherTests.cs @@ -764,7 +764,7 @@ public async Task EventBridgePublisher_UnhappyPath() new DefaultTelemetryFactory(serviceProvider) ); - var publishResponse = Assert.ThrowsAsync(async () => await messagePublisher.PublishAsync(_chatMessage)); + var publishResponse = await Assert.ThrowsAsync(async () => await messagePublisher.PublishAsync(_chatMessage)); _eventBridgeClient.Verify(x => x.PutEventsAsync( @@ -774,10 +774,9 @@ public async Task EventBridgePublisher_UnhappyPath() It.IsAny()), Times.Exactly(1)); - var publishResponseResult = await publishResponse; - Assert.Equal("Message failed to publish.", publishResponseResult.Message); - Assert.Equal("ErrorMessage", publishResponseResult.InnerException!.Message); - Assert.Equal("ErrorCode", ((EventBridgePutEventsException)publishResponseResult.InnerException).ErrorCode); + Assert.Equal("Message failed to publish.", publishResponse.Message); + Assert.Equal("ErrorMessage", publishResponse.InnerException!.Message); + Assert.Equal("ErrorCode", ((EventBridgePutEventsException)publishResponse.InnerException).ErrorCode); } [Fact] diff --git a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs index e1c2c7c3..80313630 100644 --- a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs +++ b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs @@ -89,11 +89,12 @@ public async Task SQSMessagePoller_PollingControlRestarted_PollsSQS() var source = new CancellationTokenSource(); var pump = BuildMessagePumpService(client, pollingControlToken: pollingControlToken); var task = pump.StartAsync(source.Token); + await task.ConfigureAwait(true); client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.Never); pollingControlToken.StartPolling(); - await Task.Delay(pollingControlToken.PollingWaitTime * 2); + await Task.Delay(pollingControlToken.PollingWaitTime * 2, source.Token); client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.AtLeastOnce()); From c9fa85d12aade52d0d4deb68e3a69a39cf7e91c5 Mon Sep 17 00:00:00 2001 From: jkwpappin <96733604+jkwpappin@users.noreply.github.com> Date: Fri, 14 Mar 2025 10:31:24 +0000 Subject: [PATCH 6/8] Use SpinWait to hold the thread (cherry picked from commit 1b4ac7ac1bcc552c00f84a73e33689a63dcd704d) --- test/AWS.Messaging.IntegrationTests/SubscriberTests.cs | 5 +++-- test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs index ab5944d0..f782cbac 100644 --- a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs +++ b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs @@ -189,7 +189,8 @@ await publisher.PublishAsync(new ChatMessage // Stop polling and wait for the polling cycle to complete with a buffer pollingControlToken.StopPolling(); - await Task.Delay(pollingControlToken.PollingWaitTime * 2, source.Token); + + SpinWait.SpinUntil(() => false, pollingControlToken.PollingWaitTime * 3); // Publish the next 5 messages that should not be received due to stopping polling for (int i = 5; i < 10; i++) @@ -200,7 +201,7 @@ await publisher.PublishAsync(new ChatMessage }); } - while (!source.IsCancellationRequested) { } + SpinWait.SpinUntil(() => source.IsCancellationRequested); var inMemoryLogger = serviceProvider.GetRequiredService(); var tempStorage = serviceProvider.GetRequiredService>(); diff --git a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs index 80313630..ea0ee6c9 100644 --- a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs +++ b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs @@ -89,12 +89,12 @@ public async Task SQSMessagePoller_PollingControlRestarted_PollsSQS() var source = new CancellationTokenSource(); var pump = BuildMessagePumpService(client, pollingControlToken: pollingControlToken); var task = pump.StartAsync(source.Token); - await task.ConfigureAwait(true); client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.Never); pollingControlToken.StartPolling(); - await Task.Delay(pollingControlToken.PollingWaitTime * 2, source.Token); + + SpinWait.SpinUntil(() => false, pollingControlToken.PollingWaitTime * 5); client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.AtLeastOnce()); From 8371856e5309448aa2628629edc013f08095894e Mon Sep 17 00:00:00 2001 From: Phil Asmar Date: Mon, 24 Mar 2025 12:30:12 -0400 Subject: [PATCH 7/8] fix unit and integ tests --- .../Configuration/MessageBusBuilder.cs | 3 ++- .../SubscriberTests.cs | 18 ++++++++++++++---- .../SQSMessagePollerTests.cs | 2 +- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/AWS.Messaging/Configuration/MessageBusBuilder.cs b/src/AWS.Messaging/Configuration/MessageBusBuilder.cs index bac8126b..98b72f05 100644 --- a/src/AWS.Messaging/Configuration/MessageBusBuilder.cs +++ b/src/AWS.Messaging/Configuration/MessageBusBuilder.cs @@ -1,6 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +using System.Collections.Concurrent; using AWS.Messaging.Configuration.Internal; using AWS.Messaging.Publishers; using AWS.Messaging.Publishers.EventBridge; @@ -26,7 +27,7 @@ namespace AWS.Messaging.Configuration; /// public class MessageBusBuilder : IMessageBusBuilder { - private static readonly Dictionary _messageConfigurations = new(); + private static readonly ConcurrentDictionary _messageConfigurations = new(); private readonly MessageConfiguration _messageConfiguration; private readonly IList _additionalServices = new List(); private readonly IServiceCollection _serviceCollection; diff --git a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs index f782cbac..467d8b8a 100644 --- a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs +++ b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs @@ -160,9 +160,14 @@ public async Task ReceiveMultipleMessagesOnlyWhenPollingControlTokenStarted() { options.VisibilityTimeoutExtensionThreshold = 3; // and a message is eligible for extension after it's been processing at least 3 seconds options.MaxNumberOfConcurrentMessages = 10; + options.WaitTimeSeconds = 2; }); builder.AddMessageHandler(); builder.AddMessageSource("/aws/messaging"); + builder.ConfigureBackoffPolicy(policyBuilder => + { + policyBuilder.UseNoBackoff(); + }); }); var serviceProvider = _serviceCollection.BuildServiceProvider(); @@ -181,16 +186,22 @@ await publisher.PublishAsync(new ChatMessage Assert.NotNull(pump); var source = new CancellationTokenSource(); - await pump.StartAsync(source.Token).ConfigureAwait(true); + await pump.StartAsync(source.Token); // Wait for the pump to shut down after processing the expected number of messages, // with some padding to ensure messages aren't being processed more than once - source.CancelAfter(3000); + source.CancelAfter(30_000); + + var tempStorage = serviceProvider.GetRequiredService>(); + while (tempStorage.Messages.Count < 5 && !source.IsCancellationRequested) + { + await Task.Delay(200, source.Token); + } // Stop polling and wait for the polling cycle to complete with a buffer pollingControlToken.StopPolling(); - SpinWait.SpinUntil(() => false, pollingControlToken.PollingWaitTime * 3); + await Task.Delay(5_000); // Publish the next 5 messages that should not be received due to stopping polling for (int i = 5; i < 10; i++) @@ -204,7 +215,6 @@ await publisher.PublishAsync(new ChatMessage SpinWait.SpinUntil(() => source.IsCancellationRequested); var inMemoryLogger = serviceProvider.GetRequiredService(); - var tempStorage = serviceProvider.GetRequiredService>(); Assert.Empty(inMemoryLogger.Logs.Where(x => x.Exception is AmazonSQSException ex && ex.ErrorCode.Equals("AWS.SimpleQueueService.TooManyEntriesInBatchRequest"))); Assert.Equal(5, tempStorage.Messages.Count); diff --git a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs index ea0ee6c9..561df41e 100644 --- a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs +++ b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs @@ -87,7 +87,7 @@ public async Task SQSMessagePoller_PollingControlRestarted_PollsSQS() pollingControlToken.StopPolling(); var source = new CancellationTokenSource(); - var pump = BuildMessagePumpService(client, pollingControlToken: pollingControlToken); + var pump = BuildMessagePumpService(client, options => { options.WaitTimeSeconds = 1; }, pollingControlToken: pollingControlToken); var task = pump.StartAsync(source.Token); client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.Never); From 45687a3be09caf5ea665408760ba663d6b9d7438 Mon Sep 17 00:00:00 2001 From: Phil Asmar Date: Tue, 1 Apr 2025 09:49:08 -0400 Subject: [PATCH 8/8] address norm's comments --- src/AWS.Messaging/Configuration/IMessageBusBuilder.cs | 5 +++++ src/AWS.Messaging/Configuration/PollingControlToken.cs | 6 ++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/AWS.Messaging/Configuration/IMessageBusBuilder.cs b/src/AWS.Messaging/Configuration/IMessageBusBuilder.cs index 06a3815d..b377f874 100644 --- a/src/AWS.Messaging/Configuration/IMessageBusBuilder.cs +++ b/src/AWS.Messaging/Configuration/IMessageBusBuilder.cs @@ -112,4 +112,9 @@ public interface IMessageBusBuilder /// Configures the backoff policy used by and its available options. /// IMessageBusBuilder ConfigureBackoffPolicy(Action configure); + + /// + /// Configures the , which can be used to start and stop the SQS Message Poller. + /// + IMessageBusBuilder ConfigurePollingControlToken(PollingControlToken pollingControlToken); } diff --git a/src/AWS.Messaging/Configuration/PollingControlToken.cs b/src/AWS.Messaging/Configuration/PollingControlToken.cs index 757ff9c8..e9f870ec 100644 --- a/src/AWS.Messaging/Configuration/PollingControlToken.cs +++ b/src/AWS.Messaging/Configuration/PollingControlToken.cs @@ -6,7 +6,7 @@ namespace AWS.Messaging.Configuration /// /// Control token to start and stop message polling for a service. /// - public record PollingControlToken + public class PollingControlToken { /// /// Indicates if polling is enabled. @@ -26,8 +26,6 @@ public record PollingControlToken /// /// Configurable amount of time to wait between polling for a change in status /// - public TimeSpan PollingWaitTime { get; set; } = TimeSpan.FromMilliseconds(200); + public TimeSpan PollingWaitTime { get; init; } = TimeSpan.FromMilliseconds(200); } - - }