Skip to content


Subversion checkout URL

You can clone with
Download ZIP
Tree: b3309bedc7
Fetching contributors…

Cannot retrieve contributors at this time

executable file 304 lines (190 sloc) 9.818 kB

A job takes the form of

var Job = require('').Job;
var options = {}, methods = {};
exports.job = new Job(option, methods);

Or in CoffeeScript

nodeio = require('');
class Job extends nodeio.JobClass
@job = new Job(options);



Default: read lines from STDIN (auto-detects newline)


input: [0,1,2]                  //Array input 
input: '/path/file.txt'         //Reads lines from a file (auto-detects newline)
input: '/path/to/dir/'          //Reads all files in a directory
input: false                    //Runs the job once
input: true                     //Runs the job indefinitely

To input from a stream

input: function() {

To write your own input function, use

input: function(start, num, callback) {
  • start = the offset. Starts at zero
  • num = the number of rows / lines to return
  • callback = in the format callback(err, input)
  • When there's no input left, call callback(null, false)


Default: write lines to STDOUT

Note: output is called periodically rather than at the completion of a job so that very large or continuous IO can be handled

output: '/path/file.txt'    //Outputs lines to a file
output: false               //Ignores output

To output to a stream

output: function() {
    this.output.apply(this, arguments);

To write your own output function

output: function(output) {
    //Note: this method always receives an array
    output.forEach(function(line) {


Default: passes through input to the next stage of processing

Takes some input to use or transform.

run: function(line) {

The following methods are available in run to control flow

this.emit(result)               //Emits a result to the next stage of processing
this.skip()  OR  this.finish()  //Cancels the thread and discards the input
this.exit(msg)                  //Exits the job with an optional error message
this.retry()                    //Retries the thread
this.add(input)                 //Dynamically add input to the queue


Default: passes through input to the next stage of processing

Called before output. Use emit as normal

reduce: function(lines) {
    //Note: this method always receives an array

    var emit = [];
    lines.forEach(function(line) {
        if (line.indexOf('foo') > 0) emit.push(line);    


Called if a thread fails. A thread can fail if it times out, exceeds the maximum number of retries, makes a bad request, spawns a bad command, etc.

fail: function(input, status) {
    console.log(input+' failed with status: '+status);


Called once the job is complete

complete: function() {
    console.log('Job complete.');


max (default: 1)

The maximum number of run methods allowed to run concurrently, per process

take (default: 1)

How many lines / elements of input to send to each run method

Example when take = 2

input: [0,1,2,3,4],
run: function(input) {
    console.log(input);  //Outputs [0,1] \n [2,3] \n [4] \n

retries (default: 2)

The maximum number of times some input can be retried before fail is called

timeout (default: false)

The maximum amount of time (in seconds) each thread has before fail is called

global_timeout (default: false)

The maximum amount of time (in seconds) the entire job has to complete before exiting with an error

flatten (default: true)

If run emits an array, this option determines whether each emitted array is flattened before being output

Example (when max = 3)

run: function() {
output: function(output) {
    //With flattening, outputs [1,2,3,1,2,3,1,2,3] 
    //Without, outputs [[1,2,3],[1,2,3],[1,2,3]]

benchmark (default: false)

If this is true, outputs benchmark information on a job's completion: 1) completion time, 2) bytes read + speed, 3) bytes written + speed

fork (default: false)

Whether to use child processes to distribute processing. Set this to the number of desired workers

input (default: false)

This option is used to set a limit on how many lines/rows/elements are input before forcing job complete

Example when input = 2

input: [0,1,2]
run: function(num) {
    console.log(num); //Outputs 0 \n 1 \n

recurse (default: false)

If input is a directory, this option is used to recurse through each subdirectory

read_buffer (default: 8096)

The read buffer to use when reading files

newline (default: \n)

The char to use as newline when outputting data. Input newlines are automatically detected

encoding (default: 'utf8')

The encoding to use when reading / writing files

args (default: [])

Any extra arguments when calling from the command line are stored here.


$ myjob arg1 arg2
    => options.args = ['arg1','arg2']

Working with IO

To read or write to a file inside a job, use the following methods. Both methods are synchronous if no callback is provided, [callback]);
this.write(file, data, [callback]); 

Making requests

To make a request, use the following methods.

this.get(url, [headers], callback, [parse]) headers and parse are optional

Makes a GET request to the URL and returns the result - callback takes err, data, headers

parse is an optional callback used to decode / pre-parse the returned data


this.get('', function(err, data, headers) {

this.getHtml(url, [headers], callback, [parse])

The same as above, except callback takes err, $, data, headers where $ is the dom selector / traversal object (see DOM selection / traversal below), body, [headers], callback, [parse])

**this.postHtml(url, body, [headers], callback, [parse])*

Makes a POST request. If body is an object, it is encoded using the builtin querystring module. postHtml returns the $ object.

this.doRequest(method, url, body, [headers], callback, [parse])

Makes general a request with the specified options.

Making proxied requests

Documentation coming soon. For now, see ./lib/

DOM selection / traversal

getHtml and postHtml return a special object $ that wraps node-soupselect and provides methods to aid in traversing the returned DOM.

$(selector) returns an element or collection of elements.

Some example selectors (see node-soupselect for more)

 $('a')                         //Select all A tags
 $('')                     //Select all A tags of the class 'foo'
 $('')                 //Select all A tags of the class 'foo' and the class 'bar'
 $('#container')                //Select the element with id = 'container'
 $('p a')                       //Select all A tags that have a parent P tag
 $('input[type=text]')          //Select all text inputs

Working with a collection of elements

$('a').first()                  //Returns the first A tag
$('a').last()                   //Returns the last A tag
$('a').each(callback)           //Calls `callback` with each A tag
$('a').each(attrib, callback)   //Calls `callback` with an attribute of each A tag, e.g. $('a).each('href', function(href){});

Working with an element

<a href="#">Hello <b>World!</b></a>

$('a').text                     //Outputs the text inside the tag
    // => outputs 'Hello'

$('a').fulltext                 //Outputs the text inside the tag including the text inside of each nested tag
    // => outputs 'Hello World!'

    // => #

Note: more select / traversal methods are coming soon

Executing commands

To execute a command, use the following methods. Callback takes the format of (err, stdout, stderr)

this.exec(cmd, callback);
this.spawn(cmd, stdin, callback);       //Same as exec, but can write to the commands STDIN

Data validation and sanitization uses node-validator to provide data validation and sanitization methods. Validation methods are available through this.assert while sanitization / filtering methods are available through this.filter.

this.assert('abc').isInt();                         //Fails - is called
this.assert('').len(3,64).isEmail();     //Ok
this.assert('abcdef').is(/^[a-z]+$/);               //Ok
var num = this.filter('00000001').ltrim(0).toInt(); //num = 1
var str = this.filter('&lt;a&gt;').entityDecode();  //str = '<a>'

The full list of validation / sanitization methods is available here.

Jump to Line
Something went wrong with that request. Please try again.