Skip to content

Commit

Permalink
New timeout handling that uses a single active timer
Browse files Browse the repository at this point in the history
We move to a new (yet again!) timeout handling system where we only use
one active timer at a time, and track future timeouts / deadlines in our
own queue to re-arm the timer.

This fixes issue #52.

We could use `setMaxListeners` to instead increase the timeout handlers
allowed and keep with the existing design, but this approach is O(1)
scalable.
  • Loading branch information
dterei committed Apr 30, 2016
1 parent 4edaa5d commit d956f85
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 70 deletions.
49 changes: 26 additions & 23 deletions bench/memjs.js
@@ -1,9 +1,21 @@
var memjs = require("memjs")
var memjs = require("memjs");
var header = require('header');
var b = require("benchmark")
var b = require("benchmark");

function makeString(n) {
var possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
var text = "";
var i;

for(i=0; i < n; i++ ) {
text += possible.charAt(Math.floor(Math.random() * possible.length));
}

return text;
}

var x = (function() {
suite = new b.Suite;
var suite = new b.Suite();

var headerBuf = new Buffer([0x81, 1, 7, 0, 4, 0, 0, 1, 0, 0, 0, 9, 0, 0, 0, 0, 0x0a, 0, 0, 0, 0, 0, 0, 0]);
var parsedHeader = header.fromBuffer(headerBuf);
Expand All @@ -20,10 +32,10 @@ var x = (function() {
})
// run async
.run({ 'async': true });
})();
}());

x = (function() {
suite = new b.Suite;
var suite = new b.Suite();
var responseHeader = {
magic: 0x81,
opcode: 1,
Expand All @@ -34,7 +46,7 @@ x = (function() {
totalBodyLength: 1024 * 10 + 15,
opaque: 0,
cas: new Buffer([0x0a, 0, 0, 0, 0, 0, 0, 0])
}
};
var buf = new Buffer(24 + 15 + 1024 * 10);
header.toBuffer(responseHeader).copy(buf);
buf.write(makeString(55));
Expand All @@ -46,7 +58,8 @@ x = (function() {
return arg;
};

for (var i = 0; i < 10; i++) {
var i;
for (i = 0; i < 10; i++) {
server.onResponse(dummyFunc);
}

Expand All @@ -62,35 +75,25 @@ x = (function() {
})
// run async
.run({ 'async': true });
})();

function makeString(n) {
var text = "";
var possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";

for( var i=0; i < n; i++ )
text += possible.charAt(Math.floor(Math.random() * possible.length));

return text;
}
}());

x = (function() {
suite = new b.Suite;
client = memjs.Client.create();
var suite = new b.Suite();
var client = memjs.Client.create();

suite.cycles = 0;
suite.add('Client#get', function() {
client.get("hello", function(err, val) {
client.get("hello", function(/* err, val */) {
suite.cycles++;
});
})
// add listeners
.on('cycle', function(event) {
console.log(String(event.target) + " " + suite.cycles);
});
client.set("hello", makeString(10240), function(err, val) {
client.set("hello", makeString(10240), function(/* err, val */) {
// run async
suite.run({ 'async': true });
});
})();
}());

46 changes: 46 additions & 0 deletions bench/timers.js
@@ -0,0 +1,46 @@
/*jshint node: true */
/*jslint unparam: true*/
'use strict';

/**
* Check how fast various timers are in node.
*/

var Benchmark = require('benchmark');
var Microtime = require('microtime');

var suite = new Benchmark.Suite();

// add tests
suite.add('Date.now()', function() {
// system time, not-monotonic, ms
Date.now();
})
.add('Microtime.now()', function() {
// system time, not-monotonic, us (POSIX: gettimeofday)
Microtime.now();
})
.add('process.hrtime()', function() {
// monotonic, ns (returns: [seconds, nanoseconds])
process.hrtime();
})
.add('process.hrtime() ms-round', function() {
// monotonic, ns (returns: [seconds, nanoseconds])
var time = process.hrtime();
return (time[0] * 1000) + Math.round(time[1] / 1000000);
})
.add('process.hrtime() ms-floor', function() {
// monotonic, ns (returns: [seconds, nanoseconds])
var time = process.hrtime();
return (time[0] * 1000) + Math.floor(time[1] / 1000000);
})
// add listeners
.on('cycle', function(event) {
console.log(String(event.target));
})
.on('complete', function() {
console.log('Fastest is ' + this.filter('fastest').map('name'));
})
// run async
.run({ 'async': true });

15 changes: 9 additions & 6 deletions lib/memjs/memjs.js
Expand Up @@ -450,7 +450,7 @@ Client.prototype.flush = function(callback) {
callback(lastErr, result);
}
});
serv.write(seq, request);
serv.write(request);
};

for (i = 0; i < this.servers.length; i++) {
Expand Down Expand Up @@ -501,7 +501,7 @@ Client.prototype.statsWithKey = function(key, callback) {
serv.onError(seq, function(err) {
if (callback) { callback(err, serv.host + ':' + serv.port, null); }
});
serv.write(seq, request);
serv.write(request);
};

for (i = 0; i < this.servers.length; i++) {
Expand Down Expand Up @@ -540,7 +540,10 @@ Client.prototype.resetStats = function(callback) {

// QUIT
//
// Closes the connection to each server, notifying them of this intention.
// Closes the connection to each server, notifying them of this intention. Note
// that quit can race against already outstanding requests when those requests
// fail and are retried, leading to the quit command winning and closing the
// connection before the retries complete.
Client.prototype.quit = function() {
this.seq++;
// TODO: Nicer perhaps to do QUITQ (0x17) but need a new callback for when
Expand All @@ -556,7 +559,7 @@ Client.prototype.quit = function() {
serv.onError(seq, function(/* err */) {
serv.close();
});
serv.write(seq, request);
serv.write(request);
};

for (i = 0; i < this.servers.length; i++) {
Expand Down Expand Up @@ -602,7 +605,7 @@ Client.prototype.perform = function(key, request, callback, retries) {
if (--retries > 0) {
serv.onResponse(seq, responseHandler);
serv.onError(seq, errorHandler);
serv.write(seq, request);
serv.write(request);
} else {
logger.log('MemJS: Server <' + serv.host + ':' + serv.port +
'> failed after (' + origRetries +
Expand All @@ -618,7 +621,7 @@ Client.prototype.perform = function(key, request, callback, retries) {

serv.onResponse(seq, responseHandler);
serv.onError(seq, errorHandler);
serv.write(seq, request);
serv.write(request);
};

exports.Client = Client;
Expand Down
84 changes: 74 additions & 10 deletions lib/memjs/server.js
Expand Up @@ -4,15 +4,18 @@ var util = require('util');
var makeRequestBuffer = require('./utils').makeRequestBuffer;
var parseMessage = require('./utils').parseMessage;
var merge = require('./utils').merge;
var timestamp = require('./utils').timestamp;

var Server = function(host, port, username, password, options) {
events.EventEmitter.call(this);
this.responseBuffer = new Buffer([]);
this.host = host;
this.port = port;
this.connected = false;
this.timeoutSet = false;
this.connectCallbacks = [];
this.responseCallbacks = {};
this.requestTimeouts = [];
this.errorCallbacks = {};
this.options = merge(options || {}, {timeout: 0.5, keepAlive: false, keepAliveDelay: 30});
if (this.options.conntimeout === undefined || this.options.conntimeout === null) {
Expand Down Expand Up @@ -42,6 +45,7 @@ Server.prototype.respond = function(response) {
callback(response);
if (!callback.quiet || response.header.totalBodyLength === 0) {
delete(this.responseCallbacks[response.header.opaque]);
this.requestTimeouts.shift();
delete(this.errorCallbacks[response.header.opaque]);
}
};
Expand All @@ -54,7 +58,9 @@ Server.prototype.error = function(err) {
var errcalls = this.errorCallbacks;
this.connectCallbacks = [];
this.responseCallbacks = {};
this.requestTimeouts = [];
this.errorCallbacks = {};
this.timeoutSet = false;
if (this._socket) {
this._socket.destroy();
delete(this._socket);
Expand Down Expand Up @@ -107,60 +113,118 @@ Server.prototype.responseHandler = function(dataBuf) {

Server.prototype.sock = function(sasl, go) {
var self = this;

if (!self._socket) {
// CASE 1: completely new socket
self.connected = false;
self._socket = net.connect(this.port, this.host, function() {

// SASL authentication handler
self.once('authenticated', function() {
if (self._socket) {
self.connected = true;
// cancel connection timeout
self._socket.setTimeout(0);
self.timeoutSet = false;
// run actual request(s)
go(self._socket);
self.connectCallbacks.forEach(function(cb) {
cb(self._socket);
});
self.connectCallbacks = [];
}
});

// setup response handler
this.on('data', function(dataBuf) {
self.responseHandler(dataBuf);
});

// kick of SASL if needed
if (self.username && self.password) {
self.listSasl();
} else {
self.emit('authenticated');
}
});

// setup error handler
self._socket.on('error', function(error) {
self.connected = false;
if (self.timeoutSet) {
self._socket.setTimeout(0);
self.timeoutSet = false;
}
self._socket = undefined;
self.error(error);
});

// setup connection timeout handler
self.timeoutSet = true;
self._socket.setTimeout(self.options.conntimeout * 1000, function() {
self.timeoutSet = false;
if (!self.connected) {
this.end();
self._socket = undefined;
self.error(new Error('socket timed out.'));
self.error(new Error('socket timed out connecting to server.'));
}
});

// use TCP keep-alive
self._socket.setKeepAlive(self.options.keepAlive, self.options.keepAliveDelay * 1000);

} else if (!self.connected && !sasl) {
// CASE 2: socket exists, but still connecting / authenticating
self.onConnect(go);

} else {
// CASE 3: socket exists and connected / ready to use
go(self._socket);
}
};

Server.prototype.write = function(seq, blob) {
// We handle tracking timeouts with an array of deadlines (requestTimeouts), as
// node doesn't like us setting up lots of timers, and using just one is more
// efficient anyway.
var timeoutHandler = function(server, sock) {
if (server.requestTimeouts.length === 0) {
// nothing active
server.timeoutSet = false;
return;
}

// some requests outstanding, check if any have timed-out
var now = timestamp();
var soonestTimeout = server.requestTimeouts[0];

if (soonestTimeout <= now) {
// timeout occurred!
sock.end();
server.connected = false;
server._socket = undefined;
server.timeoutSet = false;
server.error(new Error('socket timed out waiting on response.'));
} else {
// no timeout! Setup next one.
var deadline = soonestTimeout - now;
sock.setTimeout(deadline, function() {
timeoutHandler(server, sock);
});
}
};

Server.prototype.write = function(blob) {
var self = this;
var deadline = Math.round(self.options.timeout * 1000);
this.sock(false, function(s) {
s.write(blob);
s.setTimeout(self.options.timeout * 1000, function() {
if (self.responseCallbacks[seq]) {
this.end();
self.connected = false;
self._socket = undefined;
self.error(new Error('socket timed out.'));
}
});
self.requestTimeouts.push(timestamp() + deadline);
if (!self.timeoutSet) {
self.timeoutSet = true;
s.setTimeout(deadline, function() {
timeoutHandler(self, this);
});
}
});
};

Expand Down
7 changes: 7 additions & 0 deletions lib/memjs/utils.js
Expand Up @@ -86,6 +86,13 @@ exports.merge = function(original, deflt) {
return original;
};

// timestamp provides a monotonic timestamp with millisecond accuracy, useful
// for timers.
exports.timestamp = function() {
var times = process.hrtime();
return (times[0] * 1000) + Math.round((times[1] / 1000000));
};

if(!Buffer.concat) {
Buffer.concat = function(list, length) {
if (!Array.isArray(list)) {
Expand Down
3 changes: 2 additions & 1 deletion package.json
Expand Up @@ -17,7 +17,8 @@
},
"scripts": {
"test": "eslint ./lib/memjs/ ./test/ && tap -R spec ./test/*.js",
"bench": "NODE_PATH=lib/memjs/ node bench/memjs.js"
"bench": "NODE_PATH=lib/memjs/ node bench/memjs.js",
"bench-timers": "NODE_PATH=lib/memjs/ node bench/timers.js"
},
"dependencies": {},
"devDependencies": {"eslint":"1.10.3",
Expand Down

0 comments on commit d956f85

Please sign in to comment.