Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

0.2.0 release

  • Loading branch information...
commit 4df5c5a67993c2d47c6630cdcf584392aa026e9c 1 parent 7ada77a
@chriso authored
View
11 HISTORY.md
@@ -1,3 +1,14 @@
+### v0.2.0-1
+ * Added new DOM element getters - innerHTML, rawtext and striptags
+ * Added the ability to specify a custom $ context - $(select, [context])
+ * Added odd() and even() traversal methods
+ * Added has() (see: api.jquery.com/has/)
+ * Added job.parseValues() and job.writeValues() to simplify reading & writing separated values (e.g. CSV / TSV)
+ * Major refactoring
+ * Improved commenting and internal documentation
+ * Speed improvements
+ * Added Makefile (test / test-cov)
+
### v0.1.1-17
* Fixed incorrect handling of large streams
* Better support for request timeouts
View
9 Makefile
@@ -0,0 +1,9 @@
+PREFIX ?= /usr/local
+
+test: $(BIN)
+ @expresso -I lib --growl $(TEST_FLAGS) test/*.test.js
+
+test-cov:
+ @expresso -I lib --cov $(TEST_FLAGS) test/*.test.js
+
+.PHONY: test test-cov
View
33 README.md
@@ -4,27 +4,27 @@ node.io is a data scraping and processing framework for [Node.js](http://nodejs.
node.io can streamline the process of:
-- Parsing / filtering / sanitizing large amounts of data
+- Parsing, filtering or sanitizing large amounts of data
- Scraping data from the web using familiar CSS selectors and traversal methods
- Map Reduce
-- Transforming data from one format to another, e.g. from CSV => a database
+- Transferring data, e.g. CSV => a database
- Distributing work across multiple processes, and multiple servers (soon)
-- Recursively traversing a directory and using each file as input
+- Working with continuous data & streams
## Why node.io?
-- Create modular and extensible jobs for scraping and processing data
+- Create modular and extendable jobs for scraping and processing data
- Jobs are written in Javascript or Coffeescript and run in Node.js - jobs are concise, asynchronous and _FAST_
- Seamlessly speed up execution by distributing work among child processes and other servers (soon)
-- Easily handle a variety of input / output situations - node.io does the heavy lifting
+- Easily handle a variety of input output situations - node.io does the heavy lifting
* Reading / writing lines to and from files
- * Traversing files in a directory
* Reading / writing rows to and from a database
- * STDIN / STDOUT / Custom streams
- * Piping data between multiple node.io jobs
- * Continuous input / output
- * Any combination of the above, or your write your own IO methods
-- Includes a robust framework for scraping, selecting and traversing web data
+ * Traversing files in a directory
+ * STDIN / STDOUT / custom streams
+ * Piping data to other processes or node.io jobs
+ * Continuous IO
+ * Any combination of the above, or write your own IO methods
+- Includes a robust framework for scraping, selecting and traversing data from the web
- Support for a variety of proxies when scraping web data
- Includes a data validation and sanitization framework
- Provides support for retries, timeouts, dynamically adding input, etc.
@@ -52,22 +52,23 @@ Check [@nodeio](http://twitter.com/nodeio) or [http://node.io/](http://node.io/)
## Roadmap
- Fix up the [http://node.io/](http://node.io/) site
-- `-d,--daemon` node.io switch
- Nested requests inherit referrer / cookies / user-agent if to the same domain
+- `-d,--daemon` node.io switch
- Add more DOM [selector](http://api.jquery.com/category/selectors/) / [traversal](http://api.jquery.com/category/traversing/) methods
- ..or attempt a full port of jQuery that's compatible with [htmlparser](https://github.com/tautologistics/node-htmlparser) (I know a port already exists, but it uses the far less forgiving [JSDOM](https://github.com/tmpvar/jsdom))
-- Test proxy callbacks and write proxy documentation
+- Test various proxies and write the proxy documentation
- Add distributed processing
- Installation without NPM (install.sh)
-- Refactoring
-- More tests / better test coverage
+- More tests / better coverage
- Speed improvements
+[history.md](https://github.com/chriso/node.io/blob/master/HISTORY.md) lists recent changes.
+
## Contributing
If you find a bug, please report the issue [here](https://github.com/chriso/node.io/issues).
-If you want to contribute / help with the Roadmap / add more tests, please [fork/pull](https://github.com/chriso/node.io/fork).
+If you want to contribute, please [fork/pull](https://github.com/chriso/node.io/fork).
## Credits
View
36 examples/resolve.js
@@ -10,6 +10,9 @@
// 3. To return domains that do resolve:
// $ cat domains.txt | node.io resolve found
//
+// 3. To return just the IPs:
+// $ cat domains.txt | node.io -s resolve ips | sort | uniq
+//
// To output the results to a file, use either:
// $ cat domains.txt | node.io -s resolve > result.txt
// $ node.io -i domains.txt -o result.txt resolve
@@ -23,25 +26,21 @@ var options = {
}
var methods = {
-
run: function(domain) {
- var self = this, type = this.options.args.length ? this.options.args[0] : 'default';
+ var self = this, type = this.options.arg1;
dns.lookup(domain, 4, function(err, ip) {
+
+ //The domain didn't resolve
if (err) {
- //The domain didn't resolve
switch(err.errno) {
- case 4: case 8: // == notfound
- if (type === 'notfound') {
- self.emit(domain);
- } else if (type === 'found') {
- self.skip();
- } else {
- self.emit(domain + ',failed');
- }
- break;
- default: self.retry();
+ case 4: case 8: // == notfound
+ self.fail(domain);
+ break;
+ default:
+ self.retry();
+ break;
}
} else {
@@ -51,6 +50,8 @@ var methods = {
self.skip();
} else if (type === 'found') {
self.emit(domain);
+ } else if (type === 'ips') {
+ self.emit(ip);
} else {
self.emit(domain + ',' + ip);
}
@@ -60,18 +61,17 @@ var methods = {
},
fail: function(domain, status) {
- var type = this.options.args;
-
+ var type = this.options.arg1;
+
//The domain either timed out or exceeded the max number of retries
if (type === 'notfound') {
this.emit(domain);
- } else if (type === 'found') {
+ } else if (type === 'found' || type === 'ips') {
this.skip();
} else {
this.emit(domain + ',failed');
}
- this.emit(domain+',failed');
- }
+ }
}
//Export the job
View
28 examples/word_count.js
@@ -4,36 +4,34 @@
var Job = require('node.io').Job;
//Take 30 lines at a time
-var options = {max:30, take:30};
+var options = {max:10, take:10};
var word_count = {};
var methods = {
run: function(lines) {
- var self = this, words = [];
- lines.forEach(function(line) {
- line.toLowerCase().replace(/[^a-z0-9\s]+/g, '').split(' ').forEach(function(word) {
- words.push(word);
- });
- });
+ var self = this, words = [], line, i, l, j, k;
+ for (i = 0, l = lines.length; i < l; i++) {
+ line = lines[i].split(' ');
+ //.toLowerCase().replace(/[^a-z0-9\s]+/g, '')
+ for (j = 0, k = line.length; j < k; j++) {
+ words.push(line[j]);
+ }
+ }
this.emit(words);
},
reduce: function(words) {
- words.forEach(function(word) {
- if (typeof word_count[word] === 'undefined') {
- word_count[word] = 1;
- } else {
- word_count[word]++;
- }
- });
+ for (var i = 0, l = words.length; i < l; i++) {
+ word_count[words[i]] = typeof word_count[words[i]] === 'undefined' ? 1 : word_count[words[i]] + 1;
+ };
},
complete: function() {
var out = [];
for (var word in word_count) {
- out.push(word + ',' + word_count[word]);
+ out.push(word_count[word] + ' ' + word);
}
//Now that we have the full list of words, output
this.output(out);
View
294 lib/node.io/cli.js
@@ -10,20 +10,8 @@ var fs = require('fs'),
path = require('path'),
utils = require('./utils');
-var addPath = function (file) {
- if (file.indexOf('/') === -1) {
- file = cwd + '/' + file;
- }
- return file;
-};
-
-var exit = function (msg) {
- console.log(msg);
- process.exit(1);
-};
-
-var error = function (msg) {
- utils.status.error(msg);
+var exit = function (msg, is_error) {
+ utils.status[is_error ? 'error' : 'info'](msg);
process.exit(1);
};
@@ -40,13 +28,21 @@ var usage = ''
+ ' -f, --fork [NUM] Fork NUM workers. If NUM isn\'t specified, a\n'
+ ' process is spawned for each CPU core\n'
+ ' -e, --eval [EXP] Evaluate an expression on each line of input\n'
- + ' e.g. "input.replace("\t", ",")"\n'
+ + ' e.g. "input.replace(\'\\t\', \',\')"\n'
+ ' -b, --benchmark Benchmark the operation\n'
+ ' -g, --debug Debug the operation\n'
+ ' -v, --version Display the current version\n'
+ ' -h, --help Display help information\n'
;
+/**
+ * exports.cli
+ *
+ * Start node.io with the specified arguments.
+ *
+ * @param {Array} args
+ * @api public
+ */
exports.cli = function (args) {
var job_path, job_modified = false,
@@ -56,19 +52,19 @@ exports.cli = function (args) {
if (!args.length) {
exit(usage);
}
-
+
while (args.length) {
arg = args.shift();
switch (arg) {
case '-i':
case '--input':
job_modified = true;
- input = addPath(args.shift());
+ input = args.shift();
break;
case '-o':
case '--output':
job_modified = true;
- output = addPath(args.shift());
+ output = args.shift();
break;
case '-s':
case '--silent':
@@ -84,14 +80,14 @@ exports.cli = function (args) {
break;
case '-t':
case '--timeout':
- options.timeout = args.shift();
+ options.global_timeout = args.shift();
break;
case '-f':
case '--fork':
if (args.length && args[0].match(/^[0-9]+$/)) {
- options.workers = args.shift();
+ options.fork = args.shift();
} else {
- options.workers = true;
+ options.fork = true;
}
fork = true;
job_modified = true;
@@ -120,161 +116,155 @@ exports.cli = function (args) {
}
}
+ var isMaster = !process.env._CHILD_ID_;
+
var start_processor = function (job_path) {
processor.start(job_path, options);
};
+
+ if (!job_modified) {
- //We apply the command line switches by creating a temporary job that extends the specified job
- if (job_modified) {
-
- //Only continue if we're the master process
- if (!process.env._CHILD_ID_) {
-
- var is_coffee = false;
-
- var create_temp_job = function (job_path, callback) {
+ start_processor(job_path);
+
+ } else {
- if (job_path) {
+ //We apply the command line switches by creating a temporary job that extends the specified job
- var temp_filename = cwd + '/temp_' + path.basename(job_path, '.js') + '.js',
- this_job = (job_path.indexOf('/') >= 0 ? job_path : './' + job_path)
+ var is_coffee = false;
+
+ var create_temp_job = function (job_path, callback) {
+
+ if (job_path) {
+
+ var temp_filename = cwd + '/temp_' + path.basename(job_path, '.js') + '.js',
+ this_job = (job_path.indexOf('/') >= 0 ? job_path : './' + job_path)
- var temp_job;
+ //If we're not the master, the file has already been created
+ if (!isMaster) {
+ start_processor(temp_filename);
+ return;
+ }
- if (!is_coffee) {
-
- temp_job = 'var job = require("' + this_job + '").job;\n\n' +
- 'exports.job = job.extend({';
+ var temp_job;
+
+ if (!is_coffee) {
- } else {
-
- //Compatability with CoffeeScript inheritance
- temp_job = 'var JobClass = require("' + this_job + '")[\'class\'];\n\n';
- temp_job += 'if (typeof JobClass === "undefined") { console.log("Please export @class when using CoffeeScript. See the documentation for more information");process.exit(1);}';
- temp_job += 'var __hasProp = Object.prototype.hasOwnProperty, __extends = function(child, parent) {\n';
- temp_job += 'for (var key in parent) { if (__hasProp.call(parent, key)) child[key] = parent[key]; }\n';
- temp_job += 'function ctor() { this.constructor = child; }\n';
- temp_job += 'ctor.prototype = parent.prototype; child.prototype = new ctor;\n';
- temp_job += 'child.__super__ = parent.prototype; return child; }\n';
- temp_job += 'function Job() { Job.__super__.constructor.apply(this, arguments); }\n';
- temp_job += '__extends(Job, JobClass);\n';
- temp_job += 'exports.job = new Job({';
- }
-
- } else if (eval) {
-
- //We don't necessarily need a job if the eval switch is used
- var temp_filename = cwd + '/temp_job.js',
- temp_job = 'var Job = require("node.io").Job;\n\n' +
- 'exports.job = new Job({';
+ temp_job = 'var job = require("' + this_job + '").job;\n\n' +
+ 'exports.job = job.extend({';
} else {
- exit('No job specified');
-
- }
-
- if (fork) {
- temp_job += 'fork:true,';
- }
-
- if (job_args.length) {
- temp_job += 'args: ' + JSON.stringify(job_args) + ',';
- }
-
- temp_job += '}, {\n';
-
- if (input) {
- temp_job += '\tinput: "' + (input.indexOf('/') >= 0 ? input : cwd + '/' + input) + '",\n';
+ //Compatability with CoffeeScript inheritance
+ temp_job = 'var JobClass = require("' + this_job + '")[\'class\'];\n\n';
+ temp_job += 'if (typeof JobClass === "undefined") { console.log("Please export @class when using CoffeeScript. See the documentation for more information");process.exit(1);}';
+ temp_job += 'var __hasProp = Object.prototype.hasOwnProperty, __extends = function(child, parent) {\n';
+ temp_job += 'for (var key in parent) { if (__hasProp.call(parent, key)) child[key] = parent[key]; }\n';
+ temp_job += 'function ctor() { this.constructor = child; }\n';
+ temp_job += 'ctor.prototype = parent.prototype; child.prototype = new ctor;\n';
+ temp_job += 'child.__super__ = parent.prototype; return child; }\n';
+ temp_job += 'function Job() { Job.__super__.constructor.apply(this, arguments); }\n';
+ temp_job += '__extends(Job, JobClass);\n';
+ temp_job += 'exports.job = new Job({';
}
- //Evaluate an expression on each line of input (return eval(expression))
- //E.g. "input" => passes through input
- // "null" OR "undefined" => skips input
- // "input.length > 3 ? input : null" => skips lines where length <= 3
- // "input.replace('\t', ',')" => converts TSV (tab separated values) to CSV
-
- if (eval) {
- temp_job += '\trun: function (input) {\n\t\t';
- temp_job += 'var result = (function(input,expression){return eval(expression);}(input,"'+eval.replace('"', '\"')+'"));\n\t\t';
- temp_job += 'return typeof result !== "undefined" && result !== null ? this.'+(job_path ? '__super__.run' : 'emit')+'(result) : this.skip();\n\t';
- temp_job += '}\n';
- }
+ } else if (eval) {
- if (output) {
- temp_job += '\toutput: "' + (output.indexOf('/') >= 0 ? output : cwd + '/' + output) + '",\n';
- }
+ //We don't necessarily need a job if the eval switch is used
+ var temp_filename = cwd + '/temp_job.js',
+ temp_job = 'var Job = require("node.io").Job;\n\n' +
+ 'exports.job = new Job({';
+
+ } else {
- temp_job += '});';
-
- //Remove the temp file on exit
- var removeTemp = function () {
- try {
- fs.unlinkSync(temp_filename);
- } catch (e) {}
- };
- process.on('exit', removeTemp);
- process.on('SIGTERM', removeTemp);
- process.on('SIGHUP', removeTemp);
+ exit('No job specified');
- fs.writeFile(temp_filename, temp_job, function(err) {
- if (err) {
- error(err);
- } else {
- callback(temp_filename);
- }
- });
}
- if (!job_path && eval) {
- create_temp_job(null, start_processor);
- return;
+ if (fork) {
+ temp_job += 'fork:true,';
+ }
+
+ if (job_args.length) {
+ temp_job += 'args: ' + JSON.stringify(job_args) + ',';
+ temp_job += 'arg1: ' + JSON.stringify(job_args[0]) + ',';
}
-
- //Normally we would compile .coffee files in ./processor.js, but since we're modifying the job using one or more
- //switches, we need to compile it here first
- if (path.extname(job_path) === '.coffee') {
- is_coffee = true;
+ temp_job += '}, {\n';
- var basename = path.basename(job_path, '.coffee');
- var job_compiled = cwd + '/' + basename + '_compiled.js';
-
- if (!options.silent) {
- utils.status.info('Compiling ' + job_path);
+ if (input) {
+ temp_job += '\tinput: "' + (input.indexOf('/') >= 0 ? input : cwd + '/' + input) + '",\n';
+ }
+
+ //Evaluate an expression on each line of input (return eval(expression))
+ //E.g. "input" => passes through input
+ // "null" OR "undefined" => skips input
+ // "input.length > 3 ? input : null" => skips lines where length <= 3
+ // "input.replace('\t', ',')" => converts TSV (tab separated values) to CSV
+
+ if (eval) {
+ temp_job += '\trun: function (input) {\n\t\t';
+ temp_job += 'var result = (function(input,expression){return eval(expression);}(input,"'+eval.replace('"', '\"')+'"));\n\t\t';
+ temp_job += 'return typeof result !== "undefined" && result !== null ? this.'+(job_path ? '__super__.run' : 'emit')+'(result) : this.skip();\n\t';
+ temp_job += '}\n';
+ }
+
+ if (output) {
+ temp_job += '\toutput: "' + (output.indexOf('/') >= 0 ? output : cwd + '/' + output) + '",\n';
+ }
+
+ temp_job += '});';
+
+ //Remove the temp file when the process exits
+ utils.removeOnExit(temp_filename);
+
+ fs.writeFile(temp_filename, temp_job, function(err) {
+ if (err) {
+ exit(err, true);
+ } else {
+ callback(temp_filename);
}
-
- utils.compileCoffee(job_path, job_compiled, function(err) {
- if (err) {
- error(err);
- } else {
-
- //Remove the compiled .coffee file on exit
- var removeCoffeeTemp = function() {
- try {
- fs.unlinkSync(compiled_js);
- } catch (e) {}
- };
- process.on('exit', removeCoffeeTemp);
- process.on('SIGTERM', removeCoffeeTemp);
- process.on('SIGHUP', removeCoffeeTemp);
-
- //Now we can extend the compiled file
- create_temp_job(job_compiled, start_processor);
- }
- });
-
- return;
-
- } else {
-
- create_temp_job(job_path, start_processor);
-
+ });
+ }
+
+ if (!job_path && eval) {
+ create_temp_job(null, start_processor);
+ return;
+ }
+
+ //Normally we would compile .coffee files in Processor.loadJob(), but
+ //since we're modifying the job using one or more switches, we need to
+ //compile it here first
+ if (path.extname(job_path) === '.coffee') {
+
+ is_coffee = true;
+
+ var basename = path.basename(job_path, '.coffee');
+ var job_compiled = cwd + '/' + basename + '_compiled.js';
+
+ //If we're not the master, the file is already compiled
+ if (!isMaster) {
+ start_processor(job_compiled);
return;
-
}
+
+ if (!options.silent) {
+ utils.status.info('Compiling ' + job_path);
+ }
+
+ utils.compileCoffee(job_path, job_compiled, function(err) {
+ if (err) {
+ exit(err, true);
+ } else {
+ //Now we can extend the compiled file
+ create_temp_job(job_compiled, start_processor);
+ }
+ });
+
+ return;
+
+ } else {
+ create_temp_job(job_path, start_processor);
+ return;
}
}
-
- start_processor(job_path);
-};
+};
View
237 lib/node.io/dom.js
@@ -8,24 +8,47 @@ var Job = require('./job').JobProto,
soupselect = require('soupselect').select,
htmlparser = require('htmlparser');
+/**
+ * A selector object similar to jQuery's. See the documentation for
+ * usage examples.
+ *
+ * @param {String} selector
+ * @param {Function} context
+ * @param {Object} headers (optional)
+ * @api public
+ */
+Job.prototype.$ = function (selector, context) {
+ var selected = soupselect(context, selector);
+ if (selected.length === 0) {
+ this.fail_with("No elements matching '" + selector + "'");
+ return;
+ } else if (selected.length === 1) {
+ selected = selected[0];
+ this.bindToDomElement(selected);
+ } else {
+ this.bindToDomCollection(selected);
+ }
+ return selected;
+}
+
+/**
+ * Parses the specified data using HtmlParser and SoupSelect. `callback` takes
+ * (err, $, data, headers) - $ is the selector object bound to the parsed DOM.
+ *
+ * @param {String} data
+ * @param {Function} callback
+ * @param {Object} headers (optional)
+ * @api public
+ */
Job.prototype.parseHtml = function (data, callback, headers) {
var self = this;
var handler = new htmlparser.DefaultHandler(function (err, dom) {
if (err) {
callback(err);
} else {
- var $ = function (select) {
- var selected = soupselect(dom, select);
- if (selected.length === 0) {
- self.fail(self.instance_input, "No elements matching '" + select + "'");
- return;
- } else if (selected.length === 1) {
- selected = selected[0];
- self.bindToDomElement(selected);
- } else {
- self.bindToDomCollection(selected);
- }
- return selected;
+ var $ = function (selector, context) {
+ //Allow the user to specify a custom context (thanks to github.com/jimbishopp)
+ return self.$(selector, context || dom);
};
callback(null, $, data, headers);
}
@@ -34,22 +57,53 @@ Job.prototype.parseHtml = function (data, callback, headers) {
parser.parseComplete(data);
};
+/**
+ * Augments a collection of DOM elements with some helpful methods.
+ *
+ * Methods:
+ * - filter(selector)
+ * - each(callback) -OR- each(attribute, callback)
+ * - first(), last(), even(), odd()
+ * - has(selector)
+ *
+ * @param {Array} collection
+ * @api public
+ */
Job.prototype.bindToDomCollection = function (collection) {
- var self = this;
- var last = collection.length - 1;
- collection.each = function (attrib, callback) {
+ var self = this, last = collection.length - 1, x;
+
+ var traverse = function (attrib, callback, condition) {
if (typeof attrib === 'function') {
callback = attrib;
- collection.forEach(function (elem) {
- self.bindToDomElement(elem);
- callback(elem);
- });
+ for (x = 0; x <= last; x++) {
+ if (!condition()) continue;
+ self.bindToDomElement(collection[x]);
+ callback(collection[x]);
+ }
} else {
- collection.forEach(function (elem) {
- callback(elem.attribs[attrib]);
- });
+ for (x = 0; x <= last; x++) {
+ if (!condition()) continue;
+ callback(collection[x].attribs[attrib]);
+ }
}
};
+
+ collection.each = function (attrib, callback) {
+ traverse(attrib, callback, function() { return true; });
+ };
+
+ //odd() includes the 1st, 3rd, etc..
+ collection.odd = function (attrib, callback) {
+ var i = 0;
+ traverse(attrib, callback, function() { return ++i % 2 === 1; });
+ };
+
+ //even() includes the 2nd, 4th, etc..
+ collection.even = function (attrib, callback) {
+ var i = 0;
+ traverse(attrib, callback, function() { return ++i % 2 === 0; });
+ };
+
collection.first = function (callback) {
var elem = collection[0];
self.bindToDomElement(elem);
@@ -58,6 +112,7 @@ Job.prototype.bindToDomCollection = function (collection) {
}
return elem;
};
+
collection.last = function (callback) {
var elem = collection[last];
self.bindToDomElement(elem);
@@ -66,59 +121,149 @@ Job.prototype.bindToDomCollection = function (collection) {
}
return elem;
};
+
collection.filter = function (select) {
- var selected = soupselect(collection, select);
- if (selected.length === 0) {
- self.fail(self.instance_input, "No elements matching '" + select + "'");
- return;
- } else if (selected.length === 1) {
- selected = selected[0];
- self.bindToDomElement(selected);
- } else {
- self.bindToDomCollection(selected);
- }
- return selected;
+ return self.$(select, collection);
+ };
+
+ //Filter out elements in the collection that do not have a descendant that
+ //matches `select`
+ collection.has = function (select) {
+ var has = [];
+ for (x = 0; x <= last; x++) {
+ if (soupselect(collection[x], select).length > 0) {
+ has.push(collection[x]);
+ }
+ };
+ self.bindToDomCollection(has);
+ return has;
};
};
+/**
+ * Augments a DOM element with some helpful methods / getters.
+ *
+ * Getters:
+ * - innerHTML
+ * - rawtext - returns text immediately inside the selected element
+ * - rawfulltext - same as rawtext, but also includes text inside nested elems
+ * - text - rawtext but trimmed, BR's replaced with \n, and entities decoded
+ * - fulltext
+ *
+ * Note: <br> and <br /> are replaced with \n when using text and fulltext
+ *
+ * @param {Object} elem
+ * @api public
+ */
Job.prototype.bindToDomElement = function (elem) {
- var hasChildren;
+ var self = this, hasChildren, x, last;
+
if (elem.children && elem.children.length > 0) {
this.bindToDomCollection(elem.children);
+ last = elem.children.length - 1;
hasChildren = true;
}
-
- elem.__defineGetter__('text', function () {
+
+ var rawtext = function () {
var text = '';
if (hasChildren) {
- for (var i in elem.children) {
- if (elem.children[i].type === 'text') {
- text += elem.children[i].data.trim();
- } else if (elem.children[i].type === 'tag' && elem.children[i].name === 'br') {
+ for (x = 0; x <= last; x++) {
+ if (elem.children[x].type === 'text') {
+ text += elem.children[x].data;
+ }
+ }
+ }
+ return text;
+ };
+
+ var text = function () {
+ var text = '';
+ if (hasChildren) {
+ for (x = 0; x <= last; x++) {
+ if (elem.children[x].type === 'text') {
+ text += elem.children[x].data.trim();
+ } else if (elem.children[x].name === 'br') {
text += '\n';
}
}
}
+
+ text = self.filter(text).entityDecode();
+
+ return text;
+ };
+
+ var rawfulltext = function (elem) {
+ var text = '';
+ if (elem.children && elem.children.length > 0) {
+ for (var i = 0, l = elem.children.length; i < l; i++) {
+ if (elem.children[i].type === 'text') {
+ text += elem.children[i].data;
+ } else if (elem.children[i].type === 'tag') {
+ text += rawfulltext(elem.children[i]);
+ }
+ }
+ }
return text;
- });
+ };
var fulltext = function (elem) {
var text = '';
if (elem.children && elem.children.length > 0) {
- for (var i in elem.children) {
+ for (var i = 0, l = elem.children.length; i < l; i++) {
if (elem.children[i].type === 'text') {
text += elem.children[i].data.trim();
+ } else if (elem.children[i].name === 'br') {
+ text += '\n';
} else if (elem.children[i].type === 'tag') {
- if (elem.children[i].name === 'br') {
- text += '\n';
- } else {
- text += fulltext(elem.children[i]);
+ text += fulltext(elem.children[i]);
+ }
+ }
+ }
+
+ text = self.filter(text).entityDecode();
+
+ return text;
+ };
+
+ var innerHTML = function (elem) {
+ var text;
+
+ switch (elem.type) {
+ case 'tag':
+ case 'script':
+ case 'style':
+ text = '<' + elem.raw + '>';
+ //Skip if the tag is <short />
+ if (elem.raw[elem.raw.length-1] !== '/') {
+ if (elem.children && elem.children.length > 0) {
+ for (var i = 0, l = elem.children.length; i < l; i++) {
+ text += innerHTML(elem.children[i]);
}
}
+ text += '</' + elem.name + '>';
}
+ break;
+
+ case 'comment':
+ text = '<!--' + elem.raw + '-->';
+ break;
+
+ case 'text':
+ text = elem.raw;
+ break;
+
+ default:
+ break;
}
+
return text;
};
+ //Define getters
+ elem.__defineGetter__('rawtext', rawtext);
+ elem.__defineGetter__('text', text);
+ elem.__defineGetter__('striptags', function () { return rawfulltext(elem); });
elem.__defineGetter__('fulltext', function () { return fulltext(elem); });
+ elem.__defineGetter__('innerHTML', function () { return innerHTML(elem); });
};
View
4 lib/node.io/index.js
@@ -1,4 +1,4 @@
-/*!
+/*!
* node.io
* Copyright(c) 2010 Chris O'Hara <cohara87@gmail.com>
* MIT Licensed
@@ -9,7 +9,7 @@ var processor = require('./processor'),
job = require('./job');
exports = module.exports = {
- version: '0.1.1-19',
+ version: '0.2.0-1',
Processor: processor.Processor,
JobProto: job.JobProto, //A reference to the underlying Job.prototype
JobClass: job.JobClass, //A reference to a new prototype identical to Job.prototype (so Job.prototype isn't modified)
View
341 lib/node.io/io.js
@@ -1,61 +1,62 @@
-/*!
+/*!
* node.io
* Copyright(c) 2010 Chris O'Hara <cohara87@gmail.com>
* MIT Licensed
*/
var fs = require('fs'),
- util = require('util'),
+ utils = require('./utils'),
Job = require('./job').JobProto;
//Used to ensure reads/writes are returned/performed in the same order as they are requested
-var write_request_id = 1,
- read_request_id = 1,
- last_write_id = 1,
- last_read_id = 1;
+var write_request_id = 1, read_request_id = 1,
+ last_write_id = 1, last_read_id = 1;
-var dataToString = function (data, newline, stringify_array) {
- var str = '';
- if (!stringify_array && data instanceof Array) {
- data.forEach(function (line) {
- str += dataToString(line, newline, true);
- });
- } else if (typeof data === 'object') {
- str = JSON.stringify(data) + newline;
- } else {
- str = data + newline;
- }
- return str;
-};
-
-//Default input behaviour is to read from STDIN
+/**
+ * The default job input method - read from STDIN.
+ *
+ * @param {Number} start
+ * @param {Number} num
+ * @param {Function} callback
+ * @api public
+ */
Job.prototype.input = function (start, num, callback) {
- var self = this;
-
- this.inputFromStdin();
-
- //Is there a better way to check for a lack of stdin data?
- setTimeout(function () {
- if (!self.input_stream.data) {
- self.input_stream.stream.destroy();
- self.error_hook('This module expects input. Override the job\'s input method or call `node.io -i [INPUT] module`');
- }
- }, 5000);
-
+ var stream = process.openStdin();
+ this.inputStream(stream);
this.input.apply(this, arguments);
};
-//Default output behaviour is to write to STDOUT
+/**
+ * The default job output method - write to STDOUT.
+ *
+ * @param {String} data
+ * @api public
+ */
Job.prototype.output = function (data) {
this.outputStream(process.stdout, 'stdout');
this.output.apply(this, arguments);
};
+/**
+ * Reads input from the specified stream. Replaces subsequent
+ * calls to job.input().
+ *
+ * @param {Object} stream
+ * @api public
+ */
Job.prototype.inputStream = function (stream) {
this.initInputStream(stream);
this.input = this.takeInputStreamLines;
};
+/**
+ * Writes output to the specified stream. Replaces subsequent
+ * calls to job.output().
+ *
+ * @param {Object} stream
+ * @param {String} name (optional)
+ * @api public
+ */
Job.prototype.outputStream = function (stream, name) {
name = name || Math.floor(Math.random() * 1000000);
this.output_streams[name] = stream;
@@ -64,19 +65,27 @@ Job.prototype.outputStream = function (stream, name) {
};
};
+/**
+ * Reads input from the specified file.
+ *
+ * @param {String} path
+ * @api public
+ */
Job.prototype.inputFromFile = function (path) {
var stream = fs.createReadStream(path, {bufferSize: this.options.read_buffer});
this.inputStream(stream);
};
-Job.prototype.inputFromStdin = function (path) {
- var stream = process.openStdin();
- this.inputStream(stream);
-};
-
+/**
+ * Finds all files in the specified directory and returns each file
+ * path as input. To recurse subdirectories, set the `recurse` op.
+ *
+ * @param {String} path
+ * @api public
+ */
Job.prototype.inputFromDirectory = function (path) {
var self = this, files = fs.readdirSync(path);
-
+
//Trim trailing slash
var trim_slash = function (path) {
if (path[path.length - 1] === '/') {
@@ -128,10 +137,10 @@ Job.prototype.inputFromDirectory = function (path) {
var dir_files = [];
path = trim_slash(path);
-
- files.forEach(function (file) {
- dir_files.push(path + '/' + file);
- });
+
+ for (var i = 0, l = files.length; i < l; i++) {
+ dir_files.push(path + '/' + files[i]);
+ }
//Use the addInput() hook rather than files.push(file) so that recursing
//plays nice with multiple processes (i.e. is shared evenly)
@@ -144,6 +153,12 @@ Job.prototype.inputFromDirectory = function (path) {
}
};
+/**
+ * Initialises the specified input stream and adds event handlers.
+ *
+ * @param {Object} stream
+ * @api public
+ */
Job.prototype.initInputStream = function (stream) {
var self = this;
@@ -158,47 +173,50 @@ Job.prototype.initInputStream = function (stream) {
stream: stream,
lines: [],
last_line: '',
- data: false,
+ //data: false,
end: false,
paused: false
};
- var end = function () {
+ this.input_stream.stream.on('data', function (data) {
+ if (self.input_stream.end) return;
+ //self.input_stream.data = true;
+ self.handleInputStream(data);
+ self.bytes_read += data.length;
+ });
+
+ this.input_stream.stream.on('end', function () {
self.input_stream.end = true;
if (self.input_stream.last_line.length) {
self.input_stream.lines.push(self.input_stream.last_line);
self.input_stream.last_line = '';
}
- };
-
- var data = function (data) {
- if (self.input_stream.end) {
- return;
- }
- self.input_stream.data = true;
- self.handleInputStream(data);
- self.bytes_read += data.length;
- };
-
- this.input_stream.stream.on('data', data);
- this.input_stream.stream.on('end', end);
+ });
- if (this.error_hook) {
- this.input_stream.stream.on('error', this.error_hook);
- }
+ this.input_stream.stream.on('error', this.exit);
};
+/**
+ * Handles chunks of data from the input stream.
+ *
+ * @param {String} data
+ * @api public
+ */
Job.prototype.handleInputStream = function (data) {
var self = this;
data = this.input_stream.last_line + data;
- data.split('\n').forEach(function (line) {
- if (line[line.length - 1] === '\r') {
- line = line.substr(0, line.length - 1);
+ var lines = data.split('\n'), line, line_length;
+
+ for (var i = 0, l = lines.length; i < l; i++) {
+ line = lines[i];
+ line_length = line.length;
+ if (line[line_length - 1] === '\r') {
+ line = line.substr(0, line_length - 1);
}
self.input_stream.lines.push(line);
- });
+ }
//Last line is incomplete
this.input_stream.last_line = this.input_stream.lines.pop();
@@ -209,6 +227,15 @@ Job.prototype.handleInputStream = function (data) {
}
};
+/**
+ * Returns lines (\n or \r\n terminated) from the input stream. Lines are returned
+ * in the same order as they are requested.
+ *
+ * @param {Number} start
+ * @param {Number} num
+ * @param {Function} callback
+ * @api public
+ */
Job.prototype.takeInputStreamLines = function (start, num, callback, read_id) {
var self = this;
@@ -217,7 +244,6 @@ Job.prototype.takeInputStreamLines = function (start, num, callback, read_id) {
this.input_stream.stream.resume();
}
- //Ensure lines are returned in the same order they are requested
read_id = read_id || read_request_id++;
if (this.input_stream.end || this.input_stream.lines.length >= num) {
@@ -240,9 +266,14 @@ Job.prototype.takeInputStreamLines = function (start, num, callback, read_id) {
}
};
+/**
+ * Handles special job method definitions.
+ *
+ * @api public
+ */
Job.prototype.handleSpecialIO = function () {
var self = this;
-
+
//If output is a string, assume it's a file and write to it when ouput is called
if (typeof this.output === 'string') {
var out_path = this.output;
@@ -309,11 +340,19 @@ Job.prototype.handleSpecialIO = function () {
this.inputFromDirectory(path);
} else {
- this.error_hook('Unknown input: ' + path);
+ this.exit('Unknown input: ' + path);
}
}
};
+/**
+ * Reads all data from the specified file. If `callback` isn't specified
+ * the operation is synchronous.
+ *
+ * @param {String} file
+ * @param {Function} callback
+ * @api public
+ */
Job.prototype.read = function (file, callback) {
if (callback) {
fs.readFile(file, this.options.encoding, callback);
@@ -322,18 +361,23 @@ Job.prototype.read = function (file, callback) {
}
};
-//Write data to file. If data is an array, output each value on a new line
-//Data is written in the same order as write() is called
+/**
+ * Writes data to the specified file. Writes are performed in the same order as
+ * write() is called.
+ *
+ * @param {String} file
+ * @param {String} data
+ * @param {Function} callback (optional)
+ * @api public
+ */
Job.prototype.write = function (file, data, callback) {
var self = this;
+ //Cache FD's
if (typeof this.output_streams[file] === 'undefined') {
var write_mode = {flags: 'w', mode: 666, encoding: this.options.encoding};
this.output_streams[file] = fs.createWriteStream(file, write_mode);
-
- this.output_streams[file].on('error', function (err) {
- self.error_hook(err);
- });
+ this.output_streams[file].on('error', self.exit);
}
//Ensure data is written in the same order as write() is called
@@ -348,7 +392,7 @@ Job.prototype.write = function (file, data, callback) {
} else {
last_write_id++;
- var str = dataToString(data, self.options.newline);
+ var str = utils.dataToString(data, self.options.newline);
var written = self.output_streams[file].write(str);
if (file !== 'stdout') {
@@ -367,25 +411,158 @@ Job.prototype.write = function (file, data, callback) {
write_lines();
};
-//Same as above, but the file pointer starts at the end of the file
+/**
+ * Appends data to the specified file. Writes are performed in the same order as
+ * append() is called.
+ *
+ * @param {String} file
+ * @param {String} data
+ * @param {Function} callback (optional)
+ * @api public
+ */
Job.prototype.append = function (file, data, callback) {
var self = this;
-
if (typeof this.output_streams[file] === 'undefined') {
var write_mode = {flags: 'a', mode: 666, encoding: this.options.encoding};
this.output_streams[file] = fs.createWriteStream(file, write_mode);
-
- this.output_streams[file].on('error', function (err) {
- self.error_hook(err);
- });
+ this.output_streams[file].on('error', self.exit);
}
this.write.apply(this, arguments);
};
+/**
+ * Waits for all output streams to drain before calling `callback`.
+ *
+ * @param {Function} callback
+ * @api public
+ */
+Job.prototype.waitForOutputStreamDrains = function (callback) {
+ var wait_for_drain = function () {
+ var keep_waiting = false;
+ for (var i in this.output_streams) {
+ if (this.output_streams.hasOwnProperty(i)) {
+ var stream = this.output_streams[i];
+
+ if (!stream._queue) {
+ continue;
+ }
+
+ if (stream._queue.length === 0) {
+ stream.end();
+ } else {
+ keep_waiting = true;
+ }
+ }
+ }
+ if (keep_waiting) {
+ setTimeout(wait_for_drain, 100);
+ } else {
+ //Done!
+ callback();
+ }
+ };
+ wait_for_drain();
+}
+
+/**
+ * Parses a line into values using the specified delimiter, quote and
+ * quote escape char. The second parameter can also be 'csv' or 'tsv'
+ * which parses the line using the default Comma/Tab Separated Values
+ * configuration.
+ *
+ * @param {String} line
+ * @param {String} delim
+ * @param {String} quote (optional)
+ * @param {String} quote_escape (optional)
+ * @api public
+ */
+Job.prototype.parseValues = function(line, delim, quote, quote_escape) {
+ if (typeof delim === 'undefined' || delim === 'csv') {
+ delim = ',';
+ } else if (delim === 'tsv') {
+ delim = '\t';
+ }
+
+ //Escape special regex chars
+ var escape = function (str) {
+ return str.replace(new RegExp('[.*+?|()\\[\\]{}]', 'g'), '\\$&');
+ }
+
+ var d = escape(delim),
+ e = escape(quote_escape || '"'),
+ q = escape(quote || '"');
+
+ var pattern = new RegExp(
+ '(' + d + '|\\r?\\n|\\r|^)' +
+ '(?:' + q + '([^' + q + ']*(?:' + e + q + '[^' + q + ']*)*)' + q + '|' +
+ '([^' + q + d + '\\r\\n]*))'
+ , 'gi');
+
+ var matches = null, value, csv = [];
+
+ while (matches = pattern.exec(line)){
+ if (matches[2]) {
+ value = matches[2].replace(new RegExp(e + q, 'g'), q);
+ } else {
+ value = matches[3];
+ }
+ csv.push(value);
+ }
+
+ return csv;
+}
+
+/**
+ * The opposite of parseValues. Writes values to a line using the specified
+ * separation characters or configuration (csv / tsv).
+ *
+ * @param {String} values
+ * @param {String} delim
+ * @param {String} quote (optional)
+ * @param {String} quote_escape (optional)
+ * @api public
+ */
+Job.prototype.writeValues = function(values, delim, quote, quote_escape) {
+ if (typeof delim === 'undefined' || delim === 'csv') {
+ delim = ',';
+ } else if (delim === 'tsv') {
+ delim = '\t';
+ }
+ quote = quote || '"';
+ quote_escape = quote_escape || '"';
+
+ if (values instanceof Array) {
+ var quoted_values = [], value;
+ for (var i = 0, l = values.length; i < l; i++) {
+ value = values[i];
+ if (value.indexOf(quote) > -1) {
+ value = quote + value.replace(quote, quote_escape + quote) + quote;
+ } else if (value.indexOf(delim) > -1) {
+ value = quote + value + quote;
+ }
+ quoted_values.push(value);
+ }
+ values = quoted_values.join(delim);
+ }
+ return values;
+}
+
+/**
+ * Returns the total bytes read by any read() calls or input streams.
+ *
+ * @return {Number} bytes_read
+ * @api public
+ */
Job.prototype.getBytesRead = function () {
return this.bytes_read;
};
+/**
+ * Returns the total bytes written by any write() or append() calls.
+ *
+ * @return {Number} bytes_written
+ * @api public
+ */
Job.prototype.getBytesWritten = function () {
return this.bytes_written;
};
View
287 lib/node.io/job.js
@@ -1,13 +1,16 @@
-/*!
+/*!
* node.io
* Copyright(c) 2010 Chris O'Hara <cohara87@gmail.com>
* MIT Licensed
*/
-var fs = require('fs'),
- v = require('validator'),
- status = require('./utils').status;
-
+var validator = require('validator'),
+ put = require('./utils').put,
+ put_default = require('./utils').put_default;
+
+/**
+ * Default job options
+ */
var default_options = {
max: 1,
take: 1,
@@ -18,7 +21,7 @@ var default_options = {
fork: false,
global_timeout: false,
input: false,
- worker_input_buffer: 1,
+ worker_input_mult: 1,
recurse: false,
read_buffer: 8096,
newline: '\n',
@@ -29,105 +32,72 @@ var default_options = {
args: []
};
-var Job = function (options) {
- var i, self = this;
-
- this.input_stream = {};
- this.output_streams = {};
- this.stage = 0;
- this.instance_input = [];
- this.js_timeout = null;
- this.isComplete = false;
-
- this.bytes_read = 0;
- this.bytes_written = 0;
+/**
+ * Creates a new Job with the specified options
+ *
+ * @param {Number} options (optional)
+ * @api public
+ */
+var Job = exports.JobProto = function (options) {
+ this.reset();
- //Set default options
- if (typeof this.options === 'undefined') {
- this.options = {};
- for (i in default_options) {
- if (default_options.hasOwnProperty(i)) {
- this.options[i] = default_options[i];
- }
- }
- }
+ //Set job options
+ this.options = put_default(options, default_options);
- //Extend/set options with the user specified ops
- if (typeof options === 'object') {
- for (i in options) {
- if (options.hasOwnProperty(i)) {
- this.options[i] = options[i];
- }
- }
- }
-
- //Handle special input / output cases (see io.js)
- this.handleSpecialIO();
-
- //Add data validation methods that link to the instance's fail method
- var validator = new v.Validator();
- validator.error = function (msg) {
+ //Add data validation methods
+ var val = new validator.Validator(), self = this;
+ val.error = function (msg) {
self.fail(self.instance_input, msg);
};
- this.assert = validator.check.bind(validator);
+ this.assert = val.check.bind(val);
};
-exports.JobProto = Job;
+Job.prototype.reset = function () {
+ this.input_stream = {};
+ this.output_streams = {};
+
+ this.assigned_input = [];
+ this.timeout = null;
+ this.isComplete = false;
+
+ this.bytes_read = 0;
+ this.bytes_written = 0;
+ this.bytes_received = 0;
+}
-//Each job creates a new class so that the original Job.prototype isn't modified
+//Each job creates a new class/prototype so that the underlying Job.prototype is untouched
exports.__defineGetter__('JobClass', function () {
var JobClass = function (options, methods) {
- if (typeof methods === 'object') {
- for (var i in methods) {
- if (methods.hasOwnProperty(i)) {
- JobClass.prototype[i] = methods[i];
- }
- }
- }
- Job.apply(this, arguments);
+ put(JobClass.prototype, methods);
+
+ Job.apply(this, [options]);
+
+ //Handle special input / output cases (see io.js)
+ this.handleSpecialIO();
};
- for (var i in Job.prototype) {
- if (Job.prototype.hasOwnProperty(i)) {
- JobClass.prototype[i] = Job.prototype[i];
- }
- }
+ //Extend job methods
+ put(JobClass.prototype, Job.prototype);
JobClass.prototype.__super__ = Job.prototype;
- var proto = JobClass.prototype;
-
//Compatability with CoffeeScript <= 0.9.4 inheritance
+ var JobPrototype = JobClass.prototype;
JobClass.extended = function (Child) {
- proto = Child.prototype;
+ JobPrototype = Child.prototype;
};
- //Allow this class to be extended
+ //Allow JobClass to be extended
JobClass.prototype.extend = function (options, methods) {
-
- var parent_options = this.options;
- var Child = function (options) {
+ var Child = function () {
JobClass.apply(this, arguments);
};
- for (var i in proto) {
- Child.prototype[i] = proto[i];
- }
- Child.prototype.__super__ = proto;
+ //Extend parent methods
+ put(Child.prototype, JobPrototype, methods);
+ Child.prototype.__super__ = JobPrototype;
- if (typeof methods === 'object') {
- for (i in methods) {
- if (methods.hasOwnProperty(i)) {
- Child.prototype[i] = methods[i];
- }
- }
- }
- for (i in parent_options) {
- if (typeof options[i] === 'undefined') {
- if (parent_options.hasOwnProperty(i)) {
- options[i] = parent_options[i];
- }
- }
- }
+ //Extend parent options
+ put_default(options, this.options);
return new Child(options);
};
@@ -135,139 +105,66 @@ exports.__defineGetter__('JobClass', function () {
return JobClass;
});
-//Calling nodeio.Job(options, methods) will instantiate a new JobClass
+//Instantiate a new JobClass
exports.Job = function (options, methods) {
var JobClass = exports.JobClass;
return new JobClass(options, methods);
};
-Job.prototype.cancelPreviousTimeouts = function () {
- if (this.js_timeout) {
- clearTimeout(this.js_timeout);
+Job.prototype.finish = function (callback) {
+ if (!this.isComplete) {
+ this.cancel_timeout();
+ this.isComplete = true;
+ if (callback) {
+ callback();
+ };
}
-};
+}
-var x = 0;
-
-Job.prototype.emit = function () {
- var self = this;
-
- //Prevent a thread from emitting more than once
- if (this.isComplete) {
- return;
- }
-
- this.cancelPreviousTimeouts();
-
- var process_order = ['take', 'run', 'output_hook'];
-
- //No methods left? we're done
- if (this.stage >= process_order.length) {
- return;
- }
-
- //Get the next method in the chain
- var next = process_order[this.stage++];
-
- //If the method doesn't exist, skip it
- if (typeof this[next] !== 'function') {
-
- this.emit.apply(this, arguments);
-
- } else {
-
- //Set a timeout for each method if the `timeout` op is set
- if (this.options.timeout && next === 'run') {
- this.js_timeout = setTimeout(function () {
- if (self.isComplete) {
- return;
- }
- self.fail(self.instance_input, 'timeout');
- self.isComplete = true;
- }, this.options.timeout * 1000);
- }
-
- //Call the next method - handle async and sync cases
- var ret = this[next].apply(this, arguments);
- if (typeof ret !== 'undefined') {
- this.emit(ret);
- }
+Job.prototype.cancel_timeout = function () {
+ if (this.timeout) {
+ clearTimeout(this.timeout);
}
};
-Job.prototype.reduce_hook = function (data, callback) {
- if (typeof this.reduce !== 'function') {
- if (callback) {
- callback.apply(this, [data]);
- }
- return;
- }
- this.emit = function (output) {
- if (callback) {
- callback.apply(this, [output]);
+Job.prototype.fail_with = function (err) {
+ var self = this;
+ this.finish(function () {
+ var ret = self.fail(this.instance_input, err);
+ if (typeof ret !== 'undefined' && ret !== null) {
+ self.emit(ret);
}
- };
-
- this.skip = function () {};
- this.finish = function () {};
- this.retry = function () {};
- this.timeout = function () {};
-
- //Call the reduce method - handle async and sync cases
- var ret = this.reduce(data);
- if (typeof ret !== 'undefined' && callback) {
- callback.apply(this, [ret]);
- }
-};
+ });
+}
+//By default, run() passes through input
Job.prototype.run = function () {
this.emit.apply(this, arguments);
};
-//Take a slice of input to handle
-Job.prototype.take = function (input) {
- var num = this.options.take, take = [];
- while (input && num-- && input.length > 0) {
- take.push(input.shift());
- }
- this.instance_input = take;
- if (this.options.take === 1 && take.length === 1) {
- return take[0];
- }
- return take;
+//processor.js overrides these methods to add some magic
+//-----------------------------------------------------------------------------
+Job.prototype.emit = function() {
+ if (this.is_complete) return;
+ this.output.apply(this, arguments);
+}
+Job.prototype.info = function () {};
+Job.prototype.debug = function () {};
+Job.prototype.skip = function () {
+ this.is_complete = true;
};
-
-Job.prototype.exit = function (err) {
- if (this.error_hook) {
- this.isComplete = true;
- this.error_hook(err);
- }
+Job.prototype.fail = function () {
+ this.is_complete = true;
};
-
-Job.prototype.retry = function () {
- this.cancelPreviousTimeouts();
-
- if (!this.isComplete) {
- this.retry_hook(this.instance_input);
- }
+Job.prototype.exit = function () {
+ this.is_complete = true;
};
-
-Job.prototype.skip = function () {
- this.cancelPreviousTimeouts();
-
- if (!this.isComplete) {
- this.isComplete = true;
- this.output_hook();
- }
-};
-
-Job.prototype.finish = Job.prototype.skip;
-Job.prototype.fail = Job.prototype.skip;
+Job.prototype.add = function () {};
+//-----------------------------------------------------------------------------
//Add filter / sanitisation methods to the prototype
-var filter = new v.Filter();
-Job.prototype.filter = filter.sanitize.bind(filter);
-Job.prototype.sanitize = Job.prototype.filter;
+var filter = new validator.Filter();
+Job.prototype.sanitize = Job.prototype.filter = filter.sanitize.bind(filter);
//Add some other helpful methods
require('./io');
View
80 lib/node.io/multi_node.js
@@ -1,45 +1,48 @@
//This is a slightly modified of kriszyp's multi-node (https://github.com/kriszyp/multi-node)
//Multi-node is released under the AFL / BSD License
-var net = require("net"),
- childProcess = require("child_process"),
- netBinding = process.binding("net");
+var net = require('net'),
+ childProcess = require('child_process'),
+ netBinding = process.binding('net');
+/**
+ * Spawns child processes and sets up communication channels.
+ *
+ * @param {Number} num
+ * @param {String} options (optional)
+ * @return EventEmitter
+ * @api public
+ */
exports.spawnWorkers = function (num, options) {
- var isMaster, emitter = new process.EventEmitter();
+ var emitter = new process.EventEmitter();
options = options || {};
-
+
if (process.env._CHILD_ID_) {
- emitter.id = process.env._CHILD_ID_;
var stdin = new net.Stream(0, 'unix');
var descriptorType;
stdin.addListener('data', function (message) {
descriptorType = message;
});
stdin.addListener('fd', function (fd) {
- if (descriptorType == "master") {
- var stream = new net.Stream(fd, "unix");
- emitter.emit("master", stream);
+ if (descriptorType == 'master') {
+ var stream = new net.Stream(fd, 'unix');
+ emitter.emit('master', stream);
stream.resume();
} else {
- throw new Error("Unknown file descriptor " + descriptorType);
+ throw new Error('Unknown file descriptor ' + descriptorType);
}
});
stdin.resume();
} else {
-
- isMaster = true;
- emitter.id = "master";
-
var children = [],
numChildren = num || 1,
priorArgs = process.argv;
- if (process.platform === "cygwin" && priorArgs) {
- priorArgs = ["/usr/bin/bash", "--login", "-c", "cd " + process.cwd() + " && " + priorArgs.join(" ")];
+ if (process.platform === 'cygwin' && priorArgs) {
+ priorArgs = ['/usr/bin/bash', '--login', '-c', 'cd ' + process.cwd() + ' && ' + priorArgs.join(' ')];
}
var env = {};
@@ -64,10 +67,10 @@ exports.spawnWorkers = function (num, options) {
(function (child) {
var masterChildConnection = netBinding.socketpair();
process.nextTick(function () {
- var stream = new net.Stream(masterChildConnection[0], "unix");
- emitter.emit("child", stream);
+ var stream = new net.Stream(masterChildConnection[0], 'unix');
+ emitter.emit('child', stream);
stream.resume();
- child.master.write("master", "ascii", masterChildConnection[1]);
+ child.master.write('master', 'ascii', masterChildConnection[1]);
});
}(child));
@@ -75,14 +78,12 @@ exports.spawnWorkers = function (num, options) {
for (i = 0; i < numChildren; i++) {
createChild(i);
}
- ["SIGINT", "SIGTERM", "SIGKILL", "SIGQUIT", "SIGHUP", "exit"].forEach(function (signal) {
+ ['SIGINT', 'SIGTERM', 'SIGKILL', 'SIGQUIT', 'SIGHUP', 'exit'].forEach(function (signal) {
process.addListener(signal, function () {
children.forEach(function (child) {
try {
child.kill();
- } catch (e) {
-
- }
+ } catch (e) {}
});
//We use SIGHUP to restart the children
if (signal !== 'exit' && signal !== 'SIGHUP') {
@@ -90,15 +91,23 @@ exports.spawnWorkers = function (num, options) {
}
});
});
-
}
- emitter.isMaster = isMaster;
+
return emitter;
};
-//Pass in a raw unframed binary stream, and returns a framed stream for sending and
-//receiving strings or other JSON data
-//The second parameter, trusted, indicates to use eval-based parsing which is much faster
+/**
+ * Pass in a raw unframed binary stream, and returns a framed stream for sending and
+ * receiving strings or other JSON data.
+ *
+ * `trusted` indicates to use eval-based parsing (much faster).
+ *
+ * @param {Object} stream
+ * @param {Boolean} trusted (optional)
+ * @api public
+ */
+//
+//The second parameter,
exports.frameStream = function (stream, trusted) {
var parse = trusted ? function (json) {
return eval('(' + json + ')');
@@ -121,7 +130,7 @@ exports.frameStream = function (stream, trusted) {
return buffer;
};
- stream.addListener("data", function (data) {
+ stream.addListener('data', function (data) {
start = 0;
for (var i = 0, l = data.length; i < l; i++) {
var b = data[i];
@@ -131,7 +140,7 @@ exports.frameStream = function (stream, trusted) {
if (start === i) {
//Handle the special case where the end frame is the first char received
- emitter.emit("message", parse(condense_buffered().toString("utf8")));
+ emitter.emit('message', parse(condense_buffered().toString('utf8')));
} else {
@@ -141,7 +150,7 @@ exports.frameStream = function (stream, trusted) {
buffer = condense_buffered();
}
- emitter.emit("message", parse(buffer.toString("utf8", 0, buffer.length)));
+ emitter.emit('message', parse(buffer.toString('utf8', 0, buffer.length)));
}
start = i + 1;
}
@@ -151,7 +160,7 @@ exports.frameStream = function (stream, trusted) {
}
});
emitter.send = function (message) {
- var buffer = new Buffer(JSON.stringify(message), "utf8");
+ var buffer = new Buffer(JSON.stringify(message), 'utf8');
var framedBuffer = new Buffer(buffer.length + 2);
framedBuffer[0] = 0;
buffer.copy(framedBuffer, 1, 0, buffer.length);
@@ -161,11 +170,10 @@ exports.frameStream = function (stream, trusted) {
emitter.on = emitter.addListener;
return emitter;
};
-
exports.frameStreamLengthEncoded = function (stream) {
var emitter = new process.EventEmitter();
var buffer, bufferIndex;
- stream.addListener("data", function (data) {
+ stream.addListener('data', function (data) {
while (data.length) {
if (buffer && (buffer.length - bufferIndex > data.length)) {
data.copy(buffer, bufferIndex, 0, data.length);
@@ -173,7 +181,7 @@ exports.frameStreamLengthEncoded = function (stream) {
} else {
if (buffer) {
data.copy(buffer, bufferIndex, 0, buffer.length - bufferIndex);
- emitter.emit("message", buffer.toString("utf8", 0, buffer.length));
+ emitter.emit('message', buffer.toString('utf8', 0, buffer.length));
data = data.slice(buffer.length - bufferIndex, data.length);
}
if (data.length) {
@@ -187,7 +195,7 @@ exports.frameStreamLengthEncoded = function (stream) {
}
});
emitter.send = function (message) {
- var buffer = new Buffer(message, "utf8");
+ var buffer = new Buffer(message, 'utf8');
stream.write(new Buffer([buffer.length >> 24, buffer.length >> 16 & 255, buffer.length >> 8 & 255, buffer.length & 255]));
};
return emitter;
View
280 lib/node.io/process_master.js
@@ -0,0 +1,280 @@
+/*!
+ * node.io
+ * Copyright(c) 2010 Chris O'Hara <cohara87@gmail.com>
+ * MIT Licensed
+ */
+
+var Processor = require('./processor').Processor;
+
+/**
+ * Routes messages received from slaves or child processes.
+ *
+ * Messages takes the form [message, job, worker_id, ...]
+ *
+ * @param {Object} data
+ * @api public
+ */
+Processor.prototype.handleWorkerMessage = function (data) {
+ var job = this.jobs[data[1]],
+ master = job.master;
+
+ switch (data[0]) {
+ case 'output': //['output', job, worker_id, output]
+ master.emit('output', data[3]);
+ break;
+
+ case 'pull': //['pull', job, worker_id]
+ job.pull_requests++;
+ if (!job.is_complete) {
+ master.emit('pullInput', data[2]);
+ }
+ break;
+
+ case 'err': //['err', job, err]
+ job.oncomplete(data[3]);
+ break;
+
+ case 'complete': //['complete', job, worker_id, output]
+ if (data[3]) {
+ master.emit('output', data[3]);
+ }
+ job.worker_complete[data[2]] = true;
+ break;
+
+ case 'add': //['add', job, worker_id, input, dont_flatten]
+ master.emit('addInput', data[3], data[4]);
+ break;
+ }
+}
+
+/**
+ * Sets up job events to be handled by the master process.
+ *
+ * @param {Object} job
+ * @param {Array} workers (optional)
+ * @api public
+ */
+Processor.prototype.setupMasterEvents = function (job, workers) {
+ var self = this,
+ master = job.master,
+ worker_count = workers.length;
+
+ //Provide a method to check if workers are complete
+ var areWorkersComplete = function () {
+ if (job.pull_requests < worker_count) {
+ return false;
+ }
+ for (var i = 0; i < worker_count; i++) {
+ if (!job.worker_complete[i]) {
+ return false;
+ }
+ }
+ return true;
+ };
+
+ master.on('start', function () {
+ if (worker_count > 0) {
+
+ self.status('Running ' + worker_count + ' workers..');
+
+ //Each worker is initially idle (complete) and requesting input
+ job.pull_requests = worker_count;
+
+ //Tell each worker to load the job - send i so that the worker
+ //can identify itself later
+ for (var i = 0; i < worker_count; i++) {
+ workers[i].send(['load', job.job_name, self.options, i]);
+ }
+
+ } else {
+ self.status('Running 1 worker..');
+ }
+
+ //Pull the initial input
+ master.emit('pullInput');
+ });
+
+ master.on('pullInput', function (for_worker) {
+
+ //Determine how much input we need to pull
+ var pull = job.options.max * job.options.take;
+ if (worker_count > 0) {
+ //Pull more if we need have workers
+ pull = pull * job.options.worker_input_mult;
+ if (typeof for_worker === 'undefined') {
+ pull = pull * worker_count;
+ }
+ }
+
+ //Handle input limits when the `input` op is set
+ if (job.options.input && (job.input_offset + pull) > job.options.input) {
+ pull = Math.max(job.options.input - job.input_offset, 0);
+ }
+
+ //Callback for when input is received from job.input()
+ var handle_input = function (input) {
+ if (typeof input !== 'undefined' && input !== null && input !== false) {
+ master.emit('input', input, for_worker);
+ } else {
+
+ //No input? We might be done..
+
+ var completeCheckInterval;
+
+ var isComplete = function () {
+ //Check if any input was added dynamically
+ if (job.input.length > 0) {
+ job.is_complete = false;
+ //self.spawnInstance(job_name);
+ if (completeCheckInterval) {
+ clearInterval(completeCheckInterval);
+ }
+ return false;
+ }
+
+ //Wait for workers or instances that are still working
+ return worker_count > 0 ? areWorkersComplete() : job.instances <= 0;
+ };
+
+ //If we're not complete, check periodically
+ if (isComplete()) {
+ master.emit('complete');
+ } else {
+ completeCheckInterval = setInterval(function () {
+ if (isComplete()) {
+ clearInterval(completeCheckInterval);
+ master.emit('complete');
+ }
+ }, 300);
+ }
+ }
+ };
+
+ if (pull > 0) {
+
+ //Incr the input offset
+ var offset = job.input_offset;
+ job.input_offset += pull;
+
+ //Allow job.input() to be sync and async
+ var input = job.obj.input(offset, pull, handle_input);
+ if (typeof input !== 'undefined') {
+ handle_input(input);
+ }
+
+ } else {
+ handle_input();
+ }
+ });
+