diff --git a/app.js b/app.js index 97451b72..aae2c13d 100644 --- a/app.js +++ b/app.js @@ -76,8 +76,14 @@ function generateFeatures(url, client, clientid) { var db = {}; var server; +var tempPath = 'temp' + (cluster.isWorker ? '-' + cluster.worker.process.pid: ''); function run(keys) { + try { + fs.mkdirSync(tempPath); + } catch(e) { + console.log('work dir already exists'); + } app.use('/static', express.static(__dirname + '/static')); if (keys === undefined) { @@ -109,6 +115,7 @@ function run(keys) { url: referer, userAgent: ua }; + var tempStream = fs.createWriteStream(tempPath + '/' + clientid); var baseStats = {}; @@ -130,34 +137,52 @@ function run(keys) { break; default: obfuscate(data); - if (!db[referer][clientid].peerConnections[data[1]]) { - db[referer][clientid].peerConnections[data[1]] = []; - baseStats[data[1]] = {}; - } - if (data[0] === 'getstats') { // delta-compressed - data[2] = statsDecompressor(baseStats[data[1]], data[2]); - baseStats[data[1]] = JSON.parse(JSON.stringify(data[2])); - } - if (data[0] === 'getStats' || data[0] === 'getstats') { - data[2] = statsMangler(data[2]); - data[0] = 'getStats'; - } - db[referer][clientid].peerConnections[data[1]].push({ - time: new Date(), - type: data[0], - value: data[2] - }); + data.time = new Date().getTime(); + tempStream.write(JSON.stringify(data) + '\n'); break; } }); client.on('close', function() { - console.log('closed'); - - var client = db[referer][clientid]; - dump(referer, client, clientid); - generateFeatures(referer, client, clientid); - delete db[referer][clientid]; + tempStream.on('finish', function() { + fs.readFile(tempStream.path, {encoding: 'utf-8'}, function(err, data) { + if (!err) { + data.split('\n').forEach(function(line) { + if (line.length) { + var data = JSON.parse(line); + var time = new Date(data.time); + delete data.time; + if (!db[referer][clientid].peerConnections[data[1]]) { + db[referer][clientid].peerConnections[data[1]] = []; + baseStats[data[1]] = {}; + } + if (data[0] === 'getstats') { // delta-compressed + data[2] = statsDecompressor(baseStats[data[1]], data[2]); + baseStats[data[1]] = JSON.parse(JSON.stringify(data[2])); + } + if (data[0] === 'getStats' || data[0] === 'getstats') { + data[2] = statsMangler(data[2]); + data[0] = 'getStats'; + } + db[referer][clientid].peerConnections[data[1]].push({ + time: time, + type: data[0], + value: data[2] + }); + } + }); + } + // we proceed even if there was an error. + var client = db[referer][clientid]; + delete db[referer][clientid]; + fs.unlink(tempStream.path, function(err, data) { + // we're good... + }); + dump(referer, client, clientid); + generateFeatures(referer, client, clientid); + }); + }); + tempStream.end(); }); }); } @@ -175,6 +200,14 @@ if (require.main === module && cluster.isMaster) { cluster.on('exit', function(worker, code, signal) { console.log('worker', worker.process.pid, 'died, restarting'); cluster.fork(); + + // clean up after worker. + // TODO: Possibly recover data. For now: throw it away. + var path = 'temp-' + worker.process.pid; + fs.readdirSync(path).forEach(function(fname) { + fs.unlinkSync(path + '/' + fname); + }); + fs.rmdirSync(path); }); } else { run();