diff --git a/lib/endpoint.js b/lib/endpoint.js index ad4d55f..d8736ea 100644 --- a/lib/endpoint.js +++ b/lib/endpoint.js @@ -30,6 +30,7 @@ module.exports = function (inherits, EventEmitter, Pinger) { this.healthy = true this.name = this.ip + ':' + this.port this.address = this.ip + this.keepAlive = options.keepAlive this.pinger = new Pinger(this.ping.bind(this)) this.pinger.on('pong', function () { @@ -38,7 +39,7 @@ module.exports = function (inherits, EventEmitter, Pinger) { this.pingPath = options.ping this.pingTimeout = options.pingTimeout || 2000 - if (options.keepAlive) { + if (this.keepAlive) { if (protocol === http) { this.agent = new KeepAlive(options.agentOptions) } @@ -72,6 +73,18 @@ module.exports = function (inherits, EventEmitter, Pinger) { } inherits(Endpoint, EventEmitter) + Endpoint.prototype.connected = function () { + return this.agent.sockets[this.name] && this.agent.sockets[this.name].length + } + + Endpoint.prototype.ready = function () { + return this.healthy + && (this.keepAlive ? + this.connected() > this.pending : + this.pending === 0 + ) + } + Endpoint.prototype.stats = function () { var socketNames = Object.keys(this.agent.sockets) var requestCounts = [] diff --git a/lib/pool.js b/lib/pool.js index 04eb82a..4d229bc 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -79,6 +79,7 @@ module.exports = function (inherits, EventEmitter, Endpoint, RequestSet) { } return healthy } + Pool.prototype.healthyNodes = Pool.prototype.healthy_nodes Pool.prototype.onRetry = function (err) { this.emit('retrying', err) @@ -171,34 +172,37 @@ module.exports = function (inherits, EventEmitter, Endpoint, RequestSet) { Pool.prototype.get_node = function () { var len = this.nodes.length - var h = 0 + var h = [] var sum = 0 var totalPending = 0 + var r = Math.floor(Math.random() * len) for (var i = 0; i < len; i++) { - var node = this.nodes[i] - if (node.healthy) { - h++ - sum += node.busyness() + r = (r + 1) % len + var node = this.nodes[r] + if (node.ready()) { + return node //fast path + } + else if (node.healthy) { + h.push(node) + sum += node.pending } totalPending += node.pending } if (totalPending >= this.maxPending) { return Endpoint.overloaded() } - if (h !== 0) { - var avg = sum / h - var r = Math.floor(Math.random() * len) - for (i = 0; i < len; i++) { - r = (r + 1) % len - node = this.nodes[r] - if (node.healthy && avg >= node.busyness()) { - return node - } + var avg = sum / h.length + while (h.length) { + var node = h.pop() + if (node.pending <= avg) { + return node } } return Endpoint.unhealthy() } + Pool.prototype.getNode = Pool.prototype.get_node //must keep the old _ api + Pool.prototype.pending = function () { return this.nodes.reduce(function (a, b) { return a + b.pending }, 0) } diff --git a/test/endpoint_test.js b/test/endpoint_test.js index d60210e..b83c67e 100644 --- a/test/endpoint_test.js +++ b/test/endpoint_test.js @@ -441,6 +441,48 @@ describe("Endpoint", function () { }) }) + // + // ready + // + ////////////////////////////////////////////////////////////////////////////// + + describe("ready()", function () { + + it('returns true when it is healthy and connected > pending with keepAlive on', + function () { + var e = new Endpoint(http, '127.0.0.1', 6969, {keepAlive: true}) + e.pending = 1 + e.agent.sockets[e.name] = [1,2] + assert(e.ready()) + } + ) + + it('returns false when it is healthy and connected = pending with keepAlive on', + function () { + var e = new Endpoint(http, '127.0.0.1', 6969, {keepAlive: true}) + e.pending = 1 + e.agent.sockets[e.name] = [1] + assert(!e.ready()) + } + ) + + it('returns true when it is healthy and pending = 0 with keepAlive off', + function () { + var e = new Endpoint(http, '127.0.0.1', 6969) + e.pending = 0 + assert(e.ready()) + } + ) + + it('returns false when it is healthy and pending > 0 with keepAlive off', + function () { + var e = new Endpoint(http, '127.0.0.1', 6969) + e.pending = 1 + assert(!e.ready()) + } + ) + }) + // // setHealthy // diff --git a/test/pool_test.js b/test/pool_test.js index 9558228..e33fb63 100644 --- a/test/pool_test.js +++ b/test/pool_test.js @@ -11,7 +11,10 @@ var http = { function FakeEndpoint() {} inherits(FakeEndpoint, EventEmitter) +FakeEndpoint.prototype.pending = 1 FakeEndpoint.prototype.busyness = function () { return 1 } +FakeEndpoint.prototype.connected = function () { return 0 } +FakeEndpoint.prototype.ready = function () { return false } var overloaded = new FakeEndpoint() FakeEndpoint.overloaded = function () { return overloaded } var unhealthy = new FakeEndpoint() @@ -106,6 +109,21 @@ describe('Pool', function () { p.nodes.forEach(function (n) { n.healthy = false }) assert.equal(p.get_node(), unhealthy) }) + + it('returns a "ready" node when one is available', function () { + var p = new Pool(http, ['127.0.0.1:8080', '127.0.0.1:8081', '127.0.0.1:8082']) + var n = p.nodes[0] + n.ready = function () { return true } + assert.equal(p.get_node(), n); + }) + + it('returns a healthy node when none are "ready"', function () { + var p = new Pool(http, ['127.0.0.1:8080', '127.0.0.1:8081', '127.0.0.1:8082']) + p.nodes[0].healthy = false + p.nodes[1].healthy = false + p.nodes[2].healthy = true + assert(p.get_node().healthy); + }) }) //