Skip to content

Commit

Permalink
Expose retryBackoff to make testing easier
Browse files Browse the repository at this point in the history
Very hard to mock a retry that uses setTimeout with random jitter
  • Loading branch information
JoshMock committed Jun 5, 2024
1 parent e8ce49c commit 3863134
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 26 deletions.
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"workq": "^3.0.0"
},
"dependencies": {
"@tapjs/clock": "^1.1.24",
"debug": "^4.3.4",
"hpagent": "^1.0.0",
"ms": "^2.1.3",
Expand All @@ -65,6 +66,9 @@
"undici": "^6.12.0"
},
"tap": {
"allow-incomplete-coverage": true
"allow-incomplete-coverage": true,
"plugin": [
"@tapjs/clock"
]
}
}
7 changes: 5 additions & 2 deletions src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ export interface TransportOptions {
accept?: string
}
redaction?: RedactionOptions
retryBackoff?: (min: number, max: number, attempt: number) => number
}

export interface TransportRequestParams {
Expand Down Expand Up @@ -164,6 +165,7 @@ export interface TransportRequestOptions {
*/
meta?: boolean
redaction?: RedactionOptions
retryBackoff?: (min: number, max: number, attempt: number) => number
}

export interface TransportRequestOptionsWithMeta extends TransportRequestOptions {
Expand Down Expand Up @@ -280,7 +282,7 @@ export default class Transport {
this[kNdjsonContentType] = opts.vendoredHeaders?.ndjsonContentType ?? 'application/x-ndjson'
this[kAcceptHeader] = opts.vendoredHeaders?.accept ?? 'application/json, text/plain'
this[kRedaction] = opts.redaction ?? { type: 'replace', additionalKeys: [] }
this[kRetryBackoff] = retryBackoff
this[kRetryBackoff] = opts.retryBackoff ?? retryBackoff

if (opts.sniffOnStart === true) {
this.sniff({
Expand Down Expand Up @@ -613,7 +615,8 @@ export default class Transport {
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)

// exponential backoff on retries, with jitter
const backoffWait = this[kRetryBackoff](0, 4, meta.attempts)
const backoff = options.retryBackoff ?? this[kRetryBackoff]
const backoffWait = backoff(0, 4, meta.attempts)
if (backoffWait > 0) {
await setTimeoutPromise(backoffWait * 1000)
}
Expand Down
81 changes: 58 additions & 23 deletions test/unit/transport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import {
errors
} from '../..'
import { connection, buildServer } from '../utils'
import { kRetryBackoff } from '../../src/symbols'

const { version: transportVersion } = require('../../package.json') // eslint-disable-line
const sleep = promisify(setTimeout)
Expand Down Expand Up @@ -107,8 +106,12 @@ test('Basic error (TimeoutError)', async t => {
const pool = new MyPool({ Connection: MockConnectionTimeout })
pool.addConnection('http://localhost:9200')

const transport = new Transport({ connectionPool: pool, maxRetries: 0, retryOnTimeout: true })
transport[kRetryBackoff] = () => 0
const transport = new Transport({
connectionPool: pool,
maxRetries: 0,
retryOnTimeout: true,
retryBackoff: () => 0,
})

try {
await transport.request({
Expand Down Expand Up @@ -137,8 +140,11 @@ test('Basic error (ConnectionError)', async t => {
const pool = new MyPool({ Connection: MockConnectionError })
pool.addConnection('http://localhost:9200')

const transport = new Transport({ connectionPool: pool, maxRetries: 0 })
transport[kRetryBackoff] = () => 0
const transport = new Transport({
connectionPool: pool,
maxRetries: 0,
retryBackoff: () => 0
})

try {
await transport.request({
Expand Down Expand Up @@ -710,13 +716,17 @@ test('Retry on connection error', async t => {
const pool = new WeightedConnectionPool({ Connection: MockConnectionError })
pool.addConnection('http://localhost:9200')

const transport = new Transport({ connectionPool: pool })
const transport = new Transport({
connectionPool: pool,
retryBackoff: () => 0,
})

try {
await transport.request({
const res = transport.request({
method: 'GET',
path: '/hello'
})
await res
} catch (err: any) {
t.ok(err instanceof ConnectionError)
t.equal(err.meta.meta.attempts, 3)
Expand All @@ -725,18 +735,25 @@ test('Retry on connection error', async t => {

test('Retry on timeout error if retryOnTimeout is true', async t => {
t.plan(2)
t.clock.enter()
t.teardown(() => t.clock.exit())

const pool = new WeightedConnectionPool({ Connection: MockConnectionTimeout })
pool.addConnection('http://localhost:9200')

const transport = new Transport({ connectionPool: pool, retryOnTimeout: true })
transport[kRetryBackoff] = () => 0
const transport = new Transport({
connectionPool: pool,
retryOnTimeout: true,
retryBackoff: () => 0
})

try {
await transport.request({
const res = transport.request({
method: 'GET',
path: '/hello'
})
t.clock.advance(4000)
await res
} catch (err: any) {
t.ok(err instanceof TimeoutError)
t.equal(err.meta.meta.attempts, 3)
Expand Down Expand Up @@ -1518,16 +1535,16 @@ test('Calls the sniff method on connection error', async t => {

const transport = new MyTransport({
connectionPool: pool,
sniffOnConnectionFault: true
sniffOnConnectionFault: true,
retryBackoff: () => 0
})
// skip sleep between retries
transport[kRetryBackoff] = () => 0

try {
await transport.request({
const res = transport.request({
method: 'GET',
path: '/hello'
})
await res
} catch (err: any) {
t.ok(err instanceof ConnectionError)
t.equal(err.meta.meta.attempts, 3)
Expand All @@ -1537,6 +1554,9 @@ test('Calls the sniff method on connection error', async t => {
test('Calls the sniff method on timeout error if retryOnTimeout is true', async t => {
t.plan(6)

t.clock.enter()
t.teardown(() => t.clock.exit())

class MyTransport extends Transport {
sniff (opts: SniffOptions): void {
t.equal(opts.reason, Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT)
Expand All @@ -1548,16 +1568,17 @@ test('Calls the sniff method on timeout error if retryOnTimeout is true', async
const transport = new MyTransport({
connectionPool: pool,
sniffOnConnectionFault: true,
retryOnTimeout: true
retryOnTimeout: true,
retryBackoff: () => 0,
})
// skip sleep between retries
transport[kRetryBackoff] = () => 0

try {
await transport.request({
const res = transport.request({
method: 'GET',
path: '/hello'
})
t.clock.advance(4000)
await res
} catch (err: any) {
t.ok(err instanceof TimeoutError)
t.equal(err.meta.meta.attempts, 3)
Expand All @@ -1583,6 +1604,8 @@ test('Sniff on start', async t => {

test('Sniff interval', async t => {
t.plan(5)
t.clock.enter()
t.teardown(() => t.clock.exit())

class MyTransport extends Transport {
sniff (opts: SniffOptions): void {
Expand All @@ -1597,26 +1620,38 @@ test('Sniff interval', async t => {
sniffInterval: 50
})

let res = await transport.request({
let promise = transport.request({
method: 'GET',
path: '/hello'
}, { meta: true })

t.clock.advance(4000)

let res = await promise
t.equal(res.statusCode, 200)

await sleep(80)
promise = sleep(80)
t.clock.advance(80)
await promise

res = await transport.request({
promise = transport.request({
method: 'GET',
path: '/hello'
}, { meta: true })
t.clock.advance(4000)
res = await promise
t.equal(res.statusCode, 200)

await sleep(80)
promise = sleep(80)
t.clock.advance(80)
await promise

res = await transport.request({
promise = transport.request({
method: 'GET',
path: '/hello'
}, { meta: true })
t.clock.advance(4000)
res = await promise
t.equal(res.statusCode, 200)
})

Expand Down

0 comments on commit 3863134

Please sign in to comment.