From 594ce6ce6e23d04c204c53c1defcbcbeaae5aca2 Mon Sep 17 00:00:00 2001 From: Danny Coates Date: Tue, 27 Nov 2012 16:08:23 -0800 Subject: [PATCH] rewrote the ping logic to fix several ping related issues The previous code would ping too frequently and could get stuck in a ping loop, even if the endpoint was actually healthy. The new code factors out the ping logic into its own module. The logic for setting the endpoint health has be constrained to exactly two locations, one for setting as healthy and one for unhealthy. Pinger ensures that only one ping request is pending at a given time and that the ping loop is independent of other timeouts. The pinger uses exponential backoff for timing ping requests with a maximum delay of 10s. The tests were updated for the new behavior. --- index.js | 3 ++- lib/endpoint.js | 63 +++++++++++++++++++++---------------------- lib/pinger.js | 47 ++++++++++++++++++++++++++++++++ test/endpoint_test.js | 21 ++++++++------- 4 files changed, 91 insertions(+), 43 deletions(-) create mode 100644 lib/pinger.js diff --git a/index.js b/index.js index 4ecada2..2368235 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,7 @@ var inherits = require('util').inherits, EventEmitter = require('events').EventEmitter, - Endpoint = require('./lib/endpoint')(inherits, EventEmitter), + Pinger = require('./lib/pinger')(inherits, EventEmitter), + Endpoint = require('./lib/endpoint')(inherits, EventEmitter, Pinger), RequestSet = require('./lib/request_set') module.exports = require('./lib/pool')(inherits, EventEmitter, Endpoint, RequestSet) diff --git a/lib/endpoint.js b/lib/endpoint.js index cf54ac7..0797f43 100644 --- a/lib/endpoint.js +++ b/lib/endpoint.js @@ -2,7 +2,7 @@ var Stream = require('stream') var http = require('http') var KeepAlive = require('agentkeepalive') -module.exports = function (inherits, EventEmitter) { +module.exports = function (inherits, EventEmitter, Pinger) { var MAX_COUNT = Math.pow(2, 52) var clock = Date.now() var clockInterval = null @@ -29,6 +29,12 @@ module.exports = function (inherits, EventEmitter) { this.healthy = true this.name = this.ip + ':' + this.port this.address = this.ip + + this.pinger = new Pinger(this.ping.bind(this)) + this.pinger.on('pong', function () { + this.setHealthy(true) + }.bind(this)) + this.pingPath = options.ping this.pingTimeout = options.pingTimeout || 2000 if (options.keepAlive) { @@ -51,6 +57,7 @@ module.exports = function (inherits, EventEmitter) { this.pending = 0 this.successes = 0 this.failures = 0 + this.filtered = 0 this.timeout = (options.timeout === 0) ? 0 : options.timeout || (60 * 1000) this.resolution = (options.resolution === 0) ? 0 : options.resolution || 1000 @@ -81,6 +88,7 @@ module.exports = function (inherits, EventEmitter) { pending: this.pending, successes: this.successes, failures: this.failures, + filtered: this.filtered, healthy: this.healthy, socketRequestCounts: requestCounts } @@ -97,9 +105,6 @@ module.exports = function (inherits, EventEmitter) { r.abort() } } - if (!this.healthy) { - this.ping() - } this.requestRate = this.requestCount - this.requestsLastCheck this.requestsLastCheck = this.requestCount } @@ -132,6 +137,12 @@ module.exports = function (inherits, EventEmitter) { Endpoint.prototype.failed = function (error, request) { this.failures++ + this.setHealthy(false) + this.complete(error, request) + } + + Endpoint.prototype.filterRejected = function (error, request) { + this.filtered++ this.complete(error, request) } @@ -192,17 +203,15 @@ module.exports = function (inherits, EventEmitter) { } Endpoint.prototype.setHealthy = function (newState) { + if (!this.pingPath) { + return // an endpoint with no pingPath can never be made unhealthy + } + if (!newState) { + this.pinger.start() + } if (this.healthy !== newState) { this.healthy = newState - if (!this.healthy) { - this.ping() // ping may set this back to healthy - if (!this.healthy) { - this.emit('health', this) - } - } - else { - this.emit('health', this) - } + this.emit('health', this) } } @@ -210,22 +219,14 @@ module.exports = function (inherits, EventEmitter) { delete this.requests[id] } - function gotPingResponse(error, response, body) { - this.node.setHealthy(!error && response.statusCode === 200) - } - - Endpoint.prototype.ping = function () { - if (this.pingPath) { - this.request( - { path: this.pingPath - , method: 'GET' - , timeout: this.pingTimeout - } - , gotPingResponse) - } - else { - this.setHealthy(true) - } + Endpoint.prototype.ping = function (cb) { + return this.request( + { path: this.pingPath + , method: 'GET' + , timeout: this.pingTimeout + } + , cb + ) } // this = request @@ -251,7 +252,6 @@ module.exports = function (inherits, EventEmitter) { , message: msg } , this) - this.node.setHealthy(false) } // this = response @@ -268,7 +268,6 @@ module.exports = function (inherits, EventEmitter) { var node = req.node if (req.callback === null) { return } - node.setHealthy(true) var buffer = new Buffer(this.bodyLength) var offset = 0 @@ -282,7 +281,7 @@ module.exports = function (inherits, EventEmitter) { var delay = opt.retryFilter(opt, this, body) if (delay !== false) { // delay may be 0 - return node.failed( + return node.filterRejected( { delay: delay , reason: 'filter' , attempt: req diff --git a/lib/pinger.js b/lib/pinger.js new file mode 100644 index 0000000..c6a863b --- /dev/null +++ b/lib/pinger.js @@ -0,0 +1,47 @@ +module.exports = function (inherits, EventEmitter) { + + function Pinger(request) { + this.onPingResponse = pingResponse.bind(this) + this.request = request.bind(this, this.onPingResponse) + this.running = false + this.attempts = 0 + EventEmitter.call(this) + } + inherits(Pinger, EventEmitter) + + function pingResponse(error, response, body) { + if (!error && response.statusCode === 200) { + this.emit('pong') + this.running = false + } + else { + this.attempts++ + this.ping() + } + } + + function exponentialBackoff(attempt) { + return Math.min( + Math.floor(Math.random() * Math.pow(2, attempt) + 10), + 10000) + } + + Pinger.prototype.ping = function () { + if (this.attempts) { + setTimeout(this.request, exponentialBackoff(this.attempts)) + } + else { + this.request() + } + } + + Pinger.prototype.start = function () { + if (!this.running) { + this.running = true + this.attempts = 0 + this.ping() + } + } + + return Pinger +} diff --git a/test/endpoint_test.js b/test/endpoint_test.js index 5a15440..607e1d7 100644 --- a/test/endpoint_test.js +++ b/test/endpoint_test.js @@ -6,7 +6,8 @@ var Stream = require('stream') var noop = function () {} -var Endpoint = require("../lib/endpoint")(inherits, EventEmitter) +var Pinger = require('../lib/pinger')(inherits, EventEmitter) +var Endpoint = require("../lib/endpoint")(inherits, EventEmitter, Pinger) describe("Endpoint", function () { @@ -333,9 +334,9 @@ describe("Endpoint", function () { it("maintains the correct pending count when requestCount 'overflows'", function () { var e = new Endpoint(http, '127.0.0.1', 6969) - e.successes = (Math.pow(2, 31) / 2) - 250 - e.failures = (Math.pow(2, 31) / 2) - 250 - e.requestCount = Math.pow(2, 31) + e.successes = (Math.pow(2, 52) / 2) - 250 + e.failures = (Math.pow(2, 52) / 2) - 250 + e.requestCount = Math.pow(2, 52) e.setPending() assert.equal(e.pending, 500) assert.equal(e.requestCount, 500) @@ -345,7 +346,7 @@ describe("Endpoint", function () { var e = new Endpoint(http, '127.0.0.1', 6969) e.pending = 500 e.requestRate = 500 - e.requestCount = Math.pow(2, 31) + e.requestCount = Math.pow(2, 52) e.requestsLastCheck = e.requestCount - 500 e.resetCounters() assert.equal(e.requestCount - e.requestsLastCheck, e.requestRate) @@ -359,14 +360,14 @@ describe("Endpoint", function () { describe("setHealthy()", function () { - it("calls ping if transitioning from healthy to unhealthy", function (done) { - var e = new Endpoint(http, '127.0.0.1', 6969) - e.ping = done + it("calls pinger.start if transitioning from healthy to unhealthy", function (done) { + var e = new Endpoint(http, '127.0.0.1', 6969, {ping: '/ping'}) + e.pinger.start = done e.setHealthy(false) }) it("emits 'health' once when changing state from healthy to unhealthy", function (done) { - var e = new Endpoint(http, '127.0.0.1', 6969) + var e = new Endpoint(http, '127.0.0.1', 6969, {ping: '/ping'}) e.emit = function (name) { assert.equal(name, "health") done() @@ -375,7 +376,7 @@ describe("Endpoint", function () { }) it("emits 'health' once when changing state from unhealthy to healthy", function (done) { - var e = new Endpoint(http, '127.0.0.1', 6969) + var e = new Endpoint(http, '127.0.0.1', 6969, {ping: '/ping'}) e.emit = function (name) { assert.equal(name, "health") done()