Skip to content

Commit

Permalink
do timer and counter calculations prior to sending to backends
Browse files Browse the repository at this point in the history
  • Loading branch information
draco2003 committed Oct 13, 2012
1 parent 5516c1e commit ec9cc25
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 58 deletions.
2 changes: 2 additions & 0 deletions README.md
Expand Up @@ -272,6 +272,8 @@ metrics: {
gauges: gauges,
timers: timers,
sets: sets,
counter_rates: counter_rates,
timer_data: timer_data,
pctThreshold: pctThreshold
}
```
Expand Down
2 changes: 2 additions & 0 deletions backends/console.js
Expand Up @@ -34,6 +34,8 @@ ConsoleBackend.prototype.flush = function(timestamp, metrics) {
counter: this.statsCache.counters,
timers: this.statsCache.timers,
gauges: metrics.gauges,
timer_data: metrics.timer_data,
counter_rates: metrics.counter_rates,
sets: function (vals) {
var ret = {};
for (val in vals) {
Expand Down
67 changes: 9 additions & 58 deletions backends/graphite.js
Expand Up @@ -55,89 +55,40 @@ var flush_stats = function graphite_flush(ts, metrics) {
var statString = '';
var numStats = 0;
var key;

var timer_data_key;
var counters = metrics.counters;
var gauges = metrics.gauges;
var timers = metrics.timers;
var sets = metrics.sets;
var pctThreshold = metrics.pctThreshold;
var counter_rates = metrics.counter_rates;
var timer_data = metrics.timer_data;

for (key in counters) {
var value = counters[key];
var valuePerSecond = value / (flushInterval / 1000); // calculate "per second" rate

statString += 'stats.' + key + ' ' + valuePerSecond + ' ' + ts + "\n";
statString += 'stats_counts.' + key + ' ' + value + ' ' + ts + "\n";
statString += 'stats.' + key + ' ' + counter_rates[key] + ' ' + ts + "\n";
statString += 'stats_counts.' + key + ' ' + counters[key] + ' ' + ts + "\n";

numStats += 1;
}

for (key in timers) {
if (timers[key].length > 0) {
var values = timers[key].sort(function (a,b) { return a-b; });
var count = values.length;
var min = values[0];
var max = values[count - 1];

var cumulativeValues = [min];
for (var i = 1; i < count; i++) {
cumulativeValues.push(values[i] + cumulativeValues[i-1]);
}

var sum = min;
var mean = min;
var maxAtThreshold = max;

var message = "";

var key2;

for (key2 in pctThreshold) {
var pct = pctThreshold[key2];
if (count > 1) {
var thresholdIndex = Math.round(((100 - pct) / 100) * count);
var numInThreshold = count - thresholdIndex;

maxAtThreshold = values[numInThreshold - 1];
sum = cumulativeValues[numInThreshold - 1];
mean = sum / numInThreshold;
}

var clean_pct = '' + pct;
clean_pct.replace('.', '_');
message += 'stats.timers.' + key + '.mean_' + clean_pct + ' ' + mean + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.upper_' + clean_pct + ' ' + maxAtThreshold + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.sum_' + clean_pct + ' ' + sum + ' ' + ts + "\n";
}

sum = cumulativeValues[count-1];
mean = sum / count;

var sumOfDiffs = 0;
for (var i = 0; i < count; i++) {
sumOfDiffs += (values[i] - mean) * (values[i] - mean);
for (timer_data_key in timer_data[key]) {
statString += 'stats.timers.' + key + '.' + timer_data_key + ' ' + timer_data[key][timer_data_key] + ' ' + ts + "\n";
}
var stddev = Math.sqrt(sumOfDiffs / count);

message += 'stats.timers.' + key + '.std ' + stddev + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.sum ' + sum + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.mean ' + mean + ' ' + ts + "\n";
statString += message;

numStats += 1;
}
}

for (key in gauges) {
statString += 'stats.gauges.' + key + ' ' + gauges[key] + ' ' + ts + "\n";

numStats += 1;
}

for (key in sets) {
statString += 'stats.sets.' + key + '.count ' + sets[key].values().length + ' ' + ts + "\n";

numStats += 1;
}

Expand Down
88 changes: 88 additions & 0 deletions lib/processedmetrics.js
@@ -0,0 +1,88 @@
var ProcessedMetrics = function (metrics, flushInterval) {
var starttime = Date.now();
var key;
var counter_rates = {};
var timer_data = {};

var counters = metrics.counters;
var timers = metrics.timers;
var pctThreshold = metrics.pctThreshold;

for (key in counters) {
var value = counters[key];

// calculate "per second" rate
var valuePerSecond = value / (flushInterval / 1000);
counter_rates[key] = valuePerSecond;
}

for (key in timers) {
if (timers[key].length > 0) {
timer_data[key] = {};
var current_timer_data = {};

var values = timers[key].sort(function (a,b) { return a-b; });
var count = values.length;
var min = values[0];
var max = values[count - 1];

var cumulativeValues = [min];
for (var i = 1; i < count; i++) {
cumulativeValues.push(values[i] + cumulativeValues[i-1]);
}

var sum = min;
var mean = min;
var maxAtThreshold = max;

var message = "";

var key2;

for (key2 in pctThreshold) {
var pct = pctThreshold[key2];
if (count > 1) {
var thresholdIndex = Math.round(((100 - pct) / 100) * count);
var numInThreshold = count - thresholdIndex;

maxAtThreshold = values[numInThreshold - 1];
sum = cumulativeValues[numInThreshold - 1];
mean = sum / numInThreshold;
}

var clean_pct = '' + pct;
clean_pct.replace('.', '_');
current_timer_data["mean_" + clean_pct] = mean;
current_timer_data["upper_" + clean_pct] = maxAtThreshold;
current_timer_data["sum_" + clean_pct] = sum;

}

sum = cumulativeValues[count-1];
mean = sum / count;

var sumOfDiffs = 0;
for (var i = 0; i < count; i++) {
sumOfDiffs += (values[i] - mean) * (values[i] - mean);
}
var stddev = Math.sqrt(sumOfDiffs / count);
current_timer_data["std"] = stddev;
current_timer_data["upper"] = max;
current_timer_data["lower"] = min;
current_timer_data["count"] = count;
current_timer_data["sum"] = sum;
current_timer_data["mean"] = mean;

timer_data[key] = current_timer_data;

}
}

//add processed metrics to the metrics_hash
metrics.counter_rates = counter_rates;
metrics.timer_data = timer_data;

return metrics;
}

exports.ProcessedMetrics = ProcessedMetrics
7 changes: 7 additions & 0 deletions stats.js
Expand Up @@ -6,6 +6,7 @@ var dgram = require('dgram')
, events = require('events')
, logger = require('./lib/logger')
, set = require('./lib/set')
, pm = require('./lib/processedmetrics')

// initialize data structures with defaults for statsd stats
var keyCounter = {};
Expand All @@ -16,6 +17,8 @@ var counters = {
var timers = {};
var gauges = {};
var sets = {};
var counter_rates = {};
var timer_data = {};
var pctThreshold = null;
var debugInt, flushInterval, keyFlushInt, server, mgmtServer;
var startup_time = Math.round(new Date().getTime() / 1000);
Expand Down Expand Up @@ -45,6 +48,8 @@ function flushMetrics() {
gauges: gauges,
timers: timers,
sets: sets,
counter_rates: counter_rates,
timer_data: timer_data,
pctThreshold: pctThreshold
}

Expand All @@ -66,6 +71,8 @@ function flushMetrics() {
}
});

metrics_hash = pm.ProcessedMetrics(metrics_hash, flushInterval)

// Flush metrics to each backend.
backendEvents.emit('flush', time_stamp, metrics_hash);
};
Expand Down
116 changes: 116 additions & 0 deletions test/processedmetrics_tests.js
@@ -0,0 +1,116 @@
var pm = require('../lib/processedmetrics')
var time_stamp = Math.round(new Date().getTime() / 1000);

var counters = {};
var gauges = {};
var timers = {};
var sets = {};
var pctThreshold = null;

var metrics = {
counters: counters,
gauges: gauges,
timers: timers,
sets: sets,
pctThreshold: pctThreshold
}

module.exports = {
counters_has_stats_count: function(test) {
test.expect(1);
metrics.counters['a'] = 2;
var processed_metrics = new pm.ProcessedMetrics(metrics, 1000);
test.equal(2, processed_metrics.counters['a']);
test.done();
},
counters_has_correct_rate: function(test) {
test.expect(1);
metrics.counters['a'] = 2;
var processed_metrics = new pm.ProcessedMetrics(metrics, 100);
test.equal(20, processed_metrics.counter_rates['a']);
test.done();
},
timers_handle_empty: function(test) {
test.expect(1);
metrics.timers['a'] = [];
var processed_metrics = new pm.ProcessedMetrics(metrics, 100);
test.equal(20, processed_metrics.counter_rates['a']);
test.done();
},
timers_single_time: function(test) {
test.expect(6);
metrics.timers['a'] = [100];
var processed_metrics = new pm.ProcessedMetrics(metrics, 100);
timer_data = processed_metrics.timer_data['a'];
test.equal(0, timer_data.std);
test.equal(100, timer_data.upper);
test.equal(100, timer_data.lower);
test.equal(1, timer_data.count);
test.equal(100, timer_data.sum);
test.equal(100, timer_data.mean);
test.done();
},
timers_multiple_times: function(test) {
test.expect(6);
metrics.timers['a'] = [100, 200, 300];
var processed_metrics = new pm.ProcessedMetrics(metrics, 100);
timer_data = processed_metrics.timer_data['a'];
test.equal(81.64965809277261, timer_data.std);
test.equal(300, timer_data.upper);
test.equal(100, timer_data.lower);
test.equal(3, timer_data.count);
test.equal(600, timer_data.sum);
test.equal(200, timer_data.mean);
test.done();
},
timers_single_time_single_percentile: function(test) {
test.expect(3);
metrics.timers['a'] = [100];
metrics.pctThreshold = [90];
var processed_metrics = new pm.ProcessedMetrics(metrics, 100);
timer_data = processed_metrics.timer_data['a'];
test.equal(100, timer_data.mean_90);
test.equal(100, timer_data.upper_90);
test.equal(100, timer_data.sum_90);
test.done();
},
timers_single_time_multiple_percentiles: function(test) {
test.expect(6);
metrics.timers['a'] = [100];
metrics.pctThreshold = [90, 80];
var processed_metrics = new pm.ProcessedMetrics(metrics, 100);
timer_data = processed_metrics.timer_data['a'];
test.equal(100, timer_data.mean_90);
test.equal(100, timer_data.upper_90);
test.equal(100, timer_data.sum_90);
test.equal(100, timer_data.mean_80);
test.equal(100, timer_data.upper_80);
test.equal(100, timer_data.sum_80);
test.done();
},
timers_multiple_times_single_percentiles: function(test) {
test.expect(3);
metrics.timers['a'] = [100, 200, 300];
metrics.pctThreshold = [90];
var processed_metrics = new pm.ProcessedMetrics(metrics, 100);
timer_data = processed_metrics.timer_data['a'];
test.equal(200, timer_data.mean_90);
test.equal(300, timer_data.upper_90);
test.equal(600, timer_data.sum_90);
test.done();
},
timers_multiple_times_multiple_percentiles: function(test) {
test.expect(6);
metrics.timers['a'] = [100, 200, 300];
metrics.pctThreshold = [90, 80];
var processed_metrics = new pm.ProcessedMetrics(metrics, 100);
timer_data = processed_metrics.timer_data['a'];
test.equal(200, timer_data.mean_90);
test.equal(300, timer_data.upper_90);
test.equal(600, timer_data.sum_90);
test.equal(150, timer_data.mean_80);
test.equal(200, timer_data.upper_80);
test.equal(300, timer_data.sum_80);
test.done();
}
}

0 comments on commit ec9cc25

Please sign in to comment.