An opinionated declarative wrapper around the node amqplib. We use this library to make it simpler for us to build event driven microservices that use RabbitMQ as a message broker.
npm install sugarmq --save
# or
yarn add sugarmq
This library is still under active development, we encourage you to try it and give us feedback in the issues. Below are some of the the things we intend to work on over the next couple of weeks;
- Topic Producer and Consumer
- Scheduled Producers (publish to a delayed messages exchange)
- Direct Queue Producer and Consumer
- Create Docker image for RabbitMQ bundled with plugins needed for library.
RabbitMQ allows multiple programs to receive messages about a topic on an exchange. It also allows multiple programs to publish messages about any topic to an exchange. In this section, we'll see how to consume and publish different topics to an exchange.
// consume.js
const {TopicConsumer} = require('sugarmq');
TopicConsumer.create()
.setExchange('x')
.setQueue('q')
.subscribe('lazy.#', (topic, message) => {
// Do interesting stuff to lazy.# messages here
// ...
console.log(topic, message);
})
.subscribe('*.*.rabbit', (topic, message) => {
// Do interesting stuff to *.*.rabbit messages here
// ...
console.log(topic, message);
})
.start('amqp://localhost');
The above code snippet, does five things;
- Ensures a topic exchange named "x" exists on our RabbitMQ exchange. It doesn't it will be created. If it does but does not match your declaration, an exception will be thrown and our code will fail.
- Ensures a queue named "q" exists on the instance. Again, if it doesn't it will be created, and if it exists with a different declaration, our code fails with an "blah blah" exception.
- Subscribes to all messages sent to exchange
x
whose topic starts withlazy
. - Subscribes to all messages sent to exchange
x
whose topic matches the expression*.*.rabbit
- Starts the topic consumer. In the background, this establishes an amqp connection, asserts the exchange and queue, and binds topics to queue. When a message is received from the queue for a given topic, the respective callback is called. It is important to note that only one callback is invoked even if more than one match the topic. It is therefore best to ensure you subscribe to mutually exclusive topic globs.
To run this snippet, copy and paste the code to file named "consume.js", and run it as a command;
node consume.js
To see logs from the library;
DEBUG=sugarmq node consume.js
const {TopicProducer} = require('sugarmq');
TopicProducer.create()
.setExchange('x')
.start('amqp://localhost')
.then(producer => {
producer.publish("quick.orange.rabbit", "the quick orange rabbit");
setTimeout(() => producer.stop(), 500);
});
In the above snippet, our script;
- Declares an exchange
x
- Starts the producer. In the background this establishes connection, asserts the exchange and returns our producer ready to publish messages). This is an "async" method, so we have to "await" the producer.
- Publishes to the
quick.orange.rabbit
topic on exchangex
and stops the producer (closes connection) after half a second.
To run this snippet, copy and paste the code to file named 'produce.js', and run it as a command;
node produce.js
To see logs from the library;
DEBUG=sugarmq node produce.js