Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

bug fixes, docs, ready for point release.

  • Loading branch information...
commit da3083556e98a4e3c82121bc9b854a96d940af85 1 parent ae0c786
@coreyjewett authored
Showing with 189 additions and 54 deletions.
  1. +19 −0 LICENSE
  2. +13 −1 README.md
  3. +36 −31 lib/index.js
  4. +1 −1  package.json
  5. +120 −21 test/test.js
View
19 LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2012 Corey Jewett
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
View
14 README.md
@@ -1,4 +1,16 @@
async-daisychain
================
-Easily create and manage daisychains of async queues.
+
+ var daisychain = require("async-daisychain");
+ // see test/test.js for a couple examples
+
+retry
+-----
+
+When an error is passed to the callback the second argument is the memo from the last successful worker and the third argument passed to the callback is the queue_id for the failed queue. These arguments, plus a callback, may be passed to #retry to reinvoke the failed queue and remaining queues on the memo. In a sense, this is reminiscent of ruby's *retry* statement and was inspired by it. This is particularly handy where a worker might fail due to intermittent IO issues. In most cases it also would make sense to delay for some interval before retrying, but this is not built in currently.
+
+Argument Against
+----------------
+Functionally, this is similar to an async *queue* whose worker spawns a *waterfall* for each task. That strategy is actually more flexible as it is not limited to a single memo. On the upside, the added complexity herein provides the retry mechanics and also has opportunity to be improved by having mixed concurrency and also an intellgent backpressure system that has visibility into the whole system, not just it's head.
View
67 lib/index.js
@@ -1,27 +1,21 @@
var async = require('async');
/** Create a queue for each worker. Pass successful output of each queue to the next worker.
- * A worker may only pass error or one object to the callback.
- * @param callback(err, task);
+ * A worker may only pass error or one object (memo) to the callback.
+ * @param callback(err, memo);
*/
function DaisyChain(workers, concurrency) {
var self = this
, queues = this.queues = new Array(workers.length);
- var first = this.first = queues[0] = async.queue(workers[0], concurrency);
- first.__daisy_id = 0;
- var next = first;
-
// create queues and inject unidirectional linked list marker.
- for (var i = 1; i < workers.length; i++) {
- var queue = this.queues[i] = async.queue(workers[1], concurrency);
- queue.__daisy_id = i;
- next.__forward_to = queue;
- next = queue;
+ var last;
+ for (var i = 0; i < workers.length; i++) {
+ var queue = queues[i] = async.queue(workers[i], concurrency);
+ last = queue;
}
// add hooks for empty and drain
- var last = this.last = next;
last.empty = function() {
if (self.empty) {
async.all(queues, function(q, cb){ cb(q.length() === 0); }, function(yes) { if (yes) self.empty(); });
@@ -36,31 +30,42 @@ function DaisyChain(workers, concurrency) {
}
/** Start a task(s) down the chain. callback is called once per-task; with either any error or the output of the final worker. */
-DaisyChain.prototype.push = function(input, callback) {
- var self = this;
-
+DaisyChain.prototype.push = function(memos, callback) {
// wrap single task as array for eash processing.
- if (!Array.prototype.isPrototypeOf(input)) {
- input = [input];
+ if (!Array.prototype.isPrototypeOf(memos)) {
+ memos = [memos];
}
- input.forEach(function(task) {
- var next = self.first;
+ var self = this;
+ memos.forEach(function(memo) {
+ (self._makeWalker(0, callback))(null, memo);
+ });
+}
- function forward(err, task) {
- if (err) return callback(err);
-
- if (!next) return callback(null, task);
+DaisyChain.prototype.retry = function(i, memo, callback) {
+ (this._makeWalker(i, callback))(null, memo);
+}
- var queue = next;
- // async.nextTick(function() {
- queue.push(task, forward);
- // });
- next = next.__forward_to;
- }
+DaisyChain.prototype._makeWalker = function(pos, callback) {
+ var i = pos+0
+ , self = this
+ , next
+ , last_memo
+ ;
- forward(null, task);
- });
+ function walker(err, result) {
+ if (err) return callback(err, last_memo, i-1);
+
+ next = self.queues[i++]
+ if (!next) return callback(null, result);
+
+ last_memo = result;
+ // process.nextTick(function(){
+ next.push(result, walker);
+ // });
+ }
+
+ return walker;
}
/** This is somewhat deceptive as it indicates the total quantity of tasks in all the queues, not just the first. */
View
2  package.json
@@ -1,6 +1,6 @@
{
"name": "async-daisychain",
- "version": "0.0.1",
+ "version": "0.1.1",
"description": "Easily create and manage daisychains of async queues.",
"main": "lib/index.js",
"scripts": {
View
141 test/test.js
@@ -1,31 +1,58 @@
var daisychain = require('../lib/index');
var assert = require('assert');
-function sq(n, callback) {
- if (n > Math.pow(3, 16)+1) return callback("Too Big");
- callback(null, n * n);
+//// A Harness of sorts ////
+var tests_remaining = 19;
+function completed() {
+ tests_remaining--;
}
-var tests = 9, incomplete_mon = setTimeout(function(){ assert.fail(0, tests, "Not all assertions exercised"); }, 1000);
-function completed(is_last) {
- tests--;
- if (tests == 0) {
- clearTimeout(incomplete_mon);
+process.on("exit", function() {
+ if (tests_remaining !== 0) {
+ assert.fail(tests_remaining, 0, "Exited, but " + tests_remaining + " test(s) did not call complete().");
+ } else if (tests_remaining < 0) {
+ assert.fail(0, tests_remaining, Math.abs(tests_remaining) + " tests too many ran.");
+ } else {
+ // All OK
}
+});
- if (tests < 0) {
- assert.fail(0, tests, "Test count is wrong.");
- }
- if (is_last && tests !== 0) {
- assert.fail("complete(true) called prematurely", tests);
- }
+//// Workers ////
+function sq(n, callback) {
+ callback(null, n * n);
}
+function plus1(n, callback) {
+ if (n == 5) return callback("Got 5");
+ callback(null, n + 1);
+}
+
+
+//// Tests ////
+var dc3 = daisychain([
+ function(memo, cb) { if (memo != "A") return cb("A Expected"); cb(null, "B"); }
+ ,function(memo, cb) { if (memo != "B") return cb("B Expected"); cb(null, "C"); }
+ ,function(memo, cb) { if (memo != "C") return cb("C Expected"); cb(null, "D"); }
+ ,function(memo, cb) { if (memo != "D") return cb("D Expected"); cb(null, "E"); }
+ ], 1);
+dc3.push("A", function(err, result) {
+ assert.ifError(err);
+ assert.equal(result, "E");
+ completed();
+})
+
+dc3.push("B", function(err, result) {
+ assert.equal(err, "A Expected");
+ completed();
+})
+
+
+
var dc = daisychain([sq, sq, sq, sq], 1);
dc.saturated = completed;
dc.empty = completed;
-dc.drain = completed.bind(null, true);
+dc.drain = completed;
dc.push(2, function(err, result) {
assert.ifError(err);
@@ -39,15 +66,87 @@ dc.push(3, function(err, result) {
completed();
});
-dc.push(256, function(err, result) {
- assert.equal(err, "Too Big");
- completed();
-});
-
dc.push([2,2,2], function(err, result) {
assert.ifError(err);
assert.equal(result, Math.pow(2, 16));
completed();
});
-assert.equal(dc.length(), 6);
+assert.equal(dc.length(), 5);
+
+
+
+var dc4 = daisychain(
+ [
+ // this function will only work once, every successive call will fail.
+ function(memo, cb) {
+ if (!this.toggle) {
+ this.toggle = true;
+ cb(null, "y");
+ } else {
+ cb(new Error("No, no, no!"));
+ }
+ }
+ // this function has a repeating pattern of fail, succeed, fail, etc.
+ ,function(memo, cb) {
+ this.toggle = !this.toggle;
+ if (this.toggle) {
+ cb(new Error("Uh, oh"));
+ } else {
+ cb(null, "z");
+ }
+ }
+ ], 1);
+dc4.push("x", function dc4_cb(err, result, queue_id) {
+ if (err) { // first time daisy chain tells us worker #2 failed.
+ assert.equal(result, "y");
+ assert.equal(queue_id, 1); // first worker succeeded, second failed, retry should be the second.
+ dc4.retry(queue_id, result, dc4_cb);
+ return;
+ }
+ // second time is a success.
+ assert.equal(result, "z");
+ completed();
+})
+
+
+//// Allowing three simulatenous task processors and a total of six processes, simulate a temporary failing io effort.
+var seeit = false;
+var dc5 = daisychain(
+ [
+ // this function just passes the memo through, but we can take a peek if we want. (seeit=true)
+ function(memo, cb) {
+ process.nextTick(function(){
+ setTimeout(function(){
+ seeit && console.log("HELLO:", memo);
+ cb(null, memo);
+ }, 10);
+ });
+ }
+ // this function fails 12 times then succeeds.
+ ,function(memo, cb) {
+ this.count = (this.count || 0) +1;
+ if (this.count > 12) {
+ seeit && console.log("SUCCESS["+this.count+"]:", memo);
+ cb(null, [memo, this.count]);
+ } else {
+ var delay = this.count + 10; // non-deterministic; 1-10ms random delay use: parseInt(Math.random()*10)+1;
+ seeit && console.log("DOWN["+this.count+"]:", memo);
+ // pretend server is down after a brief delay of trying to connect.
+ process.nextTick(function() {
+ setTimeout(function(){
+ cb(new Error("server down"));
+ }, delay);
+ });
+ }
+ }
+ ], 3);
+dc5.push(["X", "Y", "Z", "P", "D", "Q", "Jane", "John"], function dc5_cb(err, result, queue_id) {
+ if (err) {
+ assert.equal(queue_id, 1);
+ dc5.retry(queue_id, result, dc5_cb);
+ return;
+ }
+ completed();
+})
+
Please sign in to comment.
Something went wrong with that request. Please try again.