Permalink
Browse files

Added Fifo.forEach() for parallel execution

  • Loading branch information...
cdauth committed Jun 11, 2013
1 parent 4317f4e commit 15df633ab8e21d8b3d32f651e0d38ec6477ddd62
Showing with 40 additions and 20 deletions.
  1. +40 −20 fifo.js
View
60 fifo.js
@@ -64,30 +64,50 @@ Fifo.prototype = {
this.__check();
},
- forEachSeries : function(iterator, callback) {
+ forEachLimit : function(limit, iterator, callback) {
var t = this;
- handleNextItem();
+ var running = 0;
+ var ended = false;
+ var endError = null;
+
+ function startOne() {
+ if(limit > 0 && running >= limit || ended)
+ return;
- function handleNextItem() {
+ running++;
t.next(function(err) {
- if(err === true)
- callback(null);
- else if(err)
- callback(err);
- else
- {
- var args = utils.toProperArray(arguments);
- args.shift();
- args.push(function(err) {
- if(err)
- callback(err);
- else
- setImmediate(handleNextItem);
- });
- iterator.apply(null, args);
- }
- });
+ if(err)
+ return finishOne(err);
+
+ var args = utils.toProperArray(arguments);
+ iterator.apply(null, args.slice(1).concat([ utils.callback(finishOne) ]));
+
+ setImmediate(startOne);
+ })
}
+
+ function finishOne(err) {
+ running--;
+ if(err && !ended) {
+ ended = true;
+ endError = (err === true ? null : err);
+ }
+
+ if(running == 0 && ended)
+ callback(endError);
+ else
+ setImmediate(startOne);
+ }
+
+ setImmediate(startOne);
+ },
+
+ forEachSeries : function(iterator, callback) {
+ this.forEachLimit(1, iterator, callback);
+ },
+
+ forEach : function(iterator, callback) {
+ this.forEachLimit(0, iterator, callback);
},
toArraySingle : function(callback) {

0 comments on commit 15df633

Please sign in to comment.