Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Useful change for multiple data centers. #25

Closed
wants to merge 1 commit into from

3 participants

@joethecodhr

Hello Etsy!

Dealnews ran into an issue where our VPN could not handle the amount of traffic that was being sent to StatsD from other data centers. As an alternative to sampling, we came up with the simple idea that StatsD could flush to itself.

Your feedback is welcomed!

joethecodhr

@avleen
Owner

So you have multiple statsd collectors, which then flush to a central statsd?
Was the problem you were having at the VPN caused by the packet/sec rate being too high?

@joethecodhr

Correct. Our specific issue is a constraint that limits the number of open connections across our VPN.

@mrtazz
Owner

Hey there is now a backend which allows StatsD to flush to itself (https://github.com/dynmeth/statsd-backend). Which I take to be more or less the same behaviour?

@mrtazz
Owner

I think the now existing Statsd backend and repeater support supersedes this pull request. Sorry that it didn't make it in :(.

@mrtazz mrtazz closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jul 5, 2011
  1. Adding support to allow StatsD to flush to StatsD.

    joethecohr authored
This page is out of date. Refresh to see the latest.
Showing with 142 additions and 85 deletions.
  1. +142 −85 stats.js
View
227 stats.js
@@ -23,36 +23,47 @@ config.configFile(process.argv[2], function (config, oldConfig) {
if (server === undefined) {
server = dgram.createSocket('udp4', function (msg, rinfo) {
if (config.dumpMessages) { sys.log(msg.toString()); }
- var bits = msg.toString().split(':');
- var key = bits.shift()
- .replace(/\s+/g, '_')
- .replace(/\//g, '-')
- .replace(/[^a-zA-Z_\-0-9\.]/g, '');
-
- if (bits.length == 0) {
- bits.push("1");
+
+ var messages = [];
+
+ if (msg.toString().indexOf(",") != -1) {
+ messages = msg.toString().split(',');
+ } else {
+ messages.push(msg.toString());
}
- for (var i = 0; i < bits.length; i++) {
- var sampleRate = 1;
- var fields = bits[i].split("|");
- if (fields[1] === undefined) {
+ for (var j = 0; j < messages.length; j++) {
+ var bits = messages[j].split(':');
+ var key = bits.shift()
+ .replace(/\s+/g, '_')
+ .replace(/\//g, '-')
+ .replace(/[^a-zA-Z_\-0-9\.]/g, '');
+
+ if (bits.length == 0) {
+ bits.push("1");
+ }
+
+ for (var i = 0; i < bits.length; i++) {
+ var sampleRate = 1;
+ var fields = bits[i].split("|");
+ if (fields[1] === undefined) {
sys.log('Bad line: ' + fields);
continue;
- }
- if (fields[1].trim() == "ms") {
- if (! timers[key]) {
- timers[key] = [];
}
- timers[key].push(Number(fields[0] || 0));
- } else {
- if (fields[2] && fields[2].match(/^@([\d\.]+)/)) {
- sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]);
- }
- if (! counters[key]) {
- counters[key] = 0;
+ if (fields[1].trim() == "ms") {
+ if (! timers[key]) {
+ timers[key] = [];
+ }
+ timers[key].push(Number(fields[0] || 0));
+ } else {
+ if (fields[2] && fields[2].match(/^@([\d\.]+)/)) {
+ sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]);
+ }
+ if (! counters[key]) {
+ counters[key] = 0;
+ }
+ counters[key] += Number(fields[0] || 1) * (1 / sampleRate);
}
- counters[key] += Number(fields[0] || 1) * (1 / sampleRate);
}
}
});
@@ -61,82 +72,128 @@ config.configFile(process.argv[2], function (config, oldConfig) {
var flushInterval = Number(config.flushInterval || 10000);
- flushInt = setInterval(function () {
- var statString = '';
- var ts = Math.round(new Date().getTime() / 1000);
- var numStats = 0;
- var key;
+ if (config.flushToStatsD) {
+ flushInt = setInterval(function() {
+ var statString = '';
+ var messages = [];
- for (key in counters) {
- var value = counters[key] / (flushInterval / 1000);
- var message = 'stats.' + key + ' ' + value + ' ' + ts + "\n";
- message += 'stats_counts.' + key + ' ' + counters[key] + ' ' + ts + "\n";
- statString += message;
- counters[key] = 0;
+ for (key in counters) {
+ var value = counters[key];
- numStats += 1;
- }
+ if (value == 0) {
+ continue;
+ }
+
+ messages.push(key + ':' + value + "|c");
+ counters[key] = 0;
+ }
+
+ for (key in timers) {
+ if (timers[key].length > 0) {
+ for (var k = 0; k < timers[key].length; k++) {
+ var value = timers[key][k];
+
+ if (value == 0) {
+ continue;
+ }
- for (key in timers) {
- if (timers[key].length > 0) {
- var pctThreshold = config.percentThreshold || 90;
- var values = timers[key].sort(function (a,b) { return a-b; });
- var count = values.length;
- var min = values[0];
- var max = values[count - 1];
-
- var mean = min;
- var maxAtThreshold = max;
-
- if (count > 1) {
- var thresholdIndex = Math.round(((100 - pctThreshold) / 100) * count);
- var numInThreshold = count - thresholdIndex;
- values = values.slice(0, numInThreshold);
- maxAtThreshold = values[numInThreshold - 1];
-
- // average the remaining timings
- var sum = 0;
- for (var i = 0; i < numInThreshold; i++) {
- sum += values[i];
+ messages.push(key + ':' + value + "|ms");
}
- mean = sum / numInThreshold;
- }
+ timers[key] = [];
+ }
+ }
- timers[key] = [];
+ statString = messages.join(",");
- var message = "";
- message += 'stats.timers.' + key + '.mean ' + mean + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.upper_' + pctThreshold + ' ' + maxAtThreshold + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n";
+ try {
+ var socket = dgram.createSocket('udp4');
+ var buf = new Buffer(statString);
+ socket.send(buf, 0, buf.length, config.statsdPort, config.statsdHost);
+ socket.close();
+ } catch(e) {
+ if (config.debug) {
+ sys.log(e);
+ }
+ }
+ }, flushInterval);
+ } else {
+ flushInt = setInterval(function () {
+ var statString = '';
+ var ts = Math.round(new Date().getTime() / 1000);
+ var numStats = 0;
+ var key;
+
+ for (key in counters) {
+ var value = counters[key] / (flushInterval / 1000);
+ var message = 'stats.' + key + ' ' + value + ' ' + ts + "\n";
+ message += 'stats_counts.' + key + ' ' + counters[key] + ' ' + ts + "\n";
statString += message;
+ counters[key] = 0;
numStats += 1;
}
- }
- statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n";
-
- try {
- var graphite = net.createConnection(config.graphitePort, config.graphiteHost);
- graphite.addListener('error', function(connectionException){
- if (config.debug) {
- sys.log(connectionException);
+ for (key in timers) {
+ if (timers[key].length > 0) {
+ var pctThreshold = config.percentThreshold || 90;
+ var values = timers[key].sort(function (a,b) { return a-b; });
+ var count = values.length;
+ var min = values[0];
+ var max = values[count - 1];
+
+ var mean = min;
+ var maxAtThreshold = max;
+
+ if (count > 1) {
+ var thresholdIndex = Math.round(((100 - pctThreshold) / 100) * count);
+ var numInThreshold = count - thresholdIndex;
+ values = values.slice(0, numInThreshold);
+ maxAtThreshold = values[numInThreshold - 1];
+
+ // average the remaining timings
+ var sum = 0;
+ for (var i = 0; i < numInThreshold; i++) {
+ sum += values[i];
+ }
+
+ mean = sum / numInThreshold;
+ }
+
+ timers[key] = [];
+
+ var message = "";
+ message += 'stats.timers.' + key + '.mean ' + mean + ' ' + ts + "\n";
+ message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n";
+ message += 'stats.timers.' + key + '.upper_' + pctThreshold + ' ' + maxAtThreshold + ' ' + ts + "\n";
+ message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n";
+ message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n";
+ statString += message;
+
+ numStats += 1;
}
- });
- graphite.on('connect', function() {
- this.write(statString);
- this.end();
- });
- } catch(e){
- if (config.debug) {
- sys.log(e);
}
- }
- }, flushInterval);
+ statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n";
+
+ try {
+ var graphite = net.createConnection(config.graphitePort, config.graphiteHost);
+ graphite.addListener('error', function(connectionException){
+ if (config.debug) {
+ sys.log(connectionException);
+ }
+ });
+ graphite.on('connect', function() {
+ this.write(statString);
+ this.end();
+ });
+ } catch(e){
+ if (config.debug) {
+ sys.log(e);
+ }
+ }
+ }, flushInterval);
+ }
}
});
Something went wrong with that request. Please try again.