diff --git a/src/Atc.Azure.Messaging/ServiceBus/IServiceBusPublisher.cs b/src/Atc.Azure.Messaging/ServiceBus/IServiceBusPublisher.cs index 8c199fe..1dc16c2 100644 --- a/src/Atc.Azure.Messaging/ServiceBus/IServiceBusPublisher.cs +++ b/src/Atc.Azure.Messaging/ServiceBus/IServiceBusPublisher.cs @@ -5,11 +5,53 @@ namespace Atc.Azure.Messaging.ServiceBus; /// public interface IServiceBusPublisher { + /// + /// Publishes a message. + /// + /// The topic or queue name. + /// The message to be published. + /// Optional id for appending the message to a known session. If not set, then defaults to a new session. + /// Optional custom metadata about the message. + /// Optional for message to be consumed. If not set, then defaults to the value specified on queue or topic. + /// The used. + /// A representing the asynchronous operation. Task PublishAsync( string topicOrQueue, object message, string? sessionId = null, IDictionary? properties = null, TimeSpan? timeToLive = null, + CancellationToken cancellationToken = default); + + /// + /// Schedules a message for publishing at a later time. + /// + /// The topic or queue name. + /// The message to be published. + /// The time for the message to be published. + /// Optional id for appending the message to a known session. If not set, then defaults to a new session. + /// Optional custom metadata about the message. + /// Optional for message to be consumed. If not set, then defaults to the value specified on queue or topic. + /// The used. + /// A containing the sequence number of the scheduled message. + Task SchedulePublishAsync( + string topicOrQueue, + object message, + DateTimeOffset scheduledEnqueueTime, + string? sessionId = null, + IDictionary? properties = null, + TimeSpan? timeToLive = null, + CancellationToken cancellationToken = default); + + /// + /// Cansels a scheduled publish of a message if it has not been published yet. + /// + /// The topic or queue name. + /// The sequence number of the scheduled message to cancel. + /// The used. + /// A representing the asynchronous operation. + Task CancelScheduledPublishAsync( + string topicOrQueue, + long sequenceNumber, CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/src/Atc.Azure.Messaging/ServiceBus/ServiceBusPublisher.cs b/src/Atc.Azure.Messaging/ServiceBus/ServiceBusPublisher.cs index 6e4e74f..d2002e5 100644 --- a/src/Atc.Azure.Messaging/ServiceBus/ServiceBusPublisher.cs +++ b/src/Atc.Azure.Messaging/ServiceBus/ServiceBusPublisher.cs @@ -26,6 +26,39 @@ public Task PublishAsync( properties, timeToLive), cancellationToken); + } + + public Task SchedulePublishAsync( + string topicOrQueue, + object message, + DateTimeOffset scheduledEnqueueTime, + string? sessionId = null, + IDictionary? properties = null, + TimeSpan? timeToLive = null, + CancellationToken cancellationToken = default) + { + return clientProvider + .GetSender(topicOrQueue) + .ScheduleMessageAsync( + CreateServiceBusMessage( + sessionId, + JsonSerializer.Serialize(message), + properties, + timeToLive), + scheduledEnqueueTime, + cancellationToken); + } + + public Task CancelScheduledPublishAsync( + string topicOrQueue, + long sequenceNumber, + CancellationToken cancellationToken = default) + { + return clientProvider + .GetSender(topicOrQueue) + .CancelScheduledMessageAsync( + sequenceNumber, + cancellationToken); } private static ServiceBusMessage CreateServiceBusMessage( diff --git a/test/Atc.Azure.Messaging.Tests/ServiceBus/ServiceBusPublisherTests.cs b/test/Atc.Azure.Messaging.Tests/ServiceBus/ServiceBusPublisherTests.cs index be3615e..83d012d 100644 --- a/test/Atc.Azure.Messaging.Tests/ServiceBus/ServiceBusPublisherTests.cs +++ b/test/Atc.Azure.Messaging.Tests/ServiceBus/ServiceBusPublisherTests.cs @@ -75,21 +75,106 @@ await sut.PublishAsync( .Should() .BeEquivalentTo(properties); message.TimeToLive - .Should() - .Be(timeToLive); - } - - [Theory, AutoNSubstituteData] - internal async Task Should_Handle_Default_Parameters( - [Frozen] IServiceBusSenderProvider provider, - ServiceBusPublisher sut, - [Substitute] ServiceBusSender sender, - string topicName, - object messageBody, - string sessionId) - { - provider - .GetSender(topicName) + .Should() + .Be(timeToLive); + } + + [Theory, AutoNSubstituteData] + internal async Task Should_Schedule_Message_On_ServiceBusSender( + [Frozen] IServiceBusSenderProvider provider, + ServiceBusPublisher sut, + [Substitute] ServiceBusSender sender, + long expectedSequenceNumber, + string topicName, + object messageBody, + DateTimeOffset scheduleTime, + IDictionary properties, + TimeSpan timeToLive, + string sessionId, + CancellationToken cancellationToken) + { + provider + .GetSender(topicName) + .Returns(sender); + + sender + .ScheduleMessageAsync(default!, default, default) + .ReturnsForAnyArgs(expectedSequenceNumber); + + var actualSequenceNumber = await sut.SchedulePublishAsync( + topicName, + messageBody, + scheduleTime, + sessionId, + properties, + timeToLive, + cancellationToken); + + _ = sender + .Received(1) + .ScheduleMessageAsync( + Arg.Any(), + scheduleTime, + cancellationToken); + + actualSequenceNumber + .Should() + .Be(expectedSequenceNumber); + + var message = sender + .ReceivedCallWithArgument(); + + message.MessageId + .Should() + .NotBeNullOrEmpty(); + message.Body + .ToString() + .Should() + .BeEquivalentTo(JsonSerializer.Serialize(messageBody)); + message.ApplicationProperties + .Should() + .BeEquivalentTo(properties); + message.TimeToLive + .Should() + .Be(timeToLive); + } + + [Theory, AutoNSubstituteData] + internal async Task Should_Cancel_Scheduled_Message_On_ServiceBusSender( + [Frozen] IServiceBusSenderProvider provider, + ServiceBusPublisher sut, + [Substitute] ServiceBusSender sender, + long sequenceNumber, + string topicName, + CancellationToken cancellationToken) + { + provider + .GetSender(topicName) + .Returns(sender); + + await sut.CancelScheduledPublishAsync( + topicName, + sequenceNumber, + cancellationToken); + + _ = sender + .Received(1) + .CancelScheduledMessageAsync( + sequenceNumber, + cancellationToken); + } + + [Theory, AutoNSubstituteData] + internal async Task Should_Handle_Default_Parameters( + [Frozen] IServiceBusSenderProvider provider, + ServiceBusPublisher sut, + [Substitute] ServiceBusSender sender, + string topicName, + object messageBody, + string sessionId) + { + provider + .GetSender(topicName) .Returns(sender); await sut.PublishAsync( @@ -120,7 +205,7 @@ await sut.PublishAsync( .Should() .BeEmpty(); message.TimeToLive - .Should() - .Be(TimeSpan.MaxValue); + .Should() + .Be(TimeSpan.MaxValue); } } \ No newline at end of file