diff --git a/index.js b/index.js index 4ecada2..2368235 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,7 @@ var inherits = require('util').inherits, EventEmitter = require('events').EventEmitter, - Endpoint = require('./lib/endpoint')(inherits, EventEmitter), + Pinger = require('./lib/pinger')(inherits, EventEmitter), + Endpoint = require('./lib/endpoint')(inherits, EventEmitter, Pinger), RequestSet = require('./lib/request_set') module.exports = require('./lib/pool')(inherits, EventEmitter, Endpoint, RequestSet) diff --git a/lib/endpoint.js b/lib/endpoint.js index cf54ac7..0797f43 100644 --- a/lib/endpoint.js +++ b/lib/endpoint.js @@ -2,7 +2,7 @@ var Stream = require('stream') var http = require('http') var KeepAlive = require('agentkeepalive') -module.exports = function (inherits, EventEmitter) { +module.exports = function (inherits, EventEmitter, Pinger) { var MAX_COUNT = Math.pow(2, 52) var clock = Date.now() var clockInterval = null @@ -29,6 +29,12 @@ module.exports = function (inherits, EventEmitter) { this.healthy = true this.name = this.ip + ':' + this.port this.address = this.ip + + this.pinger = new Pinger(this.ping.bind(this)) + this.pinger.on('pong', function () { + this.setHealthy(true) + }.bind(this)) + this.pingPath = options.ping this.pingTimeout = options.pingTimeout || 2000 if (options.keepAlive) { @@ -51,6 +57,7 @@ module.exports = function (inherits, EventEmitter) { this.pending = 0 this.successes = 0 this.failures = 0 + this.filtered = 0 this.timeout = (options.timeout === 0) ? 0 : options.timeout || (60 * 1000) this.resolution = (options.resolution === 0) ? 0 : options.resolution || 1000 @@ -81,6 +88,7 @@ module.exports = function (inherits, EventEmitter) { pending: this.pending, successes: this.successes, failures: this.failures, + filtered: this.filtered, healthy: this.healthy, socketRequestCounts: requestCounts } @@ -97,9 +105,6 @@ module.exports = function (inherits, EventEmitter) { r.abort() } } - if (!this.healthy) { - this.ping() - } this.requestRate = this.requestCount - this.requestsLastCheck this.requestsLastCheck = this.requestCount } @@ -132,6 +137,12 @@ module.exports = function (inherits, EventEmitter) { Endpoint.prototype.failed = function (error, request) { this.failures++ + this.setHealthy(false) + this.complete(error, request) + } + + Endpoint.prototype.filterRejected = function (error, request) { + this.filtered++ this.complete(error, request) } @@ -192,17 +203,15 @@ module.exports = function (inherits, EventEmitter) { } Endpoint.prototype.setHealthy = function (newState) { + if (!this.pingPath) { + return // an endpoint with no pingPath can never be made unhealthy + } + if (!newState) { + this.pinger.start() + } if (this.healthy !== newState) { this.healthy = newState - if (!this.healthy) { - this.ping() // ping may set this back to healthy - if (!this.healthy) { - this.emit('health', this) - } - } - else { - this.emit('health', this) - } + this.emit('health', this) } } @@ -210,22 +219,14 @@ module.exports = function (inherits, EventEmitter) { delete this.requests[id] } - function gotPingResponse(error, response, body) { - this.node.setHealthy(!error && response.statusCode === 200) - } - - Endpoint.prototype.ping = function () { - if (this.pingPath) { - this.request( - { path: this.pingPath - , method: 'GET' - , timeout: this.pingTimeout - } - , gotPingResponse) - } - else { - this.setHealthy(true) - } + Endpoint.prototype.ping = function (cb) { + return this.request( + { path: this.pingPath + , method: 'GET' + , timeout: this.pingTimeout + } + , cb + ) } // this = request @@ -251,7 +252,6 @@ module.exports = function (inherits, EventEmitter) { , message: msg } , this) - this.node.setHealthy(false) } // this = response @@ -268,7 +268,6 @@ module.exports = function (inherits, EventEmitter) { var node = req.node if (req.callback === null) { return } - node.setHealthy(true) var buffer = new Buffer(this.bodyLength) var offset = 0 @@ -282,7 +281,7 @@ module.exports = function (inherits, EventEmitter) { var delay = opt.retryFilter(opt, this, body) if (delay !== false) { // delay may be 0 - return node.failed( + return node.filterRejected( { delay: delay , reason: 'filter' , attempt: req diff --git a/lib/pinger.js b/lib/pinger.js new file mode 100644 index 0000000..c6a863b --- /dev/null +++ b/lib/pinger.js @@ -0,0 +1,47 @@ +module.exports = function (inherits, EventEmitter) { + + function Pinger(request) { + this.onPingResponse = pingResponse.bind(this) + this.request = request.bind(this, this.onPingResponse) + this.running = false + this.attempts = 0 + EventEmitter.call(this) + } + inherits(Pinger, EventEmitter) + + function pingResponse(error, response, body) { + if (!error && response.statusCode === 200) { + this.emit('pong') + this.running = false + } + else { + this.attempts++ + this.ping() + } + } + + function exponentialBackoff(attempt) { + return Math.min( + Math.floor(Math.random() * Math.pow(2, attempt) + 10), + 10000) + } + + Pinger.prototype.ping = function () { + if (this.attempts) { + setTimeout(this.request, exponentialBackoff(this.attempts)) + } + else { + this.request() + } + } + + Pinger.prototype.start = function () { + if (!this.running) { + this.running = true + this.attempts = 0 + this.ping() + } + } + + return Pinger +} diff --git a/test/endpoint_test.js b/test/endpoint_test.js index 5a15440..607e1d7 100644 --- a/test/endpoint_test.js +++ b/test/endpoint_test.js @@ -6,7 +6,8 @@ var Stream = require('stream') var noop = function () {} -var Endpoint = require("../lib/endpoint")(inherits, EventEmitter) +var Pinger = require('../lib/pinger')(inherits, EventEmitter) +var Endpoint = require("../lib/endpoint")(inherits, EventEmitter, Pinger) describe("Endpoint", function () { @@ -333,9 +334,9 @@ describe("Endpoint", function () { it("maintains the correct pending count when requestCount 'overflows'", function () { var e = new Endpoint(http, '127.0.0.1', 6969) - e.successes = (Math.pow(2, 31) / 2) - 250 - e.failures = (Math.pow(2, 31) / 2) - 250 - e.requestCount = Math.pow(2, 31) + e.successes = (Math.pow(2, 52) / 2) - 250 + e.failures = (Math.pow(2, 52) / 2) - 250 + e.requestCount = Math.pow(2, 52) e.setPending() assert.equal(e.pending, 500) assert.equal(e.requestCount, 500) @@ -345,7 +346,7 @@ describe("Endpoint", function () { var e = new Endpoint(http, '127.0.0.1', 6969) e.pending = 500 e.requestRate = 500 - e.requestCount = Math.pow(2, 31) + e.requestCount = Math.pow(2, 52) e.requestsLastCheck = e.requestCount - 500 e.resetCounters() assert.equal(e.requestCount - e.requestsLastCheck, e.requestRate) @@ -359,14 +360,14 @@ describe("Endpoint", function () { describe("setHealthy()", function () { - it("calls ping if transitioning from healthy to unhealthy", function (done) { - var e = new Endpoint(http, '127.0.0.1', 6969) - e.ping = done + it("calls pinger.start if transitioning from healthy to unhealthy", function (done) { + var e = new Endpoint(http, '127.0.0.1', 6969, {ping: '/ping'}) + e.pinger.start = done e.setHealthy(false) }) it("emits 'health' once when changing state from healthy to unhealthy", function (done) { - var e = new Endpoint(http, '127.0.0.1', 6969) + var e = new Endpoint(http, '127.0.0.1', 6969, {ping: '/ping'}) e.emit = function (name) { assert.equal(name, "health") done() @@ -375,7 +376,7 @@ describe("Endpoint", function () { }) it("emits 'health' once when changing state from unhealthy to healthy", function (done) { - var e = new Endpoint(http, '127.0.0.1', 6969) + var e = new Endpoint(http, '127.0.0.1', 6969, {ping: '/ping'}) e.emit = function (name) { assert.equal(name, "health") done()