diff --git a/package.json b/package.json index dd9daf1..14437b7 100644 --- a/package.json +++ b/package.json @@ -38,6 +38,7 @@ }, "devDependencies": { "@sinonjs/fake-timers": "github:sinonjs/fake-timers#0bfffc1", + "@tapjs/clock": "^1.1.24", "@types/debug": "^4.1.7", "@types/ms": "^0.7.31", "@types/node": "^18.19.21", @@ -65,6 +66,9 @@ "undici": "^6.12.0" }, "tap": { - "allow-incomplete-coverage": true + "allow-incomplete-coverage": true, + "plugin": [ + "@tapjs/clock" + ] } } diff --git a/src/Transport.ts b/src/Transport.ts index 6340533..8433de2 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -75,8 +75,10 @@ import { kJsonContentType, kNdjsonContentType, kAcceptHeader, - kRedaction + kRedaction, + kRetryBackoff } from './symbols' +import { setTimeout as setTimeoutPromise } from 'node:timers/promises' const { version: clientVersion } = require('../package.json') // eslint-disable-line const debug = Debug('elasticsearch') @@ -114,6 +116,7 @@ export interface TransportOptions { accept?: string } redaction?: RedactionOptions + retryBackoff?: (min: number, max: number, attempt: number) => number } export interface TransportRequestParams { @@ -162,6 +165,7 @@ export interface TransportRequestOptions { */ meta?: boolean redaction?: RedactionOptions + retryBackoff?: (min: number, max: number, attempt: number) => number } export interface TransportRequestOptionsWithMeta extends TransportRequestOptions { @@ -216,6 +220,7 @@ export default class Transport { [kNdjsonContentType]: string [kAcceptHeader]: string [kRedaction]: RedactionOptions + [kRetryBackoff]: (min: number, max: number, attempt: number) => number static sniffReasons = { SNIFF_ON_START: 'sniff-on-start', @@ -277,6 +282,7 @@ export default class Transport { this[kNdjsonContentType] = opts.vendoredHeaders?.ndjsonContentType ?? 'application/x-ndjson' this[kAcceptHeader] = opts.vendoredHeaders?.accept ?? 'application/json, text/plain' this[kRedaction] = opts.redaction ?? { type: 'replace', additionalKeys: [] } + this[kRetryBackoff] = opts.retryBackoff ?? retryBackoff if (opts.sniffOnStart === true) { this.sniff({ @@ -607,6 +613,14 @@ export default class Transport { if (meta.attempts < maxRetries) { meta.attempts++ debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params) + + // exponential backoff on retries, with jitter + const backoff = options.retryBackoff ?? this[kRetryBackoff] + const backoffWait = backoff(0, 4, meta.attempts) + if (backoffWait > 0) { + await setTimeoutPromise(backoffWait * 1000) + } + continue } @@ -701,3 +715,17 @@ export function lowerCaseHeaders (oldHeaders?: http.IncomingHttpHeaders): http.I } return newHeaders } + +/** + * Function for calculating how long to sleep, in seconds, before the next request retry + * Uses the AWS "equal jitter" algorithm noted in this post: + * https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ + * @param min The minimum number of seconds to wait + * @param max The maximum number of seconds to wait + * @param attempt How many retry attempts have been made + * @returns The number of seconds to wait before the next retry + */ +function retryBackoff (min: number, max: number, attempt: number): number { + const ceiling = Math.min(max, 2 ** attempt) / 2 + return ceiling + ((Math.random() * (ceiling - min)) + min) +} diff --git a/src/connection/HttpConnection.ts b/src/connection/HttpConnection.ts index 57fdd12..6e67b48 100644 --- a/src/connection/HttpConnection.ts +++ b/src/connection/HttpConnection.ts @@ -42,6 +42,7 @@ import { RequestAbortedError, TimeoutError } from '../errors' +import { setTimeout as setTimeoutPromise } from 'timers/promises' import { HttpAgentOptions } from '../types' const debug = Debug('elasticsearch') @@ -310,7 +311,7 @@ export default class HttpConnection extends BaseConnection { async close (): Promise { debug('Closing connection', this.id) while (this._openRequests > 0) { - await sleep(1000) + await setTimeoutPromise(1000) } /* istanbul ignore else */ if (this.agent !== undefined) { @@ -387,7 +388,3 @@ function isHttpAgentOptions (opts: Record): opts is HttpAgentOption if (opts.connections != null) return false return true } - -async function sleep (ms: number): Promise { - return await new Promise((resolve) => setTimeout(resolve, ms)) -} diff --git a/src/symbols.ts b/src/symbols.ts index c8de226..a6ae316 100644 --- a/src/symbols.ts +++ b/src/symbols.ts @@ -47,3 +47,4 @@ export const kJsonContentType = Symbol('json content type') export const kNdjsonContentType = Symbol('ndjson content type') export const kAcceptHeader = Symbol('accept header') export const kRedaction = Symbol('redaction') +export const kRetryBackoff = Symbol('retry backoff') diff --git a/test/unit/transport.test.ts b/test/unit/transport.test.ts index 87234cb..b6a668b 100644 --- a/test/unit/transport.test.ts +++ b/test/unit/transport.test.ts @@ -19,8 +19,6 @@ import { test } from 'tap' import buffer from 'buffer' -// import { URL } from 'url' -// import FakeTimers from '@sinonjs/fake-timers' import { promisify } from 'util' import { Readable as ReadableStream } from 'stream' import { gzipSync, deflateSync } from 'zlib' @@ -108,7 +106,12 @@ test('Basic error (TimeoutError)', async t => { const pool = new MyPool({ Connection: MockConnectionTimeout }) pool.addConnection('http://localhost:9200') - const transport = new Transport({ connectionPool: pool, maxRetries: 0, retryOnTimeout: true }) + const transport = new Transport({ + connectionPool: pool, + maxRetries: 0, + retryOnTimeout: true, + retryBackoff: () => 0, + }) try { await transport.request({ @@ -137,7 +140,11 @@ test('Basic error (ConnectionError)', async t => { const pool = new MyPool({ Connection: MockConnectionError }) pool.addConnection('http://localhost:9200') - const transport = new Transport({ connectionPool: pool, maxRetries: 0 }) + const transport = new Transport({ + connectionPool: pool, + maxRetries: 0, + retryBackoff: () => 0 + }) try { await transport.request({ @@ -709,13 +716,17 @@ test('Retry on connection error', async t => { const pool = new WeightedConnectionPool({ Connection: MockConnectionError }) pool.addConnection('http://localhost:9200') - const transport = new Transport({ connectionPool: pool }) + const transport = new Transport({ + connectionPool: pool, + retryBackoff: () => 0, + }) try { - await transport.request({ + const res = transport.request({ method: 'GET', path: '/hello' }) + await res } catch (err: any) { t.ok(err instanceof ConnectionError) t.equal(err.meta.meta.attempts, 3) @@ -724,17 +735,25 @@ test('Retry on connection error', async t => { test('Retry on timeout error if retryOnTimeout is true', async t => { t.plan(2) + t.clock.enter() + t.teardown(() => t.clock.exit()) const pool = new WeightedConnectionPool({ Connection: MockConnectionTimeout }) pool.addConnection('http://localhost:9200') - const transport = new Transport({ connectionPool: pool, retryOnTimeout: true }) + const transport = new Transport({ + connectionPool: pool, + retryOnTimeout: true, + retryBackoff: () => 0 + }) try { - await transport.request({ + const res = transport.request({ method: 'GET', path: '/hello' }) + t.clock.advance(4000) + await res } catch (err: any) { t.ok(err instanceof TimeoutError) t.equal(err.meta.meta.attempts, 3) @@ -1516,14 +1535,16 @@ test('Calls the sniff method on connection error', async t => { const transport = new MyTransport({ connectionPool: pool, - sniffOnConnectionFault: true + sniffOnConnectionFault: true, + retryBackoff: () => 0 }) try { - await transport.request({ + const res = transport.request({ method: 'GET', path: '/hello' }) + await res } catch (err: any) { t.ok(err instanceof ConnectionError) t.equal(err.meta.meta.attempts, 3) @@ -1533,6 +1554,9 @@ test('Calls the sniff method on connection error', async t => { test('Calls the sniff method on timeout error if retryOnTimeout is true', async t => { t.plan(6) + t.clock.enter() + t.teardown(() => t.clock.exit()) + class MyTransport extends Transport { sniff (opts: SniffOptions): void { t.equal(opts.reason, Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT) @@ -1544,14 +1568,17 @@ test('Calls the sniff method on timeout error if retryOnTimeout is true', async const transport = new MyTransport({ connectionPool: pool, sniffOnConnectionFault: true, - retryOnTimeout: true + retryOnTimeout: true, + retryBackoff: () => 0, }) try { - await transport.request({ + const res = transport.request({ method: 'GET', path: '/hello' }) + t.clock.advance(4000) + await res } catch (err: any) { t.ok(err instanceof TimeoutError) t.equal(err.meta.meta.attempts, 3) @@ -1577,6 +1604,8 @@ test('Sniff on start', async t => { test('Sniff interval', async t => { t.plan(5) + t.clock.enter() + t.teardown(() => t.clock.exit()) class MyTransport extends Transport { sniff (opts: SniffOptions): void { @@ -1591,26 +1620,38 @@ test('Sniff interval', async t => { sniffInterval: 50 }) - let res = await transport.request({ + let promise = transport.request({ method: 'GET', path: '/hello' }, { meta: true }) + + t.clock.advance(4000) + + let res = await promise t.equal(res.statusCode, 200) - await sleep(80) + promise = sleep(80) + t.clock.advance(80) + await promise - res = await transport.request({ + promise = transport.request({ method: 'GET', path: '/hello' }, { meta: true }) + t.clock.advance(4000) + res = await promise t.equal(res.statusCode, 200) - await sleep(80) + promise = sleep(80) + t.clock.advance(80) + await promise - res = await transport.request({ + promise = transport.request({ method: 'GET', path: '/hello' }, { meta: true }) + t.clock.advance(4000) + res = await promise t.equal(res.statusCode, 200) })