Scala library for reliable reliable message processing using RabbitMQ and Akka.
Scala
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
src
.gitignore
CHANGELOG.md
INSTALL
LICENSE
README.md
VERSION
build.sbt

README.md

Rabbit-MQ HA Library

This library contains various helpers classes for interacting with RabbitMQ in a standard, reliable way.

There's a standalone example in ConfirmedPublishingExample that uses a number of the classes in this library, this runs against a local RabbitMQ broker and aims to illustrate how failures are dealt with etc.

Standard configuration for RabbitMQ

The library defines the class RabbitMqConfig which has values for the various settings needed to connect to RabbitMQ. The companion object also has a factory method that creates such configuration objects from standard Config objects as defined in the common-config library, using specific parameters listed on the Service configuration guidelines page.

Reliable broker connections

The class RabbitMq has a method reliableConnection, which will return a connection to the RabbitMQ broker that will automatically reconnect after failures, and restore the state of any Channels created on this connection.

This method takes the RabbitMqConfig object mentioned above as argument. Typically, applications will use this line of code to create a RabbitMq connection:

val connection = RabbitMq.reliableConnection(RabbitMqConfig(config))

where config is the configuration returned by the standard Configuration trait.

Recovered broker connections

The RabbitMq class also has a method recoveredConnection, which will return a connection to the RabbitMQ broker that will automatically reconnect after failures, but that will not retry operations such as creating channels or publishing messages, which would then become blocking. This makes this connection suitable for use in Actors that for example publish messages on short-lived channels.

Typically, applications will create such RabbitMq connections as:

val connection = RabbitMq.recoveredConnection(RabbitMqConfig(config))

where config is the configuration returned by the standard Configuration trait.

Common RabbitMQ message consumer

Use the RabbitMqConsumer actor class to retrieve messages from RabbitMQ, where the messages comply with our standard messaging guidelines.

Each instance of these is configured with a QueueConfiguration object, which can be constructed from standard Config. This takes the following values:

Property name Type Comment
queueName String Name of queue.
exchangeName String Name of exchange queue is bound to.
routingKeys List[String] Optional list of routing keys used to bind queues to exchange, one binding per routing key. Ignored for fanout and header exchanges.
bindingArguments List[Map] Optional value with a list of key-value pairs used for headers binding.
prefetchCount Int The maximum number of messages that can be in-process at once.
exchangeType String The exchange type (topic, headers, etc).

The consumer will declare exchanges, queues and bindings as needed, so that no manual setup is needed for RabbitMQ.

General RabbitMQ message consumer

Use the AmqpConsumerActor class to retrieve messages from RabbitMQ. The messages produced by this actor directly pass on the content of the RabbitMQ message.

This class may be deprecated once all services publish messages in the standard format.

Reliable message publisher

Use the RabbitMqReliablePublisher class to send message to RabbitMQ where it's critical that published messages are not lost. This actor will publish messages as persistent messages and using publisher confirms, and it will attempt to re-deliver messages in case of failures.

This actor does not currently give notifications back to senders whether message publishing succeeds or not.

Confirmed message publisher

Use the RabbitMqConfirmedPublisher class to send messages to RabbitMQ in the most reliable way available (as persistent messages and using Publisher Confirms), but reporting failures back to the sender instead of retrying. This is suitable for use where you need to send messages reliably, but want to handle failures yourself instead of having some other code retrying the publishing for you. One example of such cases is if the message you're processing is the result of an incoming RabbitMQ messages that's stored in a persistent queue - in such cases it's often better to just retry that message later instead of ACKing it then having to make sure it's not lost down the line.

Each instance of this actor is configured with a PublisherConfiguration object, which can be constructed from standard Config. This takes the following values:

Property name Type Comment
messageTimeout Duration How long it will try before reporting a failure to publish a message.
exchangeName String Name of exchange messages are published to. If a service wants to publish direct to a queue (not recommended practice) then this value can be omitted, in which case the routingKey parameter identifies the name of the queue (this is using the RabbitMQ "Default Exchange" feature).
routingKey String Routing key used to route messages at the exchange published to. This value is ignored for some exchanges, e.g. fanout exchanges, in which case an empty value may be given.
prefetchCount Int The maximum number of messages that can be in-process at once.
exchangeType String The exchange type (topic, headers, etc).
bindingArguments Map key-value pairs used as arguments when publishing a message

The producer will declare exchanges, queues and bindings as needed, so that no manual setup is needed for RabbitMQ.