Permalink
Browse files

Clustered server and properly buffered writes to CSV file

  • Loading branch information...
1 parent add4a45 commit 0aa95a28b958ba5055c0a8cc8db05ea8eee128d5 @thinkjson thinkjson committed Oct 20, 2011
Showing with 20 additions and 9 deletions.
  1. +9 −7 listeners/csv.js
  2. +11 −2 server.js
View
@@ -4,29 +4,31 @@ exports = module.exports = function(config) {
* Rotate the log
*/
this.rotateLog = function() {
- var timestamp = Math.floor((new Date()).getTime() / 1000);
+ var timestamp = (Math.floor((new Date()).getTime() / 1000) + "").substring(5);
+ var seed = Math.floor(Math.random() * 99999);
self.oldLogfile = this.logfile;
self.oldLog = self.log;
- self.logfile = self.config.buffer + self.config.input + timestamp + ".csv.part";
+ self.logfile = self.config.buffer + self.config.input + timestamp + seed + ".csv";
self.log = fs.createWriteStream(self.logfile);
+ self.log.write(null);
// Push log file to archive
if (self.oldLog) {
self.oldLog.on('close', function() {
- fs.rename(self.oldLogfile, self.config.archive + self.config.input + timestamp + ".csv", function(err) {
+ fs.rename(self.oldLogfile, self.config.archive + self.config.input + timestamp + seed + ".csv", function(err) {
if (! err) fs.unlink(self.oldLogfile);
});
});
- self.oldLog.end();
+ self.oldLog.destroySoon();
}
};
if (! config.fields) throw { message: "You must specify the fields to extract from the incoming data" };
var self = this;
self.config = config;
self.rotateLog();
- this.interval = config.interval || 3600000;
+ self.interval = config.interval || 3600000;
setInterval(function() {
self.rotateLog();
}, self.interval);
@@ -36,12 +38,12 @@ exports = module.exports = function(config) {
* Required method
*/
this.post = function(data) {
- // FIXME - pull selected fields out of data and write as CSV row
+ // Pull selected fields out of data and write as CSV row
var row = [];
for (field in self.config.fields) {
row.push(data[self.config.fields[field]]);
}
- var raw_data = '"' + row.join('","') + '"\n';
+ var raw_data = '"' + row.join('","') + '","' + self.count + '"\n';
self.log.write(raw_data, "utf8");
};
};
View
@@ -2,6 +2,7 @@
var express = require('express');
var yaml = require('yaml');
var fs = require('fs');
+var cluster = require('cluster');
// Load listeners
var listener_files = fs.readdirSync(__dirname + "/listeners");
@@ -19,6 +20,9 @@ global.telemetry = new (require(__dirname + '/telemetry.js'))(config);
// Create the telemetry server and assign inputs
var app = express.createServer();
app.use(express.bodyParser());
+app.get('/', function(req, res, next) {
+ res.end("Telemetry server operational");
+});
app.post('/input/:input', function(req, res, next) {
var input = req.params.input;
if (telemetry.inputs[input] === undefined) {
@@ -45,6 +49,11 @@ app.post('/input/:input', function(req, res, next) {
});
// Start the telemetry server
-var port = process.argv[2] || 7000;
-app.listen(port);
+var port = process.argv[2] ? parseInt(process.argv[2]) : 7000;
+cluster(app)
+ .use(cluster.logger('logs'))
+ .use(cluster.stats())
+ .use(cluster.pidfiles('pids'))
+ .use(cluster.repl(8888))
+ .listen(port);
console.log("Listening on port", port);

0 comments on commit 0aa95a2

Please sign in to comment.