A NodeJS persistent job and message queue based on Redis
JavaScript Lua
Latest commit 6f58c02 May 24, 2017 @manast manast 3.0.0-rc.1

README.md




The fastest, most reliable, Redis-based queue for Node.
Carefully written for rock solid stability and atomicity.


Sponsors · Features · UIs · Install · Quickstart · Documentation




Follow @manast for Bull news and updates!


Sponsors

Are you developing bull sponsored by a company? Please, let us now!


Features

  • Minimal CPU usage due to a polling-free design.
  • Robust design based on Redis.
  • Delayed jobs.
  • Schedule and repeat jobs according to a cron specification.
  • Retries.
  • Priority.
  • Concurrency.
  • Pause/resume—globally or locally.
  • Multiple job types per queue.
  • Automatic recovery from process crashes.

And coming up on the roadmap...

  • Job completion acknowledgement.
  • Rate limiter for jobs.
  • Parent-child jobs relationships.

UIs

There are a few third-party UIs that you can use for monitoring:


Feature Comparison

Since there are a few job queue solutions, here a table comparing them to help you use the one that better suits your needs.

Feature Bull Kue Bee Agenda
Backend redis redis redis mongo
Priorities
Concurrency
Delayed jobs
Pause/Resume
Repeatable jobs
Atomic ops
Persistence
Optimized for Jobs / Messages Jobs Messages Jobs

Install

npm install bull --save
yarn add bull

Requirements: Bull requires a Redis version greater than or equal to 2.8.11.


Quick Guide

var Queue = require('bull');

var videoQueue = new Queue('video transcoding', 'redis://127.0.0.1:6379');
var audioQueue = new Queue('audio transcoding', {redis: {port: 6379, host: '127.0.0.1'}}); // Specify Redis connection using object
var imageQueue = new Queue('image transcoding');
var pdfQueue = new Queue('pdf transcoding');

videoQueue.process(function(job, done){

  // job.data contains the custom data passed when the job was created
  // job.jobId contains id of this job.

  // transcode video asynchronously and report progress
  job.progress(42);

  // call done when finished
  done();

  // or give a error if error
  done(new Error('error transcoding'));

  // or pass it a result
  done(null, { framerate: 29.5 /* etc... */ });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error('some unexpected error');
});

audioQueue.process(function(job, done){
  // transcode audio asynchronously and report progress
  job.progress(42);

  // call done when finished
  done();

  // or give a error if error
  done(new Error('error transcoding'));

  // or pass it a result
  done(null, { samplerate: 48000 /* etc... */ });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error('some unexpected error');
});

imageQueue.process(function(job, done){
  // transcode image asynchronously and report progress
  job.progress(42);

  // call done when finished
  done();

  // or give a error if error
  done(new Error('error transcoding'));

  // or pass it a result
  done(null, { width: 1280, height: 720 /* etc... */ });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error('some unexpected error');
});

pdfQueue.process(function(job){
  // Processors can also return promises instead of using the done callback
  return pdfAsyncProcessor();
});

videoQueue.add({video: 'http://example.com/video1.mov'});
audioQueue.add({audio: 'http://example.com/audio1.mp3'});
imageQueue.add({image: 'http://example.com/image1.tiff'});

Alternatively, you can use return promises instead of using the done callback:

videoQueue.process(function(job){ // don't forget to remove the done callback!
  // Simply return a promise
  return fetchVideo(job.data.url).then(transcodeVideo);

  // Handles promise rejection
  return Promise.reject(new Error('error transcoding'));

  // Passes the value the promise is resolved with to the "completed" event
  return Promise.resolve({ framerate: 29.5 /* etc... */ });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error('some unexpected error');
  // same as
  return Promise.reject(new Error('some unexpected error'));
});

A job can be added to a queue and processed repeatedly according to a cron specification:

  paymentsQueue.process(function(job){
    // Check payments
  });

  // Repeat payment job once every day at 3:15 (am)
  paymentsQueue.add(paymentsData, {repeat: '15 3 * * *'});

A queue can be paused and resumed globally (pass true to pause processing for just this worker):

queue.pause().then(function(){
  // queue is paused now
});

queue.resume().then(function(){
  // queue is resumed now
})

A queue emits also some useful events, for example...

.on('completed', function(job, result){
  // Job completed with output result!
})

For more information on events, including the full list of events that are fired, check out the Events reference

Queues are cheap, so if you need many of them just create new ones with different names:

var userJohn = new Queue('john');
var userLisa = new Queue('lisa');
.
.
.

Queues are robust and can be run in parallel in several threads or processes without any risk of hazards or queue corruption. Check this simple example using cluster to parallelize jobs across processes:

var
  Queue = require('bull'),
  cluster = require('cluster');

var numWorkers = 8;
var queue = new Queue("test concurrent queue");

if(cluster.isMaster){
  for (var i = 0; i < numWorkers; i++) {
    cluster.fork();
  }

  cluster.on('online', function(worker) {
    // Lets create a few jobs for the queue workers
    for(var i=0; i<500; i++){
      queue.add({foo: 'bar'});
    };
  });

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  });
}else{
  queue.process(function(job, jobDone){
    console.log("Job done by worker", cluster.worker.id, job.jobId);
    jobDone();
  });
}

Documentation

For the full documentation, check out the reference and common patterns:

  • Reference — the full reference material for Bull.
  • Patterns — a set of examples for common patterns.
  • License — the Bull license—it's MIT.

If you see anything that could use more docs, please submit a pull request!


Important Notes

The queue aims for "at most once" working strategy. When a worker is processing a job it will keep the job "locked" so other workers can't process it.

It's important to understand how locking works to prevent your jobs from losing their lock - becoming stalled - and being restarted as a result. Locking is implemented internally by creating a lock for lockDuration on interval lockRenewTime (which is usually half lockDuration). If lockDuration elapses before the lock can be renewed, the job will be considered stalled and is automatically restarted; it will be double processed. This can happen when:

  1. The Node process running your job processor unexpectedly terminates.
  2. Your job processor was too CPU-intensive and stalled the Node event loop, and as a result, Bull couldn't renew the job lock (see #488 for how we might better detect this). You can fix this by breaking your job processor into smaller parts so that no single part can block the Node event loop. Alternatively, you can pass a larger value for the lockDuration setting (with the tradeoff being that it will take longer to recognize a real stalled job).

As such, you should always listen for the stalled event and log this to your error monitoring system, as this means your jobs are likely getting double-processed.

As a safeguard so problematic jobs won't get restarted indefinitely (e.g. if the job processor aways crashes its Node process), jobs will be recovered from a stalled state a maximum of maxStalledCount times (default: 1).