Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Commit

Permalink
feat: add incremental back-off in case of APM Server issues
Browse files Browse the repository at this point in the history
  • Loading branch information
watson committed Sep 4, 2018
1 parent 38e32e3 commit ac56962
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 31 deletions.
116 changes: 88 additions & 28 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const util = require('util')
const os = require('os')
const parseUrl = require('url').parse
const zlib = require('zlib')
const Writable = require('readable-stream').Writable
const {Writable, PassThrough} = require('readable-stream')
const pump = require('pump')
const eos = require('end-of-stream')
const streamToBuffer = require('fast-stream-to-buffer')
Expand Down Expand Up @@ -48,6 +48,20 @@ function Client (opts) {
Writable.call(this, opts)

const errorproxy = (err) => {
this._errors++

const retryIndex = this._errors === 0 ? 0 : this._errors - 1
const backoff = (Math.min(retryIndex, 6) ** 2) * 1000

if (backoff > 0) {
this._chopper.resetTimer(-1) // disable timer to prepare for back-off mode
this._backoffTimer = setTimeout(() => {
this._backoffTimer = null
this._chopper.resetTimer(this._chopperTime)
if (this._backoffCallback) this._backoffCallback()
}, backoff)
}

if (this._destroyed === false) this.emit('error', err)
}

Expand All @@ -57,6 +71,11 @@ function Client (opts) {

this._received = 0 // number of events given to the client for reporting
this.sent = 0 // number of events written to the socket
this._errors = 0 // number of requests that resulted in an error (dropped connection, non-2xx etc)
this._chopperSize = opts.time // needed to set highWatermark on buffer if we enter back-off mode
this._chopperTime = opts.time // needed to restore the normal time if we ever enter back-off mode
this._backoffTimer = null
this._backoffCallback = null
this._active = false
this._destroyed = false
this._onflushed = null
Expand Down Expand Up @@ -169,39 +188,15 @@ function onStream (opts, client, onerror) {
const requestOpts = getRequestOptions(opts, client._agent)

return function (stream, next) {
const onerrorproxy = (err) => {
stream.removeListener('error', onerrorproxy)
compressor.removeListener('error', onerrorproxy)
req.removeListener('error', onerrorproxy)
stream.destroy()
onerror(err)
}

client._active = true

const req = client._transport.request(requestOpts, onResult(onerror))
let buffer, req
const compressor = zlib.createGzip()

// Mointor streams for errors so that we can make sure to destory the
// output stream as soon as that occurs
stream.on('error', onerrorproxy)
compressor.on('error', onerrorproxy)
req.on('error', onerrorproxy)

req.on('socket', function (socket) {
// Sockets will automatically be unreffed by the HTTP agent when they are
// not in use by an HTTP request, but as we're keeping the HTTP request
// open, we need to unref the socket manually
socket.unref()
})

if (Number.isFinite(serverTimeout)) {
req.setTimeout(serverTimeout, function () {
req.abort()
})
}

pump(stream, compressor, req, function () {
pump(stream, compressor, requestProxy(requestOpts, onResult(client, onerror)), function () {
// This function is technically called with an error, but because we
// manually attach error listeners on all the streams in the pipeline
// above, we can safely ignore it.
Expand Down Expand Up @@ -244,10 +239,72 @@ function onStream (opts, client, onerror) {
const metadata = getMetadata(opts)
truncate.metadata(metadata, opts)
stream.write(ndjson.serialize({metadata}))

// Under normal opperation, just make a request and return it. If
// instructed to back off, make a temporary buffer to hold data until the
// request can be made
function requestProxy (opts, onresponse) {
if (client._backoffTimer) {
buffer = new PassThrough({highWaterMark: client._chopperSize * 2}) // twice as large to allow overflow
buffer.on('error', onerrorproxy)

eos(stream, function () {
client._backoffCallback = null
if (client._backoffTimer) {
// drop all data - back-off still in effect
compressor.destroy()
buffer.destroy()
}
})

client._backoffCallback = function () {
client._backoffCallback = null
req = makeRequest(opts, onresponse)
buffer.pipe(req)
}

return buffer
} else {
return makeRequest(opts, onresponse)
}
}

function makeRequest (opts, onresponse) {
const req = client._transport.request(opts, onresponse)

req.on('error', onerrorproxy)

req.on('socket', function (socket) {
// Sockets will automatically be unreffed by the HTTP agent when they are
// not in use by an HTTP request, but as we're keeping the HTTP request
// open, we need to unref the socket manually
socket.unref()
})

if (Number.isFinite(serverTimeout)) {
req.setTimeout(serverTimeout, function () {
req.abort()
})
}

return req
}

// This function is attached to the error event of the different streams so
// that we can make sure to destroy the output stream as soon as an error
// occurs
function onerrorproxy (err) {
stream.removeListener('error', onerrorproxy)
compressor.removeListener('error', onerrorproxy)
if (buffer) buffer.removeListener('error', onerrorproxy)
if (req) req.removeListener('error', onerrorproxy)
stream.destroy()
onerror(err)
}
}
}

function onResult (onerror) {
function onResult (client, onerror) {
return streamToBuffer.onStream(function (err, buf, res) {
if (err) return onerror(err)
if (res.statusCode < 200 || res.statusCode > 299) {
Expand All @@ -261,6 +318,9 @@ function onResult (onerror) {
}
}
onerror(err)
} else {
client._errors = 0
client._chopper.resetTimer(client._chopperTime)
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"fast-stream-to-buffer": "^1.0.0",
"pump": "^3.0.0",
"readable-stream": "^2.3.6",
"stream-chopper": "^1.1.1",
"stream-chopper": "^2.1.1",
"unicode-byte-truncate": "^1.0.0"
},
"devDependencies": {
Expand Down
193 changes: 193 additions & 0 deletions test/backoff.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
'use strict'

const crypto = require('crypto')
const test = require('tape')
const utils = require('./lib/utils')

const APMServer = utils.APMServer
const processReq = utils.processReq
const assertReq = utils.assertReq
const assertMetadata = utils.assertMetadata
const assertEvent = utils.assertEvent

const testCases = [
{
name: 'only one error',
expectedData: [
assertMetadata,
assertEvent({span: {req: 2}}),
assertMetadata,
assertEvent({span: {req: 3}}),
assertMetadata,
assertEvent({span: {req: 4}})
],
requests: [
[0, true], // no back-off in effect, request fails
[0, false], // back-off in effect, request succeeds
[0, false], // no back-off in effect, request succeeds
[0, false] // no back-off in effect, request succeeds
]
},
{
name: 'only two errors',
expectedData: [
assertMetadata,
assertEvent({span: {req: 3}}),
assertMetadata,
assertEvent({span: {req: 4}}),
assertMetadata,
assertEvent({span: {req: 5}})
],
requests: [
[0, true], // no back-off in effect, request fails
[0, true], // back-off in effect, request fails
[1, false], // back-off in effect, request succeeds
[0, false], // no back-off in effect, request succeeds
[0, false] // no back-off in effect, request succeeds
]
},
{
name: 'top out at full back-off',
expectedData: [
assertMetadata,
assertEvent({span: {req: 9}}),
assertMetadata,
assertEvent({span: {req: 10}}),
assertMetadata,
assertEvent({span: {req: 11}})
],
requests: [
[0, true], // no back-off in effect, request fails
[0, true], // back-off in effect, request fails
[1, true], // back-off in effect, request fails
[4, true], // back-off in effect, request fails
[9, true], // back-off in effect, request fails
[16, true], // back-off in effect, request fails
[25, true], // back-off in effect, request fails
[36, true], // back-off in effect, request fails
[36, false], // back-off in effect, request succeeds
[0, false], // no back-off in effect, request succeeds
[0, false] // no back-off in effect, request succeeds
]
}
]

testCases.forEach(function ({name, expectedData, requests}) {
test('backoff delays - ' + name, function (t) {
let reqNo = 0
let start, client

const server = APMServer(function (req, res) {
const diff = Date.now() - start
const backoffTime = requests[reqNo - 1][0] * 1000
t.ok(diff > backoffTime && diff < backoffTime + 200, `should delay request between ${backoffTime} and ${backoffTime + 200}ms (was delayed ${diff}ms)`)

if (requests[reqNo - 1][1] === true) {
res.writeHead(500)
res.end()
} else {
assertReq(t, req)
req = processReq(req)
req.on('data', function (obj) {
expectedData.shift()(t, obj)
})
req.on('end', function () {
res.end()
if (reqNo < 4) {
setTimeout(makeReq, 10)
} else {
t.equal(expectedData.length, 0, 'should have seen all expected data')
server.close()
t.end()
}
})
}
}).client({time: 1000}, function (_client) {
client = _client
let emittedErrors = 0

client.on('error', function (err) {
emittedErrors++
if (requests[reqNo - 1][1] === true) {
t.equal(err.message, 'Unexpected response code from APM Server: 500', 'client should emit error')
t.equal(client._errors, emittedErrors, 'client error count should have been incremented to ' + emittedErrors)
makeReq()
} else {
t.error(err)
}
})

makeReq()
})

function makeReq () {
client.sendSpan({req: ++reqNo})
start = Date.now()
client.flush()
}
})
})

test('backoff - dropping data', function (t) {
let start, timer
let reqNo = 0
const backoffTimes = [0, 0, 1, 0]

const server = APMServer(function (req, res) {
const diff = Date.now() - start
const backoffTime = backoffTimes.shift() * 1000
t.ok(diff > backoffTime && diff < backoffTime + 200, `should delay request between ${backoffTime} and ${backoffTime + 200}ms (was delayed ${diff}ms)`)

req = processReq(req)
req.on('data', function (obj) {
if ('metadata' in obj) return
t.equal(obj.span.req, reqNo, 'event belongs to expected request no ' + reqNo)
t.equal(obj.span.ok, true, 'expected the event to get sent')
})
req.on('end', function () {
if (reqNo <= 2) {
res.writeHead(500)
res.end()
} else {
clearTimeout(timer)
res.end()
server.close()
t.end()
}
})
}).client({size: 256, time: 500}, function (client) {
client.on('error', function (err) {
if (reqNo === 1) {
t.equal(err.message, 'Unexpected response code from APM Server: 500', 'client should emit error')
t.equal(client._errors, 1, 'client error count should have been incremented to 1')

client.sendSpan({req: ++reqNo, ok: true, filler: crypto.randomBytes(32).toString('hex')})
start = Date.now()
client.flush()
} else if (reqNo === 2) {
t.equal(err.message, 'Unexpected response code from APM Server: 500', 'client should emit error')
t.equal(client._errors, 2, 'client error count should have been incremented to 2')

reqNo++
start = Date.now()

// these will be dropped because they are too big to be cached before the backoff
client.sendSpan({req: reqNo, ok: false, filler: crypto.randomBytes(32).toString('hex')}) // will not overflow
client.sendSpan({req: reqNo, ok: false, filler: crypto.randomBytes(32).toString('hex')}) // will trigger overflow

// this will be the first to get through after the backoff
client.sendSpan({req: reqNo, ok: true, filler: crypto.randomBytes(32).toString('hex')})

timer = setTimeout(function () {
t.fail('took too long')
}, 2000)
} else {
t.error(err)
}
})

client.sendSpan({req: ++reqNo, ok: true, filler: crypto.randomBytes(32).toString('hex')})
start = Date.now()
client.flush()
})
})
4 changes: 2 additions & 2 deletions test/lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ function assertReq (t, req) {
assertReq.asserts = 7

function assertMetadata (t, obj) {
t.deepEqual(Object.keys(obj), ['metadata'])
t.deepEqual(Object.keys(obj), ['metadata'], 'should receive metadata')
const metadata = obj.metadata
t.deepEqual(Object.keys(metadata), ['service', 'process', 'system'])
const service = metadata.service
Expand All @@ -92,7 +92,7 @@ function assertMetadata (t, obj) {
t.ok(Array.isArray(_process.argv), 'process.title should be an array')
t.ok(_process.argv.length >= 2, 'process.title should contain at least two elements')
t.ok(/\/node$/.test(_process.argv[0]), `process.argv[0] should match /\\/node$/ (was: ${_process.argv[0]})`)
const regex = /(\/test\/(test|truncate|lib\/unref-client)\.js|node_modules\/\.bin\/tape)$/
const regex = /(\/test\/(test|backoff|truncate|lib\/unref-client)\.js|node_modules\/\.bin\/tape)$/
t.ok(regex.test(_process.argv[1]), `process.argv[1] should match ${regex} (was: ${_process.argv[1]})"`)
const system = metadata.system
t.ok(typeof system.hostname, 'string')
Expand Down

0 comments on commit ac56962

Please sign in to comment.