Skip to content
Browse files

initial

  • Loading branch information...
1 parent b891119 commit ae0c7862fa5fd0290ec79a33c05846cd9526545f @coreyjewett committed Nov 15, 2012
Showing with 177 additions and 0 deletions.
  1. +103 −0 lib/index.js
  2. +21 −0 package.json
  3. +53 −0 test/test.js
View
103 lib/index.js
@@ -0,0 +1,103 @@
+var async = require('async');
+
+/** Create a queue for each worker. Pass successful output of each queue to the next worker.
+ * A worker may only pass error or one object to the callback.
+ * @param callback(err, task);
+ */
+function DaisyChain(workers, concurrency) {
+ var self = this
+ , queues = this.queues = new Array(workers.length);
+
+ var first = this.first = queues[0] = async.queue(workers[0], concurrency);
+ first.__daisy_id = 0;
+ var next = first;
+
+ // create queues and inject unidirectional linked list marker.
+ for (var i = 1; i < workers.length; i++) {
+ var queue = this.queues[i] = async.queue(workers[1], concurrency);
+ queue.__daisy_id = i;
+ next.__forward_to = queue;
+ next = queue;
+ }
+
+ // add hooks for empty and drain
+ var last = this.last = next;
+ last.empty = function() {
+ if (self.empty) {
+ async.all(queues, function(q, cb){ cb(q.length() === 0); }, function(yes) { if (yes) self.empty(); });
+ }
+ }
+
+ last.drain = function() {
+ if (self.drain) {
+ async.every(queues, function(q, cb){ cb((q.length()+q.running()) === 0); }, function(yes) { if (yes) self.drain(); });
+ }
+ }
+}
+
+/** Start a task(s) down the chain. callback is called once per-task; with either any error or the output of the final worker. */
+DaisyChain.prototype.push = function(input, callback) {
+ var self = this;
+
+ // wrap single task as array for eash processing.
+ if (!Array.prototype.isPrototypeOf(input)) {
+ input = [input];
+ }
+
+ input.forEach(function(task) {
+ var next = self.first;
+
+ function forward(err, task) {
+ if (err) return callback(err);
+
+ if (!next) return callback(null, task);
+
+ var queue = next;
+ // async.nextTick(function() {
+ queue.push(task, forward);
+ // });
+ next = next.__forward_to;
+ }
+
+ forward(null, task);
+ });
+}
+
+/** This is somewhat deceptive as it indicates the total quantity of tasks in all the queues, not just the first. */
+DaisyChain.prototype.length = function() {
+ var len = 0;
+ for (var i = this.queues.length - 1; i >= 0; i--) {
+ len += this.queues[i].length();
+ };
+ return len;
+}
+
+// map other parameters to inner-queues as appropriate. Unless the engine lacks support for setters/getters in
+// which case these parameters will be assigned, but have no effect on the queues. :/
+if (Object.__defineSetter__) {
+ DaisyChain.prototype.__defineSetter__("concurrency", function(concurrency) {
+ for (var i = this.queues.length - 1; i >= 0; i--) {
+ this.queues[i].concurrency = concurrency;
+ };
+ });
+
+ DaisyChain.prototype.__defineGetter__("concurrency", function() {
+ return this.queues[0].concurrency;
+ });
+
+ DaisyChain.prototype.__defineSetter__("saturated", function(val) {
+ this.queues[0].saturated = val;
+ });
+
+ DaisyChain.prototype.__defineGetter__("saturated", function() {
+ return this.queues[0].saturated;
+ });
+}
+
+/**
+ * @param workers Array<Function>
+ * @param concurrency integer
+ */
+module.exports = function(workers, concurrency) {
+ return new DaisyChain(workers, concurrency)
+}
View
21 package.json
@@ -0,0 +1,21 @@
+{
+ "name": "async-daisychain",
+ "version": "0.0.1",
+ "description": "Easily create and manage daisychains of async queues.",
+ "main": "lib/index.js",
+ "scripts": {
+ "test": "echo \"Error: no test specified\" && exit 1"
+ },
+ "repository": {
+ "type": "git",
+ "url": "git://github.com/coreyjewett/async-daisychain.git"
+ },
+ "keywords": [
+ "async chain queue queuing SEDA"
+ ],
+ "devDependencies": {
+ "async": "~ 0.1"
+ },
+ "author": "Corey Jewett",
+ "license": "MIT"
+}
View
53 test/test.js
@@ -0,0 +1,53 @@
+var daisychain = require('../lib/index');
+var assert = require('assert');
+
+function sq(n, callback) {
+ if (n > Math.pow(3, 16)+1) return callback("Too Big");
+ callback(null, n * n);
+}
+
+var tests = 9, incomplete_mon = setTimeout(function(){ assert.fail(0, tests, "Not all assertions exercised"); }, 1000);
+function completed(is_last) {
+ tests--;
+ if (tests == 0) {
+ clearTimeout(incomplete_mon);
+ }
+
+ if (tests < 0) {
+ assert.fail(0, tests, "Test count is wrong.");
+ }
+
+ if (is_last && tests !== 0) {
+ assert.fail("complete(true) called prematurely", tests);
+ }
+}
+
+var dc = daisychain([sq, sq, sq, sq], 1);
+dc.saturated = completed;
+dc.empty = completed;
+dc.drain = completed.bind(null, true);
+
+dc.push(2, function(err, result) {
+ assert.ifError(err);
+ assert.equal(result, Math.pow(2, 16));
+ completed();
+});
+
+dc.push(3, function(err, result) {
+ assert.ifError(err);
+ assert.equal(result, Math.pow(3, 16));
+ completed();
+});
+
+dc.push(256, function(err, result) {
+ assert.equal(err, "Too Big");
+ completed();
+});
+
+dc.push([2,2,2], function(err, result) {
+ assert.ifError(err);
+ assert.equal(result, Math.pow(2, 16));
+ completed();
+});
+
+assert.equal(dc.length(), 6);

0 comments on commit ae0c786

Please sign in to comment.
Something went wrong with that request. Please try again.