Skip to content

Commit

Permalink
Added the Funnel moduel
Browse files Browse the repository at this point in the history
  • Loading branch information
hbouvier committed Mar 15, 2012
1 parent 90b6d39 commit 899cdd5
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 1 deletion.
60 changes: 60 additions & 0 deletions modules/funnel.js
@@ -0,0 +1,60 @@
var util = require('util'),
cpus = require('os').cpus().length;

module.exports = (function () {

///////////////////////// PRIVATE METHODS /////////////////////////////////

var executors = cpus; // # of parallel tasks executed at the same time
var queue = [];
var running = 0;

function execute() {
if (running >= executors || queue.length === 0) return;
++running;
util.log('funnel|execute|running='+running);

var task = queue.shift();
var obj = typeof(task[0]) === 'object' ? task.shift() : null;
var func = task.shift();
var callback = (task.length > 0 && typeof(task[task.length -1]) === 'function') ? task.pop() : null;
var args = task; // task.slice(1, task.length -1);
args.push(done);
//console.log('obj:',obj,'args:'+args+', callback:' + callback);
func.apply(obj, args);

function done(err, result) {
--running;
util.log('funnel|execute|done|running='+running);
if (callback) {
process.nextTick(function () {
callback.apply(obj, err, result);
});
}
process.nextTick(execute);
}
}

/////////////////////////// PUBLIC CLASS //////////////////////////////////

function Funnel(nbParallelExecutors) {
executors = nbParallelExecutors || executors;
util.log('funnel|executors='+executors);
}

// queue(function) or queue(function, param1,...,paramX) <- paramX must not be a function otherwise it will be taken as the callback
// queue(function, callback) or queue(function, param1,...,paramX, callback)
// queue(this, method) or queue(this, method, param1,...,paramX) <- paramX must not be a function otherwise it will be taken as the callback
// queue(this, method, callback) or queue(this, method, param1,...,paramX, callback)
Funnel.prototype.queue = function() {
var task = Array.prototype.slice.call(arguments);
if (task.length < 1)
throw new Error('Funnel: the first parameter has to be the "function" to execute');
queue.push(task);
process.nextTick(execute);
};

///////////////////////////////////////////////////////////////////////////////

return Funnel;
})();
5 changes: 4 additions & 1 deletion modules/tts.js
@@ -1,6 +1,7 @@
var util = require('util'), var util = require('util'),
spawn = require('child_process').spawn, spawn = require('child_process').spawn,
fs = require('fs'), fs = require('fs'),
Funnel = require('./funnel'),
tts = { tts = {
init: function () { init: function () {
this.cache = __dirname + '/cache'; this.cache = __dirname + '/cache';
Expand All @@ -9,6 +10,7 @@ var util = require('util'),
this.cacheCallbacks = {}; this.cacheCallbacks = {};
this.cacheFiles = {}; this.cacheFiles = {};
this.ttl = 0; // in ms, 0 == never expire this.ttl = 0; // in ms, 0 == never expire
this.funnel = new Funnel();


try { fs.mkdirSync(this.cache); } catch (e) { } // Ignore try { fs.mkdirSync(this.cache); } catch (e) { } // Ignore


Expand Down Expand Up @@ -46,7 +48,8 @@ var util = require('util'),
} }
// Generate it // Generate it
this.cacheCallbacks[filename] = [callback]; this.cacheCallbacks[filename] = [callback];
this.generate(filename, text, voice, generatedCallback); this.funnel.queue(this, this.generate, filename, text, voice, generatedCallback);
//this.generate(filename, text, voice, generatedCallback);


function generatedCallback(err, result) { function generatedCallback(err, result) {
var $this = this; var $this = this;
Expand Down
91 changes: 91 additions & 0 deletions tests/testFunnel.js
@@ -0,0 +1,91 @@
var Funnel = require('../modules/funnel.js'),
util = require('util'),
spawn = require('child_process').spawn;

function sleep(seconds, callback) {
var command = '/bin/sleep',
options = {
"cwd": "/tmp/",
"env": {
"ENV":"development"
},
"customFds":[-1, -1, -1]
},
args = [
seconds || 1
],
output = '';

util.log('sleep='+seconds);
var child = spawn(command, args, options);
child.on('exit', function (code, signal) {
util.log('sleep|exit=' + code + '|signal=' + (signal ? signal : 'none'));
if (code === 0) {
callback(null);
} else {
callback(new Error('sleep returned ' + code + ' or signal=' + (signal ? signal : 'none') + '\n' + output + '\n'));
}
});
child.stdout.on('data', function (data) {
util.log('sleep|stdout=' + data + '\n');
output += 'STDOUT:'+ data + '\n';
});
child.stderr.on('data', function (data) {
util.log('sleep|stderr=' + data + '\n');
output += 'STDERR:' + data + '\n';
});
}


function Sleep(time) {
this.time = time;
}

Sleep.prototype.doit = function(callback) {
util.log('going to Sleep ' + this.time + ' seconds');
sleep(this.time, callback);
};

Sleep.prototype.done = function(err, result) {
util.log('Sleep of ' + this.time + ' seconds, done');
};

var funnel = new Funnel(4);

function sleepDone(err, result) {
util.log('testFunnel: sleep done');
}

var sleep1 = new Sleep(1);
var sleep2 = new Sleep(2);
var sleep3 = new Sleep(3);
var sleep5 = new Sleep(5);
var sleep10 = new Sleep(10);
//sleep5.doit(sleep5.done);

/*
funnel.queue(sleep, 8, sleepDone);
funnel.queue(sleep, 7, sleepDone);
funnel.queue(sleep, 6, sleepDone);
funnel.queue(sleep, 5, sleepDone);
funnel.queue(sleep, 4, sleepDone);
funnel.queue(sleep, 3, sleepDone);
funnel.queue(sleep, 2, sleepDone);
funnel.queue(sleep, 1, sleepDone);
*/


funnel.queue(sleep5, sleep5.doit, sleep5.done);
funnel.queue(sleep10, sleep10.doit, sleep10.done);
funnel.queue(sleep5, sleep5.doit, sleep5.done);
funnel.queue(sleep10, sleep10.doit, sleep10.done);
funnel.queue(sleep5, sleep5.doit, sleep5.done);
funnel.queue(sleep1, sleep1.doit, sleep1.done);
funnel.queue(sleep2, sleep2.doit, sleep2.done);
funnel.queue(sleep3, sleep3.doit, sleep3.done);


util.log('testFunnel: all done');


0 comments on commit 899cdd5

Please sign in to comment.