Skip to content
.NET abstractions and implementations for message brokers, event queues and data ingestion services.
C#
Branch: master
Clone or download
Latest commit 27c77dc Aug 18, 2019
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
src
.gitignore Initial commit Jul 27, 2019
LICENSE
README.md Update README.md Aug 18, 2019
azure-pipelines.yml Add coverage settings Aug 15, 2019

README.md

Namotion.Messaging

Storage | Messaging | Reflection

Azure DevOps Azure DevOps

The Namotion.Messaging .NET libraries provide abstractions and implementations for message brokers, event queues and data ingestion services.

By programming against a messaging abstraction you enable the following scenarios:

  • Build multi-cloud capable applications by being able to change messaging technologies on demand.
  • Quickly switch to different messaging technologies to find the best technological fit for your applications.
  • Implement behavior driven integration tests which can run in-memory or against different technologies for better debugging experiences or local execution.
  • Provide better local development experiences, e.g. replace Service Bus with a locally running RabbitMQ docker container or an in-memory implementation.

Usage

To use the IMessageReceiver in a simple command line application (.NET Generic Host), implement a new BackgroundService and start message processing in ExecuteAsync:

public class MyBackgroundService : BackgroundService
{
    private readonly IMessageReceiver _messageReceiver;
    private readonly ILogger _logger;

    public MyBackgroundService(IMessageReceiver messageReceiver, ILogger logger)
    {
        _messageReceiver = messageReceiver;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await _messageReceiver.ListenAsync(async (messages, ct) =>
        {
            foreach (var message in messages)
            {
                try
                {
                    // TODO: Process message

                    await _messageReceiver.ConfirmAsync(message, ct);
                }
                catch (Exception e)
                {
                    _logger.LogError(e, $"Error while processing {nameof(MyMessage)} message.");
                    await _messageReceiver.RejectAsync(message, ct);
                }
            }
        }, stoppingToken);
    }
}

In your program's Main method, create a new HostBuilder and add the background service as a hosted service:

public static async Task Main(string[] args)
{
    var host = new HostBuilder()
        .ConfigureServices(services => 
        {
            var receiver = ServiceBusMessageReceiver.Create("MyConnectionString", "myqueue");
            services.AddSingleton<IMessageReceiver>(receiver);
            services.AddHostedService<MyBackgroundService>();
        })
        .Build();

    await host.RunAsync();
}

Extensions

Behavior extensions, for example custom dead letter queues or large message handling, is achieved with interceptors which wrap publisher and receiver methods with custom code. These interceptors are added with the With* extension methods. Custom interceptors can be implemented with the MessagePublisher<T> and MessageReceiver<T> classes.

Core packages

Namotion.Messaging.Abstractions

Nuget

Contains the messaging abstractions, mainly interfaces with a very small footprint and extremely stable contracts:

  • IMessagePublisher<T>
  • IMessagePublisher
    • PublishAsync(messages, cancellationToken): Sends a batch of messages to the queue.
  • IMessageReceiver<T>
  • IMessageReceiver
    • GetMessageCountAsync(cancellationToken): Gets the count of messages waiting to be processed.
    • ListenAsync(handleMessages, cancellationToken): Starts listening and processing messages with the handleMessages function until the cancellationToken signals a cancellation.
    • KeepAliveAsync(messages, timeToLive, cancellationToken): Extends the message lock timeout on the given messages.
    • ConfirmAsync(messages, cancellationToken): Confirms the processing of messages and removes them from the queue.
    • RejectAsync(messages, cancellationToken): Rejects messages and requeues them for later reprocessing.
    • DeadLetterAsync(messages, reason, errorDescription, cancellationToken): Removes the messages and moves them to the dead letter queue.
  • Message<T>:
  • Message: A generic message implementation.

The idea behind the generic interfaces is to allow multiple instance registrations, read Dependency Injection in .NET: A way to work around missing named registrations for more information.

Namotion.Messaging.Json

Nuget

Provides extension methods on IMessagePublisher<T> and IMessageReceiver<T> to enable JSON serialization for messages:

  • PublishAsJsonAsync(...): Sends messages of type T which are serialized to JSON to the queue.
  • ListenAndDeserializeJsonAsync(...): Receives messages and deserializes their content using the JSON serializer to the Message<T>.Object property. If the content could not be deserialized then Object is null.

Send a JSON encoded message:

var publisher = ServiceBusMessagePublisher
    .Create("MyConnectionString", "myqueue")
    .WithMessageType<OrderCreatedMessage>();

await publisher.PublishAsJsonAsync(new OrderCreatedMessage { ... });

Receive JSON encoded messages:

var receiver = ServiceBusMessageReceiver
    .Create("MyConnectionString", "myqueue")
    .WithMessageType<OrderCreatedMessage>();

await receiver.ListenAndDeserializeJsonAsync(async (messages, ct) => 
{
    foreach (OrderCreatedMessage message in messages.Select(m => m.Object))
    {
        ...
    }

    await receiver.ConfirmAsync(messages, ct);
});

Implementation packages

The following packages should only be used in the head project, i.e. directly in your application bootstrapping project where the dependency injection container is initialized.

Azure
Service Bus
Azure
Event Hub
Azure
Storage Queue
RabbitMQ Amazon SQS InMemory
PublishAsync ✔️ ✔️ ✔️ ✔️ ✔️ ✔️
ListenAsync ✔️ ✔️ ✔️ ✔️ ✔️ ✔️
GetMessageCountAsync ✔️ ✔️ ✔️ ✔️
KeepAliveAsync ✔️ (1.) ✔️ ✔️
ConfirmAsync ✔️ (1.) ✔️ ✔️ ✔️
RejectAsync ✔️ (1.) ✔️ ✔️ ✔️ ✔️
DeadLetterAsync ✔️ (2.) (2.) (2.) (2.) ✔️
User properties ✔️ ✔️ (3.) ✔️ ✔️ ✔️
  1. Because Event Hub is stream based and not transactional, these method calls are just ignored.
  2. Use receiver.WithDeadLettering(publisher) to enable dead letter support.
  3. Use receiver.WithPropertiesInContent() to enable user properties support (not implemented yet).

= Noop/Ignored

Namotion.Messaging

Nuget

Contains common helper methods and base implementations of the abstractions:

  • InMemoryMessagePublisherReceiver: In-memory message publisher and receiver for integration tests and dependency free local development environments (i.e. use this implementation when no connection strings are defined).

Extension methods to enhance or modify instances:

  • WithMessageType<T>(): Changes the type of the interface from IMessagePublisher/IMessageReceiver to IMessagePublisher<T>/IMessageReceiver<T>.
  • WithDeadLettering(messagePublisher): Adds support for a custom dead letter queue, i.e. a call to DeadLetterAsync() will confirm the message and publish it to the specified messagePublisher.

Namotion.Messaging.Azure.ServiceBus

Nuget

Implementations:

  • ServiceBusMessagePublisher
  • ServiceBusMessageReceiver

Behavior:

  • Multiple queue receivers will process messages in parallel (competing consumers).
  • When handleMessages throws an exception, then the messages are abandoned and later reprocessed until they are moved to the dead letter queue.

Dependencies:

Namotion.Messaging.Azure.EventHub

Nuget

Implementations:

  • EventHubMessagePublisher
  • EventHubMessageReceiver

Behavior:

  • Messages are processed in sequence per partition and can only be retried immediately or be ignored.
  • Exceptions from handleMessages are logged and then ignored, i.e. the processing moves forward in the partition.

Dependencies:

Namotion.Messaging.Azure.Storage.Queue

Nuget

Implementations:

  • AzureStorageQueuePublisher
  • AzureStorageQueueReceiver

Behavior:

  • When handleMessages throws an exception, then the messages are rejected and later reprocessed.

Dependencies:

Namotion.Messaging.RabbitMQ

Nuget

Implementations:

  • RabbitMessagePublisher
  • RabbitMessageReceiver

Behavior:

  • When handleMessages throws an exception, then the messages are rejected and later reprocessed.

Dependencies:

Namotion.Messaging.Amazon.SQS

Nuget

Implementations:

  • AmazonSqsMessagePublisher
  • AmazonSqsMessageReceiver

Behavior:

  • When handleMessages throws an exception, then the messages are rejected and later reprocessed.
  • The message's Content bytes are serialized to Base64 because SQS can only handle string content.

Dependencies:

You can’t perform that action at this time.