Skip to content
Demis Bellot edited this page Feb 1, 2014 · 21 revisions

A nice advantage of ServiceStack's message-based design is its ability to host its Services on a variety of different endpoints. This design makes it possible to easily enable SOAP support, host Services via a Redis MQ Server in addition to ServiceStack's strong HTTP Web Services story. This release extends these existing options with a new MQ Server for the extremely popular and robust Open Source AMQP messaging broker: Rabbit MQ.

Getting Started

A great way to get started with Rabbit MQ on Windows is by following the Rabbit MQ Windows Installation guide which also includes sample source code for accessing Rabbit MQ Server using the .NET RabbitMQ.Client on NuGet.

ServiceStack.RabbitMq

ServiceStack builds on top of RabbitMQ.Client to provide concrete implementations for ServiceStack's high-level Messaging APIs enabling a number of messaging features including publishing and receiving messages as well as registering and processing message handlers.

ServiceStack's Rabbit MQ bindings is available on NuGet at:

PM> Install-Package ServiceStack.RabbitMq

RabbitMqServer

The package includes RabbitMqServer, the Rabbit MQ implementation of ServiceStack's MQ IMessageService Server API.

RabbitMqServer is basically a high-level POCO-based MQ server library that's de-coupled and can operate independently from the ServiceStack web framework. Given this, we're able to learn its functionality by playing with the library on its own. By default, RabbitMqServer looks for a Rabbit MQ Server instance on localhost at Rabbit MQ's default port 5672:

var mqServer = new RabbitMqServer();

Which is equivalent to these other configurations:

var mqServer = new RabbitMqServer("localhost");
var mqServer = new RabbitMqServer("localhost:5672");
var mqServer = new RabbitMqServer("amqp://localhost:5672");

More connection strings examples are available on Rabbit MQ's URI Specification page.

Run-able examples of these code-samples are available in the RabbitMqServerIntroTests.

POCO Messages

Just like the rest of ServiceStack, you can use any POCO for your messages (aka Request DTOs) which can be serialized with ServiceStack's JSON Serializer which it uses to serialize the payload body, a simple example is just:

public class Hello
{
    public string Name { get; set; }
}

Registering Message Handlers

Now that we have a message we can use, we can start listening to any of these messages sent via the broker by registering handlers for it. Here's how to register a simple handler that just prints out each message it receieves:

mqServer.RegisterHandler<Hello>(m => {
    Hello request = m.GetBody();
    "Hello, {0}!".Print(request.Name);
    return null;
});

Each handler receives an IMessage which is just the body of the message that was sent (i.e. T) wrapped inside an IMessage container that contains metadata about the message that was received. Inside your handler you can then use IMessage.GetBody() to extract the typed body, and in this case we signify the service has no response by returning null.

Starting the Rabbit MQ Server

Once all your handlers are registered you can start listening to messages by starting the MQ Server:

mqServer.Start();

This will spawn 2 new threads for each handler, one to listen to the Message Inbox (QueueNames<T>.In) and another to listen on the Priority Queue, whose name is identified by the string QueueNames<T>.Priority.

Note: You can whitelist which messages you want to enable Priority Queue's for with mqServer.PriortyQueuesWhitelist or disable them all by setting mqServer.DisablePriorityQueues = true.

Publishing messages

With the mqServer started, you're now ready to start processing messages, you can do with a message queue client that you can get from a new RabbitMqMessageFactory or the mqServer directly, e.g:

using (var mqClient = mqServer.CreateMessageQueueClient())
{
    mqClient.Publish(new Hello { Name = "World" });
}

The above shows the most common usage where you can publish POCO's directly, behind the scenes this gets serialized as JSON and embedded as the payload of a new persistent message that's sent using a routing key of the same name as the destination queue which by convention is mapped 1:1 to a queue of the same name, i.e: mq:Hello.inq. In effect, publishing messages are sent to a distinct Inbox Queue that's reserved for each message type, essentially behaving as a work queue.

Messages with no responses are sent to '.outq' Topic

By default, RabbitMqServer will send a response message after it's processed each message, what the response is and which Queue (or HTTP url) the response is published to is dependent on the outcome of the message handler, i.e:

Rabbit MQ Flowchart

When, like in this example, there is no response it will re-publish the message it receives to the Messages Out Queue - behind the scenes it sends a transient message to the non-durable mq:Hello.outq via the mx.servicestack.topic Rabbit MQ "fanout" exchange. Notifying any subscribers to mq:Hello.outq each time a "Hello" message is processed.

We can use this behavior to block until a message gets processed with:

IMessage<Hello> msgCopy = mqClient.Get<Hello>(QueueNames<Hello>.Out);
mqClient.Ack(msgCopy);
msgCopy.GetBody().Name //= World

Also shown in this example is an explicit Ack which you want to do for each message you receive to tell Rabbit MQ that you've taken responsibility of the message so it can safely remove it off the queue.

Messages with Responses are published to the Response .inq

Many times processed messages will return a response which just means a POCO is returned from the handler, e.g:

mqServer.RegisterHandler<Hello>(m =>
    new HelloResponse { Result = "Hello, {0}!".Fmt(m.GetBody().Name) });

When there is a response, than instead of the .outq, the response message is sent to the .inq of the response message type, namely the mq:HelloResponse.inq for a HelloResponse type, e.g:

mqClient.Publish(new Hello { Name = "World" });

var responseMsg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In);
mqClient.Ack(responseMsg);
responseMsg.GetBody().Result //= Hello, World!

Note: this behavior can be controlled to only publish responses for types in the mqServer.PublishResponsesWhitelist, otherwise response messages can be disabled entirely by setting mqServer.DisablePublishingResponses = true.

Responses from Messages with ReplyTo are published to that address

Whilst for the most part you'll only need to publish POCO messages, you can also alter the default behavior by providing a customized IMessage<T> wrapper which ServiceStack will send instead, e.g. to change the queue where the response gets published, specify your own ReplyTo address, e.g:

const string replyToMq = "mq:Hello.replyto";
mqClient.Publish(new Message<Hello>(new Hello { Name = "World" }) {
    ReplyTo = replyToMq
});

IMessage<HelloResponse> responseMsg = mqClient.Get<HelloResponse>(replyToMq);
mqClient.Ack(responseMsg);
responseMsg.GetBody().Result //= Hello, World!

A nice feature exclusive to ServiceStack is that the ReplyTo address can even be a HTTP Uri, in which case ServiceStack will attempt to POST the raw response at that address. This works nicely with ServiceStack Services as they excel at accepting serialized DTO's.

Messages that generate exceptions can be re-tried, then publishd to the dead-letter-queue (.dlq)

By default Rabbit Mq Server lets you specify whether or not you want messages that cause an exception to be retried by specifying a RetryCount of 1 (default), or if you don't want messages re-tried with a value of 0, e.g:

var mqServer = new RabbitMqServer { RetryCount = 1 };

To illustrate how this works we'll keep a counter of how many times a message handler is invoked, then throw an exception to force an error, e.g:

var called = 0;
mqServer.RegisterHandler<Hello>(m => {
    called++;
    throw new ArgumentException("Name");
});

Now when we publish a message the response instead gets published to the messages .dlq, after it's transparently retried which we can verify with called=2:

mqClient.Publish(new Hello { Name = "World" });

IMessage<Hello> dlqMsg = mqClient.Get<Hello>(QueueNames<Hello>.Dlq);
mqClient.Ack(dlqMsg);

Assert.That(called, Is.EqualTo(2));

DLQ Messages retains the original message for the body as well as the last exception serialized in the IMessage.Error ResponseStatus metadata property, e.g:

dlqMsg.GetBody().Name   //= World
dlqMsg.Error.ErrorCode  //= typeof(ArgumentException).Name
dlqMsg.Error.Message    //= Name

Since the body of the original message is left in-tact, you are able to retry failed messages by removing them from the dead-letter-queue and re-publish the original message, e.g:

IMessage<Hello> dlqMsg = mqClient.Get<Hello>(QueueNames<Hello>.Dlq);

mqClient.Publish(dlqMsg.GetBody());

mqClient.Ack(dlqMsg);

Adding Rabbit MQ support to ServiceStack

Whilst RabbitMqServer is useful on its own, it also has a distinct advantage of being able to directly Execute ServiceStack Services. To get it to execute ServiceStack Services instead, just route the handler for each Request DTO you to be able to process via Rabbit MQ to ServiceStack's ServiceController.ExecuteMessage, e.g:

public class AppHost : AppHostHttpListenerBase
{
    public AppHost() : base("Rabbit MQ Test Host", typeof(HelloService).Assembly) {}

    public override void Configure(Container container)
    {
        container.Register<IMessageService>(c => new RabbitMqServer());

        var mqServer = container.Resolve<IMessageService>();

        mqServer.RegisterHandler<Hello>(ServiceController.ExecuteMessage);
        mqServer.Start();
    }
}

Now instead of executing custom handlers it will execute ServiceStack services matching either a Post or Any verb, e.g:

public class HelloService : Service
{
    public object Any(Hello request)
    {
        return new HelloResponse { Result = "Hello, {0}!".Fmt(request.Name) };
    }
}

In addition to executing your service implementation, all Services processed via a MQ Server goes through ServiceStack's standard MQ Request Pipeline.

With everything in place, just initalize ServiceStack's AppHost then the Rabbit MQ Server starts listening and ready to process any messages published to the Request DTO's .inq:

var appHost = new AppHost().Init();

using (var mqClient = appHost.Resolve<IMessageService>().CreateMessageQueueClient())
{
    mqClient.Publish(new Hello { Name = "World" });

    var responseMsg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In);
    mqClient.Ack(responseMsg);
    responseMsg.GetBody().Result //= Hello, World!     
}

Process ServiceStack MQ Services without a HTTP host

Whilst it's unlikely to be a common use-case, you can even run a ServiceStack MQ Server without a HTTP host by using a configuring the mqServer inside a generic BasicAppHost, e.g:

var appHost = new BasicAppHost(typeof(HelloService).Assembly) {
    ConfigureAppHost = host => {
        host.Container.Register<IMessageService>(c => new RabbitMqServer());

        var mqServer = host.Container.Resolve<IMessageService>();

        mqServer.RegisterHandler<Hello>(host.ServiceController.ExecuteMessage);
        mqServer.Start();
    }
}.Init();

using (var mqClient = appHost.Resolve<IMessageService>().CreateMessageQueueClient())
{
    mqClient.Publish(new Hello { Name = "World" });

    var msg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In);
    mqClient.Ack(msg);
    Assert.That(msg.GetBody().Result, Is.EqualTo("Hello, World!"));
}

In this way, ServiceStack's AppHost is being used as a pure "logic server", hosting auto-wired services within its MQ Request Pipeline, whilst taking advantage of ServiceStack's flexibility, extensibility options and plugin ecosystem.

Run-able examples of these code-samples are available in the RabbitMqServerIntroTests.

Rabbit MQ Features

The Rabbit MQ Server have a few features unique to Rabbit MQ:

  • ConnectionFactory ConnectionFactory - The RabbitMQ.Client Connection factory to introspect connection properties and create low-level connections
  • bool AutoReconnect - Whether Rabbit MQ should auto-retry connecting when a connection to Rabbit MQ Server instance is dropped
  • bool UsePolling - Whether to use polling for consuming messages instead of a long-term subscription
  • int RetryCount - How many times a message should be retried before sending to the DLQ. Valid range for Rabbit MQ: 0-1.

In addition to sharing a similar architecture to Redis MQ, it also shares a number of common features:

  • int? KeepAliveRetryAfterMs - Wait before Starting the MQ Server after a restart
  • IMessageFactory MessageFactory - The MQ Message Factory used by this MQ Server
  • Func<IMessage, IMessage> RequestFilter - Execute global transformation or custom logic before a request is processed. Must be thread-safe.
  • Func<object, object> ResponseFilter - Execute global transformation or custom logic on the response. Must be thread-safe.
  • Action<Exception> ErrorHandler - Execute global error handler logic. Must be thread-safe.
  • string[] PriortyQueuesWhitelist - If you only want to enable priority queue handlers (and threads) for specific msg types.
  • bool DisablePriorityQueues - Don't listen on any Priority Queues
  • string[] PublishResponsesWhitelist - Opt-in to only publish responses on this white list. Publishes all responses by default.
  • bool DisablePublishingResponses - Don't publish any response messages


  1. Getting Started
    1. Create your first webservice
    2. Your first webservice explained
    3. ServiceStack's new API Design
    4. Designing a REST-ful service with ServiceStack
    5. Example Projects Overview
  2. Reference
    1. Order of Operations
    2. The IoC container
    3. Metadata page
    4. Rest, SOAP & default endpoints
    5. SOAP support
    6. Routing
    7. Service return types
    8. Customize HTTP Responses
    9. Plugins
    10. Validation
    11. Error Handling
    12. Security
    13. Debugging
  3. Clients
    1. Overview
    2. C# client
    3. Silverlight client
    4. JavaScript client
    5. Dart Client
    6. MQ Clients
  4. Formats
    1. Overview
    2. JSON/JSV and XML
    3. ServiceStack's new HTML5 Report Format
    4. ServiceStack's new CSV Format
    5. MessagePack Format
    6. ProtoBuf Format
  5. View Engines 4. Razor & Markdown Razor
    1. Markdown Razor
  6. Hosts
    1. IIS
    2. Self-hosting
    3. Messaging
    4. Mono
  7. Security
    1. Authentication/authorization
    2. Sessions
    3. Restricting Services
  8. Advanced
    1. Configuration options
    2. Access HTTP specific features in services
    3. Logging
    4. Serialization/deserialization
    5. Request/response filters
    6. Filter attributes
    7. Concurrency Model
    8. Built-in caching options
    9. Built-in profiling
    10. Form Hijacking Prevention
    11. Auto-Mapping
    12. HTTP Utils
    13. Virtual File System
    14. Config API
    15. Physical Project Structure
    16. Modularizing Services
    17. MVC Integration
  9. Plugins 3. Request logger 4. Swagger API
  10. Tests
    1. Testing
    2. HowTo write unit/integration tests
  11. Other Languages
    1. FSharp
    2. VB.NET
  12. Use Cases
    1. Single Page Apps
    2. Azure
    3. Logging
    4. Bundling and Minification
    5. NHibernate
  13. Performance
    1. Real world performance
  14. How To
    1. Sending stream to ServiceStack
    2. Setting UserAgent in ServiceStack JsonServiceClient
    3. ServiceStack adding to allowed file extensions
    4. Default web service page how to
  15. Future
    1. Roadmap

Clone this wiki locally