npm install tortoise
A client library for interacting with AMQP.
- Basic Example
- Basic Setup
- Advanced Setup
- Publishing to a queue
- Publishing to an exchange
- Subscribing to a queue
- Accessing message data
- Handling Errors
- Auto retrying and throttling
- Automatic setup of dead letter exchange and queue
- Configuring without the need to subscribe or publish
- Handling connection or channel closure
var Tortoise = require('tortoise')
, tortoise = new Tortoise('amqp://localhost');
tortoise
.queue('my-queue')
.prefetch(1)
.subscribe(function(msg, ack) {
console.log(msg);
ack();
});
setInterval(function() {
tortoise
.queue('my-queue')
.publish({ Hello: 'World' });
}, 1000);
var Tortoise = require('tortoise');
var tortoise = new Tortoise('amqp://localhost');
var Tortoise = require('tortoise');
var options = {
connectRetries: -1,
connectRetryInterval: 1000
};
var tortoise = new Tortoise('amqp://localhost', options);
options
is optional. Current options are:
connectRetries
:Number
value greater than or equal to-1
. Defaults to0
. Tortoise will attempt to connect up to this number. When set to-1
, tortoise will attempt to connect forever. Note: This does not handle connections that have already been established and were lost.connectRetryInterval
:Number
value greater than or equal to0
. Defaults to1000
. This is the amount of time, inms
, that tortoise will wait before attempting to connect again.
tortoise
.queue('my-queue', { durable:false })
.publish({ Hello: 'World' });
tortoise
.exchange('my-exchange', 'direct', { durable:false })
.publish('routing.key', { Hello: 'World' });
tortoise
.queue('my-queue', { durable: false })
.prefetch(1)
.subscribe(function(msg, ack, nack) {
// Handle
ack(); // or nack();
});
tortoise
.queue('my-queue', { durable: false })
// Add as many bindings as needed
.exchange('my-exchange', 'direct', 'routing.key', { durable: false })
.prefetch(1)
.subscribe(function(msg, ack, nack) {
// Handle
ack(); // or nack();
});
There is an optional function setting that will automatically attempt to parse messages from JSON (using JSON.parse
) and if invalid, will nack(requeue=false)
the message. To capture this event each time it occurs, you can subscribe to your tortoise instance for event Tortoise.ERRORS.PARSE
:
var Tortoise = require('tortoise');
var tortoise = new Tortoise('amqp://localhost');
tortoise
.queue('my-queue', { durable: false })
.prefetch(1)
.json()
.subscribe(function(msg, ack, nack) {
// Will be called if the msg content is valid JSON and can be parsed
ack(); // or nack();
});
tortoise.on(Tortoise.ERRORS.PARSE, function(err, msg) {
// err is the error
// msg is the message object returned from AMQP.
// msg.content is the Buffer of the message
console.log('An error occurred parsing the msg content');
});
The callback function provided to the subscribe
method will be scoped to the message, i.e. the this
object will contain the properties of the message. The object would look similar to this:
{
fields: {
deliveryTag: <int>,
redelivered: <bool>,
routingKey: <string>,
...
},
properties: {
contentType: <string>,
headers: {
...
},
...
}
}
So, if I wanted to access the routingKey
that was provided, I would access it by:
tortoise
.queue('my-queue', { durable: false })
.exchange('my-exchange', 'topic', 'event.*', { durable: false })
.subscribe(function(msg, ack, nack) {
var routingKey = this.fields.routingKey;
// Handle
ack(); // or nack();
});
This is useful if you subcribe to wildcard topics on an exchange but wanted to know what the actual topic (routingKey
) was.
Tortoise will emit events when certain things occur. The following events are emitted:
{
PARSE: 'TORTOISE.PARSEERROR'
}
These error strings are accessed by the ERRORS
property on the Tortoise
library, and can be subscribed to on an individual tortoise instance. Here is an example of being notified when a parse error occurred:
var Tortoise = require('tortoise');
var tortoise = new Tortoise('amqp://localhost');
// Do your tortoise configuration
tortoise.on(Tortoise.ERRORS.PARSE, function() {
// Called on parse error
});
There are a few methods available for controlling continuous failures, all are optional. failSpan
and retryTimeout
do nothing if failThreshold
is not set
default behavior (not setting) of failThreshold
is no failure handling
var Tortoise = require('tortoise')
, tortoise = new Tortoise('amqp://localhost');
tortoise
.queue('simple-queue', { durable: true })
.failThreshold(3) // 3 immediate attempts
.failSpan(1000 * 60 * 10) // 10 minutes, defaults to 1 minute
.retryTimeout(1000 * 10) // 10 second timeout on each retry, defaults to 5 seconds
.subscribe(function(msg, ack, nack) {
console.log(msg);
nack();
});
If you wanted to setup your (subscribe) queue to automatically set a dead letter exchange:
var Tortoise = require('tortoise')
, tortoise = new Tortoise('amqp://localhost');
tortoise
.queue('simple-queue')
.dead('exchange.dead', 'queue.dead')
.subscribe(function(msg, ack, nack) {
// Do not requeue, instead shove to dead letter exchange
nack(false);
});
Declaring the queue to bind to the exchange is optional. It is perfectly acceptable to setup like this:
var Tortoise = require('tortoise')
, tortoise = new Tortoise('amqp://localhost');
tortoise
.queue('simple-queue')
.dead('exchange.dead')
.subscribe(function(msg, ack, nack) {
// Do not requeue, instead shove to dead letter exchange
nack(false);
});
The .setup
method will call all asserts and bindings then close the channel
tortoise
.queue('myQueue')
.exchange('myExchange', 'topic', '#')
.dead('myDeadExchange')
.setup();
tortoise
.exchange('myExchange', 'topic')
.setup();
When subscribing, the promise returned from .subscribe()
resolves with a channel object that can be listened on.
The following is an example of listening for close
events and resubscribing.
// Wrap subscription inside function
var subscribe = function() {
tortoise
.queue('myQueue')
.subscribe(function(msg, ack, nack) {
ack();
})
.then(function(ch) {
// Once connection is closed, immediately attempt to subscribe again
ch.on('close', subscribe);
})
}
// Start subscribing
subscribe();