Skip to content

Commit

Permalink
initial import of StatsD
Browse files Browse the repository at this point in the history
  • Loading branch information
kastner committed Dec 30, 2010
0 parents commit 122964c
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 0 deletions.
39 changes: 39 additions & 0 deletions config.js
@@ -0,0 +1,39 @@
var fs = require('fs')
, sys = require('sys')

var Configurator = function (file) {

var self = this;
var config = {};
var oldConfig = {};

this.updateConfig = function () {
sys.log('reading config file: ' + file);

fs.readFile(file, function (err, data) {
if (err) { throw err; }
old_config = self.config;

self.config = process.compile('config = ' + data, file);
self.emit('configChanged', self.config);
});
};

this.updateConfig();

fs.watchFile(file, function (curr, prev) {
if (curr.ino != prev.ino) { self.updateConfig(); }
});
};

sys.inherits(Configurator, process.EventEmitter);

exports.Configurator = Configurator;

exports.configFile = function(file, callbackFunc) {
var config = new Configurator(file);
config.on('configChanged', function() {
callbackFunc(config.config, config.oldConfig);
});
};

6 changes: 6 additions & 0 deletions exampleConfig.js
@@ -0,0 +1,6 @@
{
graphitePort: 2003
, graphiteHost: "graphite.host.com"
, port: 8125
}

96 changes: 96 additions & 0 deletions php-example.php
@@ -0,0 +1,96 @@
<?php

/**
* Sends statistics to the stats daemon over UDP
*
**/

class StatsD {

/**
* Log timing information
*
* @param string $stats The metric to in log timing info for.
* @param float $time The ellapsed time (ms) to log
* @param float|1 $sampleRate the rate (0-1) for sampling.
**/
public static function timing($stat, $time, $sampleRate=1) {
StatsD::send(array($stat => "$time|ms"), $sampleRate);
}

/**
* Increments one or more stats counters
*
* @param string|array $stats The metric(s) to increment.
* @param float|1 $sampleRate the rate (0-1) for sampling.
* @return boolean
**/
public static function increment($stats, $sampleRate=1) {
StatsD::updateStats($stats, 1, $sampleRate);
}

/**
* Decrements one or more stats counters.
*
* @param string|array $stats The metric(s) to decrement.
* @param float|1 $sampleRate the rate (0-1) for sampling.
* @return boolean
**/
public static function decrement($stats, $sampleRate=1) {
StatsD::updateStats($stats, -1, $sampleRate);
}

/**
* Updates one or more stats counters by arbitrary amounts.
*
* @param string|array $stats The metric(s) to update. Should be either a string or array of metrics.
* @param int|1 $delta The amount to increment/decrement each metric by.
* @param float|1 $sampleRate the rate (0-1) for sampling.
* @return boolean
**/
public static function updateStats($stats, $delta=1, $sampleRate=1) {
if (!is_array($stats)) { $stats = array($stats); }
$data = array();
foreach($stats as $stat) {
$data[$stat] = "$delta|c";
}

StatsD::send($data, $sampleRate);
}

/*
* Squirt the metrics over UDP
**/
public static function send($data, $sampleRate=1) {
$config = Config::getInstance();
if (! $config->isEnabled("statsd")) { return; }

// sampling
$sampledData = array();

if ($sampleRate < 1) {
foreach ($data as $stat => $value) {
if ((mt_rand() / mt_getrandmax()) <= $sampleRate) {
$sampledData[$stat] = "$value|@$sampleRate";
}
}
} else {
$sampledData = $data;
}

if (empty($sampledData)) { return; }

// Wrap this in a try/catch - failures in any of this should be silently ignored
try {
$host = $config->getConfig("statsd.host");
$port = $config->getConfig("statsd.port");
$fp = fsockopen("udp://$host", $port, $errno, $errstr);
if (! $fp) { return; }
foreach ($sampledData as $stat => $value) {
fwrite($fp, "$stat:$value");
}
fclose($fp);
} catch (Exception $e) {
}
}
}
128 changes: 128 additions & 0 deletions stats.js
@@ -0,0 +1,128 @@
var dgram = require('dgram')
, sys = require('sys')
, net = require('net')
, config = require('./config')

var counters = {};
var timers = {};
var debugInt, flushInt, server;

config.configFile(process.argv[2], function (config, oldConfig) {
if (! config.debug && debugInt) {
clearInterval(debugInt);
debugInt = false;
}

if (config.debug) {
if (debugInt !== undefined) { clearInterval(debugInt); }
debugInt = setInterval(function () {
sys.log("Counters:\n" + sys.inspect(counters) + "\nTimers:\n" + sys.inspect(timers));
}, config.debugInterval || 10000);
}

if (server === undefined) {
server = dgram.createSocket('udp4', function (msg, rinfo) {
if (config.dumpMessages) { sys.log(msg.toString()); }
var bits = msg.toString().split(':');
var key = bits.shift()
.replace(/\s+/g, '_')
.replace(/\//g, '-')
.replace(/[^a-zA-Z_\-0-9\.]/g, '');

if (bits.length == 0) {
bits.push("1");
}

for (var i = 0; i < bits.length; i++) {
var sampleRate = 1;
var fields = bits[i].split("|");
if (fields[1] == "ms") {
if (! timers[key]) {
timers[key] = [];
}
timers[key].push(Number(fields[0] || 0));
} else {
if (fields[2] && fields[2].match(/^@([\d\.]+)/)) {
sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]);
}
if (! counters[key]) {
counters[key] = 0;
}
counters[key] += Number(fields[0] || 1) * (1 / sampleRate);
}
}
});

server.bind(config.port || 8125);

var flushInterval = Number(config.flushInterval || 10000);

flushInt = setInterval(function () {
var statString = '';
var ts = Math.round(new Date().getTime() / 1000);
var numStats = 0;
var key;

for (key in counters) {
var value = counters[key] / (flushInterval / 1000);
var message = 'stats.' + key + ' ' + value + ' ' + ts + "\n";
statString += message;
counters[key] = 0;

numStats += 1;
}

for (key in timers) {
if (timers[key].length > 0) {
var pctThreshold = config.percentThreshold || 90;
var values = timers[key].sort(function (a,b) { return a-b; });
var count = values.length;
var min = values[0];
var max = values[count - 1];

var mean = min;
var maxAtThreshold = max;

if (count > 1) {
var thresholdIndex = Math.round(((100 - pctThreshold) / 100) * count);
var numInThreshold = count - thresholdIndex;
values = values.slice(0, numInThreshold);
maxAtThreshold = values[numInThreshold - 1];

// average the remaining timings
var sum = 0;
for (var i = 0; i < numInThreshold; i++) {
sum += values[i];
}

mean = sum / numInThreshold;
}

timers[key] = [];

var message = "";
message += 'stats.timers.' + key + '.mean ' + mean + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.upper_' + pctThreshold + ' ' + maxAtThreshold + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n";
statString += message;

numStats += 1;
}
}

statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n";

var graphite = net.createConnection(config.graphitePort, config.graphiteHost);

graphite.on('connect', function() {
this.write(statString);
this.end();
});

}, flushInterval);
}

});

0 comments on commit 122964c

Please sign in to comment.