Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

rewrote the ping logic to fix several ping related issues

The previous code would ping too frequently and could get stuck
in a ping loop, even if the endpoint was actually healthy.

The new code factors out the ping logic into its own module. The
logic for setting the endpoint health has be constrained to exactly
two locations, one for setting as healthy and one for unhealthy.

Pinger ensures that only one ping request is pending at a given time
and that the ping loop is independent of other timeouts. The pinger
uses exponential backoff for timing ping requests with a maximum
delay of 10s.

The tests were updated for the new behavior.
  • Loading branch information...
commit 594ce6ce6e23d04c204c53c1defcbcbeaae5aca2 1 parent 4af17d3
@dannycoates authored
Showing with 91 additions and 43 deletions.
  1. +2 −1  index.js
  2. +31 −32 lib/endpoint.js
  3. +47 −0 lib/pinger.js
  4. +11 −10 test/endpoint_test.js
View
3  index.js
@@ -1,6 +1,7 @@
var inherits = require('util').inherits,
EventEmitter = require('events').EventEmitter,
- Endpoint = require('./lib/endpoint')(inherits, EventEmitter),
+ Pinger = require('./lib/pinger')(inherits, EventEmitter),
+ Endpoint = require('./lib/endpoint')(inherits, EventEmitter, Pinger),
RequestSet = require('./lib/request_set')
module.exports = require('./lib/pool')(inherits, EventEmitter, Endpoint, RequestSet)
View
63 lib/endpoint.js
@@ -2,7 +2,7 @@ var Stream = require('stream')
var http = require('http')
var KeepAlive = require('agentkeepalive')
-module.exports = function (inherits, EventEmitter) {
+module.exports = function (inherits, EventEmitter, Pinger) {
var MAX_COUNT = Math.pow(2, 52)
var clock = Date.now()
var clockInterval = null
@@ -29,6 +29,12 @@ module.exports = function (inherits, EventEmitter) {
this.healthy = true
this.name = this.ip + ':' + this.port
this.address = this.ip
+
+ this.pinger = new Pinger(this.ping.bind(this))
+ this.pinger.on('pong', function () {
+ this.setHealthy(true)
+ }.bind(this))
+
this.pingPath = options.ping
this.pingTimeout = options.pingTimeout || 2000
if (options.keepAlive) {
@@ -51,6 +57,7 @@ module.exports = function (inherits, EventEmitter) {
this.pending = 0
this.successes = 0
this.failures = 0
+ this.filtered = 0
this.timeout = (options.timeout === 0) ? 0 : options.timeout || (60 * 1000)
this.resolution = (options.resolution === 0) ? 0 : options.resolution || 1000
@@ -81,6 +88,7 @@ module.exports = function (inherits, EventEmitter) {
pending: this.pending,
successes: this.successes,
failures: this.failures,
+ filtered: this.filtered,
healthy: this.healthy,
socketRequestCounts: requestCounts
}
@@ -97,9 +105,6 @@ module.exports = function (inherits, EventEmitter) {
r.abort()
}
}
- if (!this.healthy) {
- this.ping()
- }
this.requestRate = this.requestCount - this.requestsLastCheck
this.requestsLastCheck = this.requestCount
}
@@ -132,6 +137,12 @@ module.exports = function (inherits, EventEmitter) {
Endpoint.prototype.failed = function (error, request) {
this.failures++
+ this.setHealthy(false)
+ this.complete(error, request)
+ }
+
+ Endpoint.prototype.filterRejected = function (error, request) {
+ this.filtered++
this.complete(error, request)
}
@@ -192,17 +203,15 @@ module.exports = function (inherits, EventEmitter) {
}
Endpoint.prototype.setHealthy = function (newState) {
+ if (!this.pingPath) {
+ return // an endpoint with no pingPath can never be made unhealthy
+ }
+ if (!newState) {
+ this.pinger.start()
+ }
if (this.healthy !== newState) {
this.healthy = newState
- if (!this.healthy) {
- this.ping() // ping may set this back to healthy
- if (!this.healthy) {
- this.emit('health', this)
- }
- }
- else {
- this.emit('health', this)
- }
+ this.emit('health', this)
}
}
@@ -210,22 +219,14 @@ module.exports = function (inherits, EventEmitter) {
delete this.requests[id]
}
- function gotPingResponse(error, response, body) {
- this.node.setHealthy(!error && response.statusCode === 200)
- }
-
- Endpoint.prototype.ping = function () {
- if (this.pingPath) {
- this.request(
- { path: this.pingPath
- , method: 'GET'
- , timeout: this.pingTimeout
- }
- , gotPingResponse)
- }
- else {
- this.setHealthy(true)
- }
+ Endpoint.prototype.ping = function (cb) {
+ return this.request(
+ { path: this.pingPath
+ , method: 'GET'
+ , timeout: this.pingTimeout
+ }
+ , cb
+ )
}
// this = request
@@ -251,7 +252,6 @@ module.exports = function (inherits, EventEmitter) {
, message: msg
}
, this)
- this.node.setHealthy(false)
}
// this = response
@@ -268,7 +268,6 @@ module.exports = function (inherits, EventEmitter) {
var node = req.node
if (req.callback === null) { return }
- node.setHealthy(true)
var buffer = new Buffer(this.bodyLength)
var offset = 0
@@ -282,7 +281,7 @@ module.exports = function (inherits, EventEmitter) {
var delay = opt.retryFilter(opt, this, body)
if (delay !== false) { // delay may be 0
- return node.failed(
+ return node.filterRejected(
{ delay: delay
, reason: 'filter'
, attempt: req
View
47 lib/pinger.js
@@ -0,0 +1,47 @@
+module.exports = function (inherits, EventEmitter) {
+
+ function Pinger(request) {
+ this.onPingResponse = pingResponse.bind(this)
+ this.request = request.bind(this, this.onPingResponse)
+ this.running = false
+ this.attempts = 0
+ EventEmitter.call(this)
+ }
+ inherits(Pinger, EventEmitter)
+
+ function pingResponse(error, response, body) {
+ if (!error && response.statusCode === 200) {
+ this.emit('pong')
+ this.running = false
+ }
+ else {
+ this.attempts++
+ this.ping()
+ }
+ }
+
+ function exponentialBackoff(attempt) {
+ return Math.min(
+ Math.floor(Math.random() * Math.pow(2, attempt) + 10),
+ 10000)
+ }
+
+ Pinger.prototype.ping = function () {
+ if (this.attempts) {
+ setTimeout(this.request, exponentialBackoff(this.attempts))
+ }
+ else {
+ this.request()
+ }
+ }
+
+ Pinger.prototype.start = function () {
+ if (!this.running) {
+ this.running = true
+ this.attempts = 0
+ this.ping()
+ }
+ }
+
+ return Pinger
+}
View
21 test/endpoint_test.js
@@ -6,7 +6,8 @@ var Stream = require('stream')
var noop = function () {}
-var Endpoint = require("../lib/endpoint")(inherits, EventEmitter)
+var Pinger = require('../lib/pinger')(inherits, EventEmitter)
+var Endpoint = require("../lib/endpoint")(inherits, EventEmitter, Pinger)
describe("Endpoint", function () {
@@ -333,9 +334,9 @@ describe("Endpoint", function () {
it("maintains the correct pending count when requestCount 'overflows'", function () {
var e = new Endpoint(http, '127.0.0.1', 6969)
- e.successes = (Math.pow(2, 31) / 2) - 250
- e.failures = (Math.pow(2, 31) / 2) - 250
- e.requestCount = Math.pow(2, 31)
+ e.successes = (Math.pow(2, 52) / 2) - 250
+ e.failures = (Math.pow(2, 52) / 2) - 250
+ e.requestCount = Math.pow(2, 52)
e.setPending()
assert.equal(e.pending, 500)
assert.equal(e.requestCount, 500)
@@ -345,7 +346,7 @@ describe("Endpoint", function () {
var e = new Endpoint(http, '127.0.0.1', 6969)
e.pending = 500
e.requestRate = 500
- e.requestCount = Math.pow(2, 31)
+ e.requestCount = Math.pow(2, 52)
e.requestsLastCheck = e.requestCount - 500
e.resetCounters()
assert.equal(e.requestCount - e.requestsLastCheck, e.requestRate)
@@ -359,14 +360,14 @@ describe("Endpoint", function () {
describe("setHealthy()", function () {
- it("calls ping if transitioning from healthy to unhealthy", function (done) {
- var e = new Endpoint(http, '127.0.0.1', 6969)
- e.ping = done
+ it("calls pinger.start if transitioning from healthy to unhealthy", function (done) {
+ var e = new Endpoint(http, '127.0.0.1', 6969, {ping: '/ping'})
+ e.pinger.start = done
e.setHealthy(false)
})
it("emits 'health' once when changing state from healthy to unhealthy", function (done) {
- var e = new Endpoint(http, '127.0.0.1', 6969)
+ var e = new Endpoint(http, '127.0.0.1', 6969, {ping: '/ping'})
e.emit = function (name) {
assert.equal(name, "health")
done()
@@ -375,7 +376,7 @@ describe("Endpoint", function () {
})
it("emits 'health' once when changing state from unhealthy to healthy", function (done) {
- var e = new Endpoint(http, '127.0.0.1', 6969)
+ var e = new Endpoint(http, '127.0.0.1', 6969, {ping: '/ping'})
e.emit = function (name) {
assert.equal(name, "health")
done()
Please sign in to comment.
Something went wrong with that request. Please try again.