Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .autover/changes/276cbae4-0a14-4efa-9fd2-760cf4529b0a.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"Projects": [
{
"Name": "AWS.Messaging",
"Type": "Minor",
"ChangelogMessages": [
"Implement a start/stop mechanism for message consumption. (ISSUE 147)"
]
}
]
}
8 changes: 8 additions & 0 deletions sampleapps/SubscriberService/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

using System.Text.Json;
using AWS.Messaging.Configuration;
using AWS.Messaging.Telemetry.OpenTelemetry;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -49,6 +50,13 @@ await Host.CreateDefaultBuilder(args)
});
});

// Optional: Configure a PollingControlToken, you can call Start()/Stop() to start and stop message processing, by default it will be started
builder.ConfigurePollingControlToken(new PollingControlToken
{
// Optional: Set how frequently it will check for changes to the state of the PollingControlToken
PollingWaitTime = TimeSpan.FromMilliseconds(200)
});

// Logging data messages is disabled by default to protect sensitive user data. If you want this enabled, uncomment the line below.
// builder.EnableMessageContentLogging();
})
Expand Down
5 changes: 5 additions & 0 deletions src/AWS.Messaging/Configuration/IMessageBusBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,9 @@ public interface IMessageBusBuilder
/// Configures the backoff policy used by <see cref="BackoffHandler"/> and its available options.
/// </summary>
IMessageBusBuilder ConfigureBackoffPolicy(Action<BackoffPolicyBuilder> configure);

/// <summary>
/// Configures the <see cref="PollingControlToken"/>, which can be used to start and stop the SQS Message Poller.
/// </summary>
IMessageBusBuilder ConfigurePollingControlToken(PollingControlToken pollingControlToken);
}
6 changes: 6 additions & 0 deletions src/AWS.Messaging/Configuration/IMessageConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using AWS.Messaging.Configuration.Internal;
using AWS.Messaging.Serialization;
using AWS.Messaging.Services;
using AWS.Messaging.Services.Backoff;
using AWS.Messaging.Services.Backoff.Policies;
using AWS.Messaging.Services.Backoff.Policies.Options;
Expand Down Expand Up @@ -93,4 +94,9 @@ public interface IMessageConfiguration
/// Holds an instance of <see cref="CappedExponentialBackoffOptions"/> to control the behavior of <see cref="CappedExponentialBackoffPolicy"/>.
/// </summary>
CappedExponentialBackoffOptions CappedExponentialBackoffOptions { get; }

/// <summary>
/// Holds an instance of <see cref="PollingControlToken"/> to control behaviour of <see cref="IMessagePoller"/>
/// </summary>
PollingControlToken PollingControlToken { get; }
}
11 changes: 10 additions & 1 deletion src/AWS.Messaging/Configuration/MessageBusBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using System.Collections.Concurrent;
using AWS.Messaging.Configuration.Internal;
using AWS.Messaging.Publishers;
using AWS.Messaging.Publishers.EventBridge;
Expand All @@ -26,7 +27,7 @@ namespace AWS.Messaging.Configuration;
/// </summary>
public class MessageBusBuilder : IMessageBusBuilder
{
private static readonly Dictionary<IServiceCollection, MessageConfiguration> _messageConfigurations = new();
private static readonly ConcurrentDictionary<IServiceCollection, MessageConfiguration> _messageConfigurations = new();
private readonly MessageConfiguration _messageConfiguration;
private readonly IList<ServiceDescriptor> _additionalServices = new List<ServiceDescriptor>();
private readonly IServiceCollection _serviceCollection;
Expand Down Expand Up @@ -164,6 +165,13 @@ public IMessageBusBuilder AddMessageSourceSuffix(string suffix)
return this;
}

/// <inheritdoc/>
public IMessageBusBuilder ConfigurePollingControlToken(PollingControlToken pollingControlToken)
{
_messageConfiguration.PollingControlToken = pollingControlToken;
return this;
}

/// <inheritdoc/>
[RequiresDynamicCode("This method requires loading types dynamically as defined in the configuration system.")]
[RequiresUnreferencedCode("This method requires loading types dynamically as defined in the configuration system.")]
Expand Down Expand Up @@ -327,6 +335,7 @@ internal void Build()
_serviceCollection.TryAdd(ServiceDescriptor.Singleton<ILoggerFactory, NullLoggerFactory>());
_serviceCollection.TryAdd(ServiceDescriptor.Singleton(typeof(ILogger<>), typeof(NullLogger<>)));

_serviceCollection.TryAddSingleton(_messageConfiguration.PollingControlToken);
_serviceCollection.TryAddSingleton<IMessageConfiguration>(_messageConfiguration);
_serviceCollection.TryAddSingleton<IMessageSerializer, MessageSerializer>();
_serviceCollection.TryAddSingleton<IEnvelopeSerializer, EnvelopeSerializer>();
Expand Down
3 changes: 3 additions & 0 deletions src/AWS.Messaging/Configuration/MessageConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,7 @@ public class MessageConfiguration : IMessageConfiguration

/// <inheritdoc/>
public CappedExponentialBackoffOptions CappedExponentialBackoffOptions { get; set; } = new();

/// <inheritdoc/>
public PollingControlToken PollingControlToken { get; set; } = new();
}
31 changes: 31 additions & 0 deletions src/AWS.Messaging/Configuration/PollingControlToken.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

namespace AWS.Messaging.Configuration
{
/// <summary>
/// Control token to start and stop message polling for a service.
/// </summary>
public class PollingControlToken
{
/// <summary>
/// Indicates if polling is enabled.
/// </summary>
internal bool IsPollingEnabled { get; private set; } = true;

/// <summary>
/// Start polling of the SQS Queue.
/// </summary>
public void StartPolling() => IsPollingEnabled = true;

/// <summary>
/// Stop polling of the SQS Queue.
/// </summary>
public void StopPolling() => IsPollingEnabled = false;

/// <summary>
/// Configurable amount of time to wait between polling for a change in status
/// </summary>
public TimeSpan PollingWaitTime { get; init; } = TimeSpan.FromMilliseconds(200);
}
}
15 changes: 12 additions & 3 deletions src/AWS.Messaging/SQS/SQSMessagePoller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ internal class SQSMessagePoller : IMessagePoller, ISQSMessageCommunication
private readonly SQSMessagePollerConfiguration _configuration;
private readonly IEnvelopeSerializer _envelopeSerializer;
private readonly IBackoffHandler _backoffHandler;
private readonly PollingControlToken _pollingControlToken;
private readonly bool _isFifoEndpoint;

/// <summary>
Expand Down Expand Up @@ -49,19 +50,22 @@ internal class SQSMessagePoller : IMessagePoller, ISQSMessageCommunication
/// <param name="configuration">The SQS message poller configuration.</param>
/// <param name="envelopeSerializer">Serializer used to deserialize the SQS messages</param>
/// <param name="backoffHandler">Backoff handler for performing back-offs if exceptions are thrown when polling SQS.</param>
/// <param name="pollingControlToken">Control token to start and stop the poller.</param>
public SQSMessagePoller(
ILogger<SQSMessagePoller> logger,
IMessageManagerFactory messageManagerFactory,
IAWSClientProvider awsClientProvider,
SQSMessagePollerConfiguration configuration,
IEnvelopeSerializer envelopeSerializer,
IBackoffHandler backoffHandler)
IBackoffHandler backoffHandler,
PollingControlToken pollingControlToken)
{
_logger = logger;
_sqsClient = awsClientProvider.GetServiceClient<IAmazonSQS>();
_configuration = configuration;
_envelopeSerializer = envelopeSerializer;
_backoffHandler = backoffHandler;
_pollingControlToken = pollingControlToken;
_isFifoEndpoint = configuration.SubscriberEndpoint.EndsWith(".fifo");

_messageManager = messageManagerFactory.CreateMessageManager(this, _configuration.ToMessageManagerConfiguration());
Expand All @@ -74,13 +78,18 @@ public async Task StartPollingAsync(CancellationToken token = default)
}

/// <summary>
/// Polls SQS indefinitely until cancelled
/// </summary>
/// Polls SQS indefinitely until cancelled. Message receipt can be stopped and started using <see cref="PollingControlToken"/></summary>
/// <param name="token">Cancellation token to shutdown the poller.</param>
private async Task PollQueue(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
if (!_pollingControlToken.IsPollingEnabled)
{
await Task.Delay(_pollingControlToken.PollingWaitTime, token);
continue;
}

var numberOfMessagesToRead = _configuration.MaxNumberOfConcurrentMessages - _messageManager.ActiveMessageCount;

// If already processing the maximum number of messages, wait for at least one to complete and then try again
Expand Down
83 changes: 83 additions & 0 deletions test/AWS.Messaging.IntegrationTests/SubscriberTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using Amazon.SQS;
using Amazon.SQS.Model;
using AWS.Messaging.Configuration;
using AWS.Messaging.IntegrationTests.Handlers;
using AWS.Messaging.IntegrationTests.Models;
using AWS.Messaging.Tests.Common.Services;
Expand Down Expand Up @@ -146,6 +147,88 @@ await publisher.PublishAsync(new ChatMessage
}
}

[Fact]
public async Task ReceiveMultipleMessagesOnlyWhenPollingControlTokenStarted()
{
var pollingControlToken = new PollingControlToken();
_serviceCollection.AddSingleton<TempStorage<ChatMessage>>();
_serviceCollection.AddAWSMessageBus(builder =>
{
builder.ConfigurePollingControlToken(pollingControlToken);
builder.AddSQSPublisher<ChatMessage>(_sqsQueueUrl);
builder.AddSQSPoller(_sqsQueueUrl, options =>
{
options.VisibilityTimeoutExtensionThreshold = 3; // and a message is eligible for extension after it's been processing at least 3 seconds
options.MaxNumberOfConcurrentMessages = 10;
options.WaitTimeSeconds = 2;
});
builder.AddMessageHandler<ChatMessageHandler, ChatMessage>();
builder.AddMessageSource("/aws/messaging");
builder.ConfigureBackoffPolicy(policyBuilder =>
{
policyBuilder.UseNoBackoff();
});
});
var serviceProvider = _serviceCollection.BuildServiceProvider();

var publishStartTime = DateTime.UtcNow;
var publisher = serviceProvider.GetRequiredService<IMessagePublisher>();
for (int i = 0; i < 5; i++)
{
await publisher.PublishAsync(new ChatMessage
{
MessageDescription = $"Test{i + 1}"
});
}
var publishEndTime = DateTime.UtcNow;

var pump = serviceProvider.GetRequiredService<IHostedService>() as MessagePumpService;
Assert.NotNull(pump);
var source = new CancellationTokenSource();

await pump.StartAsync(source.Token);

// Wait for the pump to shut down after processing the expected number of messages,
// with some padding to ensure messages aren't being processed more than once
source.CancelAfter(30_000);

var tempStorage = serviceProvider.GetRequiredService<TempStorage<ChatMessage>>();
while (tempStorage.Messages.Count < 5 && !source.IsCancellationRequested)
{
await Task.Delay(200, source.Token);
}

// Stop polling and wait for the polling cycle to complete with a buffer
pollingControlToken.StopPolling();

await Task.Delay(5_000);

// Publish the next 5 messages that should not be received due to stopping polling
for (int i = 5; i < 10; i++)
{
await publisher.PublishAsync(new ChatMessage
{
MessageDescription = $"Test{i + 1}"
});
}

SpinWait.SpinUntil(() => source.IsCancellationRequested);

var inMemoryLogger = serviceProvider.GetRequiredService<InMemoryLogger>();

Assert.Empty(inMemoryLogger.Logs.Where(x => x.Exception is AmazonSQSException ex && ex.ErrorCode.Equals("AWS.SimpleQueueService.TooManyEntriesInBatchRequest")));
Assert.Equal(5, tempStorage.Messages.Count);
for (int i = 0; i < 5; i++)
{
var message = tempStorage.Messages.FirstOrDefault(x => x.Message.MessageDescription.Equals($"Test{i + 1}"));
Assert.NotNull(message);
Assert.False(string.IsNullOrEmpty(message.Id));
Assert.Equal("/aws/messaging", message.Source.ToString());
Assert.True(message.TimeStamp > publishStartTime);
Assert.True(message.TimeStamp < publishEndTime);
}
}

[Theory]
[InlineData(20)]
public async Task SendMixOfMessageTypesToSameQueue(int numberOfMessages)
Expand Down
9 changes: 4 additions & 5 deletions test/AWS.Messaging.UnitTests/MessagePublisherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ public async Task EventBridgePublisher_UnhappyPath()
new DefaultTelemetryFactory(serviceProvider)
);

var publishResponse = Assert.ThrowsAsync<FailedToPublishException>(async () => await messagePublisher.PublishAsync(_chatMessage));
var publishResponse = await Assert.ThrowsAsync<FailedToPublishException>(async () => await messagePublisher.PublishAsync(_chatMessage));

_eventBridgeClient.Verify(x =>
x.PutEventsAsync(
Expand All @@ -774,10 +774,9 @@ public async Task EventBridgePublisher_UnhappyPath()
It.IsAny<CancellationToken>()),
Times.Exactly(1));

var publishResponseResult = await publishResponse;
Assert.Equal("Message failed to publish.", publishResponseResult.Message);
Assert.Equal("ErrorMessage", publishResponseResult.InnerException!.Message);
Assert.Equal("ErrorCode", ((EventBridgePutEventsException)publishResponseResult.InnerException).ErrorCode);
Assert.Equal("Message failed to publish.", publishResponse.Message);
Assert.Equal("ErrorMessage", publishResponse.InnerException!.Message);
Assert.Equal("ErrorCode", ((EventBridgePutEventsException)publishResponse.InnerException).ErrorCode);
}

[Fact]
Expand Down
Loading
Loading