Useful change for multiple data centers. #25

Closed
wants to merge 1 commit into
from
View
@@ -23,36 +23,47 @@ config.configFile(process.argv[2], function (config, oldConfig) {
if (server === undefined) {
server = dgram.createSocket('udp4', function (msg, rinfo) {
if (config.dumpMessages) { sys.log(msg.toString()); }
- var bits = msg.toString().split(':');
- var key = bits.shift()
- .replace(/\s+/g, '_')
- .replace(/\//g, '-')
- .replace(/[^a-zA-Z_\-0-9\.]/g, '');
-
- if (bits.length == 0) {
- bits.push("1");
+
+ var messages = [];
+
+ if (msg.toString().indexOf(",") != -1) {
+ messages = msg.toString().split(',');
+ } else {
+ messages.push(msg.toString());
}
- for (var i = 0; i < bits.length; i++) {
- var sampleRate = 1;
- var fields = bits[i].split("|");
- if (fields[1] === undefined) {
+ for (var j = 0; j < messages.length; j++) {
+ var bits = messages[j].split(':');
+ var key = bits.shift()
+ .replace(/\s+/g, '_')
+ .replace(/\//g, '-')
+ .replace(/[^a-zA-Z_\-0-9\.]/g, '');
+
+ if (bits.length == 0) {
+ bits.push("1");
+ }
+
+ for (var i = 0; i < bits.length; i++) {
+ var sampleRate = 1;
+ var fields = bits[i].split("|");
+ if (fields[1] === undefined) {
sys.log('Bad line: ' + fields);
continue;
- }
- if (fields[1].trim() == "ms") {
- if (! timers[key]) {
- timers[key] = [];
}
- timers[key].push(Number(fields[0] || 0));
- } else {
- if (fields[2] && fields[2].match(/^@([\d\.]+)/)) {
- sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]);
- }
- if (! counters[key]) {
- counters[key] = 0;
+ if (fields[1].trim() == "ms") {
+ if (! timers[key]) {
+ timers[key] = [];
+ }
+ timers[key].push(Number(fields[0] || 0));
+ } else {
+ if (fields[2] && fields[2].match(/^@([\d\.]+)/)) {
+ sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]);
+ }
+ if (! counters[key]) {
+ counters[key] = 0;
+ }
+ counters[key] += Number(fields[0] || 1) * (1 / sampleRate);
}
- counters[key] += Number(fields[0] || 1) * (1 / sampleRate);
}
}
});
@@ -61,82 +72,128 @@ config.configFile(process.argv[2], function (config, oldConfig) {
var flushInterval = Number(config.flushInterval || 10000);
- flushInt = setInterval(function () {
- var statString = '';
- var ts = Math.round(new Date().getTime() / 1000);
- var numStats = 0;
- var key;
+ if (config.flushToStatsD) {
+ flushInt = setInterval(function() {
+ var statString = '';
+ var messages = [];
- for (key in counters) {
- var value = counters[key] / (flushInterval / 1000);
- var message = 'stats.' + key + ' ' + value + ' ' + ts + "\n";
- message += 'stats_counts.' + key + ' ' + counters[key] + ' ' + ts + "\n";
- statString += message;
- counters[key] = 0;
+ for (key in counters) {
+ var value = counters[key];
- numStats += 1;
- }
+ if (value == 0) {
+ continue;
+ }
+
+ messages.push(key + ':' + value + "|c");
+ counters[key] = 0;
+ }
+
+ for (key in timers) {
+ if (timers[key].length > 0) {
+ for (var k = 0; k < timers[key].length; k++) {
+ var value = timers[key][k];
+
+ if (value == 0) {
+ continue;
+ }
- for (key in timers) {
- if (timers[key].length > 0) {
- var pctThreshold = config.percentThreshold || 90;
- var values = timers[key].sort(function (a,b) { return a-b; });
- var count = values.length;
- var min = values[0];
- var max = values[count - 1];
-
- var mean = min;
- var maxAtThreshold = max;
-
- if (count > 1) {
- var thresholdIndex = Math.round(((100 - pctThreshold) / 100) * count);
- var numInThreshold = count - thresholdIndex;
- values = values.slice(0, numInThreshold);
- maxAtThreshold = values[numInThreshold - 1];
-
- // average the remaining timings
- var sum = 0;
- for (var i = 0; i < numInThreshold; i++) {
- sum += values[i];
+ messages.push(key + ':' + value + "|ms");
}
- mean = sum / numInThreshold;
- }
+ timers[key] = [];
+ }
+ }
- timers[key] = [];
+ statString = messages.join(",");
- var message = "";
- message += 'stats.timers.' + key + '.mean ' + mean + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.upper_' + pctThreshold + ' ' + maxAtThreshold + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n";
+ try {
+ var socket = dgram.createSocket('udp4');
+ var buf = new Buffer(statString);
+ socket.send(buf, 0, buf.length, config.statsdPort, config.statsdHost);
+ socket.close();
+ } catch(e) {
+ if (config.debug) {
+ sys.log(e);
+ }
+ }
+ }, flushInterval);
+ } else {
+ flushInt = setInterval(function () {
+ var statString = '';
+ var ts = Math.round(new Date().getTime() / 1000);
+ var numStats = 0;
+ var key;
+
+ for (key in counters) {
+ var value = counters[key] / (flushInterval / 1000);
+ var message = 'stats.' + key + ' ' + value + ' ' + ts + "\n";
+ message += 'stats_counts.' + key + ' ' + counters[key] + ' ' + ts + "\n";
statString += message;
+ counters[key] = 0;
numStats += 1;
}
- }
- statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n";
-
- try {
- var graphite = net.createConnection(config.graphitePort, config.graphiteHost);
- graphite.addListener('error', function(connectionException){
- if (config.debug) {
- sys.log(connectionException);
+ for (key in timers) {
+ if (timers[key].length > 0) {
+ var pctThreshold = config.percentThreshold || 90;
+ var values = timers[key].sort(function (a,b) { return a-b; });
+ var count = values.length;
+ var min = values[0];
+ var max = values[count - 1];
+
+ var mean = min;
+ var maxAtThreshold = max;
+
+ if (count > 1) {
+ var thresholdIndex = Math.round(((100 - pctThreshold) / 100) * count);
+ var numInThreshold = count - thresholdIndex;
+ values = values.slice(0, numInThreshold);
+ maxAtThreshold = values[numInThreshold - 1];
+
+ // average the remaining timings
+ var sum = 0;
+ for (var i = 0; i < numInThreshold; i++) {
+ sum += values[i];
+ }
+
+ mean = sum / numInThreshold;
+ }
+
+ timers[key] = [];
+
+ var message = "";
+ message += 'stats.timers.' + key + '.mean ' + mean + ' ' + ts + "\n";
+ message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n";
+ message += 'stats.timers.' + key + '.upper_' + pctThreshold + ' ' + maxAtThreshold + ' ' + ts + "\n";
+ message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n";
+ message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n";
+ statString += message;
+
+ numStats += 1;
}
- });
- graphite.on('connect', function() {
- this.write(statString);
- this.end();
- });
- } catch(e){
- if (config.debug) {
- sys.log(e);
}
- }
- }, flushInterval);
+ statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n";
+
+ try {
+ var graphite = net.createConnection(config.graphitePort, config.graphiteHost);
+ graphite.addListener('error', function(connectionException){
+ if (config.debug) {
+ sys.log(connectionException);
+ }
+ });
+ graphite.on('connect', function() {
+ this.write(statString);
+ this.end();
+ });
+ } catch(e){
+ if (config.debug) {
+ sys.log(e);
+ }
+ }
+ }, flushInterval);
+ }
}
});