Skip to content

P47Phoenix/Rebus.AwsSnsAndSqs

Repository files navigation

CodeFactor .NET

Rebus.AwsSnsAndSqs

Implement aws sns and sqs provider for Rebus This provider is port of the existing sqs provider located here with the some slight refactoring and the addition of sns support.

Nuget

  • Get rebus
Install-Package Rebus.AwsSnsAndSqs -Source http://proget.homenet.local/nuget/VINSolutions/
  • Some aws creds. See this for how to set that up.
  • Create a bus
using(var workHandlerActivator = new BuiltinHandlerActivator())
{
    workHandlerActivator.Handle<string>(s =>
    {
        // handle the message
        Console.WriteLine(s);

        return Task.CompletedTask;
    });
    var queueName = Environment.MachineName;
    var worker = Configure
        .With(workHandlerActivator)
        .Logging(configurer => configurer.ColoredConsole())
        .Transport(t =>
        {
            // set the worker queue name
            t.UseAmazonSnsAndSqs(workerQueueAddress: queueName);
        })
        .Routing(r =>
        {
            // Map the message type to the queue
            r.TypeBased().Map<string>(queueName);
        })
        .Start();

    // subscribe to string topic
    await worker.Subscribe<string>();

    while(true)
    {        
        Console.Write("message:");
        string line = Console.ReadLine();

        // publish to a topic
        await worker.Publish($"topic {line}");

        // send to this bus queue
        await worker.Send($"{queueName}: {line}");
        
        // send to this bus queue
        await worker.Defer(TimeSpan.FromSeconds(30), $"defer {queueName}: {line}");

    }
}
  • read Contract based pubsub down a page for more

For more information on rebus there are lots of examples in the wild.

Target Framworks

Framwork What works
netstandard1.3 Everything exept attribute based topics
netstandard2.0 Everything
net45 Everything

Contract based pubsub

For example this following contract

using System;

namespace Topic.Contracts
{
    public class MessengerMessage
    {
        public string Message { get; set; }

        public DateTime CreateDateTime { get; set; }
        public string Sender { get; set; }
    }
}

Will create a topic like

Topic_Contracts_MessengerMessage--Topic_Contracts

In this example of pub sub the client and the worker is the same. Each instance will create queue using the machine name and pid of the process.

var queueName = $"{Environment.MachineName}_{Process.GetCurrentProcess().Id}".ToLowerInvariant();

Next we should subscribe to the topic.

await bus.Subscribe<MessengerMessage>();

Finally we just publish new messages we get onto the topic

var line = String.Empty;
do
{
    Console.Write("message:");
    line = Console.ReadLine();

    await bus.Publish(new MessengerMessage
    {
        CreateDateTime = DateTime.Now,
        Message = line,
        Sender = queueName
    });
}
while (string.IsNullOrWhiteSpace(line) == false);

Full example located here here

Don't like the convention based topic approach?

Make your own topic formater based using the ITopicFormatter

public interface ITopicFormatter
{
    string FormatTopic(string topic);
}

Topic formaters are set when configuring the transport Below you can see that we are going to us the attirbute based formated.

Configure
    .With(activator)
    .Transport(t => 
    { 
        t.UseAmazonSnsAndSqs(workerQueueAddress: queueName, topicFormatter: new YourCustomTopicFormatter()); 
    })
    .Routing(r => r.TypeBased().Map<string>(queueName))
    .Start();

Attribute based topic formater allows you to set an attributes for the topic name

using Rebus.AwsSnsAndSqs;

namespace Rebus.AwsSnsAndSqsTests
{
    [TopicName(nameof(SomeMessageTopic))]
    public class SomeMessageTopic
    {
        public string Message { get; set; }
    }
}

Setup the transport to use attribute based topic

Configure
    .With(activator)
    .Transport(t => 
    { 
        t.UseAmazonSnsAndSqs(workerQueueAddress: queueName, topicFormatter: new AttributeBasedTopicFormatter()); 
    })
    .Routing(r => r.TypeBased().Map<string>(queueName))
    .Start();

Load and perfomance tests

Load test results are located here Results may vary depending on network latency, region, cpu, etc.

Permissions

The permissions needed and example polcy documents are here

Contribute

Pull requests are welcomed from anyone. Here's how to contribute.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published