Skip to content

Rabbit MQ

Demis Bellot edited this page Oct 25, 2016 · 28 revisions

This page has moved to docs.servicestack.net/rabbit-mq


A nice advantage of ServiceStack's message-based design is its ability to host its Services on a variety of different endpoints. This design makes it possible to host Services via MQ Servers, enable SOAP support in addition to ServiceStack's strong HTTP Web Services story. One MQ Server we support is the extremely popular and robust Open Source AMQP messaging broker: Rabbit MQ.

Getting Started

A great way to get started with Rabbit MQ on Windows is by following the Rabbit MQ Windows Installation guide which also includes sample source code for accessing Rabbit MQ Server using the .NET RabbitMQ.Client on NuGet.

ServiceStack.RabbitMq

ServiceStack builds on top of RabbitMQ.Client to provide concrete implementations for ServiceStack's high-level Messaging APIs enabling a number of messaging features including publishing and receiving messages as well as registering and processing message handlers. Like other ServiceStack providers, all MQ Servers are interchangeable, visible in the shared common MqServerIntroTests.cs and MqServerAppHostTests.cs.

Messaging API

ServiceStack's Rabbit MQ bindings is available on NuGet at:

PM> Install-Package ServiceStack.RabbitMq

RabbitMqServer

The package includes RabbitMqServer, the Rabbit MQ implementation of ServiceStack's MQ IMessageService Server API.

RabbitMqServer is basically a high-level POCO-based MQ server library that's de-coupled and can operate independently from the ServiceStack web framework. Given this, we're able to learn its functionality by exploring the library on its own. By default, RabbitMqServer looks for a Rabbit MQ Server instance on localhost at Rabbit MQ's default port 5672:

var mqServer = new RabbitMqServer();

Which is equivalent to these other configurations:

var mqServer = new RabbitMqServer("localhost");
var mqServer = new RabbitMqServer("localhost:5672");
var mqServer = new RabbitMqServer("amqp://localhost:5672");

More connection strings examples are available on Rabbit MQ's URI Specification page.

Run-able examples of these code-samples are available in the RabbitMqServerIntroTests.

Message Filters

There are optional PublishMessageFilter and GetMessageFilter callbacks which can be used to intercept outgoing and incoming messages. The Type name of the message body that was published is available in IBasicProperties.Type, e.g:

var mqServer = new RabbitMqServer("localhost") 
{
    PublishMessageFilter = (queueName, properties, msg) => {
        properties.AppId = "app:{0}".Fmt(queueName);
    },
    GetMessageFilter = (queueName, basicMsg) => {
        var props = basicMsg.BasicProperties;
        receivedMsgType = props.Type; //automatically added by RabbitMqProducer
        receivedMsgApp = props.AppId;
    }
};

using (var mqClient = mqServer.CreateMessageQueueClient())
{
    mqClient.Publish(new Hello { Name = "Bugs Bunny" });
}

receivedMsgApp.Print();   // app:mq:Hello.In
receivedMsgType.Print();  // Hello

POCO Messages

Just like the rest of ServiceStack, you can use any POCO for your messages (aka Request DTOs) which are serialized with ServiceStack's JSON Serializer and embedded as the body payload, a simple example is just:

public class Hello
{
    public string Name { get; set; }
}

Registering Message Handlers

Now that we have a message we can use, we can start listening to any of these messages sent via the broker by registering handlers for it. Here's how to register a simple handler that just prints out each message it receieves:

mqServer.RegisterHandler<Hello>(m => {
    Hello request = m.GetBody();
    "Hello, {0}!".Print(request.Name);
    return null;
});

Each handler receives an IMessage which is just the body of the message that was sent (i.e. T) wrapped inside an IMessage container containing the metadata of the received message. Inside your handler you can use IMessage.GetBody() to extract the typed body, and in this case we signify the service has no response by returning null.

Starting the Rabbit MQ Server

Once all your handlers are registered you can start listening to messages by starting the MQ Server:

mqServer.Start();

Starting the MQ Server spawns 2 threads for each handler, one to listen to the Message Inbox mq:Hello.inq and another to listen on the Priority Queue located at mq:Hello.priorityq.

Note: You can white-list which messages to enable Priority Queue's for with mqServer.PriortyQueuesWhitelist or disable them all by setting mqServer.DisablePriorityQueues = true.

Allocating multiple threads for specific operations

By default only 1 thread is allocated to handle each message type, but this is easily configurable at registration. E.g. you can spawn 4 threads to handle a CPU-intensive operation with:

mqServer.RegisterHandler<Hello>(m => { .. }, noOfThreads:4);

Publishing messages

With the mqServer started, you're now ready to start publishing messages, you can do with a message queue client that you can get from a new RabbitMqMessageFactory or the mqServer directly, e.g:

using (var mqClient = mqServer.CreateMessageQueueClient())
{
    mqClient.Publish(new Hello { Name = "World" });
}

The above shows the most common usage where you can publish POCO's directly, behind the scenes this gets serialized as JSON and embedded as the payload of a new persistent message that's sent using a routing key of the same name as the destination queue which by convention is mapped 1:1 to a queue of the same name, i.e: mq:Hello.inq. In effect, publishing messages are sent to a distinct Inbox Queue that's reserved for each message type, essentially behaving as a work queue.

Message Workflow

By default, RabbitMqServer will send a response message after it's processed each message, what the response is and which Queue (or HTTP url) the response is published to is dependent on the outcome of the message handler, i.e:

Rabbit MQ Flowchart

Messages with no responses are sent to '.outq' Topic

When a handler returns a null response, the incoming message is re-published as a "transient" message to the out queue, e.g: mq:Hello.outq via the Rabbit MQ "fanout" exchange mx.servicestack.topic having the effect of notifying any subscribers to mq:Hello.outq each time a message is processed.

We can use this behavior to block until a message gets processed with:

IMessage<Hello> msgCopy = mqClient.Get<Hello>(QueueNames<Hello>.Out);
mqClient.Ack(msgCopy);
msgCopy.GetBody().Name //= World

Also shown in this example is an explicit Ack (which should be done for each message you receive) to tell Rabbit MQ that you've taken responsibility of the message so it can safely remove it off the queue.

Messages with Responses are published to the Response .inq

Often message handlers will just return a POCO response after it processes a message, e.g:

mqServer.RegisterHandler<Hello>(m =>
    new HelloResponse { Result = "Hello, {0}!".Fmt(m.GetBody().Name) });

Whenever there's a response, then instead of the .outq the response message is sent to the .inq of the response message type, which for a HelloResponse type is just mq:HelloResponse.inq, e.g:

mqClient.Publish(new Hello { Name = "World" });

var responseMsg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In);
mqClient.Ack(responseMsg);
responseMsg.GetBody().Result //= Hello, World!

Note: this behavior can be limited to only publish responses for types in the mqServer.PublishResponsesWhitelist, otherwise all response messages can be disabled entirely by setting mqServer.DisablePublishingResponses = true.

Responses from Messages with ReplyTo are published to that address

Whilst for the most part you'll only need to publish POCO messages, you can also alter the default behavior by providing a customized IMessage<T> wrapper which ServiceStack will send instead, e.g. you can specify your own ReplyTo address to change the queue where the response gets published, e.g:

const string replyToMq = mqClient.GetTempQueueName();
mqClient.Publish(new Message<Hello>(new Hello { Name = "World" }) {
    ReplyTo = replyToMq
});

IMessage<HelloResponse> responseMsg = mqClient.Get<HelloResponse>(replyToMq);
mqClient.Ack(responseMsg);
responseMsg.GetBody().Result //= Hello, World!

A nice feature unique in ServiceStack is that the ReplyTo address can even be a HTTP Uri, in which case ServiceStack will attempt to POST the raw response at that address. This works nicely with ServiceStack Services which excel at accepting serialized DTO's.

Messages that generate exceptions can be re-tried, then published to the dead-letter-queue (.dlq)

By default Rabbit Mq Server lets you specify whether or not you want messages that cause an exception to be retried by specifying a RetryCount of 1 (default), or if you don't want any messages re-tried, specify a value of 0, e.g:

var mqServer = new RabbitMqServer { RetryCount = 1 };

To illustrate how this works we'll keep a counter of how many times a message handler is invoked, then throw an exception to force an error condition, e.g:

var called = 0;
mqServer.RegisterHandler<Hello>(m => {
    called++;
    throw new ArgumentException("Name");
});

Now when we publish a message the response instead gets published to the messages .dlq, after it's first transparently retried. We can verify this behavior by checking called=2:

mqClient.Publish(new Hello { Name = "World" });

IMessage<Hello> dlqMsg = mqClient.Get<Hello>(QueueNames<Hello>.Dlq);
mqClient.Ack(dlqMsg);

Assert.That(called, Is.EqualTo(2));

DLQ Messages retains the original message in their body as well as the last exception serialized in the IMessage.Error ResponseStatus metadata property, e.g:

dlqMsg.GetBody().Name   //= World
dlqMsg.Error.ErrorCode  //= typeof(ArgumentException).Name
dlqMsg.Error.Message    //= Name

Since the body of the original message is left in-tact, you're able to retry failed messages by removing them from the dead-letter-queue then re-publishing the original message, e.g:

IMessage<Hello> dlqMsg = mqClient.Get<Hello>(QueueNames<Hello>.Dlq);

mqClient.Publish(dlqMsg.GetBody());

mqClient.Ack(dlqMsg);

This is useful for recovering failed messages after identifying and fixing bugs that were previously causing exceptions, where you can replay and re-process DLQ messages and continue processing them as normal.

Adding Rabbit MQ support to ServiceStack

Whilst RabbitMqServer is useful on its own, it also has the distinct advantage of being able to directly Execute ServiceStack Services which you can do by just routing the handler for each Request DTO you want to process through to ServiceStack's AppHost ExecuteMessage, e.g:

public class AppHost : AppHostHttpListenerBase
{
    public AppHost() : base("Rabbit MQ Test Host", typeof(HelloService).Assembly) {}

    public override void Configure(Container container)
    {
        container.Register<IMessageService>(c => new RabbitMqServer());

        var mqServer = container.Resolve<IMessageService>();

        mqServer.RegisterHandler<Hello>(ExecuteMessage);
        mqServer.Start();
    }
}

Now each message will instead be executed by the best matching ServiceStack Service that handles the message with either a Post or Any fallback verb, e.g:

public class HelloService : Service
{
    public object Any(Hello request)
    {
        return new HelloResponse { Result = "Hello, {0}!".Fmt(request.Name) };
    }
}

In addition to executing your service implementation, all Services processed via a MQ Server goes through ServiceStack's standard MQ Request Pipeline.

With everything in place, initialize ServiceStack's AppHost to start the Rabbit MQ Server to listen for any messages published to the Request DTO's .inq:

var appHost = new AppHost().Init();

using (var mqClient = appHost.Resolve<IMessageService>().CreateMessageQueueClient())
{
    mqClient.Publish(new Hello { Name = "World" });

    var responseMsg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In);
    mqClient.Ack(responseMsg);
    responseMsg.GetBody().Result //= Hello, World!     
}

Process ServiceStack MQ Services without a HTTP host

Whilst it's unlikely to be a common use-case, you can even run a ServiceStack MQ Server without a HTTP host by configuring the mqServer inside the generic BasicAppHost, e.g:

var appHost = new BasicAppHost(typeof(HelloService).Assembly) {
    ConfigureAppHost = host => {
        host.Container.Register<IMessageService>(c => new RabbitMqServer());

        var mqServer = host.Container.Resolve<IMessageService>();

        mqServer.RegisterHandler<Hello>(host.ExecuteMessage);
        mqServer.Start();
    }
}.Init();

using (var mqClient = appHost.Resolve<IMessageService>().CreateMessageQueueClient())
{
    mqClient.Publish(new Hello { Name = "World" });

    var msg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In);
    mqClient.Ack(msg);
    Assert.That(msg.GetBody().Result, Is.EqualTo("Hello, World!"));
}

In this way, ServiceStack's AppHost is being used as a pure "logic server", hosting auto-wired services within its MQ Request Pipeline, whilst taking advantage of ServiceStack's flexibility and extensibility options and its plugin ecosystem.

Run-able examples of these code-samples are available in the RabbitMqServerIntroTests.

Rabbit MQ Features

The Rabbit MQ Server have some configuration options that are unique to Rabbit MQ:

  • ConnectionFactory ConnectionFactory - The RabbitMQ.Client Connection factory to introspect connection properties and create low-level connections
  • bool AutoReconnect - Whether Rabbit MQ should auto-retry connecting when a connection to Rabbit MQ Server instance is dropped
  • bool UsePolling - Whether to use polling for consuming messages instead of a long-term subscription
  • int RetryCount - How many times a message should be retried before sending to the DLQ. Valid range for Rabbit MQ: 0-1.

In addition to sharing a similar architecture to Redis MQ, it also shares a number of common features:

  • int? KeepAliveRetryAfterMs - Wait before Starting the MQ Server after a restart
  • IMessageFactory MessageFactory - The MQ Message Factory used by this MQ Server
  • Func<IMessage, IMessage> RequestFilter - Execute global transformation or custom logic before a request is processed. Must be thread-safe.
  • Func<object, object> ResponseFilter - Execute global transformation or custom logic on the response. Must be thread-safe.
  • Action<Exception> ErrorHandler - Execute global error handler logic. Must be thread-safe.
  • string[] PriortyQueuesWhitelist - If you only want to enable priority queue handlers (and threads) for specific msg types.
  • bool DisablePriorityQueues - Don't listen on any Priority Queues
  • string[] PublishResponsesWhitelist - Opt-in to only publish responses on this white list. Publishes all responses by default.
  • bool DisablePublishingResponses - Don't publish any response messages


  1. Getting Started

    1. Creating your first project
    2. Create Service from scratch
    3. Your first webservice explained
    4. Example Projects Overview
    5. Learning Resources
  2. Designing APIs

    1. ServiceStack API Design
    2. Designing a REST-ful service with ServiceStack
    3. Simple Customer REST Example
    4. How to design a Message-Based API
    5. Software complexity and role of DTOs
  3. Reference

    1. Order of Operations
    2. The IoC container
    3. Configuration and AppSettings
    4. Metadata page
    5. Rest, SOAP & default endpoints
    6. SOAP support
    7. Routing
    8. Service return types
    9. Customize HTTP Responses
    10. Customize JSON Responses
    11. Plugins
    12. Validation
    13. Error Handling
    14. Security
    15. Debugging
    16. JavaScript Client Library (ss-utils.js)
  4. Clients

    1. Overview
    2. C#/.NET client
      1. .NET Core Clients
    3. Add ServiceStack Reference
      1. C# Add Reference
      2. F# Add Reference
      3. VB.NET Add Reference
      4. Swift Add Reference
      5. Java Add Reference
    4. Silverlight client
    5. JavaScript client
      1. Add TypeScript Reference
    6. Dart Client
    7. MQ Clients
  5. Formats

    1. Overview
    2. JSON/JSV and XML
    3. HTML5 Report Format
    4. CSV Format
    5. MessagePack Format
    6. ProtoBuf Format
  6. View Engines 4. Razor & Markdown Razor

    1. Markdown Razor
  7. Hosts

    1. IIS
    2. Self-hosting
    3. Messaging
    4. Mono
  8. Security

    1. Authentication
    2. Sessions
    3. Restricting Services
    4. Encrypted Messaging
  9. Advanced

    1. Configuration options
    2. Access HTTP specific features in services
    3. Logging
    4. Serialization/deserialization
    5. Request/response filters
    6. Filter attributes
    7. Concurrency Model
    8. Built-in profiling
    9. Form Hijacking Prevention
    10. Auto-Mapping
    11. HTTP Utils
    12. Dump Utils
    13. Virtual File System
    14. Config API
    15. Physical Project Structure
    16. Modularizing Services
    17. MVC Integration
    18. ServiceStack Integration
    19. Embedded Native Desktop Apps
    20. Auto Batched Requests
    21. Versioning
    22. Multitenancy
  10. Caching

  11. Caching Providers

  12. HTTP Caching 1. CacheResponse Attribute 2. Cache Aware Clients

  13. Auto Query

  14. Overview

  15. Why Not OData

  16. AutoQuery RDBMS

  17. AutoQuery Data 1. AutoQuery Memory 2. AutoQuery Service 3. AutoQuery DynamoDB

  18. Server Events

    1. Overview
    2. JavaScript Client
    3. C# Server Events Client
    4. Redis Server Events
  19. Service Gateway

    1. Overview
    2. Service Discovery
  20. Encrypted Messaging

    1. Overview
    2. Encrypted Client
  21. Plugins

    1. Auto Query
    2. Server Sent Events
    3. Swagger API
    4. Postman
    5. Request logger
    6. Sitemaps
    7. Cancellable Requests
    8. CorsFeature
  22. Tests

    1. Testing
    2. HowTo write unit/integration tests
  23. ServiceStackVS

    1. Install ServiceStackVS
    2. Add ServiceStack Reference
    3. TypeScript React Template
    4. React, Redux Chat App
    5. AngularJS App Template
    6. React Desktop Apps
  24. Other Languages

    1. FSharp
      1. Add ServiceStack Reference
    2. VB.NET
      1. Add ServiceStack Reference
    3. Swift
    4. Swift Add Reference
    5. Java
      1. Add ServiceStack Reference
      2. Android Studio & IntelliJ
      3. Eclipse
  25. Amazon Web Services

  26. ServiceStack.Aws

  27. PocoDynamo

  28. AWS Live Demos

  29. Getting Started with AWS

  30. Deployment

    1. Deploy Multiple Sites to single AWS Instance
      1. Simple Deployments to AWS with WebDeploy
    2. Advanced Deployments with OctopusDeploy
  31. Install 3rd Party Products

    1. Redis on Windows
    2. RabbitMQ on Windows
  32. Use Cases

    1. Single Page Apps
    2. HTML, CSS and JS Minifiers
    3. Azure
    4. Connecting to Azure Redis via SSL
    5. Logging
    6. Bundling and Minification
    7. NHibernate
  33. Performance

    1. Real world performance
  34. Other Products

    1. ServiceStack.Redis
    2. ServiceStack.OrmLite
    3. ServiceStack.Text
  35. Future

    1. Roadmap
Clone this wiki locally
You can’t perform that action at this time.