Permalink
Browse files

LifeChurch.tv improvements

  • Loading branch information...
1 parent 2c0a187 commit 0889f394031b6effdc3eb0ebe8a2b2955c4d77af @thinkjson thinkjson committed Apr 14, 2013
Showing with 1,300 additions and 131 deletions.
  1. +14 −0 Makefile
  2. +65 −0 app.js
  3. +66 −0 docs/deployment.md
  4. +65 −0 docs/sending_data.md
  5. +15 −0 helpers.js
  6. +69 −9 listeners/couchdb.js
  7. +62 −46 listeners/csv.js
  8. +104 −0 listeners/json.js
  9. +130 −0 listeners/mysql.js
  10. +170 −0 listeners/postgresql.js
  11. +5 −5 package.json
  12. +59 −15 readme.md
  13. +71 −52 server.js
  14. +189 −4 telemetry.js
  15. +189 −0 test/telemetry.js
  16. +27 −0 testconfig.js
View
@@ -0,0 +1,14 @@
+REPORTER = spec
+INTERFACE = tdd
+
+test: clean
+ @NODE_ENV=test ./node_modules/.bin/mocha \
+ --reporter $(REPORTER) \
+ --ui $(INTERFACE) \
+ --ignore-leaks \
+ --timeout 5000
+
+clean:
+ - rm *.csv
+
+.PHONY: test
View
65 app.js
@@ -0,0 +1,65 @@
+// Import libraries
+var express = require('express');
+var fs = require('fs');
+var config = require(process.env.TELEMETRY_CONFIG || __dirname + "/config.js");
+
+
+// Load listeners
+var listener_files = fs.readdirSync(__dirname + "/listeners");
+for (var i = 0; i < listener_files.length; i++) {
+ var listener = listener_files[i];
+ global[listener.replace('.js', '')] =
+ require(__dirname + "/listeners/" + listener);
+}
+
+//Load configuration and inputs
+// Create telemetry object
+global.telemetry = new (require(__dirname + '/telemetry.js'))(config.inputs);
+telemetry.status = 'operational';
+telemetry.errors = 0;
+
+// Log metadata on every transaction
+telemetry.log_metadata = config.log_metadata === undefined ?
+ true : config.log_metadata === true;
+
+// Determine how long to keep workers alive before killing them off
+if (! telemetry.shutdownTimeout) telemetry.shutdownTimeout = 3000;
+
+// Create the telemetry server
+var options;
+try {
+ options = {
+ key: fs.readFileSync(process.env.TELEMETRY_KEY || __dirname + '/telemetry-key.pem'),
+ cert: fs.readFileSync(process.env.TELEMETRY_CERT || __dirname + '/telemetry-cert.pem')
+ };
+} catch (e) {}
+var app = options === undefined ?
+ require('express').createServer() :
+ require('express').createServer(options);
+var bodyParser = express.bodyParser();
+
+// Properly handle errors
+app.error(function(err, req, res, next) {
+ if (err instanceof SyntaxError) {
+ res.send({ error: 'Could not parse data' }, 415);
+ } else {
+ res.send({ error: 'Internal Server Error' }, 500);
+ }
+});
+
+// Status page
+app.all('/', function(req, res, next) {
+ telemetry.get_status(req, res, next);
+});
+
+// Assign inputs for data collection
+app.post('/input/:input', bodyParser, function(req, res, next) {
+ telemetry.post_message(req, res, next);
+});
+
+// Create bulk load endpoint
+app.post('/input', bodyParser, function(req, res, next) {
+ telemetry.bulk_load(req, res, next);
+});
+
+module.exports = app;
View
@@ -0,0 +1,66 @@
+# Deployment
+
+This guide will describe how to deploy telemetry to a Linux server with nohup and monit.
+We will use Debian in the examples below. Adjust the process for your distribution of choice.
+
+Be sure you have installed Node.js 0.6 or later, as node-telemetry will not work
+with Node 0.4 and earlier. First, download the code to /opt/telemetry. Then create
+a script called /etc/init.d/telemetry marked as executable with the following contents:
+
+ #!/bin/bash
+ DIR=/opt/telemetry
+ PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin
+ NODE_PATH=/usr/local/lib/node_modules
+ NODE=/usr/local/bin/node
+
+ test -x $NODE || exit 0
+
+ function start_app {
+ NODE_ENV=production nohup "$NODE" "$DIR/server.js" 80 1>>"$DIR/telemetry.log" 2>&1 &
+ echo $! > "$DIR/telemetry.pid"
+ echo "Telemetry started"
+ }
+
+ function stop_app {
+ kill `cat $DIR/telemetry.pid` || echo "Could not stop process"
+ }
+
+ case $1 in
+ start)
+ start_app ;;
+ stop)
+ stop_app ;;
+ restart)
+ stop_app
+ start_app
+ ;;
+ *)
+ echo "usage: /etc/init.d/telemetry {start|stop|restart}" ;;
+ esac
+ exit 0
+
+If you installed telemetry with npm, just alter the script to access the
+telemetry on path (`"$NODE" "$DIR/server.js"` would simply become `telemetry`).
+Then install monit:
+
+ apt-get install monit
+
+Alter /etc/monit/monitrc to make the following additions, which will ensure that
+telemetry is always running.
+
+ set daemon 30
+ set logfile syslog facility log_daemon
+
+ check process telemetry with pidfile "/opt/telemetry/telemetry.pid"
+ start program = "/etc/init.d/telemetry start"
+ stop program = "/etc/init.d/telemetry stop"
+ if failed url http://localhost/
+ with timeout 10 seconds
+ then restart
+
+Also, ensure that monit is enabled by changing the startup variable in
+/etc/default/monit to 1. Then just start the monit daemon.
+
+ /etc/init.d/monit start
+
+That's it!
View
@@ -0,0 +1,65 @@
+# Sending data
+
+Events can be sent to node-telemetry by any HTTP client capable of sending a POST
+of valid JSON or urlencoded data.
+
+## Input-specific upload
+
+url: /input/[input]
+
+You can POST a document to a specific input by using the input-specific upload.
+The body of the request will be queued up for persistence as the document itself.
+There is no required structure except that it must be valid JSON. You may also send
+key=>value pairs in urlencoded format for simpler data, as long as you use the correct
+mimetype.
+
+## Bulk upload
+
+url: /input
+
+You can also push up more than one event using the bulk input. Specify an 'input' property
+in each event and wrap them into an array assigned to the 'data' property in your JSON POST.
+The example below shows JSON for three different events being pushed to three different endpoints
+all in one bulk input call.
+
+ { "data":
+ [
+ { "input":"errors",
+ "level":"log",
+ "message":"Invalid identifier: theCheat",
+ "lineNumber": 25,
+ "file": "strongbad.js"
+ },
+ { "input":"tracker",
+ "referrer":"http://example.com/",
+ "timestamp": 1326736316365,
+ "user": "HomeStar Runner"
+ },
+ { "input":"changes",
+ "setting":"emailpref",
+ "newsetting":"SpamMe",
+ "email": "junk@mail.com"
+ }
+ ]
+ }
+
+## Possible responses
+
+Successful POST to input-specific upload
+ HTTP/1.1 200
+
+Successful POST to bulk upload
+ HTTP/1.1 200
+ {"processed":1,"errors":0}
+
+Successful POST to bulk upload, with errors (see "Error handling" for more information)
+ HTTP/1.1 200
+ {"processed":0,"errors":1}
+
+Input-specific with unknown input
+ HTTP/1.1 404
+ {"error":"The input you specified could not be found."}
+
+Bulk upload with missing 'data' element in the JSON
+ HTTP/1.1 415
+ { error: 'Transactions must be in a JSON array called data in order to be processed.' }
View
@@ -0,0 +1,15 @@
+/**
+ * Utility function for creating ISO-compliant dates
+ */
+exports.ISODateString = function(d) {
+ function pad(n){
+ return n<10 ? '0'+n : n;
+ }
+ if (typeof d === "string") d = new Date(d);
+ return d.getUTCFullYear()+'-'
+ + pad(d.getUTCMonth()+1)+'-'
+ + pad(d.getUTCDate())+'T'
+ + pad(d.getUTCHours())+':'
+ + pad(d.getUTCMinutes())+':'
+ + pad(d.getUTCSeconds())+'+00:00';
+};
View
@@ -1,24 +1,84 @@
var nano = require('nano');
exports = module.exports = function(config) {
- this.url = function(config) {
- var url = "https://" + encodeURIComponent(config.username) + ":" +
- encodeURIComponent(config.password) +
- "@" + encodeURIComponent(config.host);
+ var self = this;
+ self.cache = [];
+ self.config = config;
+ self.bufferTime = config.bufferTime || 30000;
+ self.url = function(config) {
+ var url = config.protocol || "http";
+ url += "://";
+ if (config.username && config.password) {
+ url += encodeURIComponent(config.username) + ":" +
+ encodeURIComponent(config.password) + "@";
+ }
+ url += encodeURIComponent(config.host);
if (config.port) {
url += ":" + parseInt(config.port);
}
return url;
};
- this.db = nano(this.url(config)).use(config.input);
+ self.db = nano(self.url(config)).use(config.input);
+
+ /**
+ * Prepare for a graceful shutdown
+ */
+ self.shutdown = function() {
+ self.flush_cache();
+ };
+
+ /**
+ * Push a bulk update to CouchDB
+ */
+ self.flush_cache = function() {
+ // Remove all docs from the cache, but leave cache available to incoming requests
+ var batch = self.cache.splice(0, 1000);
+ if (batch.length === 0) return;
+
+ // Push
+ self.db.bulk({ docs: batch }, function(error, http_body, http_headers) {
+ if (error) {
+ self.cache = self.cache.concat(batch);
+ telemetry.errors++;
+ telemetry.last_error = {
+ time: new Date(),
+ message: "CouchDB bulk load failed for input `" + self.config.input + "` (" + error.message + ")"
+ };
+ telemetry.error({
+ type: "couchdb",
+ error: "Reverting due to error: " + error,
+ batch_size: batch.length
+ });
+ } else if (self.cache.length > 50) {
+ self.flush_cache();
+ }
+ });
+ };
/**
* Post a message to the telemetry server
* Required method
*/
- this.post = function(data) {
- this.db.insert(data, function(error, http_body, http_headers) {
- if (error)
- console.error(error);
+ self.post = function(data) {
+ // Remove underscore properties
+ for (var prop in data) {
+ if (data.hasOwnProperty(prop)) {
+ if (prop[0] === "_") {
+ delete data[prop];
+ }
+ }
+ }
+
+ // Generate a UUID to prevent duplicate docs
+ data._id = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
+ var r = Math.random()*16|0, v = c == 'x' ? r : (r&0x3|0x8);
+ return v.toString(16);
});
+
+ // Push to cache
+ this.cache.push(data);
};
+
+ setInterval(function() {
+ self.flush_cache();
+ }, self.bufferTime);
};
Oops, something went wrong.

0 comments on commit 0889f39

Please sign in to comment.