Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add exponential backoff to request retries #101

Merged
merged 5 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
},
"devDependencies": {
"@sinonjs/fake-timers": "github:sinonjs/fake-timers#0bfffc1",
"@tapjs/clock": "^1.1.24",
"@types/debug": "^4.1.7",
"@types/ms": "^0.7.31",
"@types/node": "^18.19.21",
Expand Down Expand Up @@ -65,6 +66,9 @@
"undici": "^6.12.0"
},
"tap": {
"allow-incomplete-coverage": true
"allow-incomplete-coverage": true,
"plugin": [
"@tapjs/clock"
]
}
}
30 changes: 29 additions & 1 deletion src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ import {
kJsonContentType,
kNdjsonContentType,
kAcceptHeader,
kRedaction
kRedaction,
kRetryBackoff
} from './symbols'
import { setTimeout as setTimeoutPromise } from 'node:timers/promises'

const { version: clientVersion } = require('../package.json') // eslint-disable-line
const debug = Debug('elasticsearch')
Expand Down Expand Up @@ -114,6 +116,7 @@ export interface TransportOptions {
accept?: string
}
redaction?: RedactionOptions
retryBackoff?: (min: number, max: number, attempt: number) => number
}

export interface TransportRequestParams {
Expand Down Expand Up @@ -162,6 +165,7 @@ export interface TransportRequestOptions {
*/
meta?: boolean
redaction?: RedactionOptions
retryBackoff?: (min: number, max: number, attempt: number) => number
}

export interface TransportRequestOptionsWithMeta extends TransportRequestOptions {
Expand Down Expand Up @@ -216,6 +220,7 @@ export default class Transport {
[kNdjsonContentType]: string
[kAcceptHeader]: string
[kRedaction]: RedactionOptions
[kRetryBackoff]: (min: number, max: number, attempt: number) => number

static sniffReasons = {
SNIFF_ON_START: 'sniff-on-start',
Expand Down Expand Up @@ -277,6 +282,7 @@ export default class Transport {
this[kNdjsonContentType] = opts.vendoredHeaders?.ndjsonContentType ?? 'application/x-ndjson'
this[kAcceptHeader] = opts.vendoredHeaders?.accept ?? 'application/json, text/plain'
this[kRedaction] = opts.redaction ?? { type: 'replace', additionalKeys: [] }
this[kRetryBackoff] = opts.retryBackoff ?? retryBackoff

if (opts.sniffOnStart === true) {
this.sniff({
Expand Down Expand Up @@ -607,6 +613,14 @@ export default class Transport {
if (meta.attempts < maxRetries) {
meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)

// exponential backoff on retries, with jitter
const backoff = options.retryBackoff ?? this[kRetryBackoff]
const backoffWait = backoff(0, 4, meta.attempts)
Copy link

@miguelgrinberg miguelgrinberg Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like your intention is not to expose this as a feature to developers, so maybe this isn't important right now, but would it make sense to future-proof this and make the min and max arguments also configurable, like the attempts are?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Nobody in the community asked for retry backoff, much less the ability to provide their own algorithm, so I think we can wait until someone actually asks for it?

if (backoffWait > 0) {
await setTimeoutPromise(backoffWait * 1000)
}

continue
}

Expand Down Expand Up @@ -701,3 +715,17 @@ export function lowerCaseHeaders (oldHeaders?: http.IncomingHttpHeaders): http.I
}
return newHeaders
}

/**
* Function for calculating how long to sleep, in seconds, before the next request retry
* Uses the AWS "equal jitter" algorithm noted in this post:
* https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
* @param min The minimum number of seconds to wait
* @param max The maximum number of seconds to wait
* @param attempt How many retry attempts have been made
* @returns The number of seconds to wait before the next retry
*/
function retryBackoff (min: number, max: number, attempt: number): number {
const ceiling = Math.min(max, 2 ** attempt) / 2
return ceiling + ((Math.random() * (ceiling - min)) + min)
}
7 changes: 2 additions & 5 deletions src/connection/HttpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {
RequestAbortedError,
TimeoutError
} from '../errors'
import { setTimeout as setTimeoutPromise } from 'timers/promises'
import { HttpAgentOptions } from '../types'

const debug = Debug('elasticsearch')
Expand Down Expand Up @@ -310,7 +311,7 @@ export default class HttpConnection extends BaseConnection {
async close (): Promise<void> {
debug('Closing connection', this.id)
while (this._openRequests > 0) {
await sleep(1000)
await setTimeoutPromise(1000)
}
/* istanbul ignore else */
if (this.agent !== undefined) {
Expand Down Expand Up @@ -387,7 +388,3 @@ function isHttpAgentOptions (opts: Record<string, any>): opts is HttpAgentOption
if (opts.connections != null) return false
return true
}

async function sleep (ms: number): Promise<unknown> {
return await new Promise((resolve) => setTimeout(resolve, ms))
}
1 change: 1 addition & 0 deletions src/symbols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ export const kJsonContentType = Symbol('json content type')
export const kNdjsonContentType = Symbol('ndjson content type')
export const kAcceptHeader = Symbol('accept header')
export const kRedaction = Symbol('redaction')
export const kRetryBackoff = Symbol('retry backoff')
75 changes: 58 additions & 17 deletions test/unit/transport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import { test } from 'tap'
import buffer from 'buffer'
// import { URL } from 'url'
// import FakeTimers from '@sinonjs/fake-timers'
import { promisify } from 'util'
import { Readable as ReadableStream } from 'stream'
import { gzipSync, deflateSync } from 'zlib'
Expand Down Expand Up @@ -108,7 +106,12 @@ test('Basic error (TimeoutError)', async t => {
const pool = new MyPool({ Connection: MockConnectionTimeout })
pool.addConnection('http://localhost:9200')

const transport = new Transport({ connectionPool: pool, maxRetries: 0, retryOnTimeout: true })
const transport = new Transport({
connectionPool: pool,
maxRetries: 0,
retryOnTimeout: true,
retryBackoff: () => 0,
})

try {
await transport.request({
Expand Down Expand Up @@ -137,7 +140,11 @@ test('Basic error (ConnectionError)', async t => {
const pool = new MyPool({ Connection: MockConnectionError })
pool.addConnection('http://localhost:9200')

const transport = new Transport({ connectionPool: pool, maxRetries: 0 })
const transport = new Transport({
connectionPool: pool,
maxRetries: 0,
retryBackoff: () => 0
})

try {
await transport.request({
Expand Down Expand Up @@ -709,13 +716,17 @@ test('Retry on connection error', async t => {
const pool = new WeightedConnectionPool({ Connection: MockConnectionError })
pool.addConnection('http://localhost:9200')

const transport = new Transport({ connectionPool: pool })
const transport = new Transport({
connectionPool: pool,
retryBackoff: () => 0,
})

try {
await transport.request({
const res = transport.request({
method: 'GET',
path: '/hello'
})
await res
} catch (err: any) {
t.ok(err instanceof ConnectionError)
t.equal(err.meta.meta.attempts, 3)
Expand All @@ -724,17 +735,25 @@ test('Retry on connection error', async t => {

test('Retry on timeout error if retryOnTimeout is true', async t => {
t.plan(2)
t.clock.enter()
t.teardown(() => t.clock.exit())

const pool = new WeightedConnectionPool({ Connection: MockConnectionTimeout })
pool.addConnection('http://localhost:9200')

const transport = new Transport({ connectionPool: pool, retryOnTimeout: true })
const transport = new Transport({
connectionPool: pool,
retryOnTimeout: true,
retryBackoff: () => 0
})

try {
await transport.request({
const res = transport.request({
method: 'GET',
path: '/hello'
})
t.clock.advance(4000)
await res
} catch (err: any) {
t.ok(err instanceof TimeoutError)
t.equal(err.meta.meta.attempts, 3)
Expand Down Expand Up @@ -1516,14 +1535,16 @@ test('Calls the sniff method on connection error', async t => {

const transport = new MyTransport({
connectionPool: pool,
sniffOnConnectionFault: true
sniffOnConnectionFault: true,
retryBackoff: () => 0
})

try {
await transport.request({
const res = transport.request({
method: 'GET',
path: '/hello'
})
await res
} catch (err: any) {
t.ok(err instanceof ConnectionError)
t.equal(err.meta.meta.attempts, 3)
Expand All @@ -1533,6 +1554,9 @@ test('Calls the sniff method on connection error', async t => {
test('Calls the sniff method on timeout error if retryOnTimeout is true', async t => {
t.plan(6)

t.clock.enter()
t.teardown(() => t.clock.exit())

class MyTransport extends Transport {
sniff (opts: SniffOptions): void {
t.equal(opts.reason, Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT)
Expand All @@ -1544,14 +1568,17 @@ test('Calls the sniff method on timeout error if retryOnTimeout is true', async
const transport = new MyTransport({
connectionPool: pool,
sniffOnConnectionFault: true,
retryOnTimeout: true
retryOnTimeout: true,
retryBackoff: () => 0,
})

try {
await transport.request({
const res = transport.request({
method: 'GET',
path: '/hello'
})
t.clock.advance(4000)
await res
} catch (err: any) {
t.ok(err instanceof TimeoutError)
t.equal(err.meta.meta.attempts, 3)
Expand All @@ -1577,6 +1604,8 @@ test('Sniff on start', async t => {

test('Sniff interval', async t => {
t.plan(5)
t.clock.enter()
t.teardown(() => t.clock.exit())

class MyTransport extends Transport {
sniff (opts: SniffOptions): void {
Expand All @@ -1591,26 +1620,38 @@ test('Sniff interval', async t => {
sniffInterval: 50
})

let res = await transport.request({
let promise = transport.request({
method: 'GET',
path: '/hello'
}, { meta: true })

t.clock.advance(4000)

let res = await promise
t.equal(res.statusCode, 200)

await sleep(80)
promise = sleep(80)
t.clock.advance(80)
await promise

res = await transport.request({
promise = transport.request({
method: 'GET',
path: '/hello'
}, { meta: true })
t.clock.advance(4000)
res = await promise
t.equal(res.statusCode, 200)

await sleep(80)
promise = sleep(80)
t.clock.advance(80)
await promise

res = await transport.request({
promise = transport.request({
method: 'GET',
path: '/hello'
}, { meta: true })
t.clock.advance(4000)
res = await promise
t.equal(res.statusCode, 200)
})

Expand Down
Loading