From 063a4bb394132693b5e17a09e6cb54b0b2e92dd4 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Tue, 21 May 2024 10:16:44 -0500 Subject: [PATCH] fix: Use an AbortSignal for per-request timeouts (#96) A potential fix for https://github.com/elastic/elastic-transport-js/issues/63, largely inspired by a community member's PR that was never merged: https://github.com/elastic/elastic-transport-js/pull/55 According to an Undici core committer in this comment https://github.com/elastic/elasticsearch-js/issues/1716#issuecomment-1167173492 the issue that triggers the MaxListenersExceededWarning, and possibly a memory leak in some cases, is caused by attaching an EventEmitter to each request by default when a per-request timeout is set, rather than attaching an AbortSignal. My assumption is that an EventEmitter was used because AbortSignal and AbortController were not added to Node.js until v14.17.0, so we couldn't guarantee v14 users would have it. I'm not certain why using EventEmitters makes a difference memory-wise, but it does get rid of the MaxListenersExceededWarning. --- src/connection/UndiciConnection.ts | 14 +++----------- src/symbols.ts | 1 - test/unit/undici-connection.test.ts | 20 -------------------- 3 files changed, 3 insertions(+), 32 deletions(-) diff --git a/src/connection/UndiciConnection.ts b/src/connection/UndiciConnection.ts index 412f39a..7606d6a 100644 --- a/src/connection/UndiciConnection.ts +++ b/src/connection/UndiciConnection.ts @@ -19,7 +19,6 @@ /* eslint-disable @typescript-eslint/restrict-template-expressions */ -import { EventEmitter } from 'events' import Debug from 'debug' import buffer from 'buffer' import { TLSSocket } from 'tls' @@ -41,7 +40,7 @@ import { TimeoutError } from '../errors' import { UndiciAgentOptions } from '../types' -import { kCaFingerprint, kEmitter } from '../symbols' +import { kCaFingerprint } from '../symbols' const debug = Debug('elasticsearch') const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/ @@ -50,7 +49,6 @@ const MAX_STRING_LENGTH = buffer.constants.MAX_STRING_LENGTH export default class Connection extends BaseConnection { pool: Pool - [kEmitter]: EventEmitter constructor (opts: ConnectionOptions) { super(opts) @@ -67,8 +65,6 @@ export default class Connection extends BaseConnection { throw new ConfigurationError('Bad agent configuration for Undici agent') } - this[kEmitter] = new EventEmitter() - this[kEmitter].setMaxListeners(this.maxEventListeners) const undiciOptions: Pool.Options = { keepAliveTimeout: 600e3, keepAliveMaxTimeout: 600e3, @@ -124,7 +120,7 @@ export default class Connection extends BaseConnection { path: params.path + (params.querystring == null || params.querystring === '' ? '' : `?${params.querystring}`), headers: Object.assign({}, this.headers, params.headers), body: params.body, - signal: options.signal ?? this[kEmitter] + signal: options.signal ?? new AbortController().signal } if (requestParams.path[0] !== '/') { @@ -141,11 +137,7 @@ export default class Connection extends BaseConnection { if (options.timeout != null && options.timeout !== this.timeout) { timeoutId = setTimeout(() => { timedout = true - if (options.signal != null) { - options.signal.dispatchEvent(new Event('abort')) - } else { - this[kEmitter].emit('abort') - } + requestParams.signal.dispatchEvent(new Event('abort')) }, options.timeout) } diff --git a/src/symbols.ts b/src/symbols.ts index 50c5e7b..b4c4101 100644 --- a/src/symbols.ts +++ b/src/symbols.ts @@ -38,7 +38,6 @@ export const kNodeFilter = Symbol('node filter') export const kNodeSelector = Symbol('node selector') export const kJsonOptions = Symbol('secure json parse options') export const kStatus = Symbol('status') -export const kEmitter = Symbol('event emitter') export const kProductCheck = Symbol('product check') export const kCaFingerprint = Symbol('ca fingerprint') export const kMaxResponseSize = Symbol('max response size') diff --git a/test/unit/undici-connection.test.ts b/test/unit/undici-connection.test.ts index 60e557c..7cdccd0 100644 --- a/test/unit/undici-connection.test.ts +++ b/test/unit/undici-connection.test.ts @@ -1064,26 +1064,6 @@ test('Path without intial slash', async t => { server.stop() }) -test('Should increase number of max event listeners', async t => { - t.plan(1) - - function handler (req: http.IncomingMessage, res: http.ServerResponse) { - res.end('ok') - } - - const [{ port }, server] = await buildServer(handler, { secure: true }) - const connection = new UndiciConnection({ - url: new URL(`https://localhost:${port}`), - maxEventListeners: 100, - }) - const res = await connection.request({ - path: '/hello', - method: 'GET' - }, options) - t.equal(res.body, 'ok') - server.stop() -}) - test('as stream', async t => { t.plan(2)