-
Notifications
You must be signed in to change notification settings - Fork 0
Rabbit MQ
Rabbit MQ is the latest MQ Server to be supported in ServiceStack, which like Redis MQ before it, adds an additional option for executing your ServiceStack services, in this case, via a durable Rabbit MQ broker.
A great way to get started with Rabbit MQ on Windows is by following the Rabbit MQ Windows Installation guide which also includes sample source code for accessing Rabbit MQ Server using the .NET RabbitMQ.Client on NuGet.
ServiceStack builds on top of RabbitMQ.Client to provide concrete implementations for ServiceStack's high-level Messaging APIs enabling a number of messaging features including publishing and receiving messages as well as registering and processing message handlers.
ServiceStack's Rabbit MQ bindings is available on NuGet at:
PM> Install-Package ServiceStack.RabbitMq
The package includes RabbitMqServer, the Rabbit MQ implementation of ServiceStack's MQ IMessageService Server API.
RabbitMqServer is basically a high-level POCO-based MQ server library that's de-coupled and can operate independently from the ServiceStack web framework. Given this, we're able to learn its functionality by playing with the library on its own. By default, RabbitMqServer looks for a Rabbit MQ Server instance on localhost at Rabbit MQ's default port 5672:
var mqServer = new RabbitMqServer();Which is equivalent to these other configurations:
var mqServer = new RabbitMqServer("localhost");
var mqServer = new RabbitMqServer("localhost:5672");
var mqServer = new RabbitMqServer("amqp://localhost:5672");More connection strings examples are available on Rabbit MQ's URI Specification page.
Run-able examples of these code-samples are available in the RabbitMqServerIntroTests.
Just like the rest of ServiceStack, you can use any POCO for your messages (aka Request DTOs) which can be serialized with ServiceStack's JSON Serializer which it uses to serialize the payload body, a simple example is just:
public class Hello
{
public string Name { get; set; }
}Now that we have a message we can use, we can start listening to any of these messages sent via the broker by registering handlers for it. Here's how to register a simple handler that just prints out each message it receieves:
mqServer.RegisterHandler<Hello>(m => {
Hello request = m.GetBody();
"Hello, {0}!".Print(request.Name);
return null;
});Each handler receives an IMessage
which is just the body of the message that was sent (i.e. T) wrapped inside an IMessage container that contains metadata about the message that was received.
Inside your handler you can then use IMessage.GetBody() to extract the typed body, and in this case we signify the service has no response by returning null.
Once all your handlers are registered you can start listening to messages by starting the MQ Server:
mqServer.Start();This will spawn 2 new threads for each handler, one to listen to the Message Inbox (QueueNames<T>.In) and another to listen on the
Priority Queue, whose name is identified by the string QueueNames<T>.Priority.
Note: You can whitelist which messages you want to enable Priority Queue's for with
mqServer.PriortyQueuesWhitelistor disable them all by settingmqServer.DisablePriorityQueues = true.
With the mqServer started, you're now ready to start processing messages, you can do with a message queue client that you can get from a new RabbitMqMessageFactory or the mqServer directly, e.g:
using (var mqClient = mqServer.CreateMessageQueueClient())
{
mqClient.Publish(new Hello { Name = "World" });
}The above shows the most common usage where you can publish POCO's directly, behind the scenes this gets serialized as JSON and embedded as the payload of a new persistent message that's sent using a routing key of the same name as the destination queue which by convention is mapped 1:1 to a queue of the same name, i.e: mq:Hello.inq. In effect, publishing messages are sent to a distinct Inbox Queue that's reserved for each message type, essentially behaving as a work queue.
By default, RabbitMqServer will send a response message after it's processed each message.
When, like in this example, there is no response it will re-publish the message it receives to the Messages Out Queue - behind the
scenes it sends a transient message to the non-durable mq:Hello.outq via the mx.servicestack.topic Rabbit MQ "fanout" exchange.
Notifying any subscribers to mq:Hello.outq each time a "Hello" message is processed.
We can use this behavior to block until a message gets processed with:
IMessage<Hello> msgCopy = mqClient.Get<Hello>(QueueNames<Hello>.Out);
mqClient.Ack(msgCopy);
msgCopy.GetBody().Name //= WorldAlso shown in this example is an explicit Ack which you want to do for each message you receive to tell Rabbit MQ that you've taken responsibility of the message so it can safely remove it off the queue.
Many times processed messages will return a response which just means a POCO is returned from the handler, e.g:
mqServer.RegisterHandler<Hello>(m =>
new HelloResponse { Result = "Hello, {0}!".Fmt(m.GetBody().Name) });When there is a response, than instead of the .outq, the response message is sent to the .inq of the response message type,
namely the mq:HelloResponse.inq for a HelloResponse type, e.g:
mqClient.Publish(new Hello { Name = "World" });
var responseMsg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In);
mqClient.Ack(responseMsg);
responseMsg.GetBody().Result //= Hello, World!Note: this behavior can be controlled to only publish responses for types in the
mqServer.PublishResponsesWhitelist, otherwise response messages can be disabled entirely by settingmqServer.DisablePublishingResponses = true.
By default Rabbit Mq Server lets you specify whether or not you want messages that cause an exception to be retried by specifying a RetryCount of 1 (default), or if you don't want messages re-tried with a value of 0, e.g:
var mqServer = new RabbitMqServer { RetryCount = 1 };To illustrate how this works we'll keep a counter of how many times a message handler is invoked, then throw an exception to force an error, e.g:
var called = 0;
mqServer.RegisterHandler<Hello>(m => {
called++;
throw new ArgumentException("Name");
});Now when we publish a message the response instead gets published to the messages .dlq, after it's transparently retried which we can verify with called=2:
mqClient.Publish(new Hello { Name = "World" });
IMessage<Hello> dlqMsg = mqClient.Get<Hello>(QueueNames<Hello>.Dlq);
mqClient.Ack(dlqMsg);
Assert.That(called, Is.EqualTo(2));DLQ Messages retains the original message for the body as well as the last exception serialized in the IMessage.Error
ResponseStatus metadata property, e.g:
dlqMsg.GetBody().Name //= World
dlqMsg.Error.ErrorCode //= typeof(ArgumentException).Name
dlqMsg.Error.Message //= NameSince the body of the original message is left in-tact, you are able to retry failed messages by removing them from the dead-letter-queue and re-publish the original message, e.g:
IMessage<Hello> dlqMsg = mqClient.Get<Hello>(QueueNames<Hello>.Dlq);
mqClient.Publish(dlqMsg.GetBody());
mqClient.Ack(dlqMsg);Whilst RabbitMqServer is useful on its own, it also has a distinct advantage of being able to directly Execute ServiceStack Services. To get it to execute ServiceStack Services instead, just route the handler for each Request DTO you to be able to process via Rabbit MQ to ServiceStack's ServiceController.ExecuteMessage, e.g:
public class AppHost : AppHostHttpListenerBase
{
public AppHost() : base("Rabbit MQ Test Host", typeof(HelloService).Assembly) {}
public override void Configure(Container container)
{
container.Register<IMessageService>(c => new RabbitMqServer());
var mqServer = container.Resolve<IMessageService>();
mqServer.RegisterHandler<Hello>(ServiceController.ExecuteMessage);
mqServer.Start();
}
}Now instead of executing custom handlers it will execute ServiceStack services matching either a Post or Any verb, e.g:
public class HelloService : Service
{
public object Any(Hello request)
{
return new HelloResponse { Result = "Hello, {0}!".Fmt(request.Name) };
}
}In addition to executing your service implementation, all Services processed via a MQ Server goes through ServiceStack's standard MQ Request Pipeline.
With everything in place, just initalize ServiceStack's AppHost then the Rabbit MQ Server starts listening and ready to process any messages published to the Request DTO's .inq:
var appHost = new AppHost().Init();
using (var mqClient = appHost.Resolve<IMessageService>().CreateMessageQueueClient())
{
mqClient.Publish(new Hello { Name = "World" });
var responseMsg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In);
mqClient.Ack(responseMsg);
responseMsg.GetBody().Result //= Hello, World!
}Whilst it's unlikely to be a common use-case, you can even run a ServiceStack MQ Server without a HTTP host by using a configuring the mqServer inside a generic BasicAppHost, e.g:
var appHost = new BasicAppHost(typeof(HelloService).Assembly) {
ConfigureAppHost = host => {
host.Container.Register<IMessageService>(c => new RabbitMqServer());
var mqServer = host.Container.Resolve<IMessageService>();
mqServer.RegisterHandler<Hello>(host.ServiceController.ExecuteMessage);
mqServer.Start();
}
}.Init();
using (var mqClient = appHost.Resolve<IMessageService>().CreateMessageQueueClient())
{
mqClient.Publish(new Hello { Name = "World" });
var msg = mqClient.Get<HelloResponse>(QueueNames<HelloResponse>.In);
mqClient.Ack(msg);
Assert.That(msg.GetBody().Result, Is.EqualTo("Hello, World!"));
}In this way, ServiceStack's AppHost is being used as a pure "logic server", hosting auto-wired services within its MQ Request Pipeline, whilst taking advantage of ServiceStack's flexibility, extensibility options and plugin ecosystem.
Run-able examples of these code-samples are available in the RabbitMqServerIntroTests.
The Rabbit MQ Server have a few features unique to Rabbit MQ:
- ConnectionFactory ConnectionFactory - The RabbitMQ.Client Connection factory to introspect connection properties and create low-level connections
- bool AutoReconnect - Whether Rabbit MQ should auto-retry connecting when a connection to Rabbit MQ Server instance is dropped
- bool UsePolling - Whether to use polling for consuming messages instead of a long-term subscription
- int RetryCount - How many times a message should be retried before sending to the DLQ. Valid range for Rabbit MQ: 0-1.
In addition to sharing a similar architecture to Redis MQ, it also shares a number of common features:
- int? KeepAliveRetryAfterMs - Wait before Starting the MQ Server after a restart
- IMessageFactory MessageFactory - The MQ Message Factory used by this MQ Server
- Func<IMessage, IMessage> RequestFilter - Execute global transformation or custom logic before a request is processed. Must be thread-safe.
- Func<object, object> ResponseFilter - Execute global transformation or custom logic on the response. Must be thread-safe.
- Action ErrorHandler - Execute global error handler logic. Must be thread-safe.
- string[] PriortyQueuesWhitelist - If you only want to enable priority queue handlers (and threads) for specific msg types.
- bool DisablePriorityQueues - Don't listen on any Priority Queues
- string[] PublishResponsesWhitelist - Opt-in to only publish responses on this white list. Publishes all responses by default.
- bool DisablePublishingResponses - Don't publish any response messages
- Why ServiceStack?
- What is a message based web service?
- Advantages of message based web services
- Why remote services should use separate DTOs
- Getting Started
- Reference
- Clients
- Formats
- View Engines 4. Razor & Markdown Razor
- Hosts
- Security
- Advanced
- Configuration options
- Access HTTP specific features in services
- Logging
- Serialization/deserialization
- Request/response filters
- Filter attributes
- Concurrency Model
- Built-in caching options
- Built-in profiling
- Form Hijacking Prevention
- Auto-Mapping
- HTTP Utils
- Virtual File System
- Config API
- Physical Project Structure
- Modularizing Services
- MVC Integration
- Plugins 3. Request logger 4. Swagger API
- Tests
- Other Languages
- Use Cases
- Performance
- How To
- Future