Skip to content
Browse files

added a better caching mechanism

  • Loading branch information...
1 parent 899cdd5 commit 30946fed018a92cd01d657a249a3fa5ce9326550 @hbouvier committed
Showing with 179 additions and 80 deletions.
  1. +96 −35 modules/cache.js
  2. +14 −8 modules/funnel.js
  3. +47 −14 modules/tts.js
  4. +1 −1 package.json
  5. +10 −8 tests/testFunnel.js
  6. +11 −14 ttsserver.js
View
131 modules/cache.js
@@ -1,43 +1,104 @@
-// A General wrapper with cache and batch and timeouts
var util = require('util');
-module.exports = function cache(fn) {
- var requestBatches = {};
- var requestCache ={};
- wrapped.cacheLifetime = 1000 * 60 *60 * 24; // -1 == never expire
-
- function wrapped(key, callback) {
- if (requestCache.hasOwnProperty(key)) {
- var value = requestCache[key];
- process.nextTick(function () {
- callback(null, value);
- });
- return;
- }
- if (requestBatches.hasOwnProperty(key)) {
- requestBatches[key].push(callback);
- util.log('|tts|queued-file-from-cache='+key+'|queuesize='+requestBatches[key].length);
- return;
- }
- util.log('|tts|file-from-disk='+key);
- requestBatches[key] = [callback];
- fn(key, onDone);
- function onDone(err, result) {
- util.log('|tts|file-from-queue='+key+'|queuesize='+requestBatches[key].length);
- if(!err && wrapped.cacheLifetime) {
- requestCache[key] = result;
- if (wrapped.cacheLifetime !== -1) {
+module.exports = (function () {
+
+ ///////////////////////// PRIVATE METHODS /////////////////////////////////
+
+ var requestQueue = {};
+ var resourceCache = {};
+ var defaultCacheTTL = 1000;
+ var defaultCacheSize = 100;
+ var verbose = true;
+ var debug = true;
+
+ /// Stats
+ var nbCacheHit = 0;
+ var nbCacheMiss = 0;
+ var nbFetch = 0;
+ var nbFetchInProgress = 0;
+
+ function printStats() {
+ if (verbose) util.log('cache|hit=' + nbCacheHit + '|miss=' + nbCacheMiss + '|fetch='+nbFetch+'|fetching='+nbFetchInProgress+'|cacheSize='+resourceCache.length+'|requestQueueSize='+requestQueue.length);
+ }
+
+ function execute(task) {
+ var key = task.shift();
+ var obj = typeof(task[0]) === 'object' ? task.shift() : null;
+ var func = task.shift();
+ var args = task; // All the args
+ args.push(done);
+ ++nbFetch;
+ ++nbFetchInProgress;
+
+ func.apply(obj, args);
+
+ function done(err, resource) {
+ var $this = this;
+ --nbFetchInProgress;
+ if (verbose) util.log('cache|execute|done|err='+err+'|result='+(resource ? 'found':'null'));
+ if (!err && defaultCacheTTL) { // ttl === 0 --> expire imediatly.
+ resourceCache[key] = resource;
+ /*
+ if (resourceCache.length > defaultCacheSize) {
+ if (verbose) util.log('cache|expire|key='+key);
+ resourceCache.shift();
+ }
+ */
+ if (defaultCacheTTL !== -1) { // ttl === -1 --> never expire
setTimeout(function () {
- delete requestCache[key];
- }, wrapped.cacheLifetime);
+ if (verbose) util.log('cache|expire|key='+key);
+ delete resourceCache[key];
+ }, defaultCacheTTL);
}
}
- var batch = requestBatches[key];
- delete requestBatches[key];
- for (var i =0, l = batch.length; i < l; i++) {
- batch[i].apply(null, arguments);
+
+ var pendingRequests = requestQueue[key];
+ delete requestQueue[key];
+ for (var i = 0, size = pendingRequests.length ; i < size ; ++i) {
+ if (debug) util.log('cache|calling='+i+'|err='+err+'|resource='+(resource ? 'found':'null'));
+ pendingRequests[i].call($this, err, resource);
}
+ printStats();
}
}
- return wrapped;
-};
+
+ /////////////////////////// PUBLIC CLASS //////////////////////////////////
+
+ function Cache(size, ttl) {
+ defaultCacheSize = size || defaultCacheSize;
+ defaultCacheTTL = ttl || defaultCacheTTL;
+ if (verbose) util.log('Cache|defaultCacheSize='+defaultCacheSize+'|defaultCacheTTL='+defaultCacheTTL);
+ }
+
+ Cache.prototype.queue = function(key) {
+ var task = Array.prototype.slice.call(arguments);
+ if (task.length < 3)
+ throw new Error('Cache: The first parameter has to be the key for the resource and the second parameter the "function" to obtain the resource and the last is the callback');
+ var callback = task.pop();
+ // The resource is in the cache
+ if (resourceCache.hasOwnProperty(key)) {
+ var resource = resourceCache[key];
+ ++nbCacheHit;
+ process.nextTick(function () {
+ callback(null, resource);
+ });
+ printStats();
+ return;
+ }
+ ++nbCacheMiss;
+ if (requestQueue.hasOwnProperty(key)) {
+ requestQueue[key].push(callback);
+ if (verbose) util.log('cache|queued|key='+key+'|queueSize='+requestQueue[key].length);
+ printStats();
+ return;
+ }
+ if (verbose) util.log('cache|fetch|key='+key);
+ requestQueue[key] = [callback];
+ execute(task);
+ printStats();
+ };
+
+///////////////////////////////////////////////////////////////////////////////
+
+ return Cache;
+})();
View
22 modules/funnel.js
@@ -8,11 +8,13 @@ module.exports = (function () {
var executors = cpus; // # of parallel tasks executed at the same time
var queue = [];
var running = 0;
-
+ var debug = true;
+
function execute() {
- if (running >= executors || queue.length === 0) return;
+ if (running >= executors || queue.length === 0)
+ return;
++running;
- util.log('funnel|execute|running='+running);
+ if (debug) util.log('funnel|execute|running='+running+'|pending='+queue.length);
var task = queue.shift();
var obj = typeof(task[0]) === 'object' ? task.shift() : null;
@@ -24,14 +26,16 @@ module.exports = (function () {
func.apply(obj, args);
function done(err, result) {
+ var $this = this;
--running;
- util.log('funnel|execute|done|running='+running);
if (callback) {
process.nextTick(function () {
- callback.apply(obj, err, result);
+ callback.call($this, err, result);
});
}
- process.nextTick(execute);
+ if (running < executors && queue.length > 0)
+ process.nextTick(execute);
+ if (debug) util.log('funnel|done|running='+running+'|pending='+queue.length);
}
}
@@ -39,7 +43,7 @@ module.exports = (function () {
function Funnel(nbParallelExecutors) {
executors = nbParallelExecutors || executors;
- util.log('funnel|executors='+executors);
+ if (debug) util.log('funnel|executors='+executors);
}
// queue(function) or queue(function, param1,...,paramX) <- paramX must not be a function otherwise it will be taken as the callback
@@ -51,7 +55,9 @@ module.exports = (function () {
if (task.length < 1)
throw new Error('Funnel: the first parameter has to be the "function" to execute');
queue.push(task);
- process.nextTick(execute);
+ if (running < executors)
+ process.nextTick(execute);
+ if (debug) util.log('funnel|queue|running='+running+'|pending='+queue.length);
};
///////////////////////////////////////////////////////////////////////////////
View
61 modules/tts.js
@@ -1,35 +1,68 @@
-var util = require('util'),
- spawn = require('child_process').spawn,
- fs = require('fs'),
+var util = require('util'),
+ spawn = require('child_process').spawn,
+ fs = require('fs'),
+ Cache = require('./cache'),
Funnel = require('./funnel'),
- tts = {
+ tts = {
init: function () {
- this.cache = __dirname + '/cache';
+ this.cachePath = __dirname + '/cache';
this.format = '.m4a';
- this.regex = /\.m4a$/;
- this.cacheCallbacks = {};
+ //this.regex = /\.m4a$/;
+ //this.cacheCallbacks = {};
this.cacheFiles = {};
- this.ttl = 0; // in ms, 0 == never expire
- this.funnel = new Funnel();
+ //this.ttl = 0; // in ms, 0 == never expire
+ this.funnel = new Funnel(2); // One executor per CPU
+ this.cache = new Cache(100,1000*60*60);
- try { fs.mkdirSync(this.cache); } catch (e) { } // Ignore
+ try { fs.mkdirSync(this.cachePath); } catch (e) { } // Ignore
+ /*
var files = fs.readdirSync(this.cache + '/');
for (var index in files) {
if (files[index].match(this.regex)) {
this.cacheFiles[this.cache + '/' + files[index]] = this.cache + '/' +files[index];
}
}
+ */
+ },
+ loadFromDisk: function (filename, text, voice, callback) {
+ var $this = this;
+ fs.stat(filename, function (err) {
+ if (err) {
+ $this.funnel.queue($this, $this.generate, filename, text, voice, function(/*err, filename */) {
+ fs.readFile(filename, function (err, data) {
+ $this.cacheFiles[filename] = data;
+ if (callback) {
+ process.nextTick(function() {
+ callback(err, data);
+ });
+ }
+ });
+ });
+ } else {
+ fs.readFile(filename, function (err, data) {
+ $this.cacheFiles[filename] = data;
+ if (callback) {
+ process.nextTick(function() {
+ callback(err, data);
+ });
+ }
+ });
+ }
+ });
},
play : function (text, voice, callback) {
voice = voice || 'Alex';
var filename = voice + '_' + this.format + '_' + text;
+ filename = this.cachePath + '/' + filename.replace(/[^a-zA-Z0-9]/g, '_') + this.format;
+ util.log('tts|filename='+filename);
+ this.cache.queue(filename, this, this.loadFromDisk, filename, text, voice, callback);
- filename = this.cache + '/' + filename.replace(/[^a-zA-Z0-9]/g, '_') + this.format;
- this.getWaveFromCache(filename, text, voice, callback);
+
+ //this.getWaveFromCache(filename, text, voice, callback);
},
-
+ /*
getWaveFromCache : function (filename, text, voice, callback) {
// Already in cache
if (this.cacheFiles.hasOwnProperty(filename)) {
@@ -70,7 +103,7 @@ var util = require('util'),
}
}
},
-
+ */
generate : function (filename, text, voice, callback) {
var $this = this,
command = '/usr/bin/say',
View
2 package.json
@@ -1,6 +1,6 @@
{
"name": "node-tts",
- "version": "0.0.2",
+ "version": "0.0.3",
"description": "A tts server form Mac OS X",
"author": "Henri Bouvier",
"main": "ttsserver",
View
18 tests/testFunnel.js
@@ -19,7 +19,7 @@ function sleep(seconds, callback) {
util.log('sleep='+seconds);
var child = spawn(command, args, options);
child.on('exit', function (code, signal) {
- util.log('sleep|exit=' + code + '|signal=' + (signal ? signal : 'none'));
+ //util.log('sleep|exit=' + code + '|signal=' + (signal ? signal : 'none'));
if (code === 0) {
callback(null);
} else {
@@ -27,11 +27,11 @@ function sleep(seconds, callback) {
}
});
child.stdout.on('data', function (data) {
- util.log('sleep|stdout=' + data + '\n');
+ //util.log('sleep|stdout=' + data + '\n');
output += 'STDOUT:'+ data + '\n';
});
child.stderr.on('data', function (data) {
- util.log('sleep|stderr=' + data + '\n');
+ //util.log('sleep|stderr=' + data + '\n');
output += 'STDERR:' + data + '\n';
});
}
@@ -47,13 +47,16 @@ Sleep.prototype.doit = function(callback) {
};
Sleep.prototype.done = function(err, result) {
- util.log('Sleep of ' + this.time + ' seconds, done');
+ ++nbExecuted;
+ util.log('testFunnel: sleep(' + this.time + ' seconds) #' + nbExecuted + ' completed');
};
var funnel = new Funnel(4);
+var nbExecuted = 0;
function sleepDone(err, result) {
- util.log('testFunnel: sleep done');
+ ++nbExecuted;
+ util.log('testFunnel: sleep #' + nbExecuted + ' completed');
}
var sleep1 = new Sleep(1);
@@ -61,9 +64,8 @@ var sleep2 = new Sleep(2);
var sleep3 = new Sleep(3);
var sleep5 = new Sleep(5);
var sleep10 = new Sleep(10);
-//sleep5.doit(sleep5.done);
-/*
+
funnel.queue(sleep, 8, sleepDone);
funnel.queue(sleep, 7, sleepDone);
funnel.queue(sleep, 6, sleepDone);
@@ -73,7 +75,7 @@ funnel.queue(sleep, 4, sleepDone);
funnel.queue(sleep, 3, sleepDone);
funnel.queue(sleep, 2, sleepDone);
funnel.queue(sleep, 1, sleepDone);
-*/
+
funnel.queue(sleep5, sleep5.doit, sleep5.done);
View
25 ttsserver.js
@@ -1,9 +1,9 @@
var express = require('express'),
util = require('util'),
tts = require('./modules/tts').tts,
- cache = require('./modules/cache'),
+// cache = require('./modules/cache'),
fs = require('fs'),
- readFromCache = cache(fs.readFile),
+// readFromCache = cache(fs.readFile),
port = process.env.PORT || 8082,
app = express.createServer(),
voice = 'Alex',
@@ -44,18 +44,15 @@ app.configure('production', function () {
app.get('/ws/tts', function (req, res) {
tts.play(req.param('text', 'No text passed'),
req.param('voice', voice),
- function (err, filename) {
- util.log('|tts|get|err=' +err + '|filename=' + filename);
- readFromCache(filename, function(err, data) {
- if (err) {
- res.writeHead(404, {"Content-Type": "text/html"});
- res.end('<html><body><pre>Unable to generate tts <br/>\n' + err + '</pre></body></html>');
- } else {
- res.writeHead(200, {'Content-Type': 'audio/mp4'});
- res.end(data);
- //res.sendfile(filename);
- }
- });
+ function (err, data) {
+ util.log('|tts|get|err=' +err);
+ if (err) {
+ res.writeHead(404, {"Content-Type": "text/html"});
+ res.end('<html><body><pre>Unable to generate tts <br/>\n' + err + '</pre></body></html>');
+ } else {
+ res.writeHead(200, {'Content-Type': 'audio/mp4'});
+ res.end(data);
+ }
});
});

0 comments on commit 30946fe

Please sign in to comment.
Something went wrong with that request. Please try again.