Skip to content

Commit

Permalink
Switch the flush and status commands to an event listener model.
Browse files Browse the repository at this point in the history
  • Loading branch information
mheffner committed Apr 26, 2012
1 parent 784f07a commit fe91370
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 62 deletions.
59 changes: 34 additions & 25 deletions README.md
Expand Up @@ -156,23 +156,31 @@ Tests can be executd with `./run_tests.sh`.
Backend Interface
-----------------

Backend modules are Node.js [modules][nodemods] that export the
following methods:
Backend modules are Node.js [modules][nodemods] that listen for a
number of events emitted from StatsD. Each backend module should
export the following initialization function:

* `init(startup_time, config)`: This method is invoked from StatsD to
initialize the backend module. It accepts two parameters:
`startup_time` is the startup time of StatsD in epoch seconds, and
`config` is the parsed config file hash.
* `init(startup_time, config, events)`: This method is invoked from StatsD to
initialize the backend module. It accepts three parameters:
`startup_time` is the startup time of StatsD in epoch seconds,
`config` is the parsed config file hash, and `events` is the event
emitter that backends can use to listen for events.

The backend module should return `true` from init() to indicate
success. A return of `false` indicates a failure to load the module
(missing configuration?) and will cause StatsD to exit.

* `flush(time_stamp, metrics)`: This function will be invoked by
StatsD on each flush interval to flush the current statistics to the
backend service. The function is passed two parameters: `time_stamp`
is the current time in epoch seconds and `metrics` is a hash
representing the StatsD statistics:
Backends can listen for the following events emitted by StatsD from
the `events` object:

* Event: **'flush'**

Parameters: `(time_stamp, metrics)`

Emitted on each flush interval so that backends can push aggregate
metrics to their respective backend services. The event is passed
two parameters: `time_stamp` is the current time in epoch seconds
and `metrics` is a hash representing the StatsD statistics:

```
metrics: {
Expand All @@ -184,23 +192,24 @@ metrics: {
```

Each backend module is passed the same set of statistics, so a
backend module should handle the metrics as immutable
structures. StatsD will reset timers and counters after flushing
them to each backend module. StatsD flushes statistics to each
backend sequentially, so backends should use the appropriate
asynchronous methods to flush statistics to upstream services.
backend module should treat the metrics as immutable
structures. StatsD will reset timers and counters after each
listener has handled the event.

* Event: **'status'**

Parameters: `(writeCb)`

* `stats(writeCb)`: This method is invoked from StatsD when a user
invokes a *stats* command on the management server port. It allows
the the backend module to dump backend-specific stats to the
management port.
Emitted when a user invokes a *stats* command on the management
server port. It allows each backend module to dump backend-specific
status statistics to the management port.

The `writeCb` callback function has a signature of `f(error,
stat_name, stat_value)`. The backend module should invoke this
method with each <stat_name, stat_value> tuple that should be sent to
the management port. StatsD will prefix the stat name with the
backend's module name. The backend should set `error` to *null*,
or, in the case of a failure, an appropriate error.
backend_name, stat_name, stat_value)`. The backend module should
invoke this method with each stat_name and stat_value that should be
sent to the management port. StatsD will prefix each stat name with
the `backend_name`. The backend should set `error` to *null*, or, in
the case of a failure, an appropriate error.

Inspiration
-----------
Expand Down
11 changes: 7 additions & 4 deletions backends/graphite.js
Expand Up @@ -45,7 +45,7 @@ var post_stats = function graphite_post_stats(statString) {
}
}

exports.flush = function graphite_flush(ts, metrics) {
var flush_stats = function graphite_flush(ts, metrics) {
var statString = '';
var numStats = 0;
var key;
Expand Down Expand Up @@ -120,13 +120,13 @@ exports.flush = function graphite_flush(ts, metrics) {
post_stats(statString);
};

exports.stats = function graphite_stats(writeCb) {
var backend_status = function graphite_status(writeCb) {
for (stat in graphiteStats) {
writeCb(null, stat, graphiteStats[stat]);
writeCb(null, 'graphite', stat, graphiteStats[stat]);
}
};

exports.init = function graphite_init(startup_time, config) {
exports.init = function graphite_init(startup_time, config, events) {
debug = config.debug;
graphiteHost = config.graphiteHost;
graphitePort = config.graphitePort;
Expand All @@ -136,5 +136,8 @@ exports.init = function graphite_init(startup_time, config) {

flushInterval = config.flushInterval;

events.on('flush', flush_stats);
events.on('status', backend_status);

return true;
};
64 changes: 31 additions & 33 deletions stats.js
Expand Up @@ -3,61 +3,58 @@ var dgram = require('dgram')
, net = require('net')
, config = require('./config')
, fs = require('fs')
, events = require('events')

var keyCounter = {};
var counters = {};
var timers = {};
var gauges = {};
var pctThreshold = null;
var backends = [];
var debugInt, flushInterval, keyFlushInt, server, mgmtServer;
var startup_time = Math.round(new Date().getTime() / 1000);
var backendEvents = new events.EventEmitter();

// Load and init the backend from the backends/ directory.
function loadBackend(config, name) {
var backendmod = require("./backends/" + name);
var backend = {
name: name,
mod: backendmod
};

if (config.debug) {
util.log("Loading backend: " + name);
}

var ret = backendmod.init(startup_time, config);
var ret = backendmod.init(startup_time, config, backendEvents);
if (!ret) {
util.log("Failed to load backend: " + name);
process.exit(1);
}

backends.push(backend);
};

// Flush metrics to each backend.
function flushMetrics() {
var ts = Math.round(new Date().getTime() / 1000);
var time_stamp = Math.round(new Date().getTime() / 1000);

var metrics = {
var metrics_hash = {
counters: counters,
gauges: gauges,
timers: timers,
pctThreshold: pctThreshold
}

for (var i = 0; i < backends.length; i++) {
backends[i].mod.flush(ts, metrics);
}
// After all listeners, reset the stats
backendEvents.once('flush', function clear_metrics(ts, metrics) {
// Clear the counters
for (key in metrics.counters) {
metrics.counters[key] = 0;
}

// Clear the counters
for (key in counters) {
counters[key] = 0;
}
// Clear the timers
for (key in metrics.timers) {
metrics.timers[key] = [];
}
});

// Clear the timers
for (key in timers) {
timers[key] = [];
}
// Flush metrics to each backend.
backendEvents.emit('flush', time_stamp, metrics_hash);
};

var stats = {
Expand Down Expand Up @@ -173,19 +170,20 @@ config.configFile(process.argv[2], function (config, oldConfig) {
}
}

// Retrieve stats from each backend
for (var i = 0; i < backends.length; i++) {
backends[i].mod.stats(function(err, stat, val) {
if (err) {
util.log("Failed to read stats for backend " +
backends[i].name + ": " + err);
} else {
stat_writer(backends[i].name, stat, val);
}
});
}
backendEvents.once('status', function(writeCb) {
stream.write("END\n\n");
});

// Let each backend contribute its status
backendEvents.emit('status', function(err, name, stat, val) {
if (err) {
util.log("Failed to read stats for backend " +
name + ": " + err);
} else {
stat_writer(name, stat, val);
}
});

stream.write("END\n\n");
break;

case "counters":
Expand Down

0 comments on commit fe91370

Please sign in to comment.