Move counter and timer calculations out of graphite backend and into separate module #167

Merged
merged 17 commits into from Nov 11, 2012
Jump to file or symbol
Failed to load files and symbols.
+236 −66
Diff settings

Always

Just for now

View
@@ -270,12 +270,15 @@ metrics: {
gauges: gauges,
timers: timers,
sets: sets,
+ counter_rates: counter_rates,
+ timer_data: timer_data,
pctThreshold: pctThreshold
}
```
- Each backend module is passed the same set of statistics, so a
- backend module should treat the metrics as immutable
+ The counter_rates and timer_data are precalculated statistics to simplify
+ the creation of backends. Each backend module is passed the same set of
+ statistics, so a backend module should treat the metrics as immutable
structures. StatsD will reset timers and counters after each
listener has handled the event.
View
@@ -31,9 +31,11 @@ ConsoleBackend.prototype.flush = function(timestamp, metrics) {
});
var out = {
- counter: this.statsCache.counters,
+ counters: this.statsCache.counters,
timers: this.statsCache.timers,
gauges: metrics.gauges,
+ timer_data: metrics.timer_data,
+ counter_rates: metrics.counter_rates,
sets: function (vals) {
var ret = {};
for (val in vals) {
View
@@ -55,88 +55,40 @@ var flush_stats = function graphite_flush(ts, metrics) {
var statString = '';
var numStats = 0;
var key;
-
+ var timer_data_key;
var counters = metrics.counters;
var gauges = metrics.gauges;
var timers = metrics.timers;
var sets = metrics.sets;
- var pctThreshold = metrics.pctThreshold;
+ var counter_rates = metrics.counter_rates;
+ var timer_data = metrics.timer_data;
for (key in counters) {
- var value = counters[key];
- var valuePerSecond = value / (flushInterval / 1000); // calculate "per second" rate
-
- statString += 'stats.' + key + ' ' + valuePerSecond + ' ' + ts + "\n";
- statString += 'stats_counts.' + key + ' ' + value + ' ' + ts + "\n";
+ statString += 'stats.' + key + ' ' + counter_rates[key] + ' ' + ts + "\n";
+ statString += 'stats_counts.' + key + ' ' + counters[key] + ' ' + ts + "\n";
numStats += 1;
}
- for (key in timers) {
- if (timers[key].length > 0) {
- var values = timers[key].sort(function (a,b) { return a-b; });
- var count = values.length;
- var min = values[0];
- var max = values[count - 1];
-
- var cumulativeValues = [min];
- for (var i = 1; i < count; i++) {
- cumulativeValues.push(values[i] + cumulativeValues[i-1]);
+ for (key in timer_data) {
+ if (Object.keys(timer_data).length > 0) {
+ for (timer_data_key in timer_data[key]) {
+ statString += 'stats.timers.' + key + '.' + timer_data_key + ' ' + timer_data[key][timer_data_key] + ' ' + ts + "\n";
}
- var sum = min;
- var mean = min;
- var maxAtThreshold = max;
-
- var message = "";
-
- var key2;
-
- for (key2 in pctThreshold) {
- var pct = pctThreshold[key2];
- if (count > 1) {
- var numInThreshold = Math.round(pct / 100 * count);
-
- maxAtThreshold = values[numInThreshold - 1];
- sum = cumulativeValues[numInThreshold - 1];
- mean = sum / numInThreshold;
- }
-
- var clean_pct = '' + pct;
- clean_pct.replace('.', '_');
- message += 'stats.timers.' + key + '.mean_' + clean_pct + ' ' + mean + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.upper_' + clean_pct + ' ' + maxAtThreshold + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.sum_' + clean_pct + ' ' + sum + ' ' + ts + "\n";
- }
-
- sum = cumulativeValues[count-1];
- mean = sum / count;
-
- var sumOfDiffs = 0;
- for (var i = 0; i < count; i++) {
- sumOfDiffs += (values[i] - mean) * (values[i] - mean);
- }
- var stddev = Math.sqrt(sumOfDiffs / count);
-
- message += 'stats.timers.' + key + '.std ' + stddev + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.sum ' + sum + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.mean ' + mean + ' ' + ts + "\n";
- statString += message;
-
numStats += 1;
}
}
for (key in gauges) {
statString += 'stats.gauges.' + key + ' ' + gauges[key] + ' ' + ts + "\n";
+
numStats += 1;
}
for (key in sets) {
statString += 'stats.sets.' + key + '.count ' + sets[key].values().length + ' ' + ts + "\n";
+
numStats += 1;
}
View
@@ -0,0 +1,85 @@
+var process_metrics = function (metrics, flushInterval, ts, flushCallback) {
+ var key;
+ var counter_rates = {};
+ var timer_data = {};
+ var counters = metrics.counters;
+ var timers = metrics.timers;
+ var pctThreshold = metrics.pctThreshold;
+
+ for (key in counters) {
+ var value = counters[key];
+
+ // calculate "per second" rate
+ var valuePerSecond = value / (flushInterval / 1000);
+ counter_rates[key] = valuePerSecond;
+ }
+
+ for (key in timers) {
+ if (timers[key].length > 0) {
+ timer_data[key] = {};
+ var current_timer_data = {};
+
+ var values = timers[key].sort(function (a,b) { return a-b; });
+ var count = values.length;
+ var min = values[0];
+ var max = values[count - 1];
+
+ var cumulativeValues = [min];
+ for (var i = 1; i < count; i++) {
+ cumulativeValues.push(values[i] + cumulativeValues[i-1]);
+ }
+
+ var sum = min;
+ var mean = min;
+ var maxAtThreshold = max;
+
+ var message = "";
+
+ var key2;
+
+ for (key2 in pctThreshold) {
+ var pct = pctThreshold[key2];
+ if (count > 1) {
+ var numInThreshold = Math.round(pct / 100 * count);
+
+ maxAtThreshold = values[numInThreshold - 1];
+ sum = cumulativeValues[numInThreshold - 1];
+ mean = sum / numInThreshold;
+ }
+
+ var clean_pct = '' + pct;
+ clean_pct.replace('.', '_');
+ current_timer_data["mean_" + clean_pct] = mean;
+ current_timer_data["upper_" + clean_pct] = maxAtThreshold;
+ current_timer_data["sum_" + clean_pct] = sum;
+
+ }
+
+ sum = cumulativeValues[count-1];
+ mean = sum / count;
+
+ var sumOfDiffs = 0;
+ for (var i = 0; i < count; i++) {
+ sumOfDiffs += (values[i] - mean) * (values[i] - mean);
+ }
+ var stddev = Math.sqrt(sumOfDiffs / count);
+ current_timer_data["std"] = stddev;
+ current_timer_data["upper"] = max;
+ current_timer_data["lower"] = min;
+ current_timer_data["count"] = count;
+ current_timer_data["sum"] = sum;
+ current_timer_data["mean"] = mean;
+
+ timer_data[key] = current_timer_data;
+
+ }
+ }
+
+ //add processed metrics to the metrics_hash
+ metrics.counter_rates = counter_rates;
+ metrics.timer_data = timer_data;
+
+ flushCallback(metrics);
+ }
+
+exports.process_metrics = process_metrics
View
@@ -6,6 +6,8 @@ var dgram = require('dgram')
, events = require('events')
, logger = require('./lib/logger')
, set = require('./lib/set')
+ , pm = require('./lib/process_metrics')
+
// initialize data structures with defaults for statsd stats
var keyCounter = {};
@@ -16,6 +18,8 @@ var counters = {
var timers = {};
var gauges = {};
var sets = {};
+var counter_rates = {};
+var timer_data = {};
var pctThreshold = null;
var debugInt, flushInterval, keyFlushInt, server, mgmtServer;
var startup_time = Math.round(new Date().getTime() / 1000);
@@ -45,6 +49,8 @@ function flushMetrics() {
gauges: gauges,
timers: timers,
sets: sets,
+ counter_rates: counter_rates,
+ timer_data: timer_data,
pctThreshold: pctThreshold
}
@@ -66,14 +72,16 @@ function flushMetrics() {
}
});
- // Flush metrics to each backend.
- backendEvents.emit('flush', time_stamp, metrics_hash);
+ pm.process_metrics(metrics_hash, flushInterval, time_stamp, function emitFlush(metrics) {
+ backendEvents.emit('flush', time_stamp, metrics);
+ });
+
};
var stats = {
messages: {
last_msg_seen: startup_time,
- bad_lines_seen: 0,
+ bad_lines_seen: 0
}
};
View
@@ -240,7 +240,7 @@ module.exports = {
var mykey = 'statsd.numStats';
return _.include(_.keys(post),mykey) && (post[mykey] == 3);
};
- test.ok(_.any(hashes,numstat_test), 'statsd.numStats should be 1');
+ test.ok(_.any(hashes,numstat_test), 'statsd.numStats should be 3');
var testavgvalue_test = function(post){
var mykey = 'stats.a_test_value';
@@ -0,0 +1,120 @@
+var pm = require('../lib/process_metrics')
+
+module.exports = {
+ setUp: function (callback) {
+ this.time_stamp = Math.round(new Date().getTime() / 1000);
+
+ var counters = {};
+ var gauges = {};
+ var timers = {};
+ var sets = {};
+ var pctThreshold = null;
+
+ this.metrics = {
+ counters: counters,
+ gauges: gauges,
+ timers: timers,
+ sets: sets,
+ pctThreshold: pctThreshold
+ }
+ callback();
+ },
+ counters_has_stats_count: function(test) {
+ test.expect(1);
+ this.metrics.counters['a'] = 2;
+ pm.process_metrics(this.metrics, 1000, this.time_stamp, function(){});
+ test.equal(2, this.metrics.counters['a']);
+ test.done();
+ },
+ counters_has_correct_rate: function(test) {
+ test.expect(1);
+ this.metrics.counters['a'] = 2;
+ pm.process_metrics(this.metrics, 100, this.time_stamp, function(){});
+ test.equal(20, this.metrics.counter_rates['a']);
+ test.done();
+ },
+ timers_handle_empty: function(test) {
+ test.expect(1);
+ this.metrics.timers['a'] = [];
+ pm.process_metrics(this.metrics, 100, this.time_stamp, function(){});
+ //potentially a cleaner way to check this
+ test.equal(undefined, this.metrics.counter_rates['a']);
+ test.done();
+ },
+ timers_single_time: function(test) {
+ test.expect(6);
+ this.metrics.timers['a'] = [100];
+ pm.process_metrics(this.metrics, 100, this.time_stamp, function(){});
+ timer_data = this.metrics.timer_data['a'];
+ test.equal(0, timer_data.std);
+ test.equal(100, timer_data.upper);
+ test.equal(100, timer_data.lower);
+ test.equal(1, timer_data.count);
+ test.equal(100, timer_data.sum);
+ test.equal(100, timer_data.mean);
+ test.done();
+ },
+ timers_multiple_times: function(test) {
+ test.expect(6);
+ this.metrics.timers['a'] = [100, 200, 300];
+ pm.process_metrics(this.metrics, 100, this.time_stamp, function(){});
+ timer_data = this.metrics.timer_data['a'];
+ test.equal(81.64965809277261, timer_data.std);
+ test.equal(300, timer_data.upper);
+ test.equal(100, timer_data.lower);
+ test.equal(3, timer_data.count);
+ test.equal(600, timer_data.sum);
+ test.equal(200, timer_data.mean);
+ test.done();
+ },
+ timers_single_time_single_percentile: function(test) {
+ test.expect(3);
+ this.metrics.timers['a'] = [100];
+ this.metrics.pctThreshold = [90];
+ pm.process_metrics(this.metrics, 100, this.time_stamp, function(){});
+ timer_data = this.metrics.timer_data['a'];
+ test.equal(100, timer_data.mean_90);
+ test.equal(100, timer_data.upper_90);
+ test.equal(100, timer_data.sum_90);
+ test.done();
+ },
+ timers_single_time_multiple_percentiles: function(test) {
+ test.expect(6);
+ this.metrics.timers['a'] = [100];
+ this.metrics.pctThreshold = [90, 80];
+ pm.process_metrics(this.metrics, 100, this.time_stamp, function(){});
+ timer_data = this.metrics.timer_data['a'];
+ test.equal(100, timer_data.mean_90);
+ test.equal(100, timer_data.upper_90);
+ test.equal(100, timer_data.sum_90);
+ test.equal(100, timer_data.mean_80);
+ test.equal(100, timer_data.upper_80);
+ test.equal(100, timer_data.sum_80);
+ test.done();
+ },
+ timers_multiple_times_single_percentiles: function(test) {
+ test.expect(3);
+ this.metrics.timers['a'] = [100, 200, 300];
+ this.metrics.pctThreshold = [90];
+ pm.process_metrics(this.metrics, 100, this.time_stamp, function(){});
+ timer_data = this.metrics.timer_data['a'];
+ test.equal(200, timer_data.mean_90);
+ test.equal(300, timer_data.upper_90);
+ test.equal(600, timer_data.sum_90);
+ test.done();
+ },
+ timers_multiple_times_multiple_percentiles: function(test) {
+ test.expect(6);
+ this.metrics.timers['a'] = [100, 200, 300];
+ this.metrics.pctThreshold = [90, 80];
+ pm.process_metrics(this.metrics, 100, this.time_stamp, function(){});
+ timer_data = this.metrics.timer_data['a'];
+ test.equal(200, timer_data.mean_90);
+ test.equal(300, timer_data.upper_90);
+ test.equal(600, timer_data.sum_90);
+ test.equal(150, timer_data.mean_80);
+ test.equal(200, timer_data.upper_80);
+ test.equal(300, timer_data.sum_80);
+ test.done();
+ }
+}