Skip to content
A RabbitMQ client for R
C R C++ M4 Other
Branch: master
Clone or download
Cannot retrieve the latest commit at this time.
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
R Fixes various spelling errors. Aug 28, 2019
man Fixes various spelling errors. Aug 28, 2019
src
tests
.Rbuildignore travis: Adds the skeleton for Travis CI integration. Aug 27, 2019
.dir-locals.el
.gitignore
.travis.yml
DESCRIPTION
NAMESPACE
NEWS.md Bumps the version to 0.2.1 and updates the NEWS file. Aug 28, 2019
README.Rmd
README.md Improves the project overview and installation info in the README. Aug 28, 2019
cleanup Marks the cleanup script as executable. Dec 21, 2018
configure Makes ./configure output more helpful and adds a Makevars.win. Aug 28, 2019
configure.ac
longears.Rproj

README.md

longears

travis-ci build status CRAN status

longears is a fast and fully-featured RabbitMQ client for R, built on top of the reference C library, librabbitmq.

RabbitMQ itself is a highly popular, performant, and robust open-source message broker used to build distributed systems.

longears implements a large portion of the Advanced Message Queuing Protocol (AMQP) used by RabbitMQ , and the API largely reflects the protocol itself. However, this package is not a true low-level interface: it abstracts away many details of AMQP for end users. See Limitations for details.

This package may be of interest to you if you wish to have R speak to your organization’s existing RabbitMQ servers; if you simply need a message queue library, you may be better off with txtq, litq, or the ZeroMQ package rzmq.

Installation

To install longears as a source package you will need its system dependency, librabbitmq. For example, on Debian-based systems (including Ubuntu) you can get this library by running the following from the command line:

$ apt install librabbitmq-dev

For other platforms:

  • macOS (via Homebrew): brew install rabbitmq-c
  • Fedora-based: yum install librabbitmq-devel
  • Arch-based: pacman -S librabbitmq-c

You can then install longears from CRAN with

install.packages("longears")

or from GitHub with

# install.packages("remotes")
remotes::install_github("atheriel/longears")

Basic Usage

If you are not already familiar with RabbitMQ and the message/queue/binding/exchange terminology, I suggest their excellent conceptual overview.

You will also need to have a local RabbitMQ server running with the default settings to follow this guide.

$ # apt install rabbitmq-server
$ systemctl start rabbitmq-server
$ rabbitmqctl status

First, connect to the server (with the default settings):

conn <- amqp_connect()
conn
#> AMQP Connection:
#>   status:  connected
#>   address: localhost:5672
#>   vhost:   '/'

Create an exchange to route messages and a couple of queues to store them:

amqp_declare_exchange(conn, "my.exchange", type = "fanout")
amqp_declare_queue(conn, "my.queue1")
#> AMQP queue 'my.queue1'
#>   messages:  0
#>   consumers: 0
amqp_declare_queue(conn, "my.queue2")
#> AMQP queue 'my.queue2'
#>   messages:  0
#>   consumers: 0
amqp_bind_queue(conn, "my.queue1", "my.exchange", routing_key = "#")
amqp_bind_queue(conn, "my.queue2", "my.exchange", routing_key = "#")

You can also set up a consumer for one of these queues with a callback:

received <- 0
consumer <- amqp_consume(conn, "my.queue2", function(msg) {
  received <<- received + 1
})

Now, send a few messages to this exchange:

amqp_publish(conn, "first", exchange = "my.exchange", routing_key = "#")
amqp_publish(conn, "second", exchange = "my.exchange", routing_key = "#")

Check if your messages are going into the queues:

$ rabbitmqctl list_queues

You can use amqp_get() to pull individual messages back into R:

amqp_get(conn, "my.queue1")
#> Delivery Tag:    1
#> Redelivered: FALSE
#> Exchange:    my.exchange
#> Routing Key: #
#> Message Count:   1
#> 66 69 72 73 74
amqp_get(conn, "my.queue1")
#> Delivery Tag:    2
#> Redelivered: FALSE
#> Exchange:    my.exchange
#> Routing Key: #
#> Message Count:   0
#> 73 65 63 6f 6e 64
amqp_get(conn, "my.queue1")
#> character(0)

Or you can use amqp_listen() to run consumer callbacks:

amqp_listen(conn, timeout = 1)
received
#> [1] 2

To clean things up, delete the queues, the exchange, and disconnect from the server.

amqp_delete_queue(conn, "my.queue1")
amqp_delete_queue(conn, "my.queue2")
amqp_delete_exchange(conn, "my.exchange")
amqp_disconnect(conn)
conn
#> AMQP Connection:
#>   status:  disconnected
#>   address: localhost:5672
#>   vhost:   '/'

And check that the connection is closed:

$ rabbitmqctl list_connections

Limitations

Some AMQP features are not present, notably transaction support (which there are no plans to implement). Others are not exposed through the API but are handled internally according to best practices: channels, message acknowledgements, and prefetch counts, among others. The current design goal of the package is to keep these details from the eyes of users, insofar as that is possible.

If you have need of an AMQP feature (or RabbitMQ extension) that is not currently available or accessible, please consider filing an issue explaining the use case you have in mind.

License

The package is licensed under the GPL, version 2 or later.

You can’t perform that action at this time.