Skip to content
This repository has been archived by the owner on Dec 27, 2022. It is now read-only.

Commit

Permalink
Merge pull request #9 from dannycoates/stats
Browse files Browse the repository at this point in the history
rewrote the ping logic to fix several ping related issues
  • Loading branch information
dannycoates committed Nov 28, 2012
2 parents fe25f75 + 594ce6c commit 56d5cb5
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 43 deletions.
3 changes: 2 additions & 1 deletion index.js
@@ -1,6 +1,7 @@
var inherits = require('util').inherits,
EventEmitter = require('events').EventEmitter,
Endpoint = require('./lib/endpoint')(inherits, EventEmitter),
Pinger = require('./lib/pinger')(inherits, EventEmitter),
Endpoint = require('./lib/endpoint')(inherits, EventEmitter, Pinger),
RequestSet = require('./lib/request_set')

module.exports = require('./lib/pool')(inherits, EventEmitter, Endpoint, RequestSet)
63 changes: 31 additions & 32 deletions lib/endpoint.js
Expand Up @@ -2,7 +2,7 @@ var Stream = require('stream')
var http = require('http')
var KeepAlive = require('agentkeepalive')

module.exports = function (inherits, EventEmitter) {
module.exports = function (inherits, EventEmitter, Pinger) {
var MAX_COUNT = Math.pow(2, 52)
var clock = Date.now()
var clockInterval = null
Expand All @@ -29,6 +29,12 @@ module.exports = function (inherits, EventEmitter) {
this.healthy = true
this.name = this.ip + ':' + this.port
this.address = this.ip

this.pinger = new Pinger(this.ping.bind(this))
this.pinger.on('pong', function () {
this.setHealthy(true)
}.bind(this))

this.pingPath = options.ping
this.pingTimeout = options.pingTimeout || 2000
if (options.keepAlive) {
Expand All @@ -51,6 +57,7 @@ module.exports = function (inherits, EventEmitter) {
this.pending = 0
this.successes = 0
this.failures = 0
this.filtered = 0

this.timeout = (options.timeout === 0) ? 0 : options.timeout || (60 * 1000)
this.resolution = (options.resolution === 0) ? 0 : options.resolution || 1000
Expand Down Expand Up @@ -81,6 +88,7 @@ module.exports = function (inherits, EventEmitter) {
pending: this.pending,
successes: this.successes,
failures: this.failures,
filtered: this.filtered,
healthy: this.healthy,
socketRequestCounts: requestCounts
}
Expand All @@ -97,9 +105,6 @@ module.exports = function (inherits, EventEmitter) {
r.abort()
}
}
if (!this.healthy) {
this.ping()
}
this.requestRate = this.requestCount - this.requestsLastCheck
this.requestsLastCheck = this.requestCount
}
Expand Down Expand Up @@ -132,6 +137,12 @@ module.exports = function (inherits, EventEmitter) {

Endpoint.prototype.failed = function (error, request) {
this.failures++
this.setHealthy(false)
this.complete(error, request)
}

Endpoint.prototype.filterRejected = function (error, request) {
this.filtered++
this.complete(error, request)
}

Expand Down Expand Up @@ -192,40 +203,30 @@ module.exports = function (inherits, EventEmitter) {
}

Endpoint.prototype.setHealthy = function (newState) {
if (!this.pingPath) {
return // an endpoint with no pingPath can never be made unhealthy
}
if (!newState) {
this.pinger.start()
}
if (this.healthy !== newState) {
this.healthy = newState
if (!this.healthy) {
this.ping() // ping may set this back to healthy
if (!this.healthy) {
this.emit('health', this)
}
}
else {
this.emit('health', this)
}
this.emit('health', this)
}
}

Endpoint.prototype.deleteRequest = function (id) {
delete this.requests[id]
}

function gotPingResponse(error, response, body) {
this.node.setHealthy(!error && response.statusCode === 200)
}

Endpoint.prototype.ping = function () {
if (this.pingPath) {
this.request(
{ path: this.pingPath
, method: 'GET'
, timeout: this.pingTimeout
}
, gotPingResponse)
}
else {
this.setHealthy(true)
}
Endpoint.prototype.ping = function (cb) {
return this.request(
{ path: this.pingPath
, method: 'GET'
, timeout: this.pingTimeout
}
, cb
)
}

// this = request
Expand All @@ -251,7 +252,6 @@ module.exports = function (inherits, EventEmitter) {
, message: msg
}
, this)
this.node.setHealthy(false)
}

// this = response
Expand All @@ -268,7 +268,6 @@ module.exports = function (inherits, EventEmitter) {
var node = req.node

if (req.callback === null) { return }
node.setHealthy(true)

var buffer = new Buffer(this.bodyLength)
var offset = 0
Expand All @@ -282,7 +281,7 @@ module.exports = function (inherits, EventEmitter) {

var delay = opt.retryFilter(opt, this, body)
if (delay !== false) { // delay may be 0
return node.failed(
return node.filterRejected(
{ delay: delay
, reason: 'filter'
, attempt: req
Expand Down
47 changes: 47 additions & 0 deletions lib/pinger.js
@@ -0,0 +1,47 @@
module.exports = function (inherits, EventEmitter) {

function Pinger(request) {
this.onPingResponse = pingResponse.bind(this)
this.request = request.bind(this, this.onPingResponse)
this.running = false
this.attempts = 0
EventEmitter.call(this)
}
inherits(Pinger, EventEmitter)

function pingResponse(error, response, body) {
if (!error && response.statusCode === 200) {
this.emit('pong')
this.running = false
}
else {
this.attempts++
this.ping()
}
}

function exponentialBackoff(attempt) {
return Math.min(
Math.floor(Math.random() * Math.pow(2, attempt) + 10),
10000)
}

Pinger.prototype.ping = function () {
if (this.attempts) {
setTimeout(this.request, exponentialBackoff(this.attempts))
}
else {
this.request()
}
}

Pinger.prototype.start = function () {
if (!this.running) {
this.running = true
this.attempts = 0
this.ping()
}
}

return Pinger
}
21 changes: 11 additions & 10 deletions test/endpoint_test.js
Expand Up @@ -6,7 +6,8 @@ var Stream = require('stream')

var noop = function () {}

var Endpoint = require("../lib/endpoint")(inherits, EventEmitter)
var Pinger = require('../lib/pinger')(inherits, EventEmitter)
var Endpoint = require("../lib/endpoint")(inherits, EventEmitter, Pinger)

describe("Endpoint", function () {

Expand Down Expand Up @@ -333,9 +334,9 @@ describe("Endpoint", function () {

it("maintains the correct pending count when requestCount 'overflows'", function () {
var e = new Endpoint(http, '127.0.0.1', 6969)
e.successes = (Math.pow(2, 31) / 2) - 250
e.failures = (Math.pow(2, 31) / 2) - 250
e.requestCount = Math.pow(2, 31)
e.successes = (Math.pow(2, 52) / 2) - 250
e.failures = (Math.pow(2, 52) / 2) - 250
e.requestCount = Math.pow(2, 52)
e.setPending()
assert.equal(e.pending, 500)
assert.equal(e.requestCount, 500)
Expand All @@ -345,7 +346,7 @@ describe("Endpoint", function () {
var e = new Endpoint(http, '127.0.0.1', 6969)
e.pending = 500
e.requestRate = 500
e.requestCount = Math.pow(2, 31)
e.requestCount = Math.pow(2, 52)
e.requestsLastCheck = e.requestCount - 500
e.resetCounters()
assert.equal(e.requestCount - e.requestsLastCheck, e.requestRate)
Expand All @@ -359,14 +360,14 @@ describe("Endpoint", function () {

describe("setHealthy()", function () {

it("calls ping if transitioning from healthy to unhealthy", function (done) {
var e = new Endpoint(http, '127.0.0.1', 6969)
e.ping = done
it("calls pinger.start if transitioning from healthy to unhealthy", function (done) {
var e = new Endpoint(http, '127.0.0.1', 6969, {ping: '/ping'})
e.pinger.start = done
e.setHealthy(false)
})

it("emits 'health' once when changing state from healthy to unhealthy", function (done) {
var e = new Endpoint(http, '127.0.0.1', 6969)
var e = new Endpoint(http, '127.0.0.1', 6969, {ping: '/ping'})
e.emit = function (name) {
assert.equal(name, "health")
done()
Expand All @@ -375,7 +376,7 @@ describe("Endpoint", function () {
})

it("emits 'health' once when changing state from unhealthy to healthy", function (done) {
var e = new Endpoint(http, '127.0.0.1', 6969)
var e = new Endpoint(http, '127.0.0.1', 6969, {ping: '/ping'})
e.emit = function (name) {
assert.equal(name, "health")
done()
Expand Down

0 comments on commit 56d5cb5

Please sign in to comment.