diff --git a/README.md b/README.md
index b73f98b..cc058e1 100644
--- a/README.md
+++ b/README.md
@@ -115,6 +115,27 @@ internal class BarPublisher
}
```
+### Batch Publish
+
+Multiple messages can also be published in batches to a topic or queue. Simply call the `PublishAsync` method with a list of messages. The messages will be added to a batch until the batch is full before it the batch is published and continue to work on the remaining messages. This process continues until all messages are consumed.
+
+An `InvalidOperationException` is thrown if a single message cannot fit inside a batch by itself. In this case, any previous published batches will not be rolled back and any remaining messages will remain unpublished.
+
+```csharp
+internal class BarBatchPublisher
+{
+ private readonly IServiceBusPublisher publisher;
+
+ public BarBatchPublisher(IServiceBusPublisher publisher)
+ {
+ this.publisher = publisher;
+ }
+
+ public Task Publish(object[] messages)
+ => publisher.PublishAsync("[existing servicebus topic]", messages);
+}
+```
+
Here's a full example of how to use the publishers above using a Minimal API setup (SwaggerUI enabled) with a single endpoint called `POST /data` that accepts a simple request body `{ "a": "string", "b": "string", "c": "string" }` which publishes the request to an EventHub and a ServiceBus topic
```csharp
diff --git a/src/Atc.Azure.Messaging/ServiceBus/IServiceBusPublisher.cs b/src/Atc.Azure.Messaging/ServiceBus/IServiceBusPublisher.cs
index 1dc16c2..c8f340f 100644
--- a/src/Atc.Azure.Messaging/ServiceBus/IServiceBusPublisher.cs
+++ b/src/Atc.Azure.Messaging/ServiceBus/IServiceBusPublisher.cs
@@ -23,6 +23,24 @@ Task PublishAsync(
TimeSpan? timeToLive = null,
CancellationToken cancellationToken = default);
+ ///
+ /// Publishes multiple messages in batches. The list of messages will be split in multiple batches if the messages exceeds a single batch size.
+ ///
+ /// The topic or queue name.
+ /// The messages to be published.
+ /// Optional id for appending the message to a known session. If not set, then defaults to a new session.
+ /// Optional custom metadata about the message.
+ /// Optional for message to be consumed. If not set, then defaults to the value specified on queue or topic.
+ /// The used.
+ /// A representing the asynchronous operation.
+ Task PublishAsync(
+ string topicOrQueue,
+ IReadOnlyCollection messages,
+ string? sessionId = null,
+ IDictionary? properties = null,
+ TimeSpan? timeToLive = null,
+ CancellationToken cancellationToken = default);
+
///
/// Schedules a message for publishing at a later time.
///
@@ -44,7 +62,7 @@ Task SchedulePublishAsync(
CancellationToken cancellationToken = default);
///
- /// Cansels a scheduled publish of a message if it has not been published yet.
+ /// Cancels a scheduled publish of a message if it has not been published yet.
///
/// The topic or queue name.
/// The sequence number of the scheduled message to cancel.
diff --git a/src/Atc.Azure.Messaging/ServiceBus/ServiceBusPublisher.cs b/src/Atc.Azure.Messaging/ServiceBus/ServiceBusPublisher.cs
index d2002e5..314d244 100644
--- a/src/Atc.Azure.Messaging/ServiceBus/ServiceBusPublisher.cs
+++ b/src/Atc.Azure.Messaging/ServiceBus/ServiceBusPublisher.cs
@@ -28,6 +28,60 @@ public Task PublishAsync(
cancellationToken);
}
+ public async Task PublishAsync(
+ string topicOrQueue,
+ IReadOnlyCollection messages,
+ string? sessionId = null,
+ IDictionary? properties = null,
+ TimeSpan? timeToLive = null,
+ CancellationToken cancellationToken = default)
+ {
+ var sender = clientProvider.GetSender(topicOrQueue);
+
+ var batch = await sender
+ .CreateMessageBatchAsync(cancellationToken)
+ .ConfigureAwait(false);
+
+ foreach (var message in messages)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ break;
+ }
+
+ var busMessage = CreateServiceBusMessage(
+ sessionId,
+ JsonSerializer.Serialize(message),
+ properties,
+ timeToLive);
+
+ if (batch.TryAddMessage(busMessage))
+ {
+ continue;
+ }
+
+ await sender
+ .SendMessagesAsync(batch, cancellationToken)
+ .ConfigureAwait(false);
+
+ batch.Dispose();
+ batch = await sender
+ .CreateMessageBatchAsync(cancellationToken)
+ .ConfigureAwait(false);
+
+ if (!batch.TryAddMessage(busMessage))
+ {
+ throw new InvalidOperationException("Unable to add message to batch. The message size exceeds what can be send in a batch");
+ }
+ }
+
+ await sender
+ .SendMessagesAsync(batch, cancellationToken)
+ .ConfigureAwait(false);
+
+ batch.Dispose();
+ }
+
public Task SchedulePublishAsync(
string topicOrQueue,
object message,
@@ -89,5 +143,5 @@ private static ServiceBusMessage CreateServiceBusMessage(
}
return message;
- }
+ }
}
\ No newline at end of file
diff --git a/test/Atc.Azure.Messaging.Tests/ServiceBus/ServiceBusPublisherTests.cs b/test/Atc.Azure.Messaging.Tests/ServiceBus/ServiceBusPublisherTests.cs
index 83d012d..8d1c1cd 100644
--- a/test/Atc.Azure.Messaging.Tests/ServiceBus/ServiceBusPublisherTests.cs
+++ b/test/Atc.Azure.Messaging.Tests/ServiceBus/ServiceBusPublisherTests.cs
@@ -208,4 +208,259 @@ await sut.PublishAsync(
.Should()
.Be(TimeSpan.MaxValue);
}
+
+ [Theory, AutoNSubstituteData]
+ internal async Task Should_Get_ServiceBusSender_For_Topic_On_Batch(
+ [Frozen] IServiceBusSenderProvider provider,
+ ServiceBusPublisher sut,
+ [Substitute] ServiceBusSender sender,
+ string topicName,
+ object messageBody,
+ IDictionary properties,
+ TimeSpan timeToLive,
+ string sessionId,
+ CancellationToken cancellationToken)
+ {
+ var messageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, new List());
+
+ provider
+ .GetSender(default!)
+ .ReturnsForAnyArgs(sender);
+
+ sender
+ .CreateMessageBatchAsync(default!)
+ .ReturnsForAnyArgs(messageBatch);
+
+ await sut.PublishAsync(
+ topicName,
+ new object[] { messageBody },
+ sessionId,
+ properties,
+ timeToLive,
+ cancellationToken);
+
+ _ = provider
+ .Received(1)
+ .GetSender(topicName);
+ }
+
+ [Theory, AutoNSubstituteData]
+ internal async Task Should_Create_MessageBatch(
+ [Frozen] IServiceBusSenderProvider provider,
+ ServiceBusPublisher sut,
+ [Substitute] ServiceBusSender sender,
+ string topicName,
+ object messageBody,
+ IDictionary properties,
+ TimeSpan timeToLive,
+ string sessionId,
+ CancellationToken cancellationToken)
+ {
+ var messageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, new List());
+
+ provider
+ .GetSender(default!)
+ .ReturnsForAnyArgs(sender);
+
+ sender
+ .CreateMessageBatchAsync(default!)
+ .ReturnsForAnyArgs(messageBatch);
+
+ await sut.PublishAsync(
+ topicName,
+ new object[] { messageBody },
+ sessionId,
+ properties,
+ timeToLive,
+ cancellationToken);
+
+ _ = await sender
+ .Received(1)
+ .CreateMessageBatchAsync(cancellationToken);
+ }
+
+ [Theory, AutoNSubstituteData]
+ internal async Task Should_Send_Message_On_ServiceBusSender_On_Message_Batch(
+ [Frozen] IServiceBusSenderProvider provider,
+ ServiceBusPublisher sut,
+ [Substitute] ServiceBusSender sender,
+ string topicName,
+ object messageBody,
+ IDictionary properties,
+ TimeSpan timeToLive,
+ string sessionId,
+ CancellationToken cancellationToken)
+ {
+ var batchList = new List();
+ var messageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, batchList);
+
+ provider
+ .GetSender(default!)
+ .ReturnsForAnyArgs(sender);
+
+ sender
+ .CreateMessageBatchAsync(default!)
+ .ReturnsForAnyArgs(messageBatch);
+
+ await sut.PublishAsync(
+ topicName,
+ new object[] { messageBody },
+ sessionId,
+ properties,
+ timeToLive,
+ cancellationToken);
+
+ var sendMessageBatch = sender
+ .ReceivedCallWithArgument();
+
+ sendMessageBatch.Count.Should().Be(1);
+ batchList.Count.Should().Be(1);
+
+ batchList[0].MessageId
+ .Should()
+ .NotBeNullOrEmpty();
+ batchList[0].Body
+ .ToString()
+ .Should()
+ .BeEquivalentTo(JsonSerializer.Serialize(messageBody));
+ batchList[0].ApplicationProperties
+ .Should()
+ .BeEquivalentTo(properties);
+ batchList[0].TimeToLive
+ .Should()
+ .Be(timeToLive);
+ }
+
+ [Theory, AutoNSubstituteData]
+ internal async Task Should_Create_New_MessageBatch_When_First_Batch_Is_Full(
+ [Frozen] IServiceBusSenderProvider provider,
+ ServiceBusPublisher sut,
+ [Substitute] ServiceBusSender sender,
+ string topicName,
+ object messageBody,
+ IDictionary properties,
+ TimeSpan timeToLive,
+ string sessionId,
+ CancellationToken cancellationToken)
+ {
+ var firstBatchList = new List();
+ var secondBatchList = new List();
+ var firstMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, firstBatchList, tryAddCallback: _ => false);
+ var secondMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, secondBatchList);
+
+ provider
+ .GetSender(default!)
+ .ReturnsForAnyArgs(sender);
+
+ sender
+ .CreateMessageBatchAsync(default!)
+ .ReturnsForAnyArgs(firstMessageBatch, secondMessageBatch);
+
+ await sut.PublishAsync(
+ topicName,
+ new object[] { messageBody },
+ sessionId,
+ properties,
+ timeToLive,
+ cancellationToken);
+
+ _ = await sender
+ .Received(2)
+ .CreateMessageBatchAsync(cancellationToken);
+ }
+
+ [Theory, AutoNSubstituteData]
+ internal async Task Should_Send_Multiple_Batches_If_When_Messages_Exceeds_Single_Batch(
+ [Frozen] IServiceBusSenderProvider provider,
+ ServiceBusPublisher sut,
+ [Substitute] ServiceBusSender sender,
+ string topicName,
+ object messageBody,
+ IDictionary properties,
+ TimeSpan timeToLive,
+ string sessionId,
+ CancellationToken cancellationToken)
+ {
+ var firstBatchList = new List();
+ var secondBatchList = new List();
+ var firstMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, firstBatchList, tryAddCallback: _ => false);
+ var secondMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, secondBatchList);
+
+ provider
+ .GetSender(default!)
+ .ReturnsForAnyArgs(sender);
+
+ sender
+ .CreateMessageBatchAsync(default!)
+ .ReturnsForAnyArgs(firstMessageBatch, secondMessageBatch);
+
+ await sut.PublishAsync(
+ topicName,
+ new object[] { messageBody },
+ sessionId,
+ properties,
+ timeToLive,
+ cancellationToken);
+
+ _ = sender
+ .Received(1)
+ .SendMessagesAsync(firstMessageBatch, cancellationToken);
+
+ _ = sender
+ .Received(1)
+ .SendMessagesAsync(secondMessageBatch, cancellationToken);
+
+ firstBatchList.Should().BeEmpty();
+ secondBatchList.Count.Should().Be(1);
+
+ secondBatchList[0].MessageId
+ .Should()
+ .NotBeNullOrEmpty();
+ secondBatchList[0].Body
+ .ToString()
+ .Should()
+ .BeEquivalentTo(JsonSerializer.Serialize(messageBody));
+ secondBatchList[0].ApplicationProperties
+ .Should()
+ .BeEquivalentTo(properties);
+ secondBatchList[0].TimeToLive
+ .Should()
+ .Be(timeToLive);
+ }
+
+ [Theory, AutoNSubstituteData]
+ internal Task Should_Throw_If_Message_Is_Too_Large_To_Fit_In_New_Batch(
+ [Frozen] IServiceBusSenderProvider provider,
+ ServiceBusPublisher sut,
+ [Substitute] ServiceBusSender sender,
+ string topicName,
+ object messageBody,
+ IDictionary properties,
+ TimeSpan timeToLive,
+ string sessionId,
+ CancellationToken cancellationToken)
+ {
+ var firstBatchList = new List();
+ var secondBatchList = new List();
+ var firstMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, firstBatchList, tryAddCallback: _ => false);
+ var secondMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, secondBatchList, tryAddCallback: _ => false);
+
+ provider
+ .GetSender(default!)
+ .ReturnsForAnyArgs(sender);
+
+ sender
+ .CreateMessageBatchAsync(default!)
+ .ReturnsForAnyArgs(firstMessageBatch, secondMessageBatch);
+
+ var act = async () => await sut.PublishAsync(
+ topicName,
+ new object[] { messageBody },
+ sessionId,
+ properties,
+ timeToLive,
+ cancellationToken);
+
+ return act.Should().ThrowAsync();
+ }
}
\ No newline at end of file