From fe9137094213fb442bbcbddb96abca429302378f Mon Sep 17 00:00:00 2001 From: Mike Heffner Date: Wed, 4 Apr 2012 11:19:24 -0400 Subject: [PATCH] Switch the flush and status commands to an event listener model. --- README.md | 59 +++++++++++++++++++++++----------------- backends/graphite.js | 11 +++++--- stats.js | 64 +++++++++++++++++++++----------------------- 3 files changed, 72 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index 4c5947d1..6b1cd299 100644 --- a/README.md +++ b/README.md @@ -156,23 +156,31 @@ Tests can be executd with `./run_tests.sh`. Backend Interface ----------------- -Backend modules are Node.js [modules][nodemods] that export the -following methods: +Backend modules are Node.js [modules][nodemods] that listen for a +number of events emitted from StatsD. Each backend module should +export the following initialization function: -* `init(startup_time, config)`: This method is invoked from StatsD to - initialize the backend module. It accepts two parameters: - `startup_time` is the startup time of StatsD in epoch seconds, and - `config` is the parsed config file hash. +* `init(startup_time, config, events)`: This method is invoked from StatsD to + initialize the backend module. It accepts three parameters: + `startup_time` is the startup time of StatsD in epoch seconds, + `config` is the parsed config file hash, and `events` is the event + emitter that backends can use to listen for events. The backend module should return `true` from init() to indicate success. A return of `false` indicates a failure to load the module (missing configuration?) and will cause StatsD to exit. -* `flush(time_stamp, metrics)`: This function will be invoked by - StatsD on each flush interval to flush the current statistics to the - backend service. The function is passed two parameters: `time_stamp` - is the current time in epoch seconds and `metrics` is a hash - representing the StatsD statistics: +Backends can listen for the following events emitted by StatsD from +the `events` object: + +* Event: **'flush'** + + Parameters: `(time_stamp, metrics)` + + Emitted on each flush interval so that backends can push aggregate + metrics to their respective backend services. The event is passed + two parameters: `time_stamp` is the current time in epoch seconds + and `metrics` is a hash representing the StatsD statistics: ``` metrics: { @@ -184,23 +192,24 @@ metrics: { ``` Each backend module is passed the same set of statistics, so a - backend module should handle the metrics as immutable - structures. StatsD will reset timers and counters after flushing - them to each backend module. StatsD flushes statistics to each - backend sequentially, so backends should use the appropriate - asynchronous methods to flush statistics to upstream services. + backend module should treat the metrics as immutable + structures. StatsD will reset timers and counters after each + listener has handled the event. + +* Event: **'status'** + + Parameters: `(writeCb)` -* `stats(writeCb)`: This method is invoked from StatsD when a user - invokes a *stats* command on the management server port. It allows - the the backend module to dump backend-specific stats to the - management port. + Emitted when a user invokes a *stats* command on the management + server port. It allows each backend module to dump backend-specific + status statistics to the management port. The `writeCb` callback function has a signature of `f(error, - stat_name, stat_value)`. The backend module should invoke this - method with each tuple that should be sent to - the management port. StatsD will prefix the stat name with the - backend's module name. The backend should set `error` to *null*, - or, in the case of a failure, an appropriate error. + backend_name, stat_name, stat_value)`. The backend module should + invoke this method with each stat_name and stat_value that should be + sent to the management port. StatsD will prefix each stat name with + the `backend_name`. The backend should set `error` to *null*, or, in + the case of a failure, an appropriate error. Inspiration ----------- diff --git a/backends/graphite.js b/backends/graphite.js index 28192c74..69692fb5 100644 --- a/backends/graphite.js +++ b/backends/graphite.js @@ -45,7 +45,7 @@ var post_stats = function graphite_post_stats(statString) { } } -exports.flush = function graphite_flush(ts, metrics) { +var flush_stats = function graphite_flush(ts, metrics) { var statString = ''; var numStats = 0; var key; @@ -120,13 +120,13 @@ exports.flush = function graphite_flush(ts, metrics) { post_stats(statString); }; -exports.stats = function graphite_stats(writeCb) { +var backend_status = function graphite_status(writeCb) { for (stat in graphiteStats) { - writeCb(null, stat, graphiteStats[stat]); + writeCb(null, 'graphite', stat, graphiteStats[stat]); } }; -exports.init = function graphite_init(startup_time, config) { +exports.init = function graphite_init(startup_time, config, events) { debug = config.debug; graphiteHost = config.graphiteHost; graphitePort = config.graphitePort; @@ -136,5 +136,8 @@ exports.init = function graphite_init(startup_time, config) { flushInterval = config.flushInterval; + events.on('flush', flush_stats); + events.on('status', backend_status); + return true; }; diff --git a/stats.js b/stats.js index 2bb71124..adf67bc1 100644 --- a/stats.js +++ b/stats.js @@ -3,61 +3,58 @@ var dgram = require('dgram') , net = require('net') , config = require('./config') , fs = require('fs') + , events = require('events') var keyCounter = {}; var counters = {}; var timers = {}; var gauges = {}; var pctThreshold = null; -var backends = []; var debugInt, flushInterval, keyFlushInt, server, mgmtServer; var startup_time = Math.round(new Date().getTime() / 1000); +var backendEvents = new events.EventEmitter(); // Load and init the backend from the backends/ directory. function loadBackend(config, name) { var backendmod = require("./backends/" + name); - var backend = { - name: name, - mod: backendmod - }; if (config.debug) { util.log("Loading backend: " + name); } - var ret = backendmod.init(startup_time, config); + var ret = backendmod.init(startup_time, config, backendEvents); if (!ret) { util.log("Failed to load backend: " + name); process.exit(1); } - - backends.push(backend); }; // Flush metrics to each backend. function flushMetrics() { - var ts = Math.round(new Date().getTime() / 1000); + var time_stamp = Math.round(new Date().getTime() / 1000); - var metrics = { + var metrics_hash = { counters: counters, gauges: gauges, timers: timers, pctThreshold: pctThreshold } - for (var i = 0; i < backends.length; i++) { - backends[i].mod.flush(ts, metrics); - } + // After all listeners, reset the stats + backendEvents.once('flush', function clear_metrics(ts, metrics) { + // Clear the counters + for (key in metrics.counters) { + metrics.counters[key] = 0; + } - // Clear the counters - for (key in counters) { - counters[key] = 0; - } + // Clear the timers + for (key in metrics.timers) { + metrics.timers[key] = []; + } + }); - // Clear the timers - for (key in timers) { - timers[key] = []; - } + // Flush metrics to each backend. + backendEvents.emit('flush', time_stamp, metrics_hash); }; var stats = { @@ -173,19 +170,20 @@ config.configFile(process.argv[2], function (config, oldConfig) { } } - // Retrieve stats from each backend - for (var i = 0; i < backends.length; i++) { - backends[i].mod.stats(function(err, stat, val) { - if (err) { - util.log("Failed to read stats for backend " + - backends[i].name + ": " + err); - } else { - stat_writer(backends[i].name, stat, val); - } - }); - } + backendEvents.once('status', function(writeCb) { + stream.write("END\n\n"); + }); + + // Let each backend contribute its status + backendEvents.emit('status', function(err, name, stat, val) { + if (err) { + util.log("Failed to read stats for backend " + + name + ": " + err); + } else { + stat_writer(name, stat, val); + } + }); - stream.write("END\n\n"); break; case "counters":