BSW is a Node.js framework for beanstalkd workers
Table of Contents
- Requirements
- Quick Start
- Handler Function
- BSW Consumer class
- BSW Producer class
- Full example
- Contributors
Node.js v10 or above is required.
Updated dependencies and dropped node 8 support.
Consumer
const {Consumer} = require('bsw');
(async () => {
const consumer = new Consumer({
host: '127.0.0.1',
port: 27017,
tube: 'example',
handler: async function (payload, job_info) {
console.log('processing job: ', payload);
return 'success';
}
});
// handling errors
consumer.on('error', (e) => {
console.log('error:', e);
});
await consumer.start();
})();
Producer
const {Producer} = require('bsw');
(async () => {
const producer = new Producer({
host: '127.0.0.1',
port: 27017,
tube: 'example'
});
// handling errors
consumer.on('error', (e) => {
console.log('error:', e);
});
await producer.start();
await producer.putJob({
payload: JSON.stringify({throw: true, result: 'success'}),
priority: 0,
delay: 0,
ttr: 60
});
producer.stop();
})();
In v2, handler must be an async function(https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function)
handler
interface:
async handler(payload, job_info)
- payload: beanstalk job payload string or object (if job is a valid JSON)
- job_info: job details
- job_info.id: job id
- job_info.tube: job tube
handler
definition examples:
async handler(payload, job_info) {
console.log('Hello world!');
}
After job reservation, handler
would be called. Each job must get one of the following status after processing:
- success: job processed succesfully and must be deleted from beanstalkd
- bury: job processing failed, it must be marked as buried in beanstalkd
- release + delay: job must be reserved again after delay
To report the job status, it must be returned or thrown from handler function:
// delete job
return 'success';
throw 'success';
// bury job
return 'bury';
throw 'bury';
// reput job without delay
return 'release';
throw 'release';
// reput job with 10s delay
return ['release', 10];
throw ['release', 10];
Default statuses:
- success if
handler
returned with unknown keyword - bury if
handler
thrown with unknown keyword
For example:
async handler(payload, job_info) {
try {
await mayThrow();
} catch (e) {
return 'bury'
}
return 'success';
}
equals to
async handler(payload, job_info) {
await mayThrow();
}
You may add an optional post processing of jobs, to do this add final
function to the handler with the following interface:
NOTE: post processing apply after job status was sent to beanstalkd
async final(status, delay, result)
- status: job status (
success
,release
orbury
) - delay: job delay in seconds for
release
ornull
- result: a value returned/thowrn from
handler
BSW Consumer class is used to connect to beanstalkd server and subscribe to a tube
The Consumer constructor takes configuration object:
- host: beanstalkd host (default:
'127.0.0.1'
) - port: beanstalkd port (default:
11300
) - tube: beanstalkd tube (default:
'default'
) - enable_logging: enable logging to console (default value
false
) - reserve_timeout: timeout value(in seconds) of job reservation (default value
30
) - max_processing_jobs: max number of simultaneous jobs reserved (default:
1
) - auto_reconnect: flag for reconnection behavior when connection is accidentally closed, which means it's not closed by client side and fivebeans will fire a
close
event (default valuefalse
) - handler: handler async function (mandatory, MUST be an async function)
- final: final async function (optional, MUST be an async function)
Start the worker.
- NOTE async function can be called directly, or called inside another async function with
await
key word. - If call
consumer.start()
directly, it will return immediately and process the actual start action asynchonously - If call
await consumer.start()
inside an async function, it will wait until the start process finishes and then process the code at the back
Example:
const consumer = new Consumer({
host: '127.0.0.1',
port: 27017,
tube: 'example',
handler: async function (payload, job_info) {
console.log('processing job: ', payload);
return 'success';
}
});
// could be called directly without await
consumer.start();
// this line will be immediately called because start() is async function
console.log('do something');
// or could be called inside async function context
(async () => {
await consumer.start();
// this line will be called after start() returns
console.log('do something');
})();
Stop the consumer immediately, and any processing jobs won't report to beanstalk. Example
consumer.stop();
Stop the consumer in a more graceful way. Will wait until all the processing jobs are done and reported to beanstalk, or wait for a user-specific timeout value.
Example
// stop the consumer gracefully within 3s
await consumer.stopGracefully(3000);
BSW Producer class is used to connect to beanstalkd server and put jobs to a tube
The Producer constructor takes configuration object:
- host: beanstalkd host (default:
'127.0.0.1'
) - port: beanstalkd port (default:
11300
) - tube: beanstalkd tube (default:
'default'
) - enable_logging: enable logging to console (default value
false
)
Same as Consumer class.
Same as Consumer class.
Put jobs to the tube. Receives an job
object which has the following attributes:
- payload: job payload, type is String
- priority: job priority, 0 is highest, type is Integer
- delay: time(in seconds) for a job to transfer from Delayed state to Ready state, type is Integer
- ttr: time(in seconds) for a reserved job to become Ready state, type is Integer
Example:
await producer.putJob({
payload: JSON.stringify({key: 'value'}),
priority: 0,
delay: 0,
ttr: 60
});
Find the full example in example
directory:
To run, clone the project, then:
> npm install
(If you have `yarn` installed in your machine, we recommand you use `yarn install` instead)
> cd example
> node producer.js
> node consumer.js