diff --git a/src/QUICClient.ts b/src/QUICClient.ts index 6c4b965f..25103d9a 100644 --- a/src/QUICClient.ts +++ b/src/QUICClient.ts @@ -247,8 +247,47 @@ class QUICClient { // the client, because the client bridges the push flow from the connection // to the socket. socket.connectionMap.set(connection.connectionId, connection); + // Set up intermediate abort signal + const abortController = new AbortController(); + const abortHandler = () => { + abortController.abort(ctx.signal.reason); + }; + if (ctx.signal.aborted) abortController.abort(ctx.signal.reason); + else ctx.signal.addEventListener('abort', abortHandler); + const handleEventQUICClientErrorSend = ( + evt: events.EventQUICClientErrorSend, + ) => { + // @ts-ignore: the error contains `code` but not part of the type + const code = evt.detail.code; + switch (code) { + // Thrown due to invalid arguments on linux + case 'EINVAL': + // Thrown due to invalid arguments on macOS + // Falls through + case 'EADDRNOTAVAIL': + // Thrown due to invalid arguments on Win but also for network dropouts on all platforms + // Falls through + case 'ENETUNREACH': + { + abortController.abort( + new errors.ErrorQUICClientInvalidArgument(undefined, { + cause: evt.detail, + }), + ); + } + break; + default: // Do nothing + } + }; + client.addEventListener( + `${events.EventQUICClientErrorSend.name}-${connection.sendId}`, + handleEventQUICClientErrorSend, + ); try { - await connection.start(undefined, ctx); + await connection.start(undefined, { + timer: ctx.timer, + signal: abortController.signal, + }); } catch (e) { socket.connectionMap.delete(connection.connectionId); socket.removeEventListener( @@ -284,6 +323,12 @@ class QUICClient { client.handleEventQUICClientClose, ); throw e; + } finally { + ctx.signal.removeEventListener('abort', abortHandler); + client.removeEventListener( + `${events.EventQUICClientErrorSend.name}-${connection.sendId}`, + handleEventQUICClientErrorSend, + ); } address = utils.buildAddress(host_, port); logger.info(`Created ${this.name} to ${address}`); @@ -459,14 +504,42 @@ class QUICClient { evt.detail.address, ); } catch (e) { - const e_ = new errors.ErrorQUICClientInternal( - 'Failed to send data on the QUICSocket', - { - data: evt.detail, - cause: e, - }, - ); - this.dispatchEvent(new events.EventQUICClientError({ detail: e_ })); + switch (e.code) { + // Thrown due to invalid arguments on linux + case 'EINVAL': + // Thrown due to invalid arguments on macOS + // Falls through + case 'EADDRNOTAVAIL': + // Thrown due to invalid arguments on Win but also for network dropouts on all platforms + // Falls through + case 'ENETUNREACH': + { + this.dispatchEvent( + new events.EventQUICClientErrorSend( + `${events.EventQUICClientErrorSend.name}-${evt.detail.id}`, + { + detail: e, + }, + ), + ); + } + break; + default: + { + this.dispatchEvent( + new events.EventQUICClientError({ + detail: new errors.ErrorQUICClientInternal( + 'Failed to send data on the QUICSocket', + { + data: evt.detail, + cause: e, + }, + ), + }), + ); + } + break; + } } }; diff --git a/src/QUICConnection.ts b/src/QUICConnection.ts index 9db62530..9113d5be 100644 --- a/src/QUICConnection.ts +++ b/src/QUICConnection.ts @@ -62,6 +62,11 @@ class QUICConnection { */ public readonly streamMap: Map = new Map(); + /** + * Unique id used to identify events intended for this connection. + */ + public readonly sendId: string; + protected logger: Logger; protected socket: QUICSocket; protected config: QUICConfig; @@ -322,6 +327,7 @@ class QUICConnection { logger?: Logger; }) { this.logger = logger ?? new Logger(`${this.constructor.name} ${scid}`); + this.sendId = scid.toString(); if ( config.keepAliveIntervalTime != null && config.maxIdleTimeout !== 0 && @@ -876,6 +882,7 @@ class QUICConnection { this.dispatchEvent( new events.EventQUICConnectionSend({ detail: { + id: this.sendId, msg: sendBuffer.subarray(0, sendLength), port: sendInfo.to.port, address: sendInfo.to.host, diff --git a/src/QUICServer.ts b/src/QUICServer.ts index eac9f118..f6aeb154 100644 --- a/src/QUICServer.ts +++ b/src/QUICServer.ts @@ -196,14 +196,42 @@ class QUICServer { evt.detail.address, ); } catch (e) { - const e_ = new errors.ErrorQUICServerInternal( - 'Failed to send data on the QUICSocket', - { - data: evt.detail, - cause: e, - }, - ); - this.dispatchEvent(new events.EventQUICServerError({ detail: e_ })); + switch (e.code) { + // Thrown due to invalid arguments on linux + case 'EINVAL': + // Thrown due to invalid arguments on macOS + // Falls through + case 'EADDRNOTAVAIL': + // Thrown due to invalid arguments on Win but also for network dropouts on all platforms + // Falls through + case 'ENETUNREACH': + { + this.dispatchEvent( + new events.EventQUICClientErrorSend( + `${events.EventQUICClientErrorSend.name}-${evt.detail.id}`, + { + detail: e, + }, + ), + ); + } + break; + default: + { + this.dispatchEvent( + new events.EventQUICServerError({ + detail: new errors.ErrorQUICServerInternal( + 'Failed to send data on the QUICSocket', + { + data: evt.detail, + cause: e, + }, + ), + }), + ); + } + break; + } } }; diff --git a/src/errors.ts b/src/errors.ts index 229b8707..fd802643 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -59,6 +59,11 @@ class ErrorQUICClientSocketNotRunning extends ErrorQUICClient { 'QUIC Client cannot be created with an unstarted shared QUIC socket'; } +class ErrorQUICClientInvalidArgument extends ErrorQUICClient { + static description = + 'QUIC Client had a failure relating to an invalid argument'; +} + class ErrorQUICClientInvalidHost extends ErrorQUICClient { static description = 'QUIC Client cannot be created with the specified host'; } @@ -293,6 +298,7 @@ export { ErrorQUICClientDestroyed, ErrorQUICClientCreateTimeout, ErrorQUICClientSocketNotRunning, + ErrorQUICClientInvalidArgument, ErrorQUICClientInvalidHost, ErrorQUICClientInternal, ErrorQUICServer, diff --git a/src/events.ts b/src/events.ts index f19fae4c..6f4c7dff 100644 --- a/src/events.ts +++ b/src/events.ts @@ -62,6 +62,8 @@ class EventQUICClientError extends EventQUICClient< | ErrorQUICConnectionInternal > {} +class EventQUICClientErrorSend extends EventQUICSocket {} + class EventQUICClientClose extends EventQUICClient< | ErrorQUICClientSocketNotRunning | ErrorQUICConnectionLocal @@ -126,6 +128,7 @@ class EventQUICConnectionClose extends EventQUICConnection< class EventQUICConnectionStream extends EventQUICConnection {} class EventQUICConnectionSend extends EventQUICConnection<{ + id: string; msg: Uint8Array; port: number; address: string; @@ -193,6 +196,7 @@ export { EventQUICClientDestroy, EventQUICClientDestroyed, EventQUICClientError, + EventQUICClientErrorSend, EventQUICClientClose, EventQUICServer, EventQUICServerStart, diff --git a/tests/QUICStream.test.ts b/tests/QUICStream.test.ts index d2d1342e..68be4b30 100644 --- a/tests/QUICStream.test.ts +++ b/tests/QUICStream.test.ts @@ -2,6 +2,7 @@ import type { ClientCryptoOps, QUICConnection, ServerCryptoOps } from '@'; import Logger, { formatting, LogLevel, StreamHandler } from '@matrixai/logger'; import { destroyed } from '@matrixai/async-init'; import * as events from '@/events'; +import * as errors from '@/errors'; import * as utils from '@/utils'; import QUICServer from '@/QUICServer'; import QUICClient from '@/QUICClient'; @@ -1849,4 +1850,266 @@ describe(QUICStream.name, () => { await client.destroy({ force: true }); await server.stop({ force: true }); }); + test('invalid arguments causes `createQUICClient` to fail', async () => { + await expect( + QUICClient.createQUICClient({ + host: '123.123.123.123', // Invalid ip when bound to loopback + port: 55555, + localHost: localhost, + crypto: { + ops: clientCrypto, + }, + logger: logger.getChild(QUICClient.name), + config: { + verifyPeer: false, + }, + }), + ).rejects.toThrow(errors.ErrorQUICClientInvalidArgument); + }); + test('connections are tolerant to network failures', async () => { + const connectionEventProm = + utils.promise(); + const tlsConfig = await generateTLSConfig(defaultType); + const server = new QUICServer({ + crypto: { + key, + ops: serverCrypto, + }, + logger: logger.getChild(QUICServer.name), + config: { + key: tlsConfig.leafKeyPairPEM.privateKey, + cert: tlsConfig.leafCertPEM, + verifyPeer: false, + }, + }); + socketCleanMethods.extractSocket(server); + server.addEventListener( + events.EventQUICServerConnection.name, + (e: events.EventQUICServerConnection) => connectionEventProm.resolveP(e), + ); + await server.start({ + host: localhost, + }); + const client = await QUICClient.createQUICClient({ + host: localhost, + port: server.port, + localHost: localhost, + crypto: { + ops: clientCrypto, + }, + logger: logger.getChild(QUICClient.name), + config: { + verifyPeer: false, + }, + }); + socketCleanMethods.extractSocket(client); + // @ts-ignore: kidnap protected properties + const mockedSendClient = jest.spyOn(client.socket, 'send_'); + // @ts-ignore: kidnap protected properties + const mockedSendServer = jest.spyOn(server.socket, 'send_'); + + const conn = (await connectionEventProm.p).detail; + // Do the test + const activeServerStreams: Array> = []; + conn.addEventListener( + events.EventQUICConnectionStream.name, + (streamEvent: events.EventQUICConnectionStream) => { + const stream = streamEvent.detail; + const streamProm = stream.readable.pipeTo(stream.writable); + activeServerStreams.push(streamProm); + }, + ); + + const stream = client.connection.newStream(); + const writer = stream.writable.getWriter(); + const backgroundReadP = (async () => { + let acc: string = ''; + for await (const message of stream.readable) { + acc += message.toString(); + } + return acc.split('message').length - 1; + })(); + + // Do write and read messages here. + for (let j = 0; j < 10; j++) { + await writer.write(Buffer.from(`message${j}`)); + } + + /* + // replicating this error + + Error: send ENETUNREACH ::ffff:13.54.214.222:1314 + at doSend (node:dgram:716:16) + at defaultTriggerAsyncIdScope (node:internal/async_hooks:463:18) + at afterDns (node:dgram:662:5) + at processTicksAndRejections (node:internal/process/task_queues:83:21) { + errno: -101, + code: 'ENETUNREACH', + syscall: 'send', + address: '::ffff:13.54.214.222', + port: 1314 + } + */ + class FakeError extends Error { + constructor( + public address: string, + public port: number, + ) { + super(`send ENETUNREACH ${address}:${port}`); + } + public errorno = -101; + public code = 'ENETUNREACH'; + public syscall = 'send'; + } + const fakeErrorClient = new FakeError(localhost, server.port); + const fakeErrorServer = new FakeError(localhost, client.localPort); + // Make the send fail 10 times + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + mockedSendClient.mockRejectedValueOnce(fakeErrorClient); + // Same for the server + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + mockedSendServer.mockRejectedValueOnce(fakeErrorServer); + + // Send another 20 messages + for (let j = 0; j < 20; j++) { + await writer.write(Buffer.from(`message${j + 10}`)); + } + await writer.close(); + // Expect 30 fully formed messages + await expect(backgroundReadP).resolves.toBe(30); + + await Promise.all(activeServerStreams); + await client.destroy({ force: true }); + await server.stop({ force: true }); + }); + test('connections timeout if network fails', async () => { + const connectionEventProm = + utils.promise(); + const tlsConfig = await generateTLSConfig(defaultType); + const server = new QUICServer({ + crypto: { + key, + ops: serverCrypto, + }, + logger: logger.getChild(QUICServer.name), + config: { + key: tlsConfig.leafKeyPairPEM.privateKey, + cert: tlsConfig.leafCertPEM, + verifyPeer: false, + }, + }); + socketCleanMethods.extractSocket(server); + server.addEventListener( + events.EventQUICServerConnection.name, + (e: events.EventQUICServerConnection) => connectionEventProm.resolveP(e), + ); + await server.start({ + host: localhost, + }); + const client = await QUICClient.createQUICClient({ + host: localhost, + port: server.port, + localHost: localhost, + crypto: { + ops: clientCrypto, + }, + logger: logger.getChild(QUICClient.name), + config: { + maxIdleTimeout: 1000, + verifyPeer: false, + }, + }); + socketCleanMethods.extractSocket(client); + // @ts-ignore: kidnap protected properties + const mockedSendClient = jest.spyOn(client.socket, 'send_'); + // @ts-ignore: kidnap protected properties + const mockedSendServer = jest.spyOn(server.socket, 'send_'); + + const conn = (await connectionEventProm.p).detail; + // Do the test + let activeServerStreamP: Promise | undefined = undefined; + conn.addEventListener( + events.EventQUICConnectionStream.name, + async (streamEvent: events.EventQUICConnectionStream) => { + const stream = streamEvent.detail; + const streamP = stream.readable.pipeTo(stream.writable); + void streamP.catch(() => {}); + activeServerStreamP = streamP; + }, + ); + const stream = client.connection.newStream(); + const writer = stream.writable.getWriter(); + const backgroundReadP = (async () => { + let acc: string = ''; + for await (const message of stream.readable) { + acc += message.toString(); + } + return acc.split('message').length - 1; + })(); + + // Do write and read messages here. + await writer.write(Buffer.from(`first message`)); + + /* + // replicating this error + + Error: send ENETUNREACH ::ffff:13.54.214.222:1314 + at doSend (node:dgram:716:16) + at defaultTriggerAsyncIdScope (node:internal/async_hooks:463:18) + at afterDns (node:dgram:662:5) + at processTicksAndRejections (node:internal/process/task_queues:83:21) { + errno: -101, + code: 'ENETUNREACH', + syscall: 'send', + address: '::ffff:13.54.214.222', + port: 1314 + } + */ + class FakeError extends Error { + constructor( + public address: string, + public port: number, + ) { + super(`send ENETUNREACH ${address}:${port}`); + } + public errorno = -101; + public code = 'ENETUNREACH'; + public syscall = 'send'; + } + // Make the send fail 10 times + mockedSendClient.mockRejectedValue(new FakeError(localhost, server.port)); + // Same for the server + mockedSendServer.mockRejectedValue( + new FakeError(localhost, client.localPort), + ); + await writer.write(Buffer.from(`second message`)); + // Expect both sides to time out + await expect(backgroundReadP).rejects.toThrow( + errors.ErrorQUICConnectionIdleTimeout, + ); + await expect(activeServerStreamP).rejects.toThrow( + errors.ErrorQUICConnectionIdleTimeout, + ); + await expect(writer.write(Buffer.from('fail message'))).rejects.toThrow( + errors.ErrorQUICConnectionIdleTimeout, + ); + await client.destroy({ force: true }); + await server.stop({ force: true }); + }); });