Skip to content
This repository has been archived by the owner on Dec 27, 2022. It is now read-only.

Commit

Permalink
changed the get_node strategy when keep-alive is enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
dannycoates committed Jan 11, 2013
1 parent 13a5a88 commit 418b3bc
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 15 deletions.
15 changes: 14 additions & 1 deletion lib/endpoint.js
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ module.exports = function (inherits, EventEmitter, Pinger) {
this.healthy = true this.healthy = true
this.name = this.ip + ':' + this.port this.name = this.ip + ':' + this.port
this.address = this.ip this.address = this.ip
this.keepAlive = options.keepAlive


this.pinger = new Pinger(this.ping.bind(this)) this.pinger = new Pinger(this.ping.bind(this))
this.pinger.on('pong', function () { this.pinger.on('pong', function () {
Expand All @@ -38,7 +39,7 @@ module.exports = function (inherits, EventEmitter, Pinger) {


this.pingPath = options.ping this.pingPath = options.ping
this.pingTimeout = options.pingTimeout || 2000 this.pingTimeout = options.pingTimeout || 2000
if (options.keepAlive) { if (this.keepAlive) {
if (protocol === http) { if (protocol === http) {
this.agent = new KeepAlive(options.agentOptions) this.agent = new KeepAlive(options.agentOptions)
} }
Expand Down Expand Up @@ -72,6 +73,18 @@ module.exports = function (inherits, EventEmitter, Pinger) {
} }
inherits(Endpoint, EventEmitter) inherits(Endpoint, EventEmitter)


Endpoint.prototype.connected = function () {
return this.agent.sockets[this.name] && this.agent.sockets[this.name].length
}

Endpoint.prototype.ready = function () {
return this.healthy
&& (this.keepAlive ?
this.connected() > this.pending :
this.pending === 0
)
}

Endpoint.prototype.stats = function () { Endpoint.prototype.stats = function () {
var socketNames = Object.keys(this.agent.sockets) var socketNames = Object.keys(this.agent.sockets)
var requestCounts = [] var requestCounts = []
Expand Down
32 changes: 18 additions & 14 deletions lib/pool.js
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ module.exports = function (inherits, EventEmitter, Endpoint, RequestSet) {
} }
return healthy return healthy
} }
Pool.prototype.healthyNodes = Pool.prototype.healthy_nodes


Pool.prototype.onRetry = function (err) { Pool.prototype.onRetry = function (err) {
this.emit('retrying', err) this.emit('retrying', err)
Expand Down Expand Up @@ -171,34 +172,37 @@ module.exports = function (inherits, EventEmitter, Endpoint, RequestSet) {


Pool.prototype.get_node = function () { Pool.prototype.get_node = function () {
var len = this.nodes.length var len = this.nodes.length
var h = 0 var h = []
var sum = 0 var sum = 0
var totalPending = 0 var totalPending = 0
var r = Math.floor(Math.random() * len)
for (var i = 0; i < len; i++) { for (var i = 0; i < len; i++) {
var node = this.nodes[i] r = (r + 1) % len
if (node.healthy) { var node = this.nodes[r]
h++ if (node.ready()) {
sum += node.busyness() return node //fast path
}
else if (node.healthy) {
h.push(node)
sum += node.pending
} }
totalPending += node.pending totalPending += node.pending
} }
if (totalPending >= this.maxPending) { if (totalPending >= this.maxPending) {
return Endpoint.overloaded() return Endpoint.overloaded()
} }
if (h !== 0) { var avg = sum / h.length
var avg = sum / h while (h.length) {
var r = Math.floor(Math.random() * len) var node = h.pop()
for (i = 0; i < len; i++) { if (node.pending <= avg) {
r = (r + 1) % len return node
node = this.nodes[r]
if (node.healthy && avg >= node.busyness()) {
return node
}
} }
} }
return Endpoint.unhealthy() return Endpoint.unhealthy()
} }


Pool.prototype.getNode = Pool.prototype.get_node //must keep the old _ api

Pool.prototype.pending = function () { Pool.prototype.pending = function () {
return this.nodes.reduce(function (a, b) { return a + b.pending }, 0) return this.nodes.reduce(function (a, b) { return a + b.pending }, 0)
} }
Expand Down
42 changes: 42 additions & 0 deletions test/endpoint_test.js
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -441,6 +441,48 @@ describe("Endpoint", function () {
}) })
}) })


//
// ready
//
//////////////////////////////////////////////////////////////////////////////

describe("ready()", function () {

it('returns true when it is healthy and connected > pending with keepAlive on',
function () {
var e = new Endpoint(http, '127.0.0.1', 6969, {keepAlive: true})
e.pending = 1
e.agent.sockets[e.name] = [1,2]
assert(e.ready())
}
)

it('returns false when it is healthy and connected = pending with keepAlive on',
function () {
var e = new Endpoint(http, '127.0.0.1', 6969, {keepAlive: true})
e.pending = 1
e.agent.sockets[e.name] = [1]
assert(!e.ready())
}
)

it('returns true when it is healthy and pending = 0 with keepAlive off',
function () {
var e = new Endpoint(http, '127.0.0.1', 6969)
e.pending = 0
assert(e.ready())
}
)

it('returns false when it is healthy and pending > 0 with keepAlive off',
function () {
var e = new Endpoint(http, '127.0.0.1', 6969)
e.pending = 1
assert(!e.ready())
}
)
})

// //
// setHealthy // setHealthy
// //
Expand Down
18 changes: 18 additions & 0 deletions test/pool_test.js
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ var http = {


function FakeEndpoint() {} function FakeEndpoint() {}
inherits(FakeEndpoint, EventEmitter) inherits(FakeEndpoint, EventEmitter)
FakeEndpoint.prototype.pending = 1
FakeEndpoint.prototype.busyness = function () { return 1 } FakeEndpoint.prototype.busyness = function () { return 1 }
FakeEndpoint.prototype.connected = function () { return 0 }
FakeEndpoint.prototype.ready = function () { return false }
var overloaded = new FakeEndpoint() var overloaded = new FakeEndpoint()
FakeEndpoint.overloaded = function () { return overloaded } FakeEndpoint.overloaded = function () { return overloaded }
var unhealthy = new FakeEndpoint() var unhealthy = new FakeEndpoint()
Expand Down Expand Up @@ -106,6 +109,21 @@ describe('Pool', function () {
p.nodes.forEach(function (n) { n.healthy = false }) p.nodes.forEach(function (n) { n.healthy = false })
assert.equal(p.get_node(), unhealthy) assert.equal(p.get_node(), unhealthy)
}) })

it('returns a "ready" node when one is available', function () {
var p = new Pool(http, ['127.0.0.1:8080', '127.0.0.1:8081', '127.0.0.1:8082'])
var n = p.nodes[0]
n.ready = function () { return true }
assert.equal(p.get_node(), n);
})

it('returns a healthy node when none are "ready"', function () {
var p = new Pool(http, ['127.0.0.1:8080', '127.0.0.1:8081', '127.0.0.1:8082'])
p.nodes[0].healthy = false
p.nodes[1].healthy = false
p.nodes[2].healthy = true
assert(p.get_node().healthy);
})
}) })


// //
Expand Down

0 comments on commit 418b3bc

Please sign in to comment.