Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/Atc.Azure.Messaging/ServiceBus/IServiceBusPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,53 @@ namespace Atc.Azure.Messaging.ServiceBus;
/// </summary>
public interface IServiceBusPublisher
{
/// <summary>
/// Publishes a message.
/// </summary>
/// <param name="topicOrQueue">The topic or queue name.</param>
/// <param name="message">The message to be published.</param>
/// <param name="sessionId">Optional id for appending the message to a known session. If not set, then defaults to a new session.</param>
/// <param name="properties">Optional custom metadata about the message.</param>
/// <param name="timeToLive">Optional <see cref="TimeSpan"/> for message to be consumed. If not set, then defaults to the value specified on queue or topic.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
Task PublishAsync(
string topicOrQueue,
object message,
string? sessionId = null,
IDictionary<string, string>? properties = null,
TimeSpan? timeToLive = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Schedules a message for publishing at a later time.
/// </summary>
/// <param name="topicOrQueue">The topic or queue name.</param>
/// <param name="message">The message to be published.</param>
/// <param name="scheduledEnqueueTime">The time for the message to be published.</param>
/// <param name="sessionId">Optional id for appending the message to a known session. If not set, then defaults to a new session.</param>
/// <param name="properties">Optional custom metadata about the message.</param>
/// <param name="timeToLive">Optional <see cref="TimeSpan"/> for message to be consumed. If not set, then defaults to the value specified on queue or topic.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
/// <returns>A <see cref="Task"/> containing the sequence number of the scheduled message.</returns>
Task<long> SchedulePublishAsync(
string topicOrQueue,
object message,
DateTimeOffset scheduledEnqueueTime,
string? sessionId = null,
IDictionary<string, string>? properties = null,
TimeSpan? timeToLive = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Cansels a scheduled publish of a message if it has not been published yet.
/// </summary>
/// <param name="topicOrQueue">The topic or queue name.</param>
/// <param name="sequenceNumber">The sequence number of the scheduled message to cancel.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
Task CancelScheduledPublishAsync(
string topicOrQueue,
long sequenceNumber,
CancellationToken cancellationToken = default);
}
33 changes: 33 additions & 0 deletions src/Atc.Azure.Messaging/ServiceBus/ServiceBusPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,39 @@ public Task PublishAsync(
properties,
timeToLive),
cancellationToken);
}

public Task<long> SchedulePublishAsync(
string topicOrQueue,
object message,
DateTimeOffset scheduledEnqueueTime,
string? sessionId = null,
IDictionary<string, string>? properties = null,
TimeSpan? timeToLive = null,
CancellationToken cancellationToken = default)
{
return clientProvider
.GetSender(topicOrQueue)
.ScheduleMessageAsync(
CreateServiceBusMessage(
sessionId,
JsonSerializer.Serialize(message),
properties,
timeToLive),
scheduledEnqueueTime,
cancellationToken);
}

public Task CancelScheduledPublishAsync(
string topicOrQueue,
long sequenceNumber,
CancellationToken cancellationToken = default)
{
return clientProvider
.GetSender(topicOrQueue)
.CancelScheduledMessageAsync(
sequenceNumber,
cancellationToken);
}

private static ServiceBusMessage CreateServiceBusMessage(
Expand Down
119 changes: 102 additions & 17 deletions test/Atc.Azure.Messaging.Tests/ServiceBus/ServiceBusPublisherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,106 @@ await sut.PublishAsync(
.Should()
.BeEquivalentTo(properties);
message.TimeToLive
.Should()
.Be(timeToLive);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Handle_Default_Parameters(
[Frozen] IServiceBusSenderProvider provider,
ServiceBusPublisher sut,
[Substitute] ServiceBusSender sender,
string topicName,
object messageBody,
string sessionId)
{
provider
.GetSender(topicName)
.Should()
.Be(timeToLive);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Schedule_Message_On_ServiceBusSender(
[Frozen] IServiceBusSenderProvider provider,
ServiceBusPublisher sut,
[Substitute] ServiceBusSender sender,
long expectedSequenceNumber,
string topicName,
object messageBody,
DateTimeOffset scheduleTime,
IDictionary<string, string> properties,
TimeSpan timeToLive,
string sessionId,
CancellationToken cancellationToken)
{
provider
.GetSender(topicName)
.Returns(sender);

sender
.ScheduleMessageAsync(default!, default, default)
.ReturnsForAnyArgs(expectedSequenceNumber);

var actualSequenceNumber = await sut.SchedulePublishAsync(
topicName,
messageBody,
scheduleTime,
sessionId,
properties,
timeToLive,
cancellationToken);

_ = sender
.Received(1)
.ScheduleMessageAsync(
Arg.Any<ServiceBusMessage>(),
scheduleTime,
cancellationToken);

actualSequenceNumber
.Should()
.Be(expectedSequenceNumber);

var message = sender
.ReceivedCallWithArgument<ServiceBusMessage>();

message.MessageId
.Should()
.NotBeNullOrEmpty();
message.Body
.ToString()
.Should()
.BeEquivalentTo(JsonSerializer.Serialize(messageBody));
message.ApplicationProperties
.Should()
.BeEquivalentTo(properties);
message.TimeToLive
.Should()
.Be(timeToLive);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Cancel_Scheduled_Message_On_ServiceBusSender(
[Frozen] IServiceBusSenderProvider provider,
ServiceBusPublisher sut,
[Substitute] ServiceBusSender sender,
long sequenceNumber,
string topicName,
CancellationToken cancellationToken)
{
provider
.GetSender(topicName)
.Returns(sender);

await sut.CancelScheduledPublishAsync(
topicName,
sequenceNumber,
cancellationToken);

_ = sender
.Received(1)
.CancelScheduledMessageAsync(
sequenceNumber,
cancellationToken);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Handle_Default_Parameters(
[Frozen] IServiceBusSenderProvider provider,
ServiceBusPublisher sut,
[Substitute] ServiceBusSender sender,
string topicName,
object messageBody,
string sessionId)
{
provider
.GetSender(topicName)
.Returns(sender);

await sut.PublishAsync(
Expand Down Expand Up @@ -120,7 +205,7 @@ await sut.PublishAsync(
.Should()
.BeEmpty();
message.TimeToLive
.Should()
.Be(TimeSpan.MaxValue);
.Should()
.Be(TimeSpan.MaxValue);
}
}