Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,27 @@ internal class BarPublisher
}
```

### Batch Publish

Multiple messages can also be published in batches to a topic or queue. Simply call the `PublishAsync` method with a list of messages. The messages will be added to a batch until the batch is full before it the batch is published and continue to work on the remaining messages. This process continues until all messages are consumed.

An `InvalidOperationException` is thrown if a single message cannot fit inside a batch by itself. In this case, any previous published batches will not be rolled back and any remaining messages will remain unpublished.

```csharp
internal class BarBatchPublisher
{
private readonly IServiceBusPublisher publisher;

public BarBatchPublisher(IServiceBusPublisher publisher)
{
this.publisher = publisher;
}

public Task Publish(object[] messages)
=> publisher.PublishAsync("[existing servicebus topic]", messages);
}
```

Here's a full example of how to use the publishers above using a Minimal API setup (SwaggerUI enabled) with a single endpoint called `POST /data` that accepts a simple request body `{ "a": "string", "b": "string", "c": "string" }` which publishes the request to an EventHub and a ServiceBus topic

```csharp
Expand Down
20 changes: 19 additions & 1 deletion src/Atc.Azure.Messaging/ServiceBus/IServiceBusPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,24 @@ Task PublishAsync(
TimeSpan? timeToLive = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Publishes multiple messages in batches. The list of messages will be split in multiple batches if the messages exceeds a single batch size.
/// </summary>
/// <param name="topicOrQueue">The topic or queue name.</param>
/// <param name="messages">The messages to be published.</param>
/// <param name="sessionId">Optional id for appending the message to a known session. If not set, then defaults to a new session.</param>
/// <param name="properties">Optional custom metadata about the message.</param>
/// <param name="timeToLive">Optional <see cref="TimeSpan"/> for message to be consumed. If not set, then defaults to the value specified on queue or topic.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
Task PublishAsync(
string topicOrQueue,
IReadOnlyCollection<object> messages,
string? sessionId = null,
IDictionary<string, string>? properties = null,
TimeSpan? timeToLive = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Schedules a message for publishing at a later time.
/// </summary>
Expand All @@ -44,7 +62,7 @@ Task<long> SchedulePublishAsync(
CancellationToken cancellationToken = default);

/// <summary>
/// Cansels a scheduled publish of a message if it has not been published yet.
/// Cancels a scheduled publish of a message if it has not been published yet.
/// </summary>
/// <param name="topicOrQueue">The topic or queue name.</param>
/// <param name="sequenceNumber">The sequence number of the scheduled message to cancel.</param>
Expand Down
56 changes: 55 additions & 1 deletion src/Atc.Azure.Messaging/ServiceBus/ServiceBusPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,60 @@ public Task PublishAsync(
cancellationToken);
}

public async Task PublishAsync(
string topicOrQueue,
IReadOnlyCollection<object> messages,
string? sessionId = null,
IDictionary<string, string>? properties = null,
TimeSpan? timeToLive = null,
CancellationToken cancellationToken = default)
{
var sender = clientProvider.GetSender(topicOrQueue);

var batch = await sender
.CreateMessageBatchAsync(cancellationToken)
.ConfigureAwait(false);

foreach (var message in messages)
{
if (cancellationToken.IsCancellationRequested)
{
break;
}

var busMessage = CreateServiceBusMessage(
sessionId,
JsonSerializer.Serialize(message),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this pull request should go in but I also noticed that we are very opinionated about how the messages are serialized

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree and have intention to separate this in a later PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was also the main reason why I could not use this package initially as I have customer converters

properties,
timeToLive);

if (batch.TryAddMessage(busMessage))
{
continue;
}

await sender
.SendMessagesAsync(batch, cancellationToken)
.ConfigureAwait(false);

batch.Dispose();
batch = await sender
.CreateMessageBatchAsync(cancellationToken)
.ConfigureAwait(false);

if (!batch.TryAddMessage(busMessage))
{
throw new InvalidOperationException("Unable to add message to batch. The message size exceeds what can be send in a batch");
}
}

await sender
.SendMessagesAsync(batch, cancellationToken)
.ConfigureAwait(false);

batch.Dispose();
}

public Task<long> SchedulePublishAsync(
string topicOrQueue,
object message,
Expand Down Expand Up @@ -89,5 +143,5 @@ private static ServiceBusMessage CreateServiceBusMessage(
}

return message;
}
}
}
255 changes: 255 additions & 0 deletions test/Atc.Azure.Messaging.Tests/ServiceBus/ServiceBusPublisherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,259 @@ await sut.PublishAsync(
.Should()
.Be(TimeSpan.MaxValue);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Get_ServiceBusSender_For_Topic_On_Batch(
[Frozen] IServiceBusSenderProvider provider,
ServiceBusPublisher sut,
[Substitute] ServiceBusSender sender,
string topicName,
object messageBody,
IDictionary<string, string> properties,
TimeSpan timeToLive,
string sessionId,
CancellationToken cancellationToken)
{
var messageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, new List<ServiceBusMessage>());

provider
.GetSender(default!)
.ReturnsForAnyArgs(sender);

sender
.CreateMessageBatchAsync(default!)
.ReturnsForAnyArgs(messageBatch);

await sut.PublishAsync(
topicName,
new object[] { messageBody },
sessionId,
properties,
timeToLive,
cancellationToken);

_ = provider
.Received(1)
.GetSender(topicName);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Create_MessageBatch(
[Frozen] IServiceBusSenderProvider provider,
ServiceBusPublisher sut,
[Substitute] ServiceBusSender sender,
string topicName,
object messageBody,
IDictionary<string, string> properties,
TimeSpan timeToLive,
string sessionId,
CancellationToken cancellationToken)
{
var messageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, new List<ServiceBusMessage>());

provider
.GetSender(default!)
.ReturnsForAnyArgs(sender);

sender
.CreateMessageBatchAsync(default!)
.ReturnsForAnyArgs(messageBatch);

await sut.PublishAsync(
topicName,
new object[] { messageBody },
sessionId,
properties,
timeToLive,
cancellationToken);

_ = await sender
.Received(1)
.CreateMessageBatchAsync(cancellationToken);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Send_Message_On_ServiceBusSender_On_Message_Batch(
[Frozen] IServiceBusSenderProvider provider,
ServiceBusPublisher sut,
[Substitute] ServiceBusSender sender,
string topicName,
object messageBody,
IDictionary<string, string> properties,
TimeSpan timeToLive,
string sessionId,
CancellationToken cancellationToken)
{
var batchList = new List<ServiceBusMessage>();
var messageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, batchList);

provider
.GetSender(default!)
.ReturnsForAnyArgs(sender);

sender
.CreateMessageBatchAsync(default!)
.ReturnsForAnyArgs(messageBatch);

await sut.PublishAsync(
topicName,
new object[] { messageBody },
sessionId,
properties,
timeToLive,
cancellationToken);

var sendMessageBatch = sender
.ReceivedCallWithArgument<ServiceBusMessageBatch>();

sendMessageBatch.Count.Should().Be(1);
batchList.Count.Should().Be(1);

batchList[0].MessageId
.Should()
.NotBeNullOrEmpty();
batchList[0].Body
.ToString()
.Should()
.BeEquivalentTo(JsonSerializer.Serialize(messageBody));
batchList[0].ApplicationProperties
.Should()
.BeEquivalentTo(properties);
batchList[0].TimeToLive
.Should()
.Be(timeToLive);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Create_New_MessageBatch_When_First_Batch_Is_Full(
[Frozen] IServiceBusSenderProvider provider,
ServiceBusPublisher sut,
[Substitute] ServiceBusSender sender,
string topicName,
object messageBody,
IDictionary<string, string> properties,
TimeSpan timeToLive,
string sessionId,
CancellationToken cancellationToken)
{
var firstBatchList = new List<ServiceBusMessage>();
var secondBatchList = new List<ServiceBusMessage>();
var firstMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, firstBatchList, tryAddCallback: _ => false);
var secondMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, secondBatchList);

provider
.GetSender(default!)
.ReturnsForAnyArgs(sender);

sender
.CreateMessageBatchAsync(default!)
.ReturnsForAnyArgs(firstMessageBatch, secondMessageBatch);

await sut.PublishAsync(
topicName,
new object[] { messageBody },
sessionId,
properties,
timeToLive,
cancellationToken);

_ = await sender
.Received(2)
.CreateMessageBatchAsync(cancellationToken);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Send_Multiple_Batches_If_When_Messages_Exceeds_Single_Batch(
[Frozen] IServiceBusSenderProvider provider,
ServiceBusPublisher sut,
[Substitute] ServiceBusSender sender,
string topicName,
object messageBody,
IDictionary<string, string> properties,
TimeSpan timeToLive,
string sessionId,
CancellationToken cancellationToken)
{
var firstBatchList = new List<ServiceBusMessage>();
var secondBatchList = new List<ServiceBusMessage>();
var firstMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, firstBatchList, tryAddCallback: _ => false);
var secondMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, secondBatchList);

provider
.GetSender(default!)
.ReturnsForAnyArgs(sender);

sender
.CreateMessageBatchAsync(default!)
.ReturnsForAnyArgs(firstMessageBatch, secondMessageBatch);

await sut.PublishAsync(
topicName,
new object[] { messageBody },
sessionId,
properties,
timeToLive,
cancellationToken);

_ = sender
.Received(1)
.SendMessagesAsync(firstMessageBatch, cancellationToken);

_ = sender
.Received(1)
.SendMessagesAsync(secondMessageBatch, cancellationToken);

firstBatchList.Should().BeEmpty();
secondBatchList.Count.Should().Be(1);

secondBatchList[0].MessageId
.Should()
.NotBeNullOrEmpty();
secondBatchList[0].Body
.ToString()
.Should()
.BeEquivalentTo(JsonSerializer.Serialize(messageBody));
secondBatchList[0].ApplicationProperties
.Should()
.BeEquivalentTo(properties);
secondBatchList[0].TimeToLive
.Should()
.Be(timeToLive);
}

[Theory, AutoNSubstituteData]
internal Task Should_Throw_If_Message_Is_Too_Large_To_Fit_In_New_Batch(
[Frozen] IServiceBusSenderProvider provider,
ServiceBusPublisher sut,
[Substitute] ServiceBusSender sender,
string topicName,
object messageBody,
IDictionary<string, string> properties,
TimeSpan timeToLive,
string sessionId,
CancellationToken cancellationToken)
{
var firstBatchList = new List<ServiceBusMessage>();
var secondBatchList = new List<ServiceBusMessage>();
var firstMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, firstBatchList, tryAddCallback: _ => false);
var secondMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, secondBatchList, tryAddCallback: _ => false);

provider
.GetSender(default!)
.ReturnsForAnyArgs(sender);

sender
.CreateMessageBatchAsync(default!)
.ReturnsForAnyArgs(firstMessageBatch, secondMessageBatch);

var act = async () => await sut.PublishAsync(
topicName,
new object[] { messageBody },
sessionId,
properties,
timeToLive,
cancellationToken);

return act.Should().ThrowAsync<InvalidOperationException>();
}
}