Skip to content

Commit

Permalink
feat: Add exponential backoff to request retries (#101)
Browse files Browse the repository at this point in the history
* Add exponential backoff to request retries

* Switch from sleep to setTimeout

* Expose retryBackoff to make testing easier

Very hard to mock a retry that uses setTimeout with random jitter

* Fix backoff calculation

Accidentally let the minimum be `min * -1` instead of `min`. Whoops!

* Move @tapjs/clock to dev dependencies
  • Loading branch information
JoshMock committed Jun 6, 2024
1 parent 5567584 commit 8fed57b
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 24 deletions.
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
},
"devDependencies": {
"@sinonjs/fake-timers": "github:sinonjs/fake-timers#0bfffc1",
"@tapjs/clock": "^1.1.24",
"@types/debug": "^4.1.7",
"@types/ms": "^0.7.31",
"@types/node": "^18.19.21",
Expand Down Expand Up @@ -65,6 +66,9 @@
"undici": "^6.12.0"
},
"tap": {
"allow-incomplete-coverage": true
"allow-incomplete-coverage": true,
"plugin": [
"@tapjs/clock"
]
}
}
30 changes: 29 additions & 1 deletion src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ import {
kJsonContentType,
kNdjsonContentType,
kAcceptHeader,
kRedaction
kRedaction,
kRetryBackoff
} from './symbols'
import { setTimeout as setTimeoutPromise } from 'node:timers/promises'

const { version: clientVersion } = require('../package.json') // eslint-disable-line
const debug = Debug('elasticsearch')
Expand Down Expand Up @@ -114,6 +116,7 @@ export interface TransportOptions {
accept?: string
}
redaction?: RedactionOptions
retryBackoff?: (min: number, max: number, attempt: number) => number
}

export interface TransportRequestParams {
Expand Down Expand Up @@ -162,6 +165,7 @@ export interface TransportRequestOptions {
*/
meta?: boolean
redaction?: RedactionOptions
retryBackoff?: (min: number, max: number, attempt: number) => number
}

export interface TransportRequestOptionsWithMeta extends TransportRequestOptions {
Expand Down Expand Up @@ -216,6 +220,7 @@ export default class Transport {
[kNdjsonContentType]: string
[kAcceptHeader]: string
[kRedaction]: RedactionOptions
[kRetryBackoff]: (min: number, max: number, attempt: number) => number

static sniffReasons = {
SNIFF_ON_START: 'sniff-on-start',
Expand Down Expand Up @@ -277,6 +282,7 @@ export default class Transport {
this[kNdjsonContentType] = opts.vendoredHeaders?.ndjsonContentType ?? 'application/x-ndjson'
this[kAcceptHeader] = opts.vendoredHeaders?.accept ?? 'application/json, text/plain'
this[kRedaction] = opts.redaction ?? { type: 'replace', additionalKeys: [] }
this[kRetryBackoff] = opts.retryBackoff ?? retryBackoff

if (opts.sniffOnStart === true) {
this.sniff({
Expand Down Expand Up @@ -607,6 +613,14 @@ export default class Transport {
if (meta.attempts < maxRetries) {
meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)

// exponential backoff on retries, with jitter
const backoff = options.retryBackoff ?? this[kRetryBackoff]
const backoffWait = backoff(0, 4, meta.attempts)
if (backoffWait > 0) {
await setTimeoutPromise(backoffWait * 1000)
}

continue
}

Expand Down Expand Up @@ -701,3 +715,17 @@ export function lowerCaseHeaders (oldHeaders?: http.IncomingHttpHeaders): http.I
}
return newHeaders
}

/**
* Function for calculating how long to sleep, in seconds, before the next request retry
* Uses the AWS "equal jitter" algorithm noted in this post:
* https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
* @param min The minimum number of seconds to wait
* @param max The maximum number of seconds to wait
* @param attempt How many retry attempts have been made
* @returns The number of seconds to wait before the next retry
*/
function retryBackoff (min: number, max: number, attempt: number): number {
const ceiling = Math.min(max, 2 ** attempt) / 2
return ceiling + ((Math.random() * (ceiling - min)) + min)
}
7 changes: 2 additions & 5 deletions src/connection/HttpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {
RequestAbortedError,
TimeoutError
} from '../errors'
import { setTimeout as setTimeoutPromise } from 'timers/promises'
import { HttpAgentOptions } from '../types'

const debug = Debug('elasticsearch')
Expand Down Expand Up @@ -310,7 +311,7 @@ export default class HttpConnection extends BaseConnection {
async close (): Promise<void> {
debug('Closing connection', this.id)
while (this._openRequests > 0) {
await sleep(1000)
await setTimeoutPromise(1000)
}
/* istanbul ignore else */
if (this.agent !== undefined) {
Expand Down Expand Up @@ -387,7 +388,3 @@ function isHttpAgentOptions (opts: Record<string, any>): opts is HttpAgentOption
if (opts.connections != null) return false
return true
}

async function sleep (ms: number): Promise<unknown> {
return await new Promise((resolve) => setTimeout(resolve, ms))
}
1 change: 1 addition & 0 deletions src/symbols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ export const kJsonContentType = Symbol('json content type')
export const kNdjsonContentType = Symbol('ndjson content type')
export const kAcceptHeader = Symbol('accept header')
export const kRedaction = Symbol('redaction')
export const kRetryBackoff = Symbol('retry backoff')
75 changes: 58 additions & 17 deletions test/unit/transport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import { test } from 'tap'
import buffer from 'buffer'
// import { URL } from 'url'
// import FakeTimers from '@sinonjs/fake-timers'
import { promisify } from 'util'
import { Readable as ReadableStream } from 'stream'
import { gzipSync, deflateSync } from 'zlib'
Expand Down Expand Up @@ -108,7 +106,12 @@ test('Basic error (TimeoutError)', async t => {
const pool = new MyPool({ Connection: MockConnectionTimeout })
pool.addConnection('http://localhost:9200')

const transport = new Transport({ connectionPool: pool, maxRetries: 0, retryOnTimeout: true })
const transport = new Transport({
connectionPool: pool,
maxRetries: 0,
retryOnTimeout: true,
retryBackoff: () => 0,
})

try {
await transport.request({
Expand Down Expand Up @@ -137,7 +140,11 @@ test('Basic error (ConnectionError)', async t => {
const pool = new MyPool({ Connection: MockConnectionError })
pool.addConnection('http://localhost:9200')

const transport = new Transport({ connectionPool: pool, maxRetries: 0 })
const transport = new Transport({
connectionPool: pool,
maxRetries: 0,
retryBackoff: () => 0
})

try {
await transport.request({
Expand Down Expand Up @@ -709,13 +716,17 @@ test('Retry on connection error', async t => {
const pool = new WeightedConnectionPool({ Connection: MockConnectionError })
pool.addConnection('http://localhost:9200')

const transport = new Transport({ connectionPool: pool })
const transport = new Transport({
connectionPool: pool,
retryBackoff: () => 0,
})

try {
await transport.request({
const res = transport.request({
method: 'GET',
path: '/hello'
})
await res
} catch (err: any) {
t.ok(err instanceof ConnectionError)
t.equal(err.meta.meta.attempts, 3)
Expand All @@ -724,17 +735,25 @@ test('Retry on connection error', async t => {

test('Retry on timeout error if retryOnTimeout is true', async t => {
t.plan(2)
t.clock.enter()
t.teardown(() => t.clock.exit())

const pool = new WeightedConnectionPool({ Connection: MockConnectionTimeout })
pool.addConnection('http://localhost:9200')

const transport = new Transport({ connectionPool: pool, retryOnTimeout: true })
const transport = new Transport({
connectionPool: pool,
retryOnTimeout: true,
retryBackoff: () => 0
})

try {
await transport.request({
const res = transport.request({
method: 'GET',
path: '/hello'
})
t.clock.advance(4000)
await res
} catch (err: any) {
t.ok(err instanceof TimeoutError)
t.equal(err.meta.meta.attempts, 3)
Expand Down Expand Up @@ -1516,14 +1535,16 @@ test('Calls the sniff method on connection error', async t => {

const transport = new MyTransport({
connectionPool: pool,
sniffOnConnectionFault: true
sniffOnConnectionFault: true,
retryBackoff: () => 0
})

try {
await transport.request({
const res = transport.request({
method: 'GET',
path: '/hello'
})
await res
} catch (err: any) {
t.ok(err instanceof ConnectionError)
t.equal(err.meta.meta.attempts, 3)
Expand All @@ -1533,6 +1554,9 @@ test('Calls the sniff method on connection error', async t => {
test('Calls the sniff method on timeout error if retryOnTimeout is true', async t => {
t.plan(6)

t.clock.enter()
t.teardown(() => t.clock.exit())

class MyTransport extends Transport {
sniff (opts: SniffOptions): void {
t.equal(opts.reason, Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT)
Expand All @@ -1544,14 +1568,17 @@ test('Calls the sniff method on timeout error if retryOnTimeout is true', async
const transport = new MyTransport({
connectionPool: pool,
sniffOnConnectionFault: true,
retryOnTimeout: true
retryOnTimeout: true,
retryBackoff: () => 0,
})

try {
await transport.request({
const res = transport.request({
method: 'GET',
path: '/hello'
})
t.clock.advance(4000)
await res
} catch (err: any) {
t.ok(err instanceof TimeoutError)
t.equal(err.meta.meta.attempts, 3)
Expand All @@ -1577,6 +1604,8 @@ test('Sniff on start', async t => {

test('Sniff interval', async t => {
t.plan(5)
t.clock.enter()
t.teardown(() => t.clock.exit())

class MyTransport extends Transport {
sniff (opts: SniffOptions): void {
Expand All @@ -1591,26 +1620,38 @@ test('Sniff interval', async t => {
sniffInterval: 50
})

let res = await transport.request({
let promise = transport.request({
method: 'GET',
path: '/hello'
}, { meta: true })

t.clock.advance(4000)

let res = await promise
t.equal(res.statusCode, 200)

await sleep(80)
promise = sleep(80)
t.clock.advance(80)
await promise

res = await transport.request({
promise = transport.request({
method: 'GET',
path: '/hello'
}, { meta: true })
t.clock.advance(4000)
res = await promise
t.equal(res.statusCode, 200)

await sleep(80)
promise = sleep(80)
t.clock.advance(80)
await promise

res = await transport.request({
promise = transport.request({
method: 'GET',
path: '/hello'
}, { meta: true })
t.clock.advance(4000)
res = await promise
t.equal(res.statusCode, 200)
})

Expand Down

0 comments on commit 8fed57b

Please sign in to comment.