Skip to content

Loading…

Batched send, reliability fixes, multiple graphing backends, package.json #56

Closed
wants to merge 4 commits into from

6 participants

@wickedchicken

This is a rather large pull request, one we've split into multiple commits to help ease facilitation into the mainline Etsy codebase. The test infrastructure we've previously developed was intended to ensure these sets of changes don't include regressions.

Note that these patches are rebased and distilled code from the librato-statsd fork. Some of the changes were authored by @adelcambre and @till and integrated into our history. The process of rebasing for creation of Etsy-friendly patchsets has erased that authorship information, and it's not certain if it's possible to edit the history to say so.

The first two commits are cleanup patches from @till which clean up the main directory and remove spurious warnings respectively. The 3rd commit performs a restructure of statsd's sending logic. Specifically it enables syslog support if 'node-syslog' is installed and working, splits statsd's Graphite code into an 'output pipeline' which allows for batched sends, and ensures that new pipeline can submit to several graphing backends at once. Finally, the last patch inserts Librato metrics as a new graphing backend module.

The first two patches should be self-explanatory, but the 3rd should warrant some discussions of the engineering decisions behind its design. The current statsd code has a specific target in mind: a medium number of metrics being delivered to Graphite. Our graphing infrastructure requires handling tens of thousands of metrics simultaneously. As our API is JSON based, we needed a way to split large submissions into smaller, easier-to-handle chunks automatically. This also provides a layer of fault-tolerance and modularity for both Graphite and our API; smaller chunks can be processed and tracked independently of each other both on the network and in the daemon.

With respect to modularity, stats now go through a pipeline: initial data collection, batching, and postprocessing. Postprocessing at the innermost level allows graphing backend-specific code to be localized while the outer maps are common, keeping the codebase clean.

Finally, the last patch exploits the modularity of the 3rd patch to add Librato Metrics as a configurable graphing backend. Note that each graphing backend works independently of each other and can work simultaneously. We have included a test which verifies that this is in fact true. It is our hope that other graphing backends can be added and tested in a similar manner.

@josephruscio

Any thoughts on this? Counting the previous submission that we attempted to improve upon here, it's been almost 5 months now. We're definitely willing to help out in anyway we can. We were thinking about breaking this up into 4 different pull requests if you'd prefer that? Just would like to know that you're interested in merging this upstream before putting more time into it.

@mrtazz
Etsy, Inc. member

Thanks a lot for the Pull Request and sorry for the long delay. It looks really interesting, especially the support for librato metrics. One thing though is that we want to keep the core StatsD as simple as possible as this is definitely one of its strengths. So instead of the switch/case, pluggable graphing backends (maybe with modules?) would be the way to go here. Would you be up to taking a stab at this? It might make sense to split up the Pull Requests then and at least have a distinct one for the pluggable backend.

@josephruscio

@mrtazz thanks for getting back to us :-). That sounds like a workable plan, we'll get those separate requests set up and then close this request with links to them.

@mheffner

@mrtazz

Pluggable backend support has been moved to #69, with support for graphite as a backend. I appreciate any feedback you can provide on that.

@mrtazz
Etsy, Inc. member

Any interest in splitting up the remaining changes in separate Pull Requests so that they are easier to test and integrate?

@mheffner

@mrtazz Yeah, I'll take a look at extracting the remaining parts and will push them up as separate PRs.

@timbunce

@mheffner any news on extracting the remaining parts and pushing them up as separate PRs?

@mheffner

I think this can be closed, as the parts have merged in separate pull requests.

@draco2003

Thanks!

@draco2003 draco2003 closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
View
44 README.md
@@ -1,7 +1,7 @@
StatsD
======
-A network daemon for aggregating statistics (counters and timers), rolling them up, then sending them to [graphite][graphite].
+A network daemon for aggregating statistics (counters and timers), rolling them up, then sending them to graphing services such as [graphite][graphite] or [Librato Metrics][metrics].
We ([Etsy][etsy]) [blogged][blog post] about how it works and why we created it.
@@ -25,7 +25,6 @@ Counting
This is a simple counter. Add 1 to the "gorets" bucket. It stays in memory until the flush interval `config.flushInterval`.
-
Timing
------
@@ -51,6 +50,12 @@ There are additional config variables available for debugging:
For more information, check the `exampleConfig.js`.
+Logging
+-------
+
+Nominally the output on stdout is kept clean and all system events are logged through syslog.
+If `debug` is set, all output is redirected to stdout/stderr accordingly.
+
Guts
----
@@ -58,9 +63,18 @@ Guts
Client libraries use UDP to send information to the StatsD daemon.
* [NodeJS][node]
+
+Statsd can submit data to multiple graphing services. Currently these include:
+
* [Graphite][graphite]
-Graphite uses "schemas" to define the different round robin datasets it houses (analogous to RRAs in rrdtool). Here's what Etsy is using for the stats databases:
+* [Librato Metrics][metrics]
+
+Graphing Services
+-----------------
+
+**Graphite**
+Graphite is an open source highly scalable real-time graphing system. Graphite uses "schemas" to define the different round robin datasets it houses (analogous to RRAs in rrdtool). Here's what Etsy is using for the stats databases:
[stats]
priority = 110
@@ -75,6 +89,9 @@ That translates to:
This has been a good tradeoff so far between size-of-file (round robin databases are fixed size) and data we care about. Each "stats" database is about 3.2 megs with these retentions.
+**Librato Metrics**
+Librato Metrics is a hosted graphing service that eliminates the need to set up and run your own graphing server. Submitting data to [Librato Metrics][metrics] requires a username and API token. You can find your API token in your [account settings](https://metrics.librato.com/account).
+
TCP Stats Interface
-------------------
@@ -95,19 +112,25 @@ The stats output currently will give you:
Installation and Configuration
------------------------------
- * Install node.js
- * Clone the project
- * Create a config file from exampleConfig.js and put it somewhere
+ * Clone the project and `cd` into it
+ * Install node.js, preferably using [nvm][nvm]. >= 0.4.x is supported, but the latest node is preferred
+ * Ensure [npm][npm] is operational; new nodes have it built-in
+ * Run `npm install` to automatically install package dependencies
+ * `package.json` doesn't allow specification of engine-dependent dependencies. If you are using node 0.6.x or higher, the 'node-syslog' package can be manually installed for syslog output. Run `npm install node-syslog` to do so. A [workaround](https://gist.github.com/1632460)) is available if 'syslog.h' errors are encountered.
+ * Duplicate 'exampleConfig.js' to a filename of your choice
+ * Add graphing service connection info to your config file (multiple services can be enabled simultaneously)
* Start the Daemon:
- node stats.js /path/to/config
+```
+node stats.js /path/to/config
+```
Tests
-----
-A test framework has been added using node-unit and some custom code to start and manipulate statsd. Please add tests under test/ for any new features or bug fixes encountered. Testing a live server can be tricky, attempts were made to eliminate race conditions but it may be possible to encounter a stuck state. If doing dev work, a `killall node` will kill any stray test servers in the background (don't do this on a production machine!).
+A test framework has been added using node-unit and some custom code to start and manipulate statsd. Please add tests under test/ for any new features or bug fixes encountered. Testing a live server can be tricky, attempts were made to eliminate race considions but it may be possible to encounter a stuck state. If doing dev work, a `killall node` will kill any stray test servers in the background (don't do this on a production machine!).
-Tests can be executd with `./run_tests.sh`.
+Tests can be executed with `./run_tests.sh`.
Inspiration
-----------
@@ -133,10 +156,13 @@ fork StatsD from here: http://github.com/etsy/statsd
We'll do our best to get your changes in!
[graphite]: http://graphite.wikidot.com
+[metrics]: https://metrics.librato.com
[etsy]: http://www.etsy.com
[blog post]: http://codeascraft.etsy.com/2011/02/15/measure-anything-measure-everything/
[node]: http://nodejs.org
[udp]: http://enwp.org/udp
+[npm]: http://npmjs.org
+[nvm]: https://github.com/creationix/nvm
Contributors
View
2 config.js
@@ -1,5 +1,5 @@
var fs = require('fs')
- , sys = require('sys')
+ , sys = require('util')
var Configurator = function (file) {
View
37 exampleConfig.js
@@ -13,18 +13,43 @@ Graphite Required Variables:
graphiteHost: hostname or IP of Graphite server
graphitePort: port of Graphite server
+Librato Required Variables:
+
+(Leave these unset to avoid sending stats to Librato.)
+ libratoUser: email/user ID for a Metrics account
+ libratoApiKey: API key associated with account
+
+Librato Optional Variables:
+
+ libratoSource: provide a name for this data collection source
+ libratoSnap: snap timestamp to intervals of this many seconds to make graphs
+ stack better [s, default: 10]
+
Optional Variables:
debug: debug flag [default: false]
debugInterval: interval to print debug information [ms, default: 10000]
dumpMessages: log all incoming messages
- flushInterval: interval (in ms) to flush to Graphite
+ flushInterval: interval (in ms) to flush to Graphite or Librato
percentThreshold: for time information, calculate the Nth percentile
- [%, default: 90]
-
+ [%, default: 90]
+ batch: split up large updates to Graphite/Librato into batches
+ of size N [default: 200]
*/
+
{
- graphitePort: 2003
-, graphiteHost: "graphite.host.com"
-, port: 8125
+ port: 8125
+ , graphitePort: 2003
+ , graphiteHost: "graphite.host.com"
+//, libratoUser: "<librato email>"
+//, libratoApiKey: "<librato api key>"
+//, libratoSource: "loadbalancer-statsd" // optional source
+//, libratoSnap: 10 // snap measurements to this interval
+//, debug: 1
+//, debugInterval: 10000
+//, dumpMessages: 1
+//, mgmt_port: 8126
+//, flushInterval: 10000
+//, percentThreshold: 90
+//, batch: 200
}
View
0 StatsdClient.java → examples/StatsdClient.java
File renamed without changes.
View
0 php-example.php → examples/php-example.php
File renamed without changes.
View
0 python_example.py → examples/python_example.py
File renamed without changes.
View
31 package.json
@@ -0,0 +1,31 @@
+{
+ "name": "statsd",
+ "description": "A lightweight daemon to collect metrics",
+ "author": "Etsy",
+ "contributors": [
+ {
+ "name": "Librato",
+ "email": "metrics@librato.com",
+ "url": "http://librato.com"
+ }
+ ],
+ "scripts": {
+ "test": "./run_tests.sh"
+ },
+ "repository": {
+ "type": "git",
+ "url": "https://github.com/etsy/statsd.git"
+ },
+ "version": "1.5.0",
+ "dependencies": {
+ "async": "0.1.x",
+ "underscore": "1.2.x",
+ "temp": "0.4.x"
+ },
+ "devDependencies": {
+ "nodeunit": "0.6.x"
+ },
+ "engine": {
+ "node" : ">=0.4"
+ }
+}
View
0 check_statsd.pl → scripts/check_statsd.pl
File renamed without changes.
View
397 stats.js
@@ -1,14 +1,38 @@
-var dgram = require('dgram')
- , sys = require('sys')
- , net = require('net')
- , config = require('./config')
+versionstring = "statsd/1.6"
+
+var dgram = require('dgram')
+ , sys = require('util')
+ , net = require('net')
+ , config = require('./config')
+ , _ = require('underscore')
+ , async = require('async')
+ , url_parse = require('url').parse
+ , https = require('https')
+ , http = require('http');
+
+var logger = function(message,severity){
+ switch(severity){
+ case "emerg":
+ case "alert":
+ case "crit":
+ case "err":
+ console.error(message);
+ break;
+ case "warn":
+ console.warn(message);
+ break;
+ default:
+ console.log(message);
+ break;
+ }
+}
var counters = {};
var timers = {};
var debugInt, flushInt, server, mgmtServer;
var startup_time = Math.round(new Date().getTime() / 1000);
-var stats = {
+var globalstats = {
graphite: {
last_flush: startup_time,
last_exception: startup_time
@@ -19,8 +43,66 @@ var stats = {
}
};
+process.on('uncaughtException', function (err) {
+ logger('Caught exception: ' + err,"err");
+});
+
config.configFile(process.argv[2], function (config, oldConfig) {
- if (! config.debug && debugInt) {
+ if (config.debug){
+ sys.log('logging to stdout/err');
+ } else {
+ try {
+ var syslog = require('node-syslog');
+ sys.log('logging to syslog');
+ syslog.init("statsd", syslog.LOG_PID | syslog.LOG_ODELAY, syslog.LOG_DAEMON);
+
+ logger = function(message,severity){
+ var severitycode = syslog.LOG_NOTICE;
+ switch(severity){
+ case "emerg":
+ severity = syslog.LOG_EMERG;
+ break;
+ case "alert":
+ severity = syslog.LOG_ALERT;
+ break;
+ case "crit":
+ severity = syslog.LOG_CRIT;
+ break;
+ case "err":
+ severity = syslog.LOG_ERR;
+ break;
+ case "warn":
+ severity = syslog.LOG_WARN;
+ break;
+ case "info":
+ severity = syslog.LOG_INFO;
+ break;
+ case "debug":
+ severity = syslog.LOG_DEBUG;
+ break;
+ }
+ syslog.log(severitycode,message);
+ };
+ } catch(err) {
+ sys.log('node-syslog not found, logging to stdout/err');
+ }
+ }
+
+ function graphServiceEnabled(name){
+ switch(name){
+ case "graphite":
+ return (config.graphiteHost && config.graphitePort);
+ break;
+ case "librato-metrics":
+ return (config.libratoUser && config.libratoApiKey);
+ break;
+ default:
+ throw ("'" + name + "' isn't a valid graphing service!");
+ break;
+ }
+ }
+
+ if (!config.debug && debugInt) {
clearInterval(debugInt);
debugInt = false;
}
@@ -28,29 +110,36 @@ config.configFile(process.argv[2], function (config, oldConfig) {
if (config.debug) {
if (debugInt !== undefined) { clearInterval(debugInt); }
debugInt = setInterval(function () {
- sys.log("Counters:\n" + sys.inspect(counters) + "\nTimers:\n" + sys.inspect(timers));
+ logger("Counters:\n" + sys.inspect(counters) + "\nTimers:\n" + sys.inspect(timers), "info");
}, config.debugInterval || 10000);
}
if (server === undefined) {
server = dgram.createSocket('udp4', function (msg, rinfo) {
- if (config.dumpMessages) { sys.log(msg.toString()); }
- var bits = msg.toString().split(':');
- var key = bits.shift()
- .replace(/\s+/g, '_')
- .replace(/\//g, '-')
- .replace(/[^a-zA-Z_\-0-9\.]/g, '');
+ var msgStr = msg.toString().replace(/^\s+|\s+$/g,"").replace(/\u0000/g, '');
+ if (msgStr.length == 0) {
+ if (config.debug) {
+ logger('No messsages.',"debug");
+ }
+ return;
+ }
+ if (config.dumpMessages) {
+ logger('Messages: ' + msgStr,"notice");
+ }
+ var bits = msgStr.split(':');
+ var key = '';
+ key = bits.shift();
if (bits.length == 0) {
- bits.push("1");
+ return;
}
for (var i = 0; i < bits.length; i++) {
var sampleRate = 1;
var fields = bits[i].split("|");
if (fields[1] === undefined) {
- sys.log('Bad line: ' + fields);
- stats['messages']['bad_lines_seen']++;
+ logger('Bad line: ' + fields,"err");
+ globalstats['messages']['bad_lines_seen']++;
continue;
}
if (fields[1].trim() == "ms") {
@@ -69,7 +158,13 @@ config.configFile(process.argv[2], function (config, oldConfig) {
}
}
- stats['messages']['last_msg_seen'] = Math.round(new Date().getTime() / 1000);
+ globalstats['messages']['last_msg_seen'] = Math.round(new Date().getTime() / 1000);
+ });
+
+ server.on("listening", function () {
+ var address = server.address();
+ logger("statsd is running on " + address.address + ":" + address.port,"info");
+ sys.log("server is up");
});
mgmtServer = net.createServer(function(stream) {
@@ -131,21 +226,24 @@ config.configFile(process.argv[2], function (config, oldConfig) {
server.bind(config.port || 8125);
mgmtServer.listen(config.mgmt_port || 8126);
- sys.log("server is up");
-
var flushInterval = Number(config.flushInterval || 10000);
flushInt = setInterval(function () {
- var statString = '';
+ var stats = {};
+ stats["gauges"] = {};
+ stats["counters"] = {};
var ts = Math.round(new Date().getTime() / 1000);
var numStats = 0;
var key;
for (key in counters) {
- var value = counters[key] / (flushInterval / 1000);
- var message = 'stats.' + key + ' ' + value + ' ' + ts + "\n";
- message += 'stats_counts.' + key + ' ' + counters[key] + ' ' + ts + "\n";
- statString += message;
+ var stat;
+ stat = stats["counters"];
+
+ var value = counters[key];
+ stat[key] = {};
+ stat[key]["value"] = value;
+
counters[key] = 0;
numStats += 1;
@@ -165,57 +263,242 @@ config.configFile(process.argv[2], function (config, oldConfig) {
if (count > 1) {
var thresholdIndex = Math.round(((100 - pctThreshold) / 100) * count);
var numInThreshold = count - thresholdIndex;
- values = values.slice(0, numInThreshold);
- maxAtThreshold = values[numInThreshold - 1];
-
+ values_sliced = values.slice(0, numInThreshold);
+ maxAtThreshold = values_sliced[numInThreshold - 1];
// average the remaining timings
var sum = 0;
for (var i = 0; i < numInThreshold; i++) {
- sum += values[i];
+ sum += values_sliced[i];
}
-
mean = sum / numInThreshold;
}
- timers[key] = [];
+ var sum = 0;
+ var sumOfSquares = 0;
+ for (var i = 0; i < count; i++) {
+ sum += values[i];
+ sumOfSquares += values[i] * values[i];
+ }
- var message = "";
- message += 'stats.timers.' + key + '.mean ' + mean + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.upper_' + pctThreshold + ' ' + maxAtThreshold + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n";
- statString += message;
+ timers[key] = [];
+ stats["gauges"][key] = {};
+ stats["gauges"][key]["count"] = count;
+ stats["gauges"][key]["sum"] = sum;
+ stats["gauges"][key]["sum_squares"] = sumOfSquares;
+ stats["gauges"][key]["min"] = min;
+ stats["gauges"][key]["max"] = max;
+ if (graphServiceEnabled("graphite")){
+ stats["gauges"][key]["upper_" + pctThreshold] = maxAtThreshold;
+ stats["gauges"][key]["mean"] = mean;
+ }
numStats += 1;
}
}
- statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n";
-
- if (config.graphiteHost) {
- try {
- var graphite = net.createConnection(config.graphitePort, config.graphiteHost);
- graphite.addListener('error', function(connectionException){
- if (config.debug) {
- sys.log(connectionException);
- }
- });
- graphite.on('connect', function() {
- this.write(statString);
- this.end();
- stats['graphite']['last_flush'] = Math.round(new Date().getTime() / 1000);
+ stats["counters"]["numStats"] = {};
+ stats["counters"]["numStats"]["value"] = numStats;
+
+ var slicey = function(obj,slicelen){
+ var slicecounter = 0;
+ var groups = _.groupBy(obj,function (num){ var ret = Math.floor(slicecounter/slicelen); slicecounter += + 1; return ret;});
+ return _.map(groups,function(k,v){ return k; });
+ }
+
+ function build_hash(type){
+ return function(group){
+ var hash = {};
+ hash[type] = {};
+ _.each(group,function(metric){
+ hash[type][metric] = stats[type][metric];
});
- } catch(e){
- if (config.debug) {
- sys.log(e);
+ return hash;
+ };
+ }
+
+ function hash_postprocess(inhash,service){
+ switch(service){
+ case "librato-metrics":
+ var hash = {};
+ hash["gauges"] = inhash["gauges"] || {};
+ if (_.include(_.keys(inhash),"counters")) {
+ _.each(_.keys(inhash["counters"]),function(metric){
+ metric = metric.replace(/[^-.:_\w]+/, '_').substr(0,255)
+ hash["gauges"][metric] = inhash["counters"][metric];
+ });
+ }
+ snap = config.libratoSnap || 10;
+ hash["measure_time"] = (ts - (ts%snap));
+ if (config.libratoSource) { hash["source"] = config.libratoSource; }
+ return hash;
+ break;
+ default:
+ return inhash;
+ break;
+ }
+ }
+
+ function build_string(hash,type,service){
+ var stats_str ='';
+
+ switch(service){
+ case "librato-metrics":
+ stats_str = JSON.stringify(hash);
+ break;
+ case "graphite":
+ for (key in hash[type]){
+ k = key
+ .replace(/\s+/g, '_')
+ .replace(/\//g, '-')
+ .replace(/[^a-zA-Z_\-0-9\.]/g, '');
+
+ var stat = hash[type][k];
+ if (type == "counters"){
+ if(k == 'numStats'){
+ stats_str += 'statsd.numStats ' + stat['value'] + ' ' + ts + "\n";
+ } else {
+ var per_interval_value = stat["value"] / (flushInterval / 1000);
+ stats_str += ('stats.' + k + ' ' + per_interval_value + ' ' + ts + "\n");
+ stats_str += ('stats_counts.' + k + ' ' + stat["value"] + ' ' + ts + "\n");
+ }
+ } else {
+ for (s in stat){
+ stats_str += ('stats.timers.' + k + '.' + s + ' ' + stat[s] + ' ' + ts + "\n");
+ }
+ }
+ }
+ break;
+ default:
+ throw ("'" + service + "' isn't a valid graphing service!");
+ break;
+ }
+ return stats_str;
+ };
+
+ var slice_length = config.batch || 200;
+ var ggroups = slicey(_.keys(stats["gauges"]),slice_length);
+ var cgroups = slicey(_.keys(stats["counters"]),slice_length);
+ var ghashes = _.map(ggroups,build_hash("gauges"));
+ var chashes = _.map(cgroups,build_hash("counters"));
+ var combined_hashes = ghashes.concat(chashes);
+
+ var logerror = function(e){
+ if (e){
+ globalstats['graphite']['last_exception'] = Math.round(new Date().getTime() / 1000);
+ if(config.debug) {
+ logger(e,"debug");
}
- stats['graphite']['last_exception'] = Math.round(new Date().getTime() / 1000);
}
}
+ var submit_to_librato = function(stats_str,retry){
+ var parsed_host = url_parse(config.libratoHost || 'https://metrics-api.librato.com');
+ var options = {
+ host: parsed_host["hostname"],
+ port: parsed_host["port"] || 443,
+ path: '/v1/metrics.json',
+ method: 'POST',
+ headers: {
+ "Authorization": 'Basic ' + new Buffer(config.libratoUser + ':' + config.libratoApiKey).toString('base64'),
+ "Content-Length": stats_str.length,
+ "Content-Type": "application/json",
+ "User-Agent" : versionstring
+ }
+ };
+
+ var proto = http;
+ if ((parsed_host["protocol"] || 'http:').match(/https/)){
+ proto = https;
+ }
+ var req = proto.request(options, function(res) {
+ if(res.statusCode != 204){
+ res.on('data', function(d){
+ var errdata = "HTTP " + res.statusCode + ": " + d;
+ if (retry){
+ if (config.debug) { console.log("received error " + res.statusCode + " connecting to Librato, retrying... "); }
+ setTimeout(function(){
+ submit_to_librato(stats_str,false);
+ }, Math.floor(flushInterval/2) + 100);
+ } else {
+ logger("Error connecting to Librato!\n" + errdata,"crit");
+ }
+ });
+ }
+ });
+ req.write(stats_str);
+ req.end();
+ globalstats['graphite']['last_flush'] = Math.round(new Date().getTime() / 1000);
+ req.on('error', function(errdata) {
+ if (retry){
+ setTimeout(function(){
+ submit_to_librato(stats_str,false);
+ }, Math.floor(flushInterval/2) + 100);
+ } else {
+ logger("Error connecting to Librato!\n" + errdata,"crit");
+ }
+ });
+ }
+
+ var concurrent_conns = config.maxConnections || 10;
+ var submissionq = async.queue(function (task,cb){
+ task();
+ cb();
+ },concurrent_conns);
+
+ // only send to what is enabled
+ var availableServices = ["graphite","librato-metrics"];
+ var enabledServices = _.select(availableServices,graphServiceEnabled);
+
+ _.each(combined_hashes,function(hash){
+ _.each(_.keys(hash), function(hashtype) { /* 'gauges' and 'counters' */
+ _.each(enabledServices,function(service) {
+
+ var stats_str = build_string(hash_postprocess(hash,service),hashtype,service);
+
+ switch(service){
+ case "librato-metrics":
+ submissionq.push(function(){
+ if (config.debug) {
+ logger(stats_str,"debug");
+ logger(stats_str.length,"debug");
+ }
+ submit_to_librato(stats_str,true);
+ }, logerror);
+ break;
+ case "graphite":
+ submissionq.push(function(){
+ if (config.debug) {
+ logger(stats_str,"debug");
+ logger(stats_str.length,"debug");
+ }
+
+ try {
+ var graphite = net.createConnection(config.graphitePort, config.graphiteHost);
+ graphite.addListener('error', function(connectionException){
+ if (config.debug) {
+ logger(connectionException,"crit");
+ }
+ });
+ graphite.on('connect', function() {
+ this.write(stats_str);
+ this.end();
+ globalstats['graphite']['last_flush'] = Math.round(new Date().getTime() / 1000);
+ });
+ } catch(e){
+ if (config.debug) {
+ logger(e,"debug");
+ }
+ }
+ }, logerror);
+ break;
+
+ default:
+ throw ("'" + service + "' isn't a valid graphing service!");
+ break;
+ }
+ });
+ });
+ });
}, flushInterval);
}
-
});
-
View
8 test/graphite_tests.js
@@ -2,7 +2,7 @@ var fs = require('fs'),
net = require('net'),
temp = require('temp'),
spawn = require('child_process').spawn,
- sys = require('sys'),
+ sys = require('util'),
urlparse = require('url').parse,
_ = require('underscore'),
dgram = require('dgram'),
@@ -71,12 +71,12 @@ module.exports = {
setUp: function (callback) {
this.testport = 31337;
this.myflush = 200;
- var configfile = "{graphService: \"graphite\"\n\
- , batch: 200 \n\
+ var configfile = "{\n\
+ batch: 200 \n\
, flushInterval: " + this.myflush + " \n\
, port: 8125\n\
, dumpMessages: false \n\
- , debug: false\n\
+ , debug: true\n\
, graphitePort: " + this.testport + "\n\
, graphiteHost: \"127.0.0.1\"}";
View
233 test/librato_tests.js
@@ -0,0 +1,233 @@
+var fs = require('fs'),
+ temp = require('temp'),
+ spawn = require('child_process').spawn,
+ urlparse = require('url').parse,
+ _ = require('underscore'),
+ dgram = require('dgram'),
+ qsparse = require('querystring').parse,
+ http = require('http');
+
+
+var writeconfig = function(text,worker,cb,obj){
+ temp.open({suffix: '-statsdconf.js'}, function(err, info) {
+ if (err) throw err;
+ fs.write(info.fd, text);
+ fs.close(info.fd, function(err) {
+ if (err) throw err;
+ worker(info.path,cb,obj);
+ });
+ });
+}
+
+var array_contents_are_equal = function(first,second){
+ var intlen = _.intersection(first,second).length;
+ var unlen = _.union(first,second).length;
+ return (intlen == unlen) && (intlen == first.length);
+}
+
+var statsd_send = function(data,sock,host,port,cb){
+ send_data = new Buffer(data);
+ sock.send(send_data,0,send_data.length,port,host,function(err,bytes){
+ if (err) {
+ throw err;
+ }
+ cb();
+ });
+}
+
+// keep collecting data until a specified timeout period has elapsed
+// this will let us capture all data chunks so we don't miss one
+var collect_for = function(server,timeout,cb){
+ var received = [];
+ var in_flight = 0;
+ var start_time = new Date().getTime();
+ var collector = function(req,res){
+ res.writeHead(204);
+ res.end();
+ in_flight += 1;
+ var body = '';
+ req.on('data',function(data){ body += data; });
+ req.on('end',function(){
+ received = received.concat(body);
+ in_flight -= 1;
+ if((in_flight < 1) && (new Date().getTime() > (start_time + timeout))){
+ server.removeListener('request',collector);
+ cb(received);
+ }
+ });
+ }
+
+ setTimeout(function (){
+ server.removeListener('request',collector);
+ if((in_flight < 1)){
+ cb(received);
+ }
+ },timeout);
+
+ server.on('request',collector);
+}
+
+module.exports = {
+ setUp: function (callback) {
+ this.testport = 31337;
+
+ this.myflush = 200;
+ var configfile = "{\n\
+ batch: 200 \n\
+ , flushInterval: " + this.myflush + " \n\
+ , port: 8125\n\
+ , dumpMessages: false \n\
+ , debug: true\n\
+ , libratoUser: \"test@librato.com\"\n\
+ , libratoSnap: 10\n\
+ , libratoApiKey: \"fakekey\"\n\
+ , libratoHost: \"http://127.0.0.1:" + this.testport + "\"}";
+
+ this.acceptor = http.createServer();
+ this.acceptor.listen(this.testport);
+ this.sock = dgram.createSocket('udp4');
+
+ this.server_up = true;
+ this.ok_to_die = false;
+ this.exit_callback_callback = process.exit;
+
+ writeconfig(configfile,function(path,cb,obj){
+ obj.path = path;
+ obj.server = spawn('node',['stats.js', path]);
+ obj.exit_callback = function (code) {
+ obj.server_up = false;
+ if(!obj.ok_to_die){
+ console.log('node server unexpectedly quit with code: ' + code);
+ process.exit(1);
+ }
+ else {
+ obj.exit_callback_callback();
+ }
+ };
+ obj.server.on('exit', obj.exit_callback);
+ obj.server.stderr.on('data', function (data) {
+ console.log('stderr: ' + data.toString().replace(/\n$/,''));
+ });
+ /*
+ obj.server.stdout.on('data', function (data) {
+ console.log('stdout: ' + data.toString().replace(/\n$/,''));
+ });
+ */
+ obj.server.stdout.on('data', function (data) {
+ // wait until server is up before we finish setUp
+ if (data.toString().match(/server is up/)) {
+ cb();
+ }
+ });
+
+ },callback,this);
+ },
+ tearDown: function (callback) {
+ this.sock.close();
+ this.acceptor.close();
+ this.ok_to_die = true;
+ if(this.server_up){
+ this.exit_callback_callback = callback;
+ this.server.kill();
+ } else {
+ callback();
+ }
+ },
+
+ send_well_formed_posts: function (test) {
+ test.expect(5);
+
+ // we should integrate a timeout into this
+ this.acceptor.once('request',function(req,res){
+ res.writeHead(204);
+ res.end();
+ test.equals(req.method,'POST');
+ var uri_parts = urlparse(req.url);
+ test.equals(uri_parts["pathname"],'/v1/metrics.json')
+ var body = '';
+ req.on('data',function(data){ body += data; });
+ req.on('end',function(){
+ try {
+ var post = JSON.parse(body);
+ } catch (e) {
+ test.ok(false,"string sent was not valid JSON: " + e);
+ test.done();
+ return;
+ }
+ test.ok(true);
+ var message_keys = ['measure_time','gauges'];
+ test.ok(array_contents_are_equal(_.keys(post),message_keys),"JSON must only have: [" + message_keys + "], received: [" + _.keys(post) + "]");
+
+ if(_.include(_.keys(post),'gauges') && _.include(_.keys(post['gauges']),'numStats') &&
+ _.include(_.keys(post['gauges']['numStats']),'value')){
+ test.equals(post['gauges']['numStats']['value'],0);
+ } else {
+ test.ok('false', 'API does not send numStats properly');
+ }
+ test.done();
+ });
+ });
+ },
+
+ timers_are_valid: function (test) {
+ test.expect(3);
+
+ var testvalue = 100;
+ var me = this;
+ this.acceptor.once('request',function(req,res){
+ res.writeHead(204);
+ res.end();
+ statsd_send('a_test_value:' + testvalue + '|ms',me.sock,'127.0.0.1',8125,function(){
+ collect_for(me.acceptor,me.myflush*2,function(strings){
+ test.ok(strings.length > 0,'should receive some data');
+ var hashes = _.map(strings,function(str){ return JSON.parse(str); });
+ var numstat_test = function(post){
+ return _.include(_.keys(post),'gauges') && _.include(_.keys(post['gauges']),'numStats') &&
+ _.include(_.keys(post['gauges']['numStats']),'value') &&
+ (post['gauges']['numStats']['value'] == 1);
+ };
+ test.ok(_.any(hashes,numstat_test), 'numStats should be 1');
+
+ var testvalue_test = function(post){
+ return _.include(_.keys(post),'gauges') && _.include(_.keys(post['gauges']),'a_test_value') &&
+ _.include(_.keys(post['gauges']['a_test_value']),'sum') &&
+ (post['gauges']['a_test_value']['sum'] == testvalue);
+ };
+ test.ok(_.any(hashes,testvalue_test), 'testvalue should be ' + testvalue);
+ test.done();
+ });
+ });
+ });
+ },
+
+ counts_are_valid: function (test) {
+ test.expect(3);
+
+ var testvalue = 100;
+ var me = this;
+ this.acceptor.once('request',function(req,res){
+ res.writeHead(204);
+ res.end();
+ statsd_send('a_test_value:' + testvalue + '|c',me.sock,'127.0.0.1',8125,function(){
+ collect_for(me.acceptor,me.myflush*2,function(strings){
+ test.ok(strings.length > 0,'should receive some data');
+ var hashes = _.map(strings,function(str){ return JSON.parse(str); });
+ var numstat_test = function(post){
+ return _.include(_.keys(post),'gauges') && _.include(_.keys(post['gauges']),'numStats') &&
+ _.include(_.keys(post['gauges']['numStats']),'value') &&
+ (post['gauges']['numStats']['value'] == 1);
+ };
+ test.ok(_.any(hashes,numstat_test), 'numStats should be 1');
+
+ var testvalue_test = function(post){
+ return _.include(_.keys(post),'gauges') && _.include(_.keys(post['gauges']),'a_test_value') &&
+ _.include(_.keys(post['gauges']['a_test_value']),'value') &&
+ (post['gauges']['a_test_value']['value'] == testvalue);
+ };
+ test.ok(_.any(hashes,testvalue_test), 'testvalue should be ' + testvalue);
+ test.done();
+ });
+ });
+ });
+ }
+}
View
205 test/multi_test.js
@@ -0,0 +1,205 @@
+// this file tests that graphite and librato work simultaneously
+
+var fs = require('fs'),
+ net = require('net'),
+ temp = require('temp'),
+ spawn = require('child_process').spawn,
+ sys = require('util'),
+ urlparse = require('url').parse,
+ _ = require('underscore'),
+ dgram = require('dgram'),
+ qsparse = require('querystring').parse,
+ http = require('http');
+
+
+var writeconfig = function(text,worker,cb,obj){
+ temp.open({suffix: '-statsdconf.js'}, function(err, info) {
+ if (err) throw err;
+ fs.write(info.fd, text);
+ fs.close(info.fd, function(err) {
+ if (err) throw err;
+ worker(info.path,cb,obj);
+ });
+ });
+}
+
+var array_contents_are_equal = function(first,second){
+ var intlen = _.intersection(first,second).length;
+ var unlen = _.union(first,second).length;
+ return (intlen == unlen) && (intlen == first.length);
+}
+
+var statsd_send = function(data,sock,host,port,cb){
+ send_data = new Buffer(data);
+ sock.send(send_data,0,send_data.length,port,host,function(err,bytes){
+ if (err) {
+ throw err;
+ }
+ cb();
+ });
+}
+
+// keep collecting data until a specified timeout period has elapsed
+// this will let us capture all data chunks so we don't miss one
+var collect_for = function(server,timeout,cb){
+ var received = [];
+ var in_flight = 0;
+ var start_time = new Date().getTime();
+ var collector = function(req,res){
+ in_flight += 1;
+ var body = '';
+ req.on('data',function(data){ body += data; });
+ req.on('end',function(){
+ received = received.concat(body.split("\n"));
+ in_flight -= 1;
+ if((in_flight < 1) && (new Date().getTime() > (start_time + timeout))){
+ server.removeListener('request',collector);
+ cb(received);
+ }
+ });
+ }
+
+ setTimeout(function (){
+ server.removeListener('connection',collector);
+ if((in_flight < 1)){
+ cb(received);
+ }
+ },timeout);
+
+ server.on('connection',collector);
+}
+
+module.exports = {
+ setUp: function (callback) {
+ this.graphiteTestport = 31336;
+ this.libratoTestport = 31337;
+
+ this.myflush = 200;
+ var configfile = "{\n\
+ batch: 200 \n\
+ , flushInterval: " + this.myflush + " \n\
+ , port: 8125\n\
+ , dumpMessages: false \n\
+ , debug: true\n\
+ , libratoUser: \"test@librato.com\"\n\
+ , libratoSnap: 10\n\
+ , libratoApiKey: \"fakekey\"\n\
+ , libratoHost: \"http://127.0.0.1:" + this.libratoTestport + "\" \n\
+ , graphitePort: " + this.graphiteTestport + "\n\
+ , graphiteHost: \"127.0.0.1\"}";
+
+ this.graphiteAcceptor = net.createServer();
+ this.graphiteAcceptor.listen(this.graphiteTestport);
+ this.libratoAcceptor = http.createServer();
+ this.libratoAcceptor.listen(this.libratoTestport);
+ this.sock = dgram.createSocket('udp4');
+
+ this.server_up = true;
+ this.ok_to_die = false;
+ this.exit_callback_callback = process.exit;
+
+ writeconfig(configfile,function(path,cb,obj){
+ obj.path = path;
+ obj.server = spawn('node',['stats.js', path]);
+ obj.exit_callback = function (code) {
+ obj.server_up = false;
+ if(!obj.ok_to_die){
+ console.log('node server unexpectedly quit with code: ' + code);
+ process.exit(1);
+ }
+ else {
+ obj.exit_callback_callback();
+ }
+ };
+ obj.server.on('exit', obj.exit_callback);
+ obj.server.stderr.on('data', function (data) {
+ console.log('stderr: ' + data.toString().replace(/\n$/,''));
+ });
+ /*
+ obj.server.stdout.on('data', function (data) {
+ console.log('stdout: ' + data.toString().replace(/\n$/,''));
+ });
+ */
+ obj.server.stdout.on('data', function (data) {
+ // wait until server is up before we finish setUp
+ if (data.toString().match(/server is up/)) {
+ cb();
+ }
+ });
+
+ },callback,this);
+ },
+ tearDown: function (callback) {
+ this.sock.close();
+ this.graphiteAcceptor.close();
+ this.libratoAcceptor.close();
+ this.ok_to_die = true;
+ if(this.server_up){
+ this.exit_callback_callback = callback;
+ this.server.kill();
+ } else {
+ callback();
+ }
+ },
+
+ send_well_formed_posts: function (test) {
+ test.expect(7);
+
+ // only call test.done() when both graphite and librato have finished
+ var protocols_hit = 0;
+ var finish_protocol = function(){
+ protocols_hit++;
+ if(protocols_hit >= 2){
+ test.done();
+ }
+ }
+
+ // we should integrate a timeout into this
+ this.graphiteAcceptor.once('connection',function(c){
+ var body = '';
+ c.on('data',function(d){ body += d; });
+ c.on('end',function(){
+ var rows = body.split("\n");
+ var entries = _.map(rows, function(x) {
+ var chunks = x.split(' ');
+ var data = {};
+ data[chunks[0]] = chunks[1];
+ return data;
+ });
+ test.ok(_.include(_.map(entries,function(x) { return _.keys(x)[0] }),'statsd.numStats'),'graphite output includes numStats');
+ test.equal(_.find(entries, function(x) { return _.keys(x)[0] == 'statsd.numStats' })['statsd.numStats'],0);
+ finish_protocol();
+ });
+ });
+
+ this.libratoAcceptor.once('request',function(req,res){
+ res.writeHead(204);
+ res.end();
+ test.equals(req.method,'POST');
+ var uri_parts = urlparse(req.url);
+ test.equals(uri_parts["pathname"],'/v1/metrics.json')
+ var body = '';
+ req.on('data',function(data){ body += data; });
+ req.on('end',function(){
+ try {
+ var post = JSON.parse(body);
+ } catch (e) {
+ test.ok(false,"string sent was not valid JSON: " + e);
+ test.done();
+ return;
+ }
+ test.ok(true);
+ var message_keys = ['measure_time','gauges'];
+ test.ok(array_contents_are_equal(_.keys(post),message_keys),"JSON must only have: [" + message_keys + "], received: [" + _.keys(post) + "]");
+
+ if(_.include(_.keys(post),'gauges') && _.include(_.keys(post['gauges']),'numStats') &&
+ _.include(_.keys(post['gauges']['numStats']),'value')){
+ test.equals(post['gauges']['numStats']['value'],0);
+ } else {
+ test.ok('false', 'API does not send numStats properly');
+ }
+ finish_protocol();
+ });
+ });
+ }
+}
Something went wrong with that request. Please try again.