Permalink
Browse files

first commit, not working yet but all layed out

  • Loading branch information...
Geoff Flarity
Geoff Flarity committed Feb 12, 2012
1 parent 59dea80 commit d6db86487cf661c6112ec37e239873cea9f86f7b
View
No changes.

Some generated files are not rendered by default. Learn more.

Oops, something went wrong.

Some generated files are not rendered by default. Learn more.

Oops, something went wrong.

Some generated files are not rendered by default. Learn more.

Oops, something went wrong.

Some generated files are not rendered by default. Learn more.

Oops, something went wrong.
View
@@ -0,0 +1,147 @@
+var net = require('net');
+var EventEmitter2 = require('eventemitter2').EventEmitter2;
+
+var GraphiteEventEmitter = function( listen_host, listen_port ) {
+
+ var that = this;
+
+ //call super
+ EventEmitter2.call(this, {
+ wildcard: true,
+ maxListeners: 0, //infinite listeners
+ });
+
+ var read_buffers_by_connection = {};
+
+ var on_connect = function(tcp_connection) {
+
+ read_buffers_by_connection[tcp_connection] = '';
+
+ //setup our handlers for this connection
+
+ //data
+ var on_data = function( data ) {
+
+ var read_buffer = read_buffers_by_connection[tcp_connection];
+ read_buffer += data;
+
+ //split by new lines,
+ var messages = read_buffer.split('\n');
+
+ //last element is either a partial message, or ''
+ read_buffer = messages.pop();
+
+ for ( var i = 0; i < messages.length; i++ ) {
+
+ //TODO error handling
+ var message = messages[i];
+ var components = message.split( ' ' );
+ var path = componets[0];
+ var value = componets[1];
+ var timestamp = components[2];
+
+ that.emit( path, value, timestamp );
+ }
+ };
+ tcp_connection.on( 'data', on_data );
+
+
+ //end event
+ var on_end = function( ) {
+ tcp_connection.destroy();
+ };
+ tcp_connection.on( 'end', on_end );
+
+ //error event
+ var on_error = function() {
+ tcp_connection.destroy();
+ };
+ tcp_connection.on( 'error', on_error );
+
+ //close
+ var on_close = function() {
+
+ //clean up
+ delete read_buffers_by_connection[tcp_connection];
+ };
+ tcp_connection.on( 'close', on_close );
+ };
+
+ this.server = net.createServer( on_connect );
+ this.server.listen( listen_port, listen_host );
+};
+module.exports.GraphiteEventEmitter = GraphiteEventEmitter
+util.inherts( GraphiteEventEmitter, EventEmitter2 );
+
+
+var GraphiteEventProxy = function( graphite_event_emitter, graphite_host, graphite_port ) {
+
+ var that = this;
+
+ that.host = graphite_host;
+ that.port = graphite_port;
+
+ that.connected = false;
+ that.tcp_connection = new net.Socket();
+
+ //TODO eventually this could get big if we can't send them out for a while
+ that.message_buffer = [];
+
+ //on connect we need to check if we have any messages in the
+ //the message buffer, send them if so but first send a new
+ //connect message
+ var on_connect = function() {
+
+ that.connected = true;
+ that.tcp_connection.setKeepAlive( true );
+
+ if ( that.message_buffer.length ) {
+ while( that.message_buffer.length ) {
+ that.tcp_connection.write( that.message_buffer.shift() );
+ }
+ }
+ };
+ that.tcp_connection.on( 'connect', on_connect );
+
+ var on_error = function( error ){
+
+ //TODO figure out logging
+ //log.error( 'graphite connection error: ' + error );
+
+ //destroy the connection so that reconnect fires
+ that.tcp_connection.destroy();
+ that.connected = false;
+ };
+ that.tcp_connection.on( 'error', on_error );
+
+ var on_close = function( ) {
+
+ that.connected = false;
+ //TODO create an end message
+ //log that we ended
+ //log.info( 'disconnected from graphite, reconnecting in 10000 ms' );
+
+ //reconnect after 10000 ms
+ setTimeout( function () {
+ //need to reset the listeners?
+ that.tcp_connection.connect( that.port, that.host );
+ }, 10000 );
+
+ };
+ that.tcp_connection.on( 'close', on_close );
+
+ var on_any = function( path, value, timestamp ) {
+
+ var message = path + ' ' + value + ' ' + timestamp + '\n';
+
+ if ( that.connected ) {
+ tcp_connection.write( message );
+ }
+ else {
+ that.message_buffer.push( message );
+ }
+ };
+ graphite_event_emitter.onAny( on_any );
+
+};
+
View
@@ -0,0 +1,62 @@
+var path = require('path');
+var graphite = require('./graphite);
+
+var Response = function( config ) {
+
+ var that = this;
+
+ //create response dispatcher, plugins can communicate through this
+ this.dispatch_emitter = new EventEmitter2({
+ wildcard: true,
+ maxListeners: 0, //infinite listeners
+ });
+
+ var listen_host = config.graphite_proxy.listen_host;
+ var listen_port = config.graphite_proxy.listen_port;
+
+ var graphite_host = config.graphite_proxy.graphite_host;
+ var graphite_port = config.graphite_proxy.graphite_port;
+
+ //this pretends to be a graphite server, emitting events for each message
+ //it inherits from EventEmitter2 and namespaces use periods just like
+ //graphite
+ this.graphite_emitter = new graphite.GraphiteEventEmitter( listen_host, listen_port );
+
+ //this takes a GraphiteEventEmitter, subscribes to all events and sends sends them off to Graphite
+ //it can also buffer messages should graphite go down
+ this.proxy = new graphite.GraphiteEventProxy( this.graphite_emitter, graphite_host, graphite_port );
+
+ this.plugin_path = config.plugin_path;
+
+ //plugins could except, but we still want to full our proxy duties
+ try {
+ this.load_plugins();
+ } catch( e ) {
+
+ };
+
+};
+module.exports.Response = Response;
+
+Response.prototype.load_plugins = function () {
+
+ var full_plugins_path = path.join( this.plugin_path, node_modules );
+
+ fs.readdir( full_plugins_path, function( err, entries ) {
+ var dirs = [];
+ entries = entries || [];
+ entries.forEach( function( entry ) {
+
+ var plugin = full_plugins_path + '/' + entry;
+ fs.stat( plugin, function( err, stats ) {
+ if ( stats.isDirectory() ) {
+ require( plugin )( this.graphite_emitter, this.dispatch_emitter );
+ }
+ } );
+ } );
+
+
+ }
+
+ );
+};
View
No changes.

0 comments on commit d6db864

Please sign in to comment.