Reactive Streams driver for AMQP protocol. Powered by RabbitMQ library.
Scala Shell
Latest commit 3658602 Dec 15, 2016 Colin Breck committed with mkiedys - Only open the RabbitMQ channel when the Subscriber is subscribed to.
- Ensure open RabbitMQ channels are closed in the finalize method.

README.md

Reactive Streams: AMQP

Join the chat at https://gitter.im/ScalaConsultants/reactive-rabbit

Build Status

Reactive Streams driver for AMQP protocol. Powered by RabbitMQ library.

Available at Maven Central for Scala 2.11 and 2.12:

libraryDependencies += "io.scalac" %% "reactive-rabbit" % "1.1.4"

Example

Akka Streams - 2.4.12

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import io.scalac.amqp.Connection


// streaming invoices to Accounting Department
val connection = Connection()
// create org.reactivestreams.Publisher
val queue = connection.consume(queue = "invoices")
// create org.reactivestreams.Subscriber
val exchange = connection.publish(exchange = "accounting_department",
  routingKey = "invoices")

implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
// Run akka-streams with queue as Source and exchange as Sink
Source.fromPublisher(queue).map(_.message).runWith(Sink.fromSubscriber(exchange))

API Docs

Run sbt doc and open target/scala-2.12/index.html.

Settings

There are 3 options for passing AMQP settings:

  • Use default settings from reference.conf and application.conf. See Config library. See refrence.conf for settings layout.
Connection()
  • Use Config programatically.
Connection(config : Config)
  • Use ConnectionSettings programatically
Connection(settings: ConnectionSettings)

ConnectionSettings have following properties:

  • addresses: Seq[Address] broker addresses (hostname/port pairs) to try in order. A random one will be picked during recovery.
  • virtualHost: String virtual host to use when connecting to the broker.
  • username: String user name to use when connecting to the broker.
  • password: String password to use when connecting to the broker.
  • heartbeat: Option[FiniteDuration] requested heartbeat interval, at least 1 second.None to disable heartbeat.
  • timeout: Duration the default connection timeout, at least 1 millisecond.
  • automaticRecovery: Boolean enable automatic connection recovery. Subscriptions are not recovered.
  • recoveryInterval: FiniteDuration how long will automatic recovery wait before attempting to reconnect.
  • ssl: Option[String] allows to use SSL for connecting to the broker. Valid values depend on JRE, see possiblities. Recent RabbitMQ servers does not allow SSL3.

Connection

Connection trait API has two groups of methods: to manage AMQP infrastructure (ie. declare and delete exchanges, queues and bindings) and to create ReactiveStreams entities: Publisher and Subscriber.
consume(queue, prefetch) - creates Delivery stream Publisher for messages from queue.
publish(exchange, routingKey) - creates Subscription that takes stream of Message that will be sent to exchange with fixed routingKey.
publish(exchange) - creates Subscription for stream of Routed (tuple of Message and routing key).