RabbitMq client in Scala and Akka actors


This small library allows you use RabbitMQ client via Akka Actors. The main idea implemented in library is to survive losing connection with RabbitMQ server

It gives you two actors ConnectionActor and ChannelActor


  • handles connection failures and notifies children
  • keep trying to reconnect if connection lost
  • provides children with new channels when needed


  • may store messages in memory if channel lost
  • send stored messages as soon as new channel received
  • retrieve new channel if current is broken

Since version 3.0.0

libraryDependencies += "com.thenewmotion" %% "akka-rabbitmq" % "3.0.0"

To add as depency prior releases add resolver to New Motion public repo

resolvers += "New Motion Repository" at "http://nexus.thenewmotion.com/content/groups/public/"
libraryDependencies += "com.thenewmotion" %% "akka-rabbitmq" % "2.3"


Since version 3.0.0


For prior releases

    <name>New Motion Repository</name>

Tutorial in comparisons

Before start, you need to add import statement

    import com.thenewmotion.akka.rabbitmq._

Create connection

Default approach:

    val factory = new ConnectionFactory()
    val connection: Connection = factory.newConnection()

Actor style:

    val factory = new ConnectionFactory()
    val connectionActor: ActorRef = system.actorOf(ConnectionActor.props(factory))

Let's name it:

    system.actorOf(ConnectionActor.props(factory), "my-connection")

How often will it reconnect?

    import concurrent.duration._
    system.actorOf(ConnectionActor.props(factory, reconnectionDelay = 10.seconds), "my-connection")

Create channel

That's plain option:

    val channel: Channel = connection.createChannel()

But we can do better. Asynchronously:

    connectionActor ! CreateChannel(ChannelActor.props())


    val channelActor: ActorRef = connectionActor.createChannel(ChannelActor.props())

Maybe give it a name:

    connectionActor.createChannel(ChannelActor.props(), Some("my-channel"))

What's about custom actor:

    connectionActor.createChannel(Props(new Actor {
      def receive = {
        case channel: Channel =>

Setup channel

    channel.queueDeclare("queue_name", false, false, false, null)

Actor style:

    // this function will be called each time new channel received
    def setupChannel(channel: Channel, self: ActorRef) {
      channel.queueDeclare("queue_name", false, false, false, null)
    val channelActor: ActorRef = connectionActor.createChannel(ChannelActor.props(setupChannel))

Use channel

    channel.basicPublish("", "queue_name", null, "Hello world".getBytes)

Using our channelActor:

    def publish(channel: Channel) {
      channel.basicPublish("", "queue_name", null, "Hello world".getBytes)
    channelActor ! ChannelMessage(publish)

But I don't want to lose messages when connection is lost:

    channelActor ! ChannelMessage(publish, dropIfNoChannel = false)

    system stop channelActor

    system stop connectionActor // will close all channels associated with this connection

Here is RabbitMQ Publish/Subscribe in actors style

object PublishSubscribe extends App {
  implicit val system = ActorSystem()
  val factory = new ConnectionFactory()
  val connection = system.actorOf(ConnectionActor.props(factory), "rabbitmq")
  val exchange = "amq.fanout"

  def setupPublisher(channel: Channel, self: ActorRef) {
    val queue = channel.queueDeclare().getQueue
    channel.queueBind(queue, exchange, "")
  connection ! CreateChannel(ChannelActor.props(setupPublisher), Some("publisher"))

  def setupSubscriber(channel: Channel, self: ActorRef) {
    val queue = channel.queueDeclare().getQueue
    channel.queueBind(queue, exchange, "")
    val consumer = new DefaultConsumer(channel) {
      override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) {
        println("received: " + fromBytes(body))
    channel.basicConsume(queue, true, consumer)
  connection ! CreateChannel(ChannelActor.props(setupSubscriber), Some("subscriber"))

  Future {
    def loop(n: Long) {
      val publisher = system.actorFor("/user/rabbitmq/publisher")

      def publish(channel: Channel) {
        channel.basicPublish(exchange, "", null, toBytes(n))
      publisher ! ChannelMessage(publish, dropIfNoChannel = false)

      loop(n + 1)

  def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")
  def toBytes(x: Long) = x.toString.getBytes("UTF-8")

Other Libraries

Akka-RabbitMQ is a low-level library, and leaves it to the coder to manually wire consumers, serialize messages, etc. If you'd like a higher-level abstraction library, look at Op-Rabbit (which uses this library).