Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: follow design patterns for publishers #101

Merged
merged 1 commit into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public class PublisherController : ControllerBase
}

// Publish the message to SQS using the injected ISQSPublisher, with SQS-specific options
await _sqsPublisher.PublishAsync(message, new SQSOptions
await _sqsPublisher.SendAsync(message, new SQSOptions
{
DelaySeconds = <delay-in-seconds>,
MessageAttributes = <message-attributes>,
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public class PublisherController : ControllerBase
}

// Publish the message to SQS using the injected ISQSPublisher, with SQS-specific options
await _sqsPublisher.PublishAsync(message, new SQSOptions
await _sqsPublisher.SendAsync(message, new SQSOptions
{
DelaySeconds = <delay-in-seconds>,
MessageAttributes = <message-attributes>,
Expand Down
4 changes: 2 additions & 2 deletions sampleapps/PublisherAPI/Controllers/PublisherController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public async Task<IActionResult> PublishOrder([FromBody] OrderInfo message)

return Ok();
}

[HttpPost("fooditem", Name = "Food Item")]
public async Task<IActionResult> PublishFoodItem([FromBody] FoodItem message)
{
Expand Down Expand Up @@ -87,7 +87,7 @@ public async Task<IActionResult> PublishTransaction([FromBody] TransactionInfo t
return BadRequest("The TransactionId cannot be null or empty.");
}

await _sqsPublisher.PublishAsync(transactionInfo, new SQSOptions
await _sqsPublisher.SendAsync(transactionInfo, new SQSOptions
{
MessageGroupId = "group-123"
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using AWS.Messaging.Services;

namespace AWS.Messaging.Publishers.EventBridge
{
/// <summary>
/// This interface allows publishing messages from application code to Amazon EventBridge.
/// It exposes the <see cref="PublishAsync{T}(T, EventBridgeOptions?, CancellationToken)"/> method which takes in a user-defined message, and <see cref="EventBridgeOptions"/> to set additonal parameters while publishing messages to EventBridge.
/// Using dependency injection, this interface is available to inject anywhere in the code.
/// </summary>
public interface IEventBridgePublisher
public interface IEventBridgePublisher : IEventPublisher
{
/// <summary>
/// Publishes the application message to SNS.
Expand Down
38 changes: 22 additions & 16 deletions src/AWS.Messaging/Publishers/MessageRoutingPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using AWS.Messaging.Publishers.EventBridge;
using AWS.Messaging.Publishers.SNS;
using AWS.Messaging.Publishers.SQS;
using AWS.Messaging.Services;
using AWS.Messaging.Telemetry;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -47,10 +48,16 @@ internal class MessageRoutingPublisher : IMessagePublisher
};

/// <summary>
/// This dictionary serves as a method to cache created instances of <see cref="IMessagePublisher"/>,
/// This dictionary serves as a method to cache created instances of <see cref="ICommandPublisher"/>,
/// to avoid having to create a new instance any time a message is sent.
/// </summary>
private readonly ConcurrentDictionary<Type, ICommandPublisher> _commandPublisherInstances = new();

/// <summary>
/// This dictionary serves as a method to cache created instances of <see cref="IEventPublisher"/>,
/// to avoid having to create a new instance any time a message is published.
/// </summary>
private readonly ConcurrentDictionary<Type, IMessagePublisher> _publisherInstances = new();
private readonly ConcurrentDictionary<Type, IEventPublisher> _eventPublisherInstances = new();

/// <summary>
/// Publishes a user-defined message to an AWS service based on the
Expand Down Expand Up @@ -80,16 +87,23 @@ public async Task PublishAsync<T>(T message, CancellationToken token = default)

if (_publisherTypeMapping.TryGetValue(mapping.PublishTargetType, out var publisherType))
{
if (!typeof(IMessagePublisher).IsAssignableFrom(publisherType))
if (typeof(ICommandPublisher).IsAssignableFrom(publisherType))
{
var publisher = _commandPublisherInstances.GetOrAdd(publisherType, _ => (ICommandPublisher) ActivatorUtilities.CreateInstance(_serviceProvider, publisherType));
await publisher.SendAsync(message, token);
}
else if (typeof(IEventPublisher).IsAssignableFrom(publisherType))
{
var publisher = _eventPublisherInstances.GetOrAdd(publisherType, _ => (IEventPublisher) ActivatorUtilities.CreateInstance(_serviceProvider, publisherType));
await publisher.PublishAsync(message, token);
}
else
{
_logger.LogError("The message publisher corresponding to the type '{PublishTargetType}' is invalid " +
"and does not implement the interface '{InterfaceType}'.", mapping.PublishTargetType, typeof(IMessagePublisher));
"and does not implement the interface '{CommandInterfaceType}' or '{EventInterfaceType}'.", mapping.PublishTargetType, typeof(ICommandPublisher), typeof(IEventPublisher));
throw new InvalidPublisherTypeException($"The message publisher corresponding to the type '{mapping.PublishTargetType}' is invalid " +
$"and does not implement the interface '{typeof(IMessagePublisher)}'.");
$"and does not implement the interface '{typeof(ICommandPublisher)}' or '{typeof(IEventPublisher)}'.");
}

var publisher = GetPublisherInstance(publisherType);
await publisher.PublishAsync(message, token);
}
else
{
Expand All @@ -104,12 +118,4 @@ public async Task PublishAsync<T>(T message, CancellationToken token = default)
}
}
}

private IMessagePublisher GetPublisherInstance(Type publisherType)
{
return _publisherInstances.GetOrAdd(publisherType, x =>
{
return (IMessagePublisher) ActivatorUtilities.CreateInstance(_serviceProvider, publisherType);
});
}
}
4 changes: 3 additions & 1 deletion src/AWS.Messaging/Publishers/SNS/ISNSPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using AWS.Messaging.Services;

namespace AWS.Messaging.Publishers.SNS
{
/// <summary>
/// This interface allows publishing messages from application code to Amazon SNS.
/// It exposes the <see cref="PublishAsync{T}(T, SNSOptions?, CancellationToken)"/> method which takes in a user-defined message, and <see cref="SNSOptions"/> to set additonal parameters while publishing messages to SNS.
/// Using dependency injection, this interface is available to inject anywhere in the code.
/// </summary>
public interface ISNSPublisher
public interface ISNSPublisher : IEventPublisher
{
/// <summary>
/// Publishes the application message to SNS.
Expand Down
12 changes: 7 additions & 5 deletions src/AWS.Messaging/Publishers/SQS/ISQSPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using AWS.Messaging.Services;

namespace AWS.Messaging.Publishers.SQS
{
/// <summary>
/// This interface allows publishing messages from application code to Amazon SQS.
/// It exposes the <see cref="PublishAsync{T}(T, SQSOptions?, CancellationToken)"/> method which takes in a user-defined message, and <see cref="SQSOptions"/> to set additonal parameters while publishing messages to SQS.
/// This interface allows sending messages from application code to Amazon SQS.
/// It exposes the <see cref="SendAsync{T}(T, SQSOptions?, CancellationToken)"/> method which takes in a user-defined message, and <see cref="SQSOptions"/> to set additional parameters while sending messages to SQS.
/// Using dependency injection, this interface is available to inject anywhere in the code.
/// </summary>
public interface ISQSPublisher
public interface ISQSPublisher : ICommandPublisher
{
/// <summary>
/// Publishes the application message to SQS.
/// Sends the application message to SQS.
/// </summary>
/// <param name="message">The application message that will be serialized and sent to an SQS queue</param>
/// <param name="sqsOptions">Contains additional parameters that can be set while sending a message to an SQS queue</param>
/// <param name="token">The cancellation token used to cancel the request.</param>
Task PublishAsync<T>(T message, SQSOptions? sqsOptions, CancellationToken token = default);
Task SendAsync<T>(T message, SQSOptions? sqsOptions, CancellationToken token = default);
}
}
20 changes: 10 additions & 10 deletions src/AWS.Messaging/Publishers/SQS/SQSPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
namespace AWS.Messaging.Publishers.SQS;

/// <summary>
/// The SQS message publisher allows publishing messages to AWS SQS.
/// The SQS message publisher allows sending messages to AWS SQS.
/// </summary>
internal class SQSPublisher : IMessagePublisher, ISQSPublisher
internal class SQSPublisher : ISQSPublisher
{
private readonly IAWSClientProvider _awsClientProvider;
private readonly ILogger<IMessagePublisher> _logger;
private readonly ILogger<ISQSPublisher> _logger;
private readonly IMessageConfiguration _messageConfiguration;
private readonly IEnvelopeSerializer _envelopeSerializer;
private readonly ITelemetryFactory _telemetryFactory;
Expand All @@ -29,7 +29,7 @@ internal class SQSPublisher : IMessagePublisher, ISQSPublisher
/// </summary>
public SQSPublisher(
IAWSClientProvider awsClientProvider,
ILogger<IMessagePublisher> logger,
ILogger<ISQSPublisher> logger,
IMessageConfiguration messageConfiguration,
IEnvelopeSerializer envelopeSerializer,
ITelemetryFactory telemetryFactory)
Expand All @@ -48,9 +48,9 @@ internal class SQSPublisher : IMessagePublisher, ISQSPublisher
/// <param name="token">The cancellation token used to cancel the request.</param>
/// <exception cref="InvalidMessageException">If the message is null or invalid.</exception>
/// <exception cref="MissingMessageTypeConfigurationException">If cannot find the publisher configuration for the message type.</exception>
public async Task PublishAsync<T>(T message, CancellationToken token = default)
public async Task SendAsync<T>(T message, CancellationToken token = default)
{
await PublishAsync(message, null, token);
await SendAsync(message, null, token);
}

/// <summary>
Expand All @@ -61,7 +61,7 @@ public async Task PublishAsync<T>(T message, CancellationToken token = default)
/// <param name="token">The cancellation token used to cancel the request.</param>
/// <exception cref="InvalidMessageException">If the message is null or invalid.</exception>
/// <exception cref="MissingMessageTypeConfigurationException">If cannot find the publisher configuration for the message type.</exception>
public async Task PublishAsync<T>(T message, SQSOptions? sqsOptions, CancellationToken token = default)
public async Task SendAsync<T>(T message, SQSOptions? sqsOptions, CancellationToken token = default)
{
using (var trace = _telemetryFactory.Trace("Publish to AWS SQS"))
{
Expand Down Expand Up @@ -127,9 +127,9 @@ private SendMessageRequest CreateSendMessageRequest(string queueUrl, string mess
if (queueUrl.EndsWith(FIFO_SUFFIX) && string.IsNullOrEmpty(sqsOptions?.MessageGroupId))
{
var errorMessage =
$"You are attempting to publish to a FIFO SQS queue but the request does not include a message group ID. " +
$"Please use {nameof(ISQSPublisher)} from the service collection to publish to FIFO queues. " +
$"It exposes a {nameof(PublishAsync)} method that accepts {nameof(SQSOptions)} as a parameter. " +
$"You are attempting to send to a FIFO SQS queue but the request does not include a message group ID. " +
$"Please use {nameof(ISQSPublisher)} from the service collection to send to FIFO queues. " +
$"It exposes a {nameof(SendAsync)} method that accepts {nameof(SQSOptions)} as a parameter. " +
$"A message group ID must be specified via {nameof(SQSOptions.MessageGroupId)}. " +
$"Additionally, {nameof(SQSOptions.MessageDeduplicationId)} must also be specified if content based de-duplication is not enabled on the queue.";

Expand Down
18 changes: 18 additions & 0 deletions src/AWS.Messaging/Services/ICommandPublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.\r
// SPDX-License-Identifier: Apache-2.0

namespace AWS.Messaging.Services;

/// <summary>
/// This interface allows sending messages from application code to recipient-specific Amazon services.
/// It exposes the <see cref="SendAsync{T}(T, CancellationToken)"/> method which takes in a user-defined message to send to a recipient-specific Amazon service.
/// </summary>
public interface ICommandPublisher
{
/// <summary>
/// Sends the application message to a recipient-specific Amazon service.
/// </summary>
/// <param name="message">The application message that will be serialized and sent.</param>
/// <param name="token">The cancellation token used to cancel the request.</param>
Task SendAsync<T>(T message, CancellationToken token = default);
}
18 changes: 18 additions & 0 deletions src/AWS.Messaging/Services/IEventPublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.\r
// SPDX-License-Identifier: Apache-2.0

namespace AWS.Messaging.Services;

/// <summary>
/// This interface allows publishing messages from application code to event-based Amazon services.
/// It exposes the <see cref="PublishAsync{T}(T, CancellationToken)"/> method which takes in a user-defined message to publish to an event-based Amazon service.
/// </summary>
public interface IEventPublisher
{
/// <summary>
/// Publishes the application message to an event-based Amazon service.
/// </summary>
/// <param name="message">The application message that will be serialized and published.</param>
/// <param name="token">The cancellation token used to cancel the request.</param>
Task PublishAsync<T>(T message, CancellationToken token = default);
}
4 changes: 2 additions & 2 deletions test/AWS.Messaging.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ static async Task<int> Main(string[] args)
{
DisplayData(benchmarkCollector.PublishTimes, publishElapsedTime, numberOfMessages, "Publishing");
DisplayData(benchmarkCollector.ReceptionTimes, handlingElapsedTime, numberOfMessages, "Receiving");
}
}

host.Dispose();
return benchmarkCollector;
Expand Down Expand Up @@ -218,7 +218,7 @@ private static async Task<TimeSpan> PublishMessages(ISQSPublisher publisher, IBe
await Parallel.ForEachAsync(Enumerable.Range(0, messageCount), options, async (messageNumber, token) =>
{
var start = stopwatch.Elapsed;
await publisher.PublishAsync(new BenchmarkMessage { SentTime = DateTime.UtcNow }, null, token);
await publisher.SendAsync(new BenchmarkMessage { SentTime = DateTime.UtcNow }, null, token);
var publishDuration = stopwatch.Elapsed - start;

benchmarkCollector.RecordMessagePublish(publishDuration);
Expand Down
2 changes: 1 addition & 1 deletion test/AWS.Messaging.IntegrationTests/FifoSubscriberTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private async Task PublishTransactions(ISQSPublisher sqsPublisher, int numTransa
transactionInfo.ShouldFail = true;
}

await sqsPublisher.PublishAsync(transactionInfo, new SQSOptions
await sqsPublisher.SendAsync(transactionInfo, new SQSOptions
{
MessageGroupId = userId
});
Expand Down
2 changes: 1 addition & 1 deletion test/AWS.Messaging.IntegrationTests/LambdaTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ public async Task ProcessFifoLambdaEventsAsync_Success(int numberOfGroups, int n

expectedMessagesPerGroup[groupId].Add(transactionInfo);

await _publisher!.PublishAsync(transactionInfo, new SQSOptions
await _publisher!.SendAsync(transactionInfo, new SQSOptions
{
MessageGroupId = groupId
});
Expand Down
Loading
Loading