From 5aea9555eac8f39fd98f9767d6f017d5d9d77759 Mon Sep 17 00:00:00 2001 From: C J Silverio Date: Mon, 29 Jun 2015 17:09:46 -0700 Subject: [PATCH] A little better handling of backpressure. Don't accumulate infinite events if we can't connect. Give up connecting after a while. Don't thrash with attempts to reconnect: just handle close, not error. --- example-tcp.js | 41 ++++++++++++++++++++++++++--------------- index.js | 27 ++++++++++++++++++++------- test.js | 25 +------------------------ 3 files changed, 47 insertions(+), 46 deletions(-) diff --git a/example-tcp.js b/example-tcp.js index b0aa658..710fdb3 100755 --- a/example-tcp.js +++ b/example-tcp.js @@ -3,26 +3,37 @@ var Emitter = require('./index'); var emitter = new Emitter({ - uri: 'tcp://localhost:4677', - node: 'tcp-emitter' + uri: 'tcp://localhost:4677', + node: 'tcp-emitter', + maxretries: 10, + maxbacklog: 5 }); +console.log('--> start'); emitter.metric({ name: 'example.start', pid: process.pid }); +emitter.on('failed', function() +{ + console.log('failed, backlog len=' + emitter.backlog.length); + process.exit(1); +}); +emitter.on('close', function() { console.log('closed', emitter.retries, emitter.backlog.length); }); +emitter.on('ready', function() { console.log('ready'); }); + function heartbeat() { - console.log('heartbeat'); - emitter.metric({ name: 'heartbeat', ttl: 16000 }); + console.log('-> heartbeat'); + emitter.metric({ name: 'heartbeat', ttl: 16000 }); } function resources() { - console.log('resources'); - var mem = process.memoryUsage(); + console.log('-> resources'); + var mem = process.memoryUsage(); - emitter.metric({ name: 'example.memory.rss', value: mem.rss }); - emitter.metric({ name: 'example.memory.heapTotal', value: mem.heapTotal }); - emitter.metric({ name: 'example.memory.heapUsed', value: mem.heapUsed }); + emitter.metric({ name: 'example.memory.rss', value: mem.rss }); + emitter.metric({ name: 'example.memory.heapTotal', value: mem.heapTotal }); + emitter.metric({ name: 'example.memory.heapUsed', value: mem.heapUsed }); } var heartbeatTimer = setInterval(heartbeat, 15000); @@ -30,10 +41,10 @@ var resourcesTimer = setInterval(resources, 30000); process.on('SIGINT', function() { - console.log('Shutting down gracefully.'); - clearInterval(heartbeatTimer); - clearInterval(resourcesTimer); - console.log('shutdown'); - emitter.metric({ name: 'shutdown' }); - setTimeout(process.exit, 500); + console.log('Shutting down gracefully.'); + clearInterval(heartbeatTimer); + clearInterval(resourcesTimer); + console.log('shutdown'); + emitter.metric({ name: 'shutdown' }); + setTimeout(process.exit, 500); }); diff --git a/index.js b/index.js index f8777cb..7db1743 100644 --- a/index.js +++ b/index.js @@ -18,6 +18,8 @@ var Emitter = module.exports = function Emitter(opts) events.EventEmitter.call(this); if (opts.uri) Emitter.parseURI(opts); + if (opts.maxretries) this.maxretries = parseInt(opts.maxretries, 10); + if (opts.maxbacklog) this.maxbacklog = parseInt(opts.maxbacklog, 10); this.options = opts; this.defaults = {}; @@ -32,9 +34,12 @@ util.inherits(Emitter, events.EventEmitter); Emitter.prototype.defaults = null; Emitter.prototype.backlog = null; +Emitter.prototype.maxbacklog = 1000; Emitter.prototype.client = null; Emitter.prototype.ready = false; Emitter.prototype.retries = 0; +Emitter.prototype.maxretries = 100; +Emitter.prototype.maxbacklog = 1000; Emitter.parseURI = function(options) { @@ -105,6 +110,7 @@ Emitter.prototype.onConnect = function onConnect() { while (this.backlog.length) this._write(this.backlog.shift()); + this.retries = 0; this.ready = true; this.emit('ready'); }; @@ -112,17 +118,17 @@ Emitter.prototype.onConnect = function onConnect() Emitter.prototype.onError = function onError(err) { this.ready = false; - this.emit('close'); - this.retries++; - setTimeout(this.connect.bind(this), this.nextBackoff()); }; Emitter.prototype.onClose = function onClose() { this.ready = false; - this.emit('close'); this.retries++; - setTimeout(this.connect.bind(this), this.nextBackoff()); + if (this.retries <= this.maxretries) + setTimeout(this.connect.bind(this), this.nextBackoff()); + else + this.emit('failed', 'retried ' + this.retries + ' times; giving up'); + this.emit('close'); }; Emitter.prototype.nextBackoff = function nextBackoff() @@ -142,9 +148,13 @@ Emitter.prototype.makeEvent = function makeEvent(attrs) return event; }; -Emitter.prototype._write = function _write(event) +Emitter.prototype._write = function _write(event, encoding, callback) { - this.output.write(JSON.stringify(event) + '\n'); + var payload = event; + if (_.isObject(event)) + payload = JSON.stringify(event) + '\n'; + + this.output.write(payload); }; Emitter.prototype.metric = function metric(attrs) @@ -154,6 +164,9 @@ Emitter.prototype.metric = function metric(attrs) this._write(event); else this.backlog.push(event); + + while (this.backlog.length > this.maxbacklog) + this.backlog.shift(); }; function JSONOutputStream() diff --git a/test.js b/test.js index cadd6b0..e0dfc6f 100644 --- a/test.js +++ b/test.js @@ -8,7 +8,7 @@ var net = require('net'), Emitter = require('./index'), JSONStream = require('json-stream') - ; +; describe('numbat-emitter', function() { @@ -229,28 +229,6 @@ describe('numbat-emitter', function() emitter.client.end(); }); - - it('reconnects on error', function(done) - { - var count = 0; - var emitter = new Emitter(mockOpts); - emitter.on('ready', function() - { - count++; - switch (count) - { - case 1: - emitter.client.emit('error', new Error('whee!')); - break; - case 2: - emitter.destroy(); - done(); - break; - } - }); - - emitter.connect(); - }); }); describe('metric()', function() @@ -302,7 +280,6 @@ describe('numbat-emitter', function() mockServer.on('received', observer); var emitter = new Emitter(mockOpts); emitter.metric({ name: 'test', value: 4, time: new Date('2014-01-01') }); - }); it('accumulates events in a backlog until connected', function(done)