Skip to content

Tazmainiandevil/Useful.Azure.ServiceBus.Abstractions

Repository files navigation

Useful.Azure.ServiceBus.Abstractions

The purpose of this project is to to provide a wrapper for Azure Service Bus to send messages as JSON and abstract some of the configuration away. This also aids in unit testing by providing interfaces for the Sender and Receiver.

Supported .NET Frameworks - .NET 6 and .NET 7

NuGet version

Configuration

Add the packages for your app

NOTE: Azure.Identity and Microsoft.Extensions.Azure are used for Managed Identity connections

dotnet add package Azure.Identity
dotnet add package Microsoft.Extensions.Azure
dotnet add package Useful.Azure.ServiceBus.Abstractions

Basic Console App Usage

Sending

Create a message structure e.g.

public record MyMessage
{
    public string Name { get; init; }
}

Create an instance of the ServiceBusFactory e.g.

var factory = new ServiceBusFactory();

Create a Sender with a connection string for the Service Bus e.g.

var sender = await factory.CreateSenderAsync<MyMessage>(ServiceBusConnectionString, "myTopic");

or using Managed Identity

const string fullyQualifiedNamespace = "<your namespace>.servicebus.windows.net";
var sender = await factory.CreateSenderAsync<MyMessage>(fullyQualifiedNamespace, new DefaultAzureCredential(), "myTopic");

Then use the SendAsJsonAsync method to send to the Service Bus

await sender.SendAsJsonAsync(new MyMessage { Name = "Bilbo Baggins" });

When creating the Sender there is also a number of options that can be configured e.g.

var sender = await factory.CreateSenderAsync<MyMessage>(fullyQualifiedNamespace, new DefaultAzureCredential(), "myTopic", new SenderOptions { ConnectionCanCreateTopicOrQueue = true } );

The defaults for the SenderOptions are:

public record SenderOptions
{
    public bool ConnectionCanCreateTopicOrQueue { get; set; } = false;
    public ServiceBusTransportType ServiceBusTransportType { get; set; } = ServiceBusTransportType.AmqpTcp;
    public TimeSpan Delay { get; set; } = TimeSpan.FromSeconds(0.8);
    public TimeSpan MaxDelay { get; set; } = TimeSpan.FromMinutes(1);
    public int MaxRetries { get; set; } = 3;
    public ServiceBusRetryMode Mode { get; set; } = ServiceBusRetryMode.Exponential;
}

Receiving

Create a Receiver with a connection string for the Service Bus e.g.

var receiver = await factory.CreateTopicReceiverAsync<MyMessage>(ServiceBusConnectionString, "myTopic", "mySub");

or using Managed Identity

var receiver = await factory.CreateTopicReceiverAsync<MyMessage>(fullyQualifiedNamespace, new DefaultAzureCredential(), "myTopic", "mySub");

Then to listen for incoming messages e.g.

NOTE : The receive method takes an exception func<> to provide feedback and returns an IObservable to get messages

var observer = receiver.Receive(args =>
{
    Console.WriteLine(args.Exception.Message);
    return Task.CompletedTask;
});

And finally subscribe to receive the messages e.g.

observer.Subscribe(x => Console.WriteLine($"From Topic {x}"));

TIP: Running in a console app you'll need to keep it open, so add a ReadKey at the bottom

Console.ReadKey();

When creating the Receiver there is also a number of options that can be configured e.g.

var receiver = await factory.CreateTopicReceiverAsync<MyMessage>(fullyQualifiedNamespace, new DefaultAzureCredential(), "myTopic", "mySub", new ReceiverOptions { ConnectionCanCreateTopicOrQueue = true });

The defaults for the ReceiverOptions are:

public record ReceiverOptions
{
    public bool ConnectionCanCreateTopicOrQueue { get; set; } = false;
    public int MaxConcurrentCalls { get; set; } = 10;
    public ServiceBusReceiveMode ReceiveMode { get; set; } = ServiceBusReceiveMode.PeekLock;
}

Basic Injection Usage

Inject the Factory in program.cs e.g.

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.
builder.Services.AddSingleton<IServiceBusFactory, ServiceBusFactory>();

Example

await using var sender = await _serviceBusFactory.CreateSenderAsync<MyMessage>(SendConnectionString, "myTopic");
await sender.SendAsJsonAsync(new MyMessage { Name = "Bilbo Baggins" });

or a Sender e.g.

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.
var factory = new ServiceBusFactory();
var sender = await factory.CreateSenderAsync<MyMessage>(builder.Configuration["ServiceBusSendConnectionString"], "myTopic");
builder.Services.AddSingleton(sender);

Multiple Senders to different topics

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.
var factory = new ServiceBusFactory();
var sender = await factory.CreateSenderAsync<MyMessage>(builder.Configuration["ServiceBusSendConnectionString"], "myTopic");
var otherSender = await factory.CreateSenderAsync<MyOtherMessage>(builder.Configuration["ServiceBusSendConnectionString"], "myOtherTopic");
builder.Services.AddSingleton(sender);
builder.Services.AddSingleton(otherSender);

Example

await _sender.SendAsJsonAsync(new MyMessage { Name = "Bilbo Baggins" });

or a Receiver

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.
var factory = new ServiceBusFactory();
var receiver = await factory.CreateTopicReceiverAsync<MyMessage>(builder.Configuration["ServiceBusReceiveConnectionString"], "myTopic", "mySub");
builder.Services.AddSingleton(receiver);

Multiple Receivers from different topics

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.
var factory = new ServiceBusFactory();
var receiver = await factory.CreateTopicReceiverAsync<MyMessage>(builder.Configuration["ServiceBusReceiveConnectionString"], "myTopic", "mySub");
var otherReceiver = await factory.CreateTopicReceiverAsync<MyOtherMessage>(builder.Configuration["ServiceBusReceiveConnectionString"], "myOtherTopic", "mySub");
builder.Services.AddSingleton(receiver);
builder.Services.AddSingleton(otherReceiver);

Example

var observer = receiver.Receive(args =>
{
    Console.WriteLine(args.Exception.Message);
    return Task.CompletedTask;
});

observer.Subscribe(x => Console.WriteLine($"From Topic {x.Name}"));

Example with Methods

var observer = receiver.Receive(ExceptionHandler);
observer.Subscribe(IncomingMessage);

private void IncomingMessage(MyMessage message)
{
    Console.WriteLine($"From Topic {message.Name}")
}

private Task ExceptionHandler(ProcessErrorEventArgs args)
{
    Console.WriteLine(args.Exception.Message);
    return Task.CompletedTask;
}

About

Azure Service Bus Wrapper to make help manage and use the Service Bus

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages