From 89de5f781177a6d37ecaaa1a17c1a75f82652aad Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Tue, 6 Feb 2024 11:13:09 +1100 Subject: [PATCH 1/2] fix: addressed crashing due to socket send failures This commit addresses the send failures by taking any send specific errors and passing them back to the connection to be handled. Any errors such as bad arguments results in the connection throwing the problem proactivity. Any network failures are generally ignored and the Connections are left to time out. This allows for the network to drop for a short amount of time without failure of the connection and streams. [ci skip] --- src/QUICClient.ts | 88 +++++++++++-- src/QUICConnection.ts | 7 ++ src/QUICServer.ts | 54 ++++++-- src/errors.ts | 6 + src/events.ts | 4 + tests/QUICStream.test.ts | 263 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 405 insertions(+), 17 deletions(-) diff --git a/src/QUICClient.ts b/src/QUICClient.ts index 6c4b965f..7fde2cb2 100644 --- a/src/QUICClient.ts +++ b/src/QUICClient.ts @@ -247,8 +247,34 @@ class QUICClient { // the client, because the client bridges the push flow from the connection // to the socket. socket.connectionMap.set(connection.connectionId, connection); + // Set up intermediate abort signal + const abortController = new AbortController(); + const abortHandler = () => { + abortController.abort(ctx.signal.reason); + }; + if (ctx.signal.aborted) abortController.abort(ctx.signal.reason); + else ctx.signal.addEventListener('abort', abortHandler); + const handleEventQUICClientErrorSend = ( + evt: events.EventQUICClientErrorSend, + ) => { + // @ts-ignore: the error contains `code` but not part of the type + if (evt.detail.code === 'EINVAL') { + abortController.abort( + new errors.ErrorQUICClientInvalidArgument(undefined, { + cause: evt.detail, + }), + ); + } + }; + client.addEventListener( + `${events.EventQUICClientErrorSend.name}-${connection.sendId}`, + handleEventQUICClientErrorSend, + ); try { - await connection.start(undefined, ctx); + await connection.start(undefined, { + timer: ctx.timer, + signal: abortController.signal, + }); } catch (e) { socket.connectionMap.delete(connection.connectionId); socket.removeEventListener( @@ -284,6 +310,12 @@ class QUICClient { client.handleEventQUICClientClose, ); throw e; + } finally { + ctx.signal.removeEventListener('abort', abortHandler); + client.removeEventListener( + `${events.EventQUICClientErrorSend.name}-${connection.sendId}`, + handleEventQUICClientErrorSend, + ); } address = utils.buildAddress(host_, port); logger.info(`Created ${this.name} to ${address}`); @@ -299,6 +331,10 @@ class QUICClient { protected config: Config; protected _closed: boolean = false; protected resolveClosedP: () => void; + /** + * Flag used to make sure network fail warnings are only logged once per failure + */ + protected networkWarned: boolean = false; /** * Handles `EventQUICClientError`. @@ -458,15 +494,49 @@ class QUICClient { evt.detail.port, evt.detail.address, ); + this.networkWarned = false; } catch (e) { - const e_ = new errors.ErrorQUICClientInternal( - 'Failed to send data on the QUICSocket', - { - data: evt.detail, - cause: e, - }, - ); - this.dispatchEvent(new events.EventQUICClientError({ detail: e_ })); + switch (e.code) { + case 'EINVAL': + { + this.dispatchEvent( + new events.EventQUICClientErrorSend( + `${events.EventQUICClientErrorSend.name}-${evt.detail.id}`, + { + detail: e, + }, + ), + ); + } + break; + case 'ENETUNREACH': + { + // We consider this branch a temp failure. + // For these error codes we rely on the connection's timeout to handle. + if (!this.networkWarned) { + this.logger.warn( + `client send failed with 'ENETUNREACH', likely due to network failure`, + ); + this.networkWarned = true; + } + } + break; + default: + { + this.dispatchEvent( + new events.EventQUICClientError({ + detail: new errors.ErrorQUICClientInternal( + 'Failed to send data on the QUICSocket', + { + data: evt.detail, + cause: e, + }, + ), + }), + ); + } + break; + } } }; diff --git a/src/QUICConnection.ts b/src/QUICConnection.ts index 9db62530..9113d5be 100644 --- a/src/QUICConnection.ts +++ b/src/QUICConnection.ts @@ -62,6 +62,11 @@ class QUICConnection { */ public readonly streamMap: Map = new Map(); + /** + * Unique id used to identify events intended for this connection. + */ + public readonly sendId: string; + protected logger: Logger; protected socket: QUICSocket; protected config: QUICConfig; @@ -322,6 +327,7 @@ class QUICConnection { logger?: Logger; }) { this.logger = logger ?? new Logger(`${this.constructor.name} ${scid}`); + this.sendId = scid.toString(); if ( config.keepAliveIntervalTime != null && config.maxIdleTimeout !== 0 && @@ -876,6 +882,7 @@ class QUICConnection { this.dispatchEvent( new events.EventQUICConnectionSend({ detail: { + id: this.sendId, msg: sendBuffer.subarray(0, sendLength), port: sendInfo.to.port, address: sendInfo.to.host, diff --git a/src/QUICServer.ts b/src/QUICServer.ts index eac9f118..224cfbb2 100644 --- a/src/QUICServer.ts +++ b/src/QUICServer.ts @@ -62,6 +62,10 @@ class QUICServer { protected _closed: boolean = false; protected _closedP: Promise; protected resolveClosedP: () => void; + /** + * Flag used to make sure network fail warnings are only logged once per failure + */ + protected networkWarned: boolean = false; /** * Handles `EventQUICServerError`. @@ -195,15 +199,49 @@ class QUICServer { evt.detail.port, evt.detail.address, ); + this.networkWarned = false; } catch (e) { - const e_ = new errors.ErrorQUICServerInternal( - 'Failed to send data on the QUICSocket', - { - data: evt.detail, - cause: e, - }, - ); - this.dispatchEvent(new events.EventQUICServerError({ detail: e_ })); + switch (e.code) { + case 'EINVAL': + { + this.dispatchEvent( + new events.EventQUICClientErrorSend( + `${events.EventQUICClientErrorSend.name}-${evt.detail.id}`, + { + detail: e, + }, + ), + ); + } + break; + case 'ENETUNREACH': + { + // We consider this branch a temp failure. + // For these error codes we rely on the connection's timeout to handle. + if (!this.networkWarned) { + this.logger.warn( + `server send failed with 'ENETUNREACH', likely due to network failure`, + ); + this.networkWarned = true; + } + } + break; + default: + { + this.dispatchEvent( + new events.EventQUICServerError({ + detail: new errors.ErrorQUICServerInternal( + 'Failed to send data on the QUICSocket', + { + data: evt.detail, + cause: e, + }, + ), + }), + ); + } + break; + } } }; diff --git a/src/errors.ts b/src/errors.ts index 229b8707..fd802643 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -59,6 +59,11 @@ class ErrorQUICClientSocketNotRunning extends ErrorQUICClient { 'QUIC Client cannot be created with an unstarted shared QUIC socket'; } +class ErrorQUICClientInvalidArgument extends ErrorQUICClient { + static description = + 'QUIC Client had a failure relating to an invalid argument'; +} + class ErrorQUICClientInvalidHost extends ErrorQUICClient { static description = 'QUIC Client cannot be created with the specified host'; } @@ -293,6 +298,7 @@ export { ErrorQUICClientDestroyed, ErrorQUICClientCreateTimeout, ErrorQUICClientSocketNotRunning, + ErrorQUICClientInvalidArgument, ErrorQUICClientInvalidHost, ErrorQUICClientInternal, ErrorQUICServer, diff --git a/src/events.ts b/src/events.ts index f19fae4c..6f4c7dff 100644 --- a/src/events.ts +++ b/src/events.ts @@ -62,6 +62,8 @@ class EventQUICClientError extends EventQUICClient< | ErrorQUICConnectionInternal > {} +class EventQUICClientErrorSend extends EventQUICSocket {} + class EventQUICClientClose extends EventQUICClient< | ErrorQUICClientSocketNotRunning | ErrorQUICConnectionLocal @@ -126,6 +128,7 @@ class EventQUICConnectionClose extends EventQUICConnection< class EventQUICConnectionStream extends EventQUICConnection {} class EventQUICConnectionSend extends EventQUICConnection<{ + id: string; msg: Uint8Array; port: number; address: string; @@ -193,6 +196,7 @@ export { EventQUICClientDestroy, EventQUICClientDestroyed, EventQUICClientError, + EventQUICClientErrorSend, EventQUICClientClose, EventQUICServer, EventQUICServerStart, diff --git a/tests/QUICStream.test.ts b/tests/QUICStream.test.ts index d2d1342e..ee2ed0fd 100644 --- a/tests/QUICStream.test.ts +++ b/tests/QUICStream.test.ts @@ -2,6 +2,7 @@ import type { ClientCryptoOps, QUICConnection, ServerCryptoOps } from '@'; import Logger, { formatting, LogLevel, StreamHandler } from '@matrixai/logger'; import { destroyed } from '@matrixai/async-init'; import * as events from '@/events'; +import * as errors from '@/errors'; import * as utils from '@/utils'; import QUICServer from '@/QUICServer'; import QUICClient from '@/QUICClient'; @@ -1849,4 +1850,266 @@ describe(QUICStream.name, () => { await client.destroy({ force: true }); await server.stop({ force: true }); }); + test('invalid arguments causes `createQUICClient` to fail', async () => { + await expect( + QUICClient.createQUICClient({ + host: '123.123.123.123', // Invalid ip when bound to loopback + port: 55555, + localHost: localhost, + crypto: { + ops: clientCrypto, + }, + logger: logger.getChild(QUICClient.name), + config: { + verifyPeer: false, + }, + }), + ).rejects.toThrow(errors.ErrorQUICClientInvalidArgument); + }); + test('connections are tolerant to network failures', async () => { + const connectionEventProm = + utils.promise(); + const tlsConfig = await generateTLSConfig(defaultType); + const server = new QUICServer({ + crypto: { + key, + ops: serverCrypto, + }, + logger: logger.getChild(QUICServer.name), + config: { + key: tlsConfig.leafKeyPairPEM.privateKey, + cert: tlsConfig.leafCertPEM, + verifyPeer: false, + }, + }); + socketCleanMethods.extractSocket(server); + server.addEventListener( + events.EventQUICServerConnection.name, + (e: events.EventQUICServerConnection) => connectionEventProm.resolveP(e), + ); + await server.start({ + host: localhost, + }); + const client = await QUICClient.createQUICClient({ + host: localhost, + port: server.port, + localHost: '192.168.56.1', + crypto: { + ops: clientCrypto, + }, + logger: logger.getChild(QUICClient.name), + config: { + verifyPeer: false, + }, + }); + socketCleanMethods.extractSocket(client); + // @ts-ignore: kidnap protected properties + const mockedSendClient = jest.spyOn(client.socket, 'send_'); + // @ts-ignore: kidnap protected properties + const mockedSendServer = jest.spyOn(server.socket, 'send_'); + + const conn = (await connectionEventProm.p).detail; + // Do the test + const activeServerStreams: Array> = []; + conn.addEventListener( + events.EventQUICConnectionStream.name, + (streamEvent: events.EventQUICConnectionStream) => { + const stream = streamEvent.detail; + const streamProm = stream.readable.pipeTo(stream.writable); + activeServerStreams.push(streamProm); + }, + ); + + const stream = client.connection.newStream(); + const writer = stream.writable.getWriter(); + const backgroundReadP = (async () => { + let acc: string = ''; + for await (const message of stream.readable) { + acc += message.toString(); + } + return acc.split('message').length - 1; + })(); + + // Do write and read messages here. + for (let j = 0; j < 10; j++) { + await writer.write(Buffer.from(`message${j}`)); + } + + /* + // replicating this error + + Error: send ENETUNREACH ::ffff:13.54.214.222:1314 + at doSend (node:dgram:716:16) + at defaultTriggerAsyncIdScope (node:internal/async_hooks:463:18) + at afterDns (node:dgram:662:5) + at processTicksAndRejections (node:internal/process/task_queues:83:21) { + errno: -101, + code: 'ENETUNREACH', + syscall: 'send', + address: '::ffff:13.54.214.222', + port: 1314 + } + */ + class FakeError extends Error { + constructor( + public address: string, + public port: number, + ) { + super(`send ENETUNREACH ${address}:${port}`); + } + public errorno = -101; + public code = 'ENETUNREACH'; + public syscall = 'send'; + } + const fakeErrorClient = new FakeError(localhost, server.port); + const fakeErrorServer = new FakeError(localhost, client.localPort); + // Make the send fail 10 times + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + // Same for the server + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + + // Send another 20 messages + for (let j = 0; j < 20; j++) { + await writer.write(Buffer.from(`message${j + 10}`)); + } + await writer.close(); + // Expect 30 fully formed messages + await expect(backgroundReadP).resolves.toBe(30); + + await Promise.all(activeServerStreams); + await client.destroy({ force: true }); + await server.stop({ force: true }); + }); + test('connections timeout if network fails', async () => { + const connectionEventProm = + utils.promise(); + const tlsConfig = await generateTLSConfig(defaultType); + const server = new QUICServer({ + crypto: { + key, + ops: serverCrypto, + }, + logger: logger.getChild(QUICServer.name), + config: { + key: tlsConfig.leafKeyPairPEM.privateKey, + cert: tlsConfig.leafCertPEM, + verifyPeer: false, + }, + }); + socketCleanMethods.extractSocket(server); + server.addEventListener( + events.EventQUICServerConnection.name, + (e: events.EventQUICServerConnection) => connectionEventProm.resolveP(e), + ); + await server.start({ + host: localhost, + }); + const client = await QUICClient.createQUICClient({ + host: localhost, + port: server.port, + localHost: '192.168.56.1', + crypto: { + ops: clientCrypto, + }, + logger: logger.getChild(QUICClient.name), + config: { + maxIdleTimeout: 1000, + verifyPeer: false, + }, + }); + socketCleanMethods.extractSocket(client); + // @ts-ignore: kidnap protected properties + const mockedSendClient = jest.spyOn(client.socket, 'send_'); + // @ts-ignore: kidnap protected properties + const mockedSendServer = jest.spyOn(server.socket, 'send_'); + + const conn = (await connectionEventProm.p).detail; + // Do the test + let activeServerStreamP: Promise | undefined = undefined; + conn.addEventListener( + events.EventQUICConnectionStream.name, + async (streamEvent: events.EventQUICConnectionStream) => { + const stream = streamEvent.detail; + const streamP = stream.readable.pipeTo(stream.writable); + void streamP.catch(() => {}); + activeServerStreamP = streamP; + }, + ); + const stream = client.connection.newStream(); + const writer = stream.writable.getWriter(); + const backgroundReadP = (async () => { + let acc: string = ''; + for await (const message of stream.readable) { + acc += message.toString(); + } + return acc.split('message').length - 1; + })(); + + // Do write and read messages here. + await writer.write(Buffer.from(`first message`)); + + /* + // replicating this error + + Error: send ENETUNREACH ::ffff:13.54.214.222:1314 + at doSend (node:dgram:716:16) + at defaultTriggerAsyncIdScope (node:internal/async_hooks:463:18) + at afterDns (node:dgram:662:5) + at processTicksAndRejections (node:internal/process/task_queues:83:21) { + errno: -101, + code: 'ENETUNREACH', + syscall: 'send', + address: '::ffff:13.54.214.222', + port: 1314 + } + */ + class FakeError extends Error { + constructor( + public address: string, + public port: number, + ) { + super(`send ENETUNREACH ${address}:${port}`); + } + public errorno = -101; + public code = 'ENETUNREACH'; + public syscall = 'send'; + } + // Make the send fail 10 times + mockedSendClient.mockRejectedValue(new FakeError(localhost, server.port)); + // Same for the server + mockedSendServer.mockRejectedValue( + new FakeError(localhost, client.localPort), + ); + await writer.write(Buffer.from(`second message`)); + // Expect both sides to time out + await expect(backgroundReadP).rejects.toThrow( + errors.ErrorQUICConnectionIdleTimeout, + ); + await expect(activeServerStreamP).rejects.toThrow( + errors.ErrorQUICConnectionIdleTimeout, + ); + await expect(writer.write(Buffer.from('fail message'))).rejects.toThrow( + errors.ErrorQUICConnectionIdleTimeout, + ); + await client.destroy({ force: true }); + await server.stop({ force: true }); + }); }); From 6b6249182d55cdaed7087f5d465b8ae71103a269 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Fri, 9 Feb 2024 11:20:34 +1100 Subject: [PATCH 2/2] fix: fixed edge cases leading to failures in ci There were a few things here. For starters some of the new tests had some dumb config. A mistake on my part. The other part is that the `send` throws different errors depending on platform and context. I could only reasonably track it down based on testing in the CI... There are 3 main errors we're sending back to the connection to be dealt with. --- src/QUICClient.ts | 49 +++++++++++++++++++++------------------- src/QUICServer.ts | 24 ++++++-------------- tests/QUICStream.test.ts | 4 ++-- 3 files changed, 35 insertions(+), 42 deletions(-) diff --git a/src/QUICClient.ts b/src/QUICClient.ts index 7fde2cb2..25103d9a 100644 --- a/src/QUICClient.ts +++ b/src/QUICClient.ts @@ -258,12 +258,25 @@ class QUICClient { evt: events.EventQUICClientErrorSend, ) => { // @ts-ignore: the error contains `code` but not part of the type - if (evt.detail.code === 'EINVAL') { - abortController.abort( - new errors.ErrorQUICClientInvalidArgument(undefined, { - cause: evt.detail, - }), - ); + const code = evt.detail.code; + switch (code) { + // Thrown due to invalid arguments on linux + case 'EINVAL': + // Thrown due to invalid arguments on macOS + // Falls through + case 'EADDRNOTAVAIL': + // Thrown due to invalid arguments on Win but also for network dropouts on all platforms + // Falls through + case 'ENETUNREACH': + { + abortController.abort( + new errors.ErrorQUICClientInvalidArgument(undefined, { + cause: evt.detail, + }), + ); + } + break; + default: // Do nothing } }; client.addEventListener( @@ -331,10 +344,6 @@ class QUICClient { protected config: Config; protected _closed: boolean = false; protected resolveClosedP: () => void; - /** - * Flag used to make sure network fail warnings are only logged once per failure - */ - protected networkWarned: boolean = false; /** * Handles `EventQUICClientError`. @@ -494,10 +503,16 @@ class QUICClient { evt.detail.port, evt.detail.address, ); - this.networkWarned = false; } catch (e) { switch (e.code) { + // Thrown due to invalid arguments on linux case 'EINVAL': + // Thrown due to invalid arguments on macOS + // Falls through + case 'EADDRNOTAVAIL': + // Thrown due to invalid arguments on Win but also for network dropouts on all platforms + // Falls through + case 'ENETUNREACH': { this.dispatchEvent( new events.EventQUICClientErrorSend( @@ -509,18 +524,6 @@ class QUICClient { ); } break; - case 'ENETUNREACH': - { - // We consider this branch a temp failure. - // For these error codes we rely on the connection's timeout to handle. - if (!this.networkWarned) { - this.logger.warn( - `client send failed with 'ENETUNREACH', likely due to network failure`, - ); - this.networkWarned = true; - } - } - break; default: { this.dispatchEvent( diff --git a/src/QUICServer.ts b/src/QUICServer.ts index 224cfbb2..f6aeb154 100644 --- a/src/QUICServer.ts +++ b/src/QUICServer.ts @@ -62,10 +62,6 @@ class QUICServer { protected _closed: boolean = false; protected _closedP: Promise; protected resolveClosedP: () => void; - /** - * Flag used to make sure network fail warnings are only logged once per failure - */ - protected networkWarned: boolean = false; /** * Handles `EventQUICServerError`. @@ -199,10 +195,16 @@ class QUICServer { evt.detail.port, evt.detail.address, ); - this.networkWarned = false; } catch (e) { switch (e.code) { + // Thrown due to invalid arguments on linux case 'EINVAL': + // Thrown due to invalid arguments on macOS + // Falls through + case 'EADDRNOTAVAIL': + // Thrown due to invalid arguments on Win but also for network dropouts on all platforms + // Falls through + case 'ENETUNREACH': { this.dispatchEvent( new events.EventQUICClientErrorSend( @@ -214,18 +216,6 @@ class QUICServer { ); } break; - case 'ENETUNREACH': - { - // We consider this branch a temp failure. - // For these error codes we rely on the connection's timeout to handle. - if (!this.networkWarned) { - this.logger.warn( - `server send failed with 'ENETUNREACH', likely due to network failure`, - ); - this.networkWarned = true; - } - } - break; default: { this.dispatchEvent( diff --git a/tests/QUICStream.test.ts b/tests/QUICStream.test.ts index ee2ed0fd..68be4b30 100644 --- a/tests/QUICStream.test.ts +++ b/tests/QUICStream.test.ts @@ -1893,7 +1893,7 @@ describe(QUICStream.name, () => { const client = await QUICClient.createQUICClient({ host: localhost, port: server.port, - localHost: '192.168.56.1', + localHost: localhost, crypto: { ops: clientCrypto, }, @@ -2025,7 +2025,7 @@ describe(QUICStream.name, () => { const client = await QUICClient.createQUICClient({ host: localhost, port: server.port, - localHost: '192.168.56.1', + localHost: localhost, crypto: { ops: clientCrypto, },