Skip to content

Commit

Permalink
A little better handling of backpressure.
Browse files Browse the repository at this point in the history
Don't accumulate infinite events if we can't connect.
Give up connecting after a while.
Don't thrash with attempts to reconnect: just handle close, not error.
  • Loading branch information
ceejbot committed Jun 30, 2015
1 parent 6ed8c8e commit 5aea955
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 46 deletions.
41 changes: 26 additions & 15 deletions example-tcp.js
Expand Up @@ -3,37 +3,48 @@
var Emitter = require('./index');

var emitter = new Emitter({
uri: 'tcp://localhost:4677',
node: 'tcp-emitter'
uri: 'tcp://localhost:4677',
node: 'tcp-emitter',
maxretries: 10,
maxbacklog: 5
});

console.log('--> start');
emitter.metric({ name: 'example.start', pid: process.pid });

emitter.on('failed', function()
{
console.log('failed, backlog len=' + emitter.backlog.length);
process.exit(1);
});
emitter.on('close', function() { console.log('closed', emitter.retries, emitter.backlog.length); });
emitter.on('ready', function() { console.log('ready'); });

function heartbeat()
{
console.log('heartbeat');
emitter.metric({ name: 'heartbeat', ttl: 16000 });
console.log('-> heartbeat');
emitter.metric({ name: 'heartbeat', ttl: 16000 });
}

function resources()
{
console.log('resources');
var mem = process.memoryUsage();
console.log('-> resources');
var mem = process.memoryUsage();

emitter.metric({ name: 'example.memory.rss', value: mem.rss });
emitter.metric({ name: 'example.memory.heapTotal', value: mem.heapTotal });
emitter.metric({ name: 'example.memory.heapUsed', value: mem.heapUsed });
emitter.metric({ name: 'example.memory.rss', value: mem.rss });
emitter.metric({ name: 'example.memory.heapTotal', value: mem.heapTotal });
emitter.metric({ name: 'example.memory.heapUsed', value: mem.heapUsed });
}

var heartbeatTimer = setInterval(heartbeat, 15000);
var resourcesTimer = setInterval(resources, 30000);

process.on('SIGINT', function()
{
console.log('Shutting down gracefully.');
clearInterval(heartbeatTimer);
clearInterval(resourcesTimer);
console.log('shutdown');
emitter.metric({ name: 'shutdown' });
setTimeout(process.exit, 500);
console.log('Shutting down gracefully.');
clearInterval(heartbeatTimer);
clearInterval(resourcesTimer);
console.log('shutdown');
emitter.metric({ name: 'shutdown' });
setTimeout(process.exit, 500);
});
27 changes: 20 additions & 7 deletions index.js
Expand Up @@ -18,6 +18,8 @@ var Emitter = module.exports = function Emitter(opts)
events.EventEmitter.call(this);

if (opts.uri) Emitter.parseURI(opts);
if (opts.maxretries) this.maxretries = parseInt(opts.maxretries, 10);
if (opts.maxbacklog) this.maxbacklog = parseInt(opts.maxbacklog, 10);

this.options = opts;
this.defaults = {};
Expand All @@ -32,9 +34,12 @@ util.inherits(Emitter, events.EventEmitter);

Emitter.prototype.defaults = null;
Emitter.prototype.backlog = null;
Emitter.prototype.maxbacklog = 1000;
Emitter.prototype.client = null;
Emitter.prototype.ready = false;
Emitter.prototype.retries = 0;
Emitter.prototype.maxretries = 100;
Emitter.prototype.maxbacklog = 1000;

Emitter.parseURI = function(options)
{
Expand Down Expand Up @@ -105,24 +110,25 @@ Emitter.prototype.onConnect = function onConnect()
{
while (this.backlog.length)
this._write(this.backlog.shift());
this.retries = 0;
this.ready = true;
this.emit('ready');
};

Emitter.prototype.onError = function onError(err)
{
this.ready = false;
this.emit('close');
this.retries++;
setTimeout(this.connect.bind(this), this.nextBackoff());
};

Emitter.prototype.onClose = function onClose()
{
this.ready = false;
this.emit('close');
this.retries++;
setTimeout(this.connect.bind(this), this.nextBackoff());
if (this.retries <= this.maxretries)
setTimeout(this.connect.bind(this), this.nextBackoff());
else
this.emit('failed', 'retried ' + this.retries + ' times; giving up');
this.emit('close');
};

Emitter.prototype.nextBackoff = function nextBackoff()
Expand All @@ -142,9 +148,13 @@ Emitter.prototype.makeEvent = function makeEvent(attrs)
return event;
};

Emitter.prototype._write = function _write(event)
Emitter.prototype._write = function _write(event, encoding, callback)
{
this.output.write(JSON.stringify(event) + '\n');
var payload = event;
if (_.isObject(event))
payload = JSON.stringify(event) + '\n';

this.output.write(payload);
};

Emitter.prototype.metric = function metric(attrs)
Expand All @@ -154,6 +164,9 @@ Emitter.prototype.metric = function metric(attrs)
this._write(event);
else
this.backlog.push(event);

while (this.backlog.length > this.maxbacklog)
this.backlog.shift();
};

function JSONOutputStream()
Expand Down
25 changes: 1 addition & 24 deletions test.js
Expand Up @@ -8,7 +8,7 @@ var
net = require('net'),
Emitter = require('./index'),
JSONStream = require('json-stream')
;
;

describe('numbat-emitter', function()
{
Expand Down Expand Up @@ -229,28 +229,6 @@ describe('numbat-emitter', function()

emitter.client.end();
});

it('reconnects on error', function(done)
{
var count = 0;
var emitter = new Emitter(mockOpts);
emitter.on('ready', function()
{
count++;
switch (count)
{
case 1:
emitter.client.emit('error', new Error('whee!'));
break;
case 2:
emitter.destroy();
done();
break;
}
});

emitter.connect();
});
});

describe('metric()', function()
Expand Down Expand Up @@ -302,7 +280,6 @@ describe('numbat-emitter', function()
mockServer.on('received', observer);
var emitter = new Emitter(mockOpts);
emitter.metric({ name: 'test', value: 4, time: new Date('2014-01-01') });

});

it('accumulates events in a backlog until connected', function(done)
Expand Down

0 comments on commit 5aea955

Please sign in to comment.