From 9e05912ca7c067ccfcd6b3d8492dc14d0a579a10 Mon Sep 17 00:00:00 2001
From: jkwpappin <96733604+jkwpappin@users.noreply.github.com>
Date: Wed, 26 Feb 2025 15:59:33 +0000
Subject: [PATCH 1/6] Allow pausing and resumption of message consumption
(cherry picked from commit 4a442fbf1c6cd8b0fba3d4a0a20b183b0381eea6)
---
sampleapps/SubscriberService/Program.cs | 8 ++
.../Configuration/IMessageConfiguration.cs | 6 ++
.../Configuration/MessageBusBuilder.cs | 8 ++
.../Configuration/MessageConfiguration.cs | 3 +
.../Configuration/PollingControlToken.cs | 33 +++++++
src/AWS.Messaging/SQS/SQSMessagePoller.cs | 15 ++-
.../SubscriberTests.cs | 72 ++++++++++++++
.../SQSMessagePollerTests.cs | 94 +++++++++++++++++--
8 files changed, 226 insertions(+), 13 deletions(-)
create mode 100644 src/AWS.Messaging/Configuration/PollingControlToken.cs
diff --git a/sampleapps/SubscriberService/Program.cs b/sampleapps/SubscriberService/Program.cs
index 25567658..0523e62e 100644
--- a/sampleapps/SubscriberService/Program.cs
+++ b/sampleapps/SubscriberService/Program.cs
@@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
+using AWS.Messaging.Configuration;
using AWS.Messaging.Telemetry.OpenTelemetry;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
@@ -49,6 +50,13 @@ await Host.CreateDefaultBuilder(args)
});
});
+ // Optional: Configure a PollingControlToken, you can call Start()/Stop() to start and stop message processing, by default it will be started
+ builder.ConfigurePollingControlToken(new PollingControlToken()
+ {
+ // Optional: Set how frequently it will check for changes to the state of the PollingControlToken
+ PollingWaitTime = TimeSpan.FromMilliseconds(200)
+ });
+
// Logging data messages is disabled by default to protect sensitive user data. If you want this enabled, uncomment the line below.
// builder.EnableMessageContentLogging();
})
diff --git a/src/AWS.Messaging/Configuration/IMessageConfiguration.cs b/src/AWS.Messaging/Configuration/IMessageConfiguration.cs
index b9416210..3e34389c 100644
--- a/src/AWS.Messaging/Configuration/IMessageConfiguration.cs
+++ b/src/AWS.Messaging/Configuration/IMessageConfiguration.cs
@@ -3,6 +3,7 @@
using AWS.Messaging.Configuration.Internal;
using AWS.Messaging.Serialization;
+using AWS.Messaging.Services;
using AWS.Messaging.Services.Backoff;
using AWS.Messaging.Services.Backoff.Policies;
using AWS.Messaging.Services.Backoff.Policies.Options;
@@ -93,4 +94,9 @@ public interface IMessageConfiguration
/// Holds an instance of to control the behavior of .
///
CappedExponentialBackoffOptions CappedExponentialBackoffOptions { get; }
+
+ ///
+ /// Holds an instance of to control behaviour of
+ ///
+ PollingControlToken PollingControlToken { get; }
}
diff --git a/src/AWS.Messaging/Configuration/MessageBusBuilder.cs b/src/AWS.Messaging/Configuration/MessageBusBuilder.cs
index 22a4dd23..f4d96168 100644
--- a/src/AWS.Messaging/Configuration/MessageBusBuilder.cs
+++ b/src/AWS.Messaging/Configuration/MessageBusBuilder.cs
@@ -172,6 +172,13 @@ public IMessageBusBuilder AddMessageSourceSuffix(string suffix)
return this;
}
+ ///
+ public IMessageBusBuilder ConfigurePollingControlToken(PollingControlToken pollingControlToken)
+ {
+ _messageConfiguration.PollingControlToken = pollingControlToken;
+ return this;
+ }
+
///
public IMessageBusBuilder LoadConfigurationFromSettings(IConfiguration configuration)
{
@@ -316,6 +323,7 @@ internal void Build()
_serviceCollection.TryAdd(ServiceDescriptor.Singleton());
_serviceCollection.TryAdd(ServiceDescriptor.Singleton(typeof(ILogger<>), typeof(NullLogger<>)));
+ _serviceCollection.TryAddSingleton(_messageConfiguration.PollingControlToken);
_serviceCollection.TryAddSingleton(_messageConfiguration);
_serviceCollection.TryAddSingleton();
_serviceCollection.TryAddSingleton();
diff --git a/src/AWS.Messaging/Configuration/MessageConfiguration.cs b/src/AWS.Messaging/Configuration/MessageConfiguration.cs
index 1947eddb..db93d17f 100644
--- a/src/AWS.Messaging/Configuration/MessageConfiguration.cs
+++ b/src/AWS.Messaging/Configuration/MessageConfiguration.cs
@@ -65,4 +65,7 @@ public class MessageConfiguration : IMessageConfiguration
///
public CappedExponentialBackoffOptions CappedExponentialBackoffOptions { get; set; } = new();
+
+ ///
+ public PollingControlToken PollingControlToken { get; set; } = new();
}
diff --git a/src/AWS.Messaging/Configuration/PollingControlToken.cs b/src/AWS.Messaging/Configuration/PollingControlToken.cs
new file mode 100644
index 00000000..757ff9c8
--- /dev/null
+++ b/src/AWS.Messaging/Configuration/PollingControlToken.cs
@@ -0,0 +1,33 @@
+// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+namespace AWS.Messaging.Configuration
+{
+ ///
+ /// Control token to start and stop message polling for a service.
+ ///
+ public record PollingControlToken
+ {
+ ///
+ /// Indicates if polling is enabled.
+ ///
+ internal bool IsPollingEnabled { get; private set; } = true;
+
+ ///
+ /// Start polling of the SQS Queue.
+ ///
+ public void StartPolling() => IsPollingEnabled = true;
+
+ ///
+ /// Stop polling of the SQS Queue.
+ ///
+ public void StopPolling() => IsPollingEnabled = false;
+
+ ///
+ /// Configurable amount of time to wait between polling for a change in status
+ ///
+ public TimeSpan PollingWaitTime { get; set; } = TimeSpan.FromMilliseconds(200);
+ }
+
+
+}
diff --git a/src/AWS.Messaging/SQS/SQSMessagePoller.cs b/src/AWS.Messaging/SQS/SQSMessagePoller.cs
index c242af44..d758933b 100644
--- a/src/AWS.Messaging/SQS/SQSMessagePoller.cs
+++ b/src/AWS.Messaging/SQS/SQSMessagePoller.cs
@@ -22,6 +22,7 @@ internal class SQSMessagePoller : IMessagePoller, ISQSMessageCommunication
private readonly SQSMessagePollerConfiguration _configuration;
private readonly IEnvelopeSerializer _envelopeSerializer;
private readonly IBackoffHandler _backoffHandler;
+ private readonly PollingControlToken _pollingControlToken;
private readonly bool _isFifoEndpoint;
///
@@ -49,19 +50,22 @@ internal class SQSMessagePoller : IMessagePoller, ISQSMessageCommunication
/// The SQS message poller configuration.
/// Serializer used to deserialize the SQS messages
/// Backoff handler for performing back-offs if exceptions are thrown when polling SQS.
+ /// Control token to start and stop the poller.
public SQSMessagePoller(
ILogger logger,
IMessageManagerFactory messageManagerFactory,
IAWSClientProvider awsClientProvider,
SQSMessagePollerConfiguration configuration,
IEnvelopeSerializer envelopeSerializer,
- IBackoffHandler backoffHandler)
+ IBackoffHandler backoffHandler,
+ PollingControlToken pollingControlToken)
{
_logger = logger;
_sqsClient = awsClientProvider.GetServiceClient();
_configuration = configuration;
_envelopeSerializer = envelopeSerializer;
_backoffHandler = backoffHandler;
+ _pollingControlToken = pollingControlToken;
_isFifoEndpoint = configuration.SubscriberEndpoint.EndsWith(".fifo");
_messageManager = messageManagerFactory.CreateMessageManager(this, _configuration.ToMessageManagerConfiguration());
@@ -74,13 +78,18 @@ public async Task StartPollingAsync(CancellationToken token = default)
}
///
- /// Polls SQS indefinitely until cancelled
- ///
+ /// Polls SQS indefinitely until cancelled. Message receipt can be stopped and started using
/// Cancellation token to shutdown the poller.
private async Task PollQueue(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
+ if (!_pollingControlToken.IsPollingEnabled)
+ {
+ await Task.Delay(_pollingControlToken.PollingWaitTime, token);
+ continue;
+ }
+
var numberOfMessagesToRead = _configuration.MaxNumberOfConcurrentMessages - _messageManager.ActiveMessageCount;
// If already processing the maximum number of messages, wait for at least one to complete and then try again
diff --git a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs
index 2982c048..b09b5d8a 100644
--- a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs
+++ b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs
@@ -7,6 +7,7 @@
using System.Threading.Tasks;
using Amazon.SQS;
using Amazon.SQS.Model;
+using AWS.Messaging.Configuration;
using AWS.Messaging.IntegrationTests.Handlers;
using AWS.Messaging.IntegrationTests.Models;
using AWS.Messaging.Tests.Common.Services;
@@ -146,6 +147,77 @@ await publisher.PublishAsync(new ChatMessage
}
}
+ [Fact]
+ public async Task ReceiveMultipleMessagesOnlyWhenPollingControlTokenStarted()
+ {
+ var pollingControlToken = new PollingControlToken();
+ _serviceCollection.AddSingleton>();
+ _serviceCollection.AddAWSMessageBus(builder =>
+ {
+ builder.ConfigurePollingControlToken(pollingControlToken);
+ builder.AddSQSPublisher(_sqsQueueUrl);
+ builder.AddSQSPoller(_sqsQueueUrl, options =>
+ {
+ options.VisibilityTimeoutExtensionThreshold = 3; // and a message is eligible for extension after it's been processing at least 3 seconds
+ options.MaxNumberOfConcurrentMessages = 10;
+ });
+ builder.AddMessageHandler();
+ builder.AddMessageSource("/aws/messaging");
+ });
+ var serviceProvider = _serviceCollection.BuildServiceProvider();
+
+ var publishStartTime = DateTime.UtcNow;
+ var publisher = serviceProvider.GetRequiredService();
+ for (int i = 0; i < 5; i++)
+ {
+ await publisher.PublishAsync(new ChatMessage
+ {
+ MessageDescription = $"Test{i + 1}"
+ });
+ }
+ var publishEndTime = DateTime.UtcNow;
+
+ var pump = serviceProvider.GetRequiredService() as MessagePumpService;
+ Assert.NotNull(pump);
+ var source = new CancellationTokenSource();
+
+ await pump.StartAsync(source.Token);
+
+ // Wait for the pump to shut down after processing the expected number of messages,
+ // with some padding to ensure messages aren't being processed more than once
+ source.CancelAfter(3000);
+
+ // Stop polling and wait for the polling cycle to complete with a buffer
+ pollingControlToken.StopPolling();
+ await Task.Delay(pollingControlToken.PollingWaitTime * 2);
+
+ // Publish the next 5 messages that should not be received due to stopping polling
+ for (int i = 5; i < 10; i++)
+ {
+ await publisher.PublishAsync(new ChatMessage
+ {
+ MessageDescription = $"Test{i + 1}"
+ });
+ }
+
+ while (!source.IsCancellationRequested) { }
+
+ var inMemoryLogger = serviceProvider.GetRequiredService();
+ var tempStorage = serviceProvider.GetRequiredService>();
+
+ Assert.Empty(inMemoryLogger.Logs.Where(x => x.Exception is AmazonSQSException ex && ex.ErrorCode.Equals("AWS.SimpleQueueService.TooManyEntriesInBatchRequest")));
+ Assert.Equal(5, tempStorage.Messages.Count);
+ for (int i = 0; i < 5; i++)
+ {
+ var message = tempStorage.Messages.FirstOrDefault(x => x.Message.MessageDescription.Equals($"Test{i + 1}"));
+ Assert.NotNull(message);
+ Assert.False(string.IsNullOrEmpty(message.Id));
+ Assert.Equal("/aws/messaging", message.Source.ToString());
+ Assert.True(message.TimeStamp > publishStartTime);
+ Assert.True(message.TimeStamp < publishEndTime);
+ }
+ }
+
[Theory]
[InlineData(20)]
public async Task SendMixOfMessageTypesToSameQueue(int numberOfMessages)
diff --git a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs
index d5e9a5ad..e1c2c7c3 100644
--- a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs
+++ b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs
@@ -26,6 +26,12 @@ public class SQSMessagePollerTests
{
private const string TEST_QUEUE_URL = "queueUrl";
private InMemoryLogger? _inMemoryLogger;
+ private readonly ServiceCollection _serviceCollection;
+
+ public SQSMessagePollerTests()
+ {
+ _serviceCollection = new ServiceCollection();
+ }
///
/// Tests that starting an SQS poller with default settings begins polling SQS
@@ -42,6 +48,59 @@ public async Task SQSMessagePoller_Defaults_PollsSQS()
client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.AtLeastOnce());
}
+ ///
+ /// Tests that starting an SQS poller with
+ /// set to false, will not poll any messages.
+ ///
+ [Fact]
+ public async Task SQSMessagePoller_PollingControlStopped_DoesNotPollSQS()
+ {
+ var client = new Mock();
+ client.Setup(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()))
+ .ReturnsAsync(new ReceiveMessageResponse(), TimeSpan.FromMilliseconds(50));
+ var pollingControlToken = new PollingControlToken
+ {
+ PollingWaitTime = TimeSpan.FromMilliseconds(25)
+ };
+ pollingControlToken.StopPolling();
+
+ await RunSQSMessagePollerTest(client, pollingControlToken: pollingControlToken);
+
+ client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.Never);
+ }
+
+ ///
+ /// Tests that starting an SQS poller with
+ /// set to false, will not poll any messages first. Then when changing the value to true
+ /// polling resumes and messages are received.
+ ///
+ [Fact]
+ public async Task SQSMessagePoller_PollingControlRestarted_PollsSQS()
+ {
+ var client = new Mock();
+ client.Setup(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()))
+ .ReturnsAsync(new ReceiveMessageResponse(), TimeSpan.FromMilliseconds(50));
+ var pollingControlToken = new PollingControlToken
+ {
+ PollingWaitTime = TimeSpan.FromMilliseconds(25)
+ };
+ pollingControlToken.StopPolling();
+
+ var source = new CancellationTokenSource();
+ var pump = BuildMessagePumpService(client, pollingControlToken: pollingControlToken);
+ var task = pump.StartAsync(source.Token);
+
+ client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.Never);
+
+ pollingControlToken.StartPolling();
+ await Task.Delay(pollingControlToken.PollingWaitTime * 2);
+
+ client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.AtLeastOnce());
+
+ source.Cancel();
+ await task;
+ }
+
///
/// Tests that configuring a poller with
/// set to a value greater than SQS's current limit of 10 will only receive 10 messages at a time.
@@ -251,20 +310,38 @@ public async Task SQSMessagePoller_ContinuesForNonFatalException()
///
/// Mocked SQS client
/// SQS MessagePoller options
- private async Task RunSQSMessagePollerTest(Mock mockSqsClient, Action? options = null)
+ /// Polling control token to start or stop message receipt
+ private async Task RunSQSMessagePollerTest(Mock mockSqsClient, Action? options = null, PollingControlToken? pollingControlToken = null)
{
- var serviceCollection = new ServiceCollection();
- serviceCollection.AddLogging();
+ var pump = BuildMessagePumpService(mockSqsClient, options, pollingControlToken);
- serviceCollection.AddAWSMessageBus(builder =>
+ var source = new CancellationTokenSource();
+ source.CancelAfter(500);
+
+ await pump.StartAsync(source.Token);
+ }
+
+ ///
+ /// Helper function that initializes but does not start a with
+ /// a mocked SQS client
+ ///
+ /// Mocked SQS client
+ /// SQS MessagePoller options
+ /// Polling control token to start or stop message receipt
+ private MessagePumpService BuildMessagePumpService(Mock mockSqsClient, Action? options = null, PollingControlToken? pollingControlToken = null)
+ {
+ _serviceCollection.AddLogging();
+
+ _serviceCollection.AddAWSMessageBus(builder =>
{
+ if (pollingControlToken is not null) builder.ConfigurePollingControlToken(pollingControlToken);
builder.AddSQSPoller(TEST_QUEUE_URL, options);
builder.AddMessageHandler();
});
- serviceCollection.AddSingleton(mockSqsClient.Object);
+ _serviceCollection.AddSingleton(mockSqsClient.Object);
- var serviceProvider = serviceCollection.BuildServiceProvider();
+ var serviceProvider = _serviceCollection.BuildServiceProvider();
var pump = serviceProvider.GetService() as MessagePumpService;
@@ -273,10 +350,7 @@ private async Task RunSQSMessagePollerTest(Mock mockSqsClient, Actio
Assert.Fail($"Unable to get the {nameof(MessagePumpService)} from the service provider.");
}
- var source = new CancellationTokenSource();
- source.CancelAfter(500);
-
- await pump.StartAsync(source.Token);
+ return pump;
}
///
From f6608c82666a0ba7aeeefb459499832bd162fae6 Mon Sep 17 00:00:00 2001
From: jkwpappin <96733604+jkwpappin@users.noreply.github.com>
Date: Wed, 26 Feb 2025 16:06:26 +0000
Subject: [PATCH 2/6] Add changelog
(cherry picked from commit fba5d5b89e781da3f6a02fd5a095538515661c2b)
---
.../changes/276cbae4-0a14-4efa-9fd2-760cf4529b0a.json | 11 +++++++++++
1 file changed, 11 insertions(+)
create mode 100644 .autover/changes/276cbae4-0a14-4efa-9fd2-760cf4529b0a.json
diff --git a/.autover/changes/276cbae4-0a14-4efa-9fd2-760cf4529b0a.json b/.autover/changes/276cbae4-0a14-4efa-9fd2-760cf4529b0a.json
new file mode 100644
index 00000000..f91cbfd8
--- /dev/null
+++ b/.autover/changes/276cbae4-0a14-4efa-9fd2-760cf4529b0a.json
@@ -0,0 +1,11 @@
+{
+ "Projects": [
+ {
+ "Name": "AWS.Messaging",
+ "Type": "Minor",
+ "ChangelogMessages": [
+ "Implement a start/stop mechanism for message consumption. (ISSUE 147)"
+ ]
+ }
+ ]
+}
\ No newline at end of file
From 3c2d631e225e97ffc61657abbdeb81f30910d067 Mon Sep 17 00:00:00 2001
From: jkwpappin <96733604+jkwpappin@users.noreply.github.com>
Date: Wed, 26 Feb 2025 16:37:45 +0000
Subject: [PATCH 3/6] Remove redundant parenthesis
(cherry picked from commit 1946aecd804b93a76387d3d33d6a342f82bbdd73)
---
sampleapps/SubscriberService/Program.cs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/sampleapps/SubscriberService/Program.cs b/sampleapps/SubscriberService/Program.cs
index 0523e62e..ef7cdf8f 100644
--- a/sampleapps/SubscriberService/Program.cs
+++ b/sampleapps/SubscriberService/Program.cs
@@ -51,7 +51,7 @@ await Host.CreateDefaultBuilder(args)
});
// Optional: Configure a PollingControlToken, you can call Start()/Stop() to start and stop message processing, by default it will be started
- builder.ConfigurePollingControlToken(new PollingControlToken()
+ builder.ConfigurePollingControlToken(new PollingControlToken
{
// Optional: Set how frequently it will check for changes to the state of the PollingControlToken
PollingWaitTime = TimeSpan.FromMilliseconds(200)
From 129320090a49b31b45f07f7b2b3a2085650aaa59 Mon Sep 17 00:00:00 2001
From: jkwpappin <96733604+jkwpappin@users.noreply.github.com>
Date: Thu, 13 Mar 2025 15:54:26 +0000
Subject: [PATCH 4/6] Fix whitespace
(cherry picked from commit b86353c64ce94fc47f9745724029720a5ee6cdef)
---
src/AWS.Messaging/Configuration/MessageBusBuilder.cs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/AWS.Messaging/Configuration/MessageBusBuilder.cs b/src/AWS.Messaging/Configuration/MessageBusBuilder.cs
index f4d96168..59c445e2 100644
--- a/src/AWS.Messaging/Configuration/MessageBusBuilder.cs
+++ b/src/AWS.Messaging/Configuration/MessageBusBuilder.cs
@@ -323,7 +323,7 @@ internal void Build()
_serviceCollection.TryAdd(ServiceDescriptor.Singleton());
_serviceCollection.TryAdd(ServiceDescriptor.Singleton(typeof(ILogger<>), typeof(NullLogger<>)));
- _serviceCollection.TryAddSingleton(_messageConfiguration.PollingControlToken);
+ _serviceCollection.TryAddSingleton(_messageConfiguration.PollingControlToken);
_serviceCollection.TryAddSingleton(_messageConfiguration);
_serviceCollection.TryAddSingleton();
_serviceCollection.TryAddSingleton();
From f4ebf6e75a267f832c69a4cd89ad600c18749a2d Mon Sep 17 00:00:00 2001
From: jkwpappin <96733604+jkwpappin@users.noreply.github.com>
Date: Thu, 13 Mar 2025 16:55:58 +0000
Subject: [PATCH 5/6] Add a ConfigureAwait as we're trying to do something and
verify a Moq during the async execution
(cherry picked from commit 6c3f0ccdea56cc12704d34787dc1e72797472213)
---
test/AWS.Messaging.IntegrationTests/SubscriberTests.cs | 4 ++--
test/AWS.Messaging.UnitTests/MessagePublisherTests.cs | 8 ++++----
test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs | 3 ++-
3 files changed, 8 insertions(+), 7 deletions(-)
diff --git a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs
index b09b5d8a..7a0e2527 100644
--- a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs
+++ b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs
@@ -181,7 +181,7 @@ await publisher.PublishAsync(new ChatMessage
Assert.NotNull(pump);
var source = new CancellationTokenSource();
- await pump.StartAsync(source.Token);
+ await pump.StartAsync(source.Token).ConfigureAwait(true);
// Wait for the pump to shut down after processing the expected number of messages,
// with some padding to ensure messages aren't being processed more than once
@@ -189,7 +189,7 @@ await publisher.PublishAsync(new ChatMessage
// Stop polling and wait for the polling cycle to complete with a buffer
pollingControlToken.StopPolling();
- await Task.Delay(pollingControlToken.PollingWaitTime * 2);
+ await Task.Delay(pollingControlToken.PollingWaitTime * 2, source.Token);
// Publish the next 5 messages that should not be received due to stopping polling
for (int i = 5; i < 10; i++)
diff --git a/test/AWS.Messaging.UnitTests/MessagePublisherTests.cs b/test/AWS.Messaging.UnitTests/MessagePublisherTests.cs
index ca9e9d6b..f0b365d2 100644
--- a/test/AWS.Messaging.UnitTests/MessagePublisherTests.cs
+++ b/test/AWS.Messaging.UnitTests/MessagePublisherTests.cs
@@ -764,7 +764,7 @@ public async Task EventBridgePublisher_UnhappyPath()
new DefaultTelemetryFactory(serviceProvider)
);
- var publishResponse = Assert.ThrowsAsync(async () => await messagePublisher.PublishAsync(_chatMessage));
+ var publishResponse = await Assert.ThrowsAsync(async () => await messagePublisher.PublishAsync(_chatMessage));
_eventBridgeClient.Verify(x =>
x.PutEventsAsync(
@@ -774,9 +774,9 @@ public async Task EventBridgePublisher_UnhappyPath()
It.IsAny()),
Times.Exactly(1));
- Assert.Equal("Message failed to publish.", publishResponse.Result.Message);
- Assert.Equal("ErrorMessage", publishResponse.Result.InnerException.Message);
- Assert.Equal("ErrorCode", ((EventBridgePutEventsException)publishResponse.Result.InnerException).ErrorCode);
+ Assert.Equal("Message failed to publish.", publishResponse.Message);
+ Assert.Equal("ErrorMessage", publishResponse.InnerException.Message);
+ Assert.Equal("ErrorCode", ((EventBridgePutEventsException)publishResponse.InnerException).ErrorCode);
}
[Fact]
diff --git a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs
index e1c2c7c3..80313630 100644
--- a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs
+++ b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs
@@ -89,11 +89,12 @@ public async Task SQSMessagePoller_PollingControlRestarted_PollsSQS()
var source = new CancellationTokenSource();
var pump = BuildMessagePumpService(client, pollingControlToken: pollingControlToken);
var task = pump.StartAsync(source.Token);
+ await task.ConfigureAwait(true);
client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.Never);
pollingControlToken.StartPolling();
- await Task.Delay(pollingControlToken.PollingWaitTime * 2);
+ await Task.Delay(pollingControlToken.PollingWaitTime * 2, source.Token);
client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.AtLeastOnce());
From 3c6a6e266b7c00cb2a4c8d393a5cf122d90065e7 Mon Sep 17 00:00:00 2001
From: jkwpappin <96733604+jkwpappin@users.noreply.github.com>
Date: Fri, 14 Mar 2025 10:31:24 +0000
Subject: [PATCH 6/6] Use SpinWait to hold the thread
(cherry picked from commit 1b4ac7ac1bcc552c00f84a73e33689a63dcd704d)
---
test/AWS.Messaging.IntegrationTests/SubscriberTests.cs | 5 +++--
test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs | 4 ++--
2 files changed, 5 insertions(+), 4 deletions(-)
diff --git a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs
index 7a0e2527..34bddbb7 100644
--- a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs
+++ b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs
@@ -189,7 +189,8 @@ await publisher.PublishAsync(new ChatMessage
// Stop polling and wait for the polling cycle to complete with a buffer
pollingControlToken.StopPolling();
- await Task.Delay(pollingControlToken.PollingWaitTime * 2, source.Token);
+
+ SpinWait.SpinUntil(() => false, pollingControlToken.PollingWaitTime * 3);
// Publish the next 5 messages that should not be received due to stopping polling
for (int i = 5; i < 10; i++)
@@ -200,7 +201,7 @@ await publisher.PublishAsync(new ChatMessage
});
}
- while (!source.IsCancellationRequested) { }
+ SpinWait.SpinUntil(() => source.IsCancellationRequested);
var inMemoryLogger = serviceProvider.GetRequiredService();
var tempStorage = serviceProvider.GetRequiredService>();
diff --git a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs
index 80313630..ea0ee6c9 100644
--- a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs
+++ b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs
@@ -89,12 +89,12 @@ public async Task SQSMessagePoller_PollingControlRestarted_PollsSQS()
var source = new CancellationTokenSource();
var pump = BuildMessagePumpService(client, pollingControlToken: pollingControlToken);
var task = pump.StartAsync(source.Token);
- await task.ConfigureAwait(true);
client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.Never);
pollingControlToken.StartPolling();
- await Task.Delay(pollingControlToken.PollingWaitTime * 2, source.Token);
+
+ SpinWait.SpinUntil(() => false, pollingControlToken.PollingWaitTime * 5);
client.Verify(x => x.ReceiveMessageAsync(It.IsAny(), It.IsAny()), Times.AtLeastOnce());