From a4d6f8e2cf814e1686e01d7357bae257fdddc8a4 Mon Sep 17 00:00:00 2001 From: Geoff Flarity Date: Sun, 5 Feb 2012 18:44:40 -0500 Subject: [PATCH] axons are now EventEmitters --- lib/graphite_axon.js | 62 +++++++++++++++++------------- lib/stdout_axon.js | 16 ++++++-- plugins/filesystem_usage/index.js | 47 ++++++++++++---------- plugins/memcached_nervous/index.js | 8 ++-- 4 files changed, 78 insertions(+), 55 deletions(-) diff --git a/lib/graphite_axon.js b/lib/graphite_axon.js index 2469726..17ae596 100644 --- a/lib/graphite_axon.js +++ b/lib/graphite_axon.js @@ -1,9 +1,17 @@ var net = require('net'); +var events = require("events"); var GraphiteAxon = function ( namespace, host, port) { var this_axon = this; - + + //call super constructor + events.EventEmitter.call(this_axon); + + //setup event listeners + this_axon.on( 'data', this_axon.fire.bind( this_axon ) ); + this_axon.on( 'error', this_axon.misfire.bind( this_axon ) ); + this_axon.namespace = namespace; this_axon.host = host; this_axon.port = port; @@ -21,44 +29,45 @@ var GraphiteAxon = function ( namespace, host, port) { //connect message var on_connect = function() { - var connected_message_metric_path = axon.namespace + '.nervous.connected'; - var connected_mesage_value = '1'; - var connected_message_timestamp = Math.floor( new Date / 1000.0 ); - this_axon.tcp_connection.write( connected_message_metric_path - + ' ' + connected_message_value - + ' ' + connected_message_timestamp + '\n' ); - - - //TODO eventually this could get big if we can't send them out for a while - while( this.message_buffer.length > 0) { - - var message = this_axon.message_buffer.shift(); - this_axon.write( message ); - } + var connected_message_metric_path = axon.namespace + '.nervous.connected'; + var connected_mesage_value = '1'; + var connected_message_timestamp = Math.floor( new Date / 1000.0 ); + this_axon.tcp_connection.write( connected_message_metric_path + + ' ' + connected_message_value + + ' ' + connected_message_timestamp + '\n' ); + + + //TODO eventually this could get big if we can't send them out for a while + while( this.message_buffer.length > 0) { + + var message = this_axon.message_buffer.shift(); + this_axon.write( message ); + } }; this_axon.tcp_connection.on( 'connect', on_connect ); var on_error = function( exception ){ - //TODO create an error message and enque it - //log error + //TODO create an error message and enque it + //log error }; this_axon.tcp_connection.on( 'error', on_error ); var on_end = function( ) { - //TODO create an end message - //log that we ended + //TODO create an end message + //log that we ended - //reconnect - this_axon.tcp_connection.connect( this_axon.port, this_axon.port ); + //reconnect + this_axon.tcp_connection.connect( this_axon.port, this_axon.port ); }; this_axon.tcp_connection.on( 'end', on_end ); }; +utils.inherits( GraphiteAxon, events.EventEmitter ); module.exports.GraphiteAxon = GraphiteAxon; GraphiteAxon.prototype.fire = function( name, value, timestamp ) { @@ -75,16 +84,15 @@ GraphiteAxon.prototype.fire = function( name, value, timestamp ) { var line = metric_path + ' ' + metric_value + ' ' + metric_timestamp + '\n'; try { - this_axon.tcp_connection.write( line ); + this_axon.tcp_connection.write( line ); } catch ( e ) { - console.log( line ); - - //TODO need to resend these - this.message_buffer.push ( line ); - } + console.log( line ); + //TODO need to resend these + this.message_buffer.push ( line ); + } }; diff --git a/lib/stdout_axon.js b/lib/stdout_axon.js index 18e6c2a..0ca9e13 100644 --- a/lib/stdout_axon.js +++ b/lib/stdout_axon.js @@ -1,18 +1,29 @@ +//deps var net = require('net'); +var util = require('util'); +var events = require("events"); +//code var STDOUTAxon = function ( namespace) { var this_axon = this; + //setup parent + events.EventEmitter.call(this_axon); + this_axon.namespace = namespace; + //setup our events, bind functions + this_axon.on('data', this_axon.fire.bind( this_axon ) ); + this_axon.on('error', this_axon.misfire.bind( this_axon ) ); + }; +util.inherits( STDOUTAxon, events.EventEmitter ); module.exports.STDOUTAxon = STDOUTAxon; STDOUTAxon.prototype.fire = function( name, value, timestamp ) { - this_axon = this; - + this_axon = this; //by default we generate the timestamp on fire, but it can be //overridden @@ -31,7 +42,6 @@ STDOUTAxon.prototype.misfire = function( err ) { this_axon = this; console.log(err); - }; diff --git a/plugins/filesystem_usage/index.js b/plugins/filesystem_usage/index.js index 4f6f659..3d20611 100644 --- a/plugins/filesystem_usage/index.js +++ b/plugins/filesystem_usage/index.js @@ -1,33 +1,38 @@ // configuration -var interval = 10*1000; +var interval = 1*1000; var filesystem = "/"; +var filesystem_name = "root"; //deps var child_process = require('child_process'); //code -var on_exec_complete = function( err, stdout, stderr ) { - - var lines = stdout.split('\n'); - var payload = lines[1]; - - var matches = payload.match(/(\d+)\%/ ); - - console.log(matches[1]); - - - -}; - +//our plugin main function +module.exports = function( axon ) { + + var on_exec_complete = function( err, stdout, stderr ) { + + var lines = stdout.split('\n'); + var payload = lines[1]; + + var matches = payload.match(/(\d+)\%/ ); + + if ( matches && matches[1] ) { + capacity = matches[1]; + axon.emit( 'data', 'filesystem.capacity.' + filesystem_name, capacity ); + } + else { + axon.emit( 'error', 'match failed' ); + } + }; + + //this checks it + var check_filesystem_usage = function() { + child_process.exec( 'df ' + filesystem, on_exec_complete ); + }; -//this checks it -var check_filesystem_usage = function() { - child_process.exec( 'df ' + filesystem, on_exec_complete ); + setInterval( check_filesystem_usage, interval ); }; -//our plugin main function -module.exports = function( config ) { - setInterval( check_filesystem_usage, interval ); -}; diff --git a/plugins/memcached_nervous/index.js b/plugins/memcached_nervous/index.js index ad9f0ad..1b3f161 100644 --- a/plugins/memcached_nervous/index.js +++ b/plugins/memcached_nervous/index.js @@ -53,7 +53,7 @@ module.exports = function( axon ) { var on_connect = function () { client.stats( function( err, result ){ - if( err ) { axon.misfire( err ) }; + if( err ) { axon.emit( 'error', err ) }; //console.log( utils.inspect( result, true, null, true ) ); @@ -63,12 +63,12 @@ module.exports = function( axon ) { var value = result[name]; var timestamp = result.time; - axon.fire( name, value, timestamp ); + axon.emit( 'data', name, value, timestamp ); } //reset and then close the client - debugger; + //debugger; client.stats( 'reset', function( err ) { console.log(err); client.close(); @@ -80,7 +80,7 @@ module.exports = function( axon ) { var on_error = function( err ) { - axon.misfire( e ); + axon.emit( 'error', e ); }; client.on( 'error', on_error );