diff --git a/.autover/changes/276cbae4-0a14-4efa-9fd2-760cf4529b0a.json b/.autover/changes/276cbae4-0a14-4efa-9fd2-760cf4529b0a.json new file mode 100644 index 00000000..f91cbfd8 --- /dev/null +++ b/.autover/changes/276cbae4-0a14-4efa-9fd2-760cf4529b0a.json @@ -0,0 +1,11 @@ +{ + "Projects": [ + { + "Name": "AWS.Messaging", + "Type": "Minor", + "ChangelogMessages": [ + "Implement a start/stop mechanism for message consumption. (ISSUE 147)" + ] + } + ] +} \ No newline at end of file diff --git a/sampleapps/SubscriberService/Program.cs b/sampleapps/SubscriberService/Program.cs index 25567658..ef7cdf8f 100644 --- a/sampleapps/SubscriberService/Program.cs +++ b/sampleapps/SubscriberService/Program.cs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 using System.Text.Json; +using AWS.Messaging.Configuration; using AWS.Messaging.Telemetry.OpenTelemetry; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -49,6 +50,13 @@ await Host.CreateDefaultBuilder(args) }); }); + // Optional: Configure a PollingControlToken, you can call Start()/Stop() to start and stop message processing, by default it will be started + builder.ConfigurePollingControlToken(new PollingControlToken + { + // Optional: Set how frequently it will check for changes to the state of the PollingControlToken + PollingWaitTime = TimeSpan.FromMilliseconds(200) + }); + // Logging data messages is disabled by default to protect sensitive user data. If you want this enabled, uncomment the line below. // builder.EnableMessageContentLogging(); }) diff --git a/src/AWS.Messaging/Configuration/IMessageBusBuilder.cs b/src/AWS.Messaging/Configuration/IMessageBusBuilder.cs index 06a3815d..b377f874 100644 --- a/src/AWS.Messaging/Configuration/IMessageBusBuilder.cs +++ b/src/AWS.Messaging/Configuration/IMessageBusBuilder.cs @@ -112,4 +112,9 @@ public interface IMessageBusBuilder /// Configures the backoff policy used by and its available options. /// IMessageBusBuilder ConfigureBackoffPolicy(Action configure); + + /// + /// Configures the , which can be used to start and stop the SQS Message Poller. + /// + IMessageBusBuilder ConfigurePollingControlToken(PollingControlToken pollingControlToken); } diff --git a/src/AWS.Messaging/Configuration/IMessageConfiguration.cs b/src/AWS.Messaging/Configuration/IMessageConfiguration.cs index b9416210..3e34389c 100644 --- a/src/AWS.Messaging/Configuration/IMessageConfiguration.cs +++ b/src/AWS.Messaging/Configuration/IMessageConfiguration.cs @@ -3,6 +3,7 @@ using AWS.Messaging.Configuration.Internal; using AWS.Messaging.Serialization; +using AWS.Messaging.Services; using AWS.Messaging.Services.Backoff; using AWS.Messaging.Services.Backoff.Policies; using AWS.Messaging.Services.Backoff.Policies.Options; @@ -93,4 +94,9 @@ public interface IMessageConfiguration /// Holds an instance of to control the behavior of . /// CappedExponentialBackoffOptions CappedExponentialBackoffOptions { get; } + + /// + /// Holds an instance of to control behaviour of + /// + PollingControlToken PollingControlToken { get; } } diff --git a/src/AWS.Messaging/Configuration/MessageBusBuilder.cs b/src/AWS.Messaging/Configuration/MessageBusBuilder.cs index 869419ed..98b72f05 100644 --- a/src/AWS.Messaging/Configuration/MessageBusBuilder.cs +++ b/src/AWS.Messaging/Configuration/MessageBusBuilder.cs @@ -1,6 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +using System.Collections.Concurrent; using AWS.Messaging.Configuration.Internal; using AWS.Messaging.Publishers; using AWS.Messaging.Publishers.EventBridge; @@ -26,7 +27,7 @@ namespace AWS.Messaging.Configuration; /// public class MessageBusBuilder : IMessageBusBuilder { - private static readonly Dictionary _messageConfigurations = new(); + private static readonly ConcurrentDictionary _messageConfigurations = new(); private readonly MessageConfiguration _messageConfiguration; private readonly IList _additionalServices = new List(); private readonly IServiceCollection _serviceCollection; @@ -164,6 +165,13 @@ public IMessageBusBuilder AddMessageSourceSuffix(string suffix) return this; } + /// + public IMessageBusBuilder ConfigurePollingControlToken(PollingControlToken pollingControlToken) + { + _messageConfiguration.PollingControlToken = pollingControlToken; + return this; + } + /// [RequiresDynamicCode("This method requires loading types dynamically as defined in the configuration system.")] [RequiresUnreferencedCode("This method requires loading types dynamically as defined in the configuration system.")] @@ -327,6 +335,7 @@ internal void Build() _serviceCollection.TryAdd(ServiceDescriptor.Singleton()); _serviceCollection.TryAdd(ServiceDescriptor.Singleton(typeof(ILogger<>), typeof(NullLogger<>))); + _serviceCollection.TryAddSingleton(_messageConfiguration.PollingControlToken); _serviceCollection.TryAddSingleton(_messageConfiguration); _serviceCollection.TryAddSingleton(); _serviceCollection.TryAddSingleton(); diff --git a/src/AWS.Messaging/Configuration/MessageConfiguration.cs b/src/AWS.Messaging/Configuration/MessageConfiguration.cs index 1947eddb..db93d17f 100644 --- a/src/AWS.Messaging/Configuration/MessageConfiguration.cs +++ b/src/AWS.Messaging/Configuration/MessageConfiguration.cs @@ -65,4 +65,7 @@ public class MessageConfiguration : IMessageConfiguration /// public CappedExponentialBackoffOptions CappedExponentialBackoffOptions { get; set; } = new(); + + /// + public PollingControlToken PollingControlToken { get; set; } = new(); } diff --git a/src/AWS.Messaging/Configuration/PollingControlToken.cs b/src/AWS.Messaging/Configuration/PollingControlToken.cs new file mode 100644 index 00000000..e9f870ec --- /dev/null +++ b/src/AWS.Messaging/Configuration/PollingControlToken.cs @@ -0,0 +1,31 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +namespace AWS.Messaging.Configuration +{ + /// + /// Control token to start and stop message polling for a service. + /// + public class PollingControlToken + { + /// + /// Indicates if polling is enabled. + /// + internal bool IsPollingEnabled { get; private set; } = true; + + /// + /// Start polling of the SQS Queue. + /// + public void StartPolling() => IsPollingEnabled = true; + + /// + /// Stop polling of the SQS Queue. + /// + public void StopPolling() => IsPollingEnabled = false; + + /// + /// Configurable amount of time to wait between polling for a change in status + /// + public TimeSpan PollingWaitTime { get; init; } = TimeSpan.FromMilliseconds(200); + } +} diff --git a/src/AWS.Messaging/SQS/SQSMessagePoller.cs b/src/AWS.Messaging/SQS/SQSMessagePoller.cs index 11ee4487..5453c744 100644 --- a/src/AWS.Messaging/SQS/SQSMessagePoller.cs +++ b/src/AWS.Messaging/SQS/SQSMessagePoller.cs @@ -22,6 +22,7 @@ internal class SQSMessagePoller : IMessagePoller, ISQSMessageCommunication private readonly SQSMessagePollerConfiguration _configuration; private readonly IEnvelopeSerializer _envelopeSerializer; private readonly IBackoffHandler _backoffHandler; + private readonly PollingControlToken _pollingControlToken; private readonly bool _isFifoEndpoint; /// @@ -49,19 +50,22 @@ internal class SQSMessagePoller : IMessagePoller, ISQSMessageCommunication /// The SQS message poller configuration. /// Serializer used to deserialize the SQS messages /// Backoff handler for performing back-offs if exceptions are thrown when polling SQS. + /// Control token to start and stop the poller. public SQSMessagePoller( ILogger logger, IMessageManagerFactory messageManagerFactory, IAWSClientProvider awsClientProvider, SQSMessagePollerConfiguration configuration, IEnvelopeSerializer envelopeSerializer, - IBackoffHandler backoffHandler) + IBackoffHandler backoffHandler, + PollingControlToken pollingControlToken) { _logger = logger; _sqsClient = awsClientProvider.GetServiceClient(); _configuration = configuration; _envelopeSerializer = envelopeSerializer; _backoffHandler = backoffHandler; + _pollingControlToken = pollingControlToken; _isFifoEndpoint = configuration.SubscriberEndpoint.EndsWith(".fifo"); _messageManager = messageManagerFactory.CreateMessageManager(this, _configuration.ToMessageManagerConfiguration()); @@ -74,13 +78,18 @@ public async Task StartPollingAsync(CancellationToken token = default) } /// - /// Polls SQS indefinitely until cancelled - /// + /// Polls SQS indefinitely until cancelled. Message receipt can be stopped and started using /// Cancellation token to shutdown the poller. private async Task PollQueue(CancellationToken token) { while (!token.IsCancellationRequested) { + if (!_pollingControlToken.IsPollingEnabled) + { + await Task.Delay(_pollingControlToken.PollingWaitTime, token); + continue; + } + var numberOfMessagesToRead = _configuration.MaxNumberOfConcurrentMessages - _messageManager.ActiveMessageCount; // If already processing the maximum number of messages, wait for at least one to complete and then try again diff --git a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs index f62bc670..467d8b8a 100644 --- a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs +++ b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using Amazon.SQS; using Amazon.SQS.Model; +using AWS.Messaging.Configuration; using AWS.Messaging.IntegrationTests.Handlers; using AWS.Messaging.IntegrationTests.Models; using AWS.Messaging.Tests.Common.Services; @@ -146,6 +147,88 @@ await publisher.PublishAsync(new ChatMessage } } + [Fact] + public async Task ReceiveMultipleMessagesOnlyWhenPollingControlTokenStarted() + { + var pollingControlToken = new PollingControlToken(); + _serviceCollection.AddSingleton>(); + _serviceCollection.AddAWSMessageBus(builder => + { + builder.ConfigurePollingControlToken(pollingControlToken); + builder.AddSQSPublisher(_sqsQueueUrl); + builder.AddSQSPoller(_sqsQueueUrl, options => + { + options.VisibilityTimeoutExtensionThreshold = 3; // and a message is eligible for extension after it's been processing at least 3 seconds + options.MaxNumberOfConcurrentMessages = 10; + options.WaitTimeSeconds = 2; + }); + builder.AddMessageHandler(); + builder.AddMessageSource("/aws/messaging"); + builder.ConfigureBackoffPolicy(policyBuilder => + { + policyBuilder.UseNoBackoff(); + }); + }); + var serviceProvider = _serviceCollection.BuildServiceProvider(); + + var publishStartTime = DateTime.UtcNow; + var publisher = serviceProvider.GetRequiredService(); + for (int i = 0; i < 5; i++) + { + await publisher.PublishAsync(new ChatMessage + { + MessageDescription = $"Test{i + 1}" + }); + } + var publishEndTime = DateTime.UtcNow; + + var pump = serviceProvider.GetRequiredService() as MessagePumpService; + Assert.NotNull(pump); + var source = new CancellationTokenSource(); + + await pump.StartAsync(source.Token); + + // Wait for the pump to shut down after processing the expected number of messages, + // with some padding to ensure messages aren't being processed more than once + source.CancelAfter(30_000); + + var tempStorage = serviceProvider.GetRequiredService>(); + while (tempStorage.Messages.Count < 5 && !source.IsCancellationRequested) + { + await Task.Delay(200, source.Token); + } + + // Stop polling and wait for the polling cycle to complete with a buffer + pollingControlToken.StopPolling(); + + await Task.Delay(5_000); + + // Publish the next 5 messages that should not be received due to stopping polling + for (int i = 5; i < 10; i++) + { + await publisher.PublishAsync(new ChatMessage + { + MessageDescription = $"Test{i + 1}" + }); + } + + SpinWait.SpinUntil(() => source.IsCancellationRequested); + + var inMemoryLogger = serviceProvider.GetRequiredService(); + + Assert.Empty(inMemoryLogger.Logs.Where(x => x.Exception is AmazonSQSException ex && ex.ErrorCode.Equals("AWS.SimpleQueueService.TooManyEntriesInBatchRequest"))); + Assert.Equal(5, tempStorage.Messages.Count); + for (int i = 0; i < 5; i++) + { + var message = tempStorage.Messages.FirstOrDefault(x => x.Message.MessageDescription.Equals($"Test{i + 1}")); + Assert.NotNull(message); + Assert.False(string.IsNullOrEmpty(message.Id)); + Assert.Equal("/aws/messaging", message.Source.ToString()); + Assert.True(message.TimeStamp > publishStartTime); + Assert.True(message.TimeStamp < publishEndTime); + } + } + [Theory] [InlineData(20)] public async Task SendMixOfMessageTypesToSameQueue(int numberOfMessages) diff --git a/test/AWS.Messaging.UnitTests/MessagePublisherTests.cs b/test/AWS.Messaging.UnitTests/MessagePublisherTests.cs index 27bff876..6130fcd2 100644 --- a/test/AWS.Messaging.UnitTests/MessagePublisherTests.cs +++ b/test/AWS.Messaging.UnitTests/MessagePublisherTests.cs @@ -764,7 +764,7 @@ public async Task EventBridgePublisher_UnhappyPath() new DefaultTelemetryFactory(serviceProvider) ); - var publishResponse = Assert.ThrowsAsync(async () => await messagePublisher.PublishAsync(_chatMessage)); + var publishResponse = await Assert.ThrowsAsync(async () => await messagePublisher.PublishAsync(_chatMessage)); _eventBridgeClient.Verify(x => x.PutEventsAsync( @@ -774,10 +774,9 @@ public async Task EventBridgePublisher_UnhappyPath() It.IsAny()), Times.Exactly(1)); - var publishResponseResult = await publishResponse; - Assert.Equal("Message failed to publish.", publishResponseResult.Message); - Assert.Equal("ErrorMessage", publishResponseResult.InnerException!.Message); - Assert.Equal("ErrorCode", ((EventBridgePutEventsException)publishResponseResult.InnerException).ErrorCode); + Assert.Equal("Message failed to publish.", publishResponse.Message); + Assert.Equal("ErrorMessage", publishResponse.InnerException!.Message); + Assert.Equal("ErrorCode", ((EventBridgePutEventsException)publishResponse.InnerException).ErrorCode); } [Fact] diff --git a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs index d5e9a5ad..561df41e 100644 --- a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs +++ b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs @@ -26,6 +26,12 @@ public class SQSMessagePollerTests { private const string TEST_QUEUE_URL = "queueUrl"; private InMemoryLogger? _inMemoryLogger; + private readonly ServiceCollection _serviceCollection; + + public SQSMessagePollerTests() + { + _serviceCollection = new ServiceCollection(); + } /// /// Tests that starting an SQS poller with default settings begins polling SQS @@ -42,6 +48,60 @@ public async Task SQSMessagePoller_Defaults_PollsSQS() client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.AtLeastOnce()); } + /// + /// Tests that starting an SQS poller with + /// set to false, will not poll any messages. + /// + [Fact] + public async Task SQSMessagePoller_PollingControlStopped_DoesNotPollSQS() + { + var client = new Mock(); + client.Setup(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(new ReceiveMessageResponse(), TimeSpan.FromMilliseconds(50)); + var pollingControlToken = new PollingControlToken + { + PollingWaitTime = TimeSpan.FromMilliseconds(25) + }; + pollingControlToken.StopPolling(); + + await RunSQSMessagePollerTest(client, pollingControlToken: pollingControlToken); + + client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.Never); + } + + /// + /// Tests that starting an SQS poller with + /// set to false, will not poll any messages first. Then when changing the value to true + /// polling resumes and messages are received. + /// + [Fact] + public async Task SQSMessagePoller_PollingControlRestarted_PollsSQS() + { + var client = new Mock(); + client.Setup(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(new ReceiveMessageResponse(), TimeSpan.FromMilliseconds(50)); + var pollingControlToken = new PollingControlToken + { + PollingWaitTime = TimeSpan.FromMilliseconds(25) + }; + pollingControlToken.StopPolling(); + + var source = new CancellationTokenSource(); + var pump = BuildMessagePumpService(client, options => { options.WaitTimeSeconds = 1; }, pollingControlToken: pollingControlToken); + var task = pump.StartAsync(source.Token); + + client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.Never); + + pollingControlToken.StartPolling(); + + SpinWait.SpinUntil(() => false, pollingControlToken.PollingWaitTime * 5); + + client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.AtLeastOnce()); + + source.Cancel(); + await task; + } + /// /// Tests that configuring a poller with /// set to a value greater than SQS's current limit of 10 will only receive 10 messages at a time. @@ -251,20 +311,38 @@ public async Task SQSMessagePoller_ContinuesForNonFatalException() /// /// Mocked SQS client /// SQS MessagePoller options - private async Task RunSQSMessagePollerTest(Mock mockSqsClient, Action? options = null) + /// Polling control token to start or stop message receipt + private async Task RunSQSMessagePollerTest(Mock mockSqsClient, Action? options = null, PollingControlToken? pollingControlToken = null) { - var serviceCollection = new ServiceCollection(); - serviceCollection.AddLogging(); + var pump = BuildMessagePumpService(mockSqsClient, options, pollingControlToken); - serviceCollection.AddAWSMessageBus(builder => + var source = new CancellationTokenSource(); + source.CancelAfter(500); + + await pump.StartAsync(source.Token); + } + + /// + /// Helper function that initializes but does not start a with + /// a mocked SQS client + /// + /// Mocked SQS client + /// SQS MessagePoller options + /// Polling control token to start or stop message receipt + private MessagePumpService BuildMessagePumpService(Mock mockSqsClient, Action? options = null, PollingControlToken? pollingControlToken = null) + { + _serviceCollection.AddLogging(); + + _serviceCollection.AddAWSMessageBus(builder => { + if (pollingControlToken is not null) builder.ConfigurePollingControlToken(pollingControlToken); builder.AddSQSPoller(TEST_QUEUE_URL, options); builder.AddMessageHandler(); }); - serviceCollection.AddSingleton(mockSqsClient.Object); + _serviceCollection.AddSingleton(mockSqsClient.Object); - var serviceProvider = serviceCollection.BuildServiceProvider(); + var serviceProvider = _serviceCollection.BuildServiceProvider(); var pump = serviceProvider.GetService() as MessagePumpService; @@ -273,10 +351,7 @@ private async Task RunSQSMessagePollerTest(Mock mockSqsClient, Actio Assert.Fail($"Unable to get the {nameof(MessagePumpService)} from the service provider."); } - var source = new CancellationTokenSource(); - source.CancelAfter(500); - - await pump.StartAsync(source.Token); + return pump; } ///