Skip to content
forked from getjackd/jackd

Modern, simple beanstalkd client for Node.js

License

Notifications You must be signed in to change notification settings

AQUAOSOTech/jackd

 
 

Repository files navigation

jackd

Build Status

/* producer.js */
const Jackd = require('jackd')
const beanstalkd = new Jackd()

await beanstalkd.put('Hello!')

/* consumer.js */
const Jackd = require('jackd')
const beanstalkd = new Jackd()

const job = await beanstalkd.reserve() // => { id: '1', payload: 'Hello!' }

// ...process the job... then:
await beanstalkd.delete(job.id)

Why

Most beanstalkd clients don't support promises (fivebeans, nodestalker) and the ones that do have too many dependencies (node-beanstalkd-client). This package has:

  • A concise and easy to use API
  • Native promise support
  • No dependencies

API

The author of beanstalkd has a good write-up of the beanstalkd protocol, which makes it incredibly easy to development against. If you're unsure how beanstalkd works, it may be worth reading the specs before the API docs.

Overview

beanstalkd is a simple and blazing fast work queue. Producers connected through TCP sockets (by default on port 11300) send in jobs to be processed at a later time by a consumer.

Connecting and disconnecting

const Jackd = require('jackd')
const beanstalkd = new Jackd()

await beanstalkd.connect() // Connects to localhost:11300
await beanstalkd.connect({ host, port })

await beanstalkd.disconnect() // You can also use beanstalkd.close; it's an alias

Producers

Adding jobs into a tube

Jobs are simply payloads with a job ID attached. All payloads are ASCII encoded strings. Please keep this in mind if you need to send in special UTF-8 characters in your payloads.

jackd will automatically convert an Object into a String for you using JSON.stringify.

const jobId = await beanstalkd.put({ foo: 'bar' })

You can also just pass in a String.

const jobId = await beanstalkd.put('my long running job')

All jobs sent to beanstalkd have a priority, a delay, and TTR (time-to-run) specification. By default, all jobs are published with 0 priority, 0 delay, and 60 TTR, which means consumers will have 60 seconds to finish the job after reservation. You can override these defaults:

await beanstalkd.put(
  { foo: 'bar' },
  {
    delay: 2, // Two second delay
    priority: new Date().getTime(),
    ttr: 600 // Ten minute delay
  }
)

Jobs with lower priorities are handled first. Refer to the protocol specs for more information on job options.

Using different tubes

All jobs are added to the default tube by default. You can change the tube you want to produce a job in by using use.

const tubeName = await beanstalkd.use('awesome-tube') // => 'awesome-tube'
await beanstalkd.put({ foo: 'bar' })

Consumers

Reserving a job

Consumers work by reserving jobs in a tube. Reserving is a blocking operation and execution will stop until a job has been reserved.

const { id, payload } = await beanstalkd.reserve() // wait until job incoming
console.log({ id, payload }) // => { id: '1', payload: '{"foo":"bar"}' }

jackd will return the payload as-is. This means you'll have to do JSON.parse yourself if you passed in an Object.

Performing job operations (delete/bury/touch/release)

Once you've reserved a job, there are several operations you can perform on it. The most common operation will be deleting the job after the consumer is finished processing it.

await beanstalkd.delete(id)

Consumers can also give up their reservation by releasing the job. You'll usually want to release the job if an error occurred on the consumer and you want to put it back in the queue immediately.

// Release immediately with high priority (0) and no delay (0)
await beanstalkd.release(id)

// You can also specify the priority and the delay
await beanstalkd.release(id,
  priority: 10
  delay: 10
})

However, you may want to bury the job to be processed later under certain conditions, such as a recurring error or a job that can't be processed. Buried jobs will not be processed until they are kicked.

await beanstalkd.bury(id)
// ... some time later ...
await beanstalkd.kickJob(id)

You'll notice that the kick operation is suffixed by Job. This is because there is a kick command in beanstalkd which will kick a certain number of jobs back into the tube.

await beanstalkd.kick(10) // 10 buried jobs will be moved to a ready state

Consumers will sometimes need additional time to run jobs. You can touch those jobs to let beanstalkd know you're still processing them.

await beanstalkd.touch(id)

Watching on multiple tubes

By default, all consumers will watch the default tube only. So naturally consumers can elect what tubes they want to watch.

const numberOfTubesWatched = await beanstalkd.watch('my-special-tube')
// => 2

If a consumer is watching a tube and it no longer needs it, you can choose to ignore that tube as well.

const numberOfTubesWatched = await beanstalkd.ignore('default')
// => 1

Please keep in mind that attempting to ignore the only tube being watched will return an error.

You can also bring back the current tubes watched using list-tubes-watched. However, there is no first-class support for this command because it returns YAML. This will be discussed in the next section.

Executing arbitrary commands

jackd only has first-class support for commands that do not return YAML. This was an intentional design decision to allow the developer using jackd the flexibility to specify what YAML parser they want to use.

To execute commands that return YAML, jackd exposes the executeMultiPartCommand function:

const stats = await beanstalkd.executeMultiPartCommand('stats\r\n')
/* =>
---
current-jobs-urgent: 0
current-jobs-ready: 0
current-jobs-reserved: 0
current-jobs-delayed: 0
current-jobs-buried: 0
*/

You can then pipe this result through a YAML parser to get the actual contents of the YAML file.

const YAML = require('yaml')
const stats = await beanstalkd.executeMultiPartCommand('stats\r\n')
const { 'total-jobs': totalJobs } = YAML.parse(stats)
console.log(totalJobs)
// => 0

There is also an executeCommand method which will allow you to execute arbitary commands on beanstalkd. Please keep in mind that support for this use-case is limited.

Worker pattern

You may be looking to design a process that does nothing else but consume jobs. You can accomplish this with one jackd client using async/await. Here's an example implementation.

/* consumer.js */
const Jackd = require('jackd')
const beanstalkd = new Jackd()

beanstalkd.connect()

while (true) {
  try {
    const { id, payload } = await beanstalkd.reserve()
    /* ... process job here ... */
    await beanstalkd.delete(id)
  } catch (err) {
    // Log error somehow
    console.error(err)
  }
}

If you need to both publish and consume messages within the same Node.js process you may find it useful to create two connections to beanstalkd using jackd. While performing beanstalkd push/pull operations are supported on a single client, opening two clients will give you the following benefits:

  • Likelihood of asynchronicity bugs is diminished (beanstalkd is synchronous in nature while Node.js is not)
  • Data will flow in one direction per client. One client will only put, the other will only reserve/delete.
  • Similarly, you'll have less tube confusion as you'll only need to use on one client and watch/ignore on the other.

License

MIT

About

Modern, simple beanstalkd client for Node.js

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • JavaScript 98.5%
  • Dockerfile 1.5%