diff --git a/docs/interfaces/client.clientoptions.md b/docs/interfaces/client.clientoptions.md index 4d6bef37..21d3e95c 100644 --- a/docs/interfaces/client.clientoptions.md +++ b/docs/interfaces/client.clientoptions.md @@ -120,7 +120,7 @@ ___ ### on -• `Optional` **on**: *Partial*<{ `closed`: (`event`: *unknown*) => *void* ; `connected`: (`socket`: *unknown*, `payload?`: *Record*) => *void* ; `connecting`: () => *void* ; `error`: (`error`: *unknown*) => *void* }\> +• `Optional` **on**: *Partial*<{ `closed`: (`event`: *unknown*) => *void* ; `connected`: (`socket`: *unknown*, `payload?`: *Record*) => *void* ; `connecting`: () => *void* ; `error`: (`error`: *unknown*) => *void* ; `message`: (`message`: [*ConnectionInitMessage*](message.connectioninitmessage.md) \| [*ConnectionAckMessage*](message.connectionackmessage.md) \| [*SubscribeMessage*](message.subscribemessage.md) \| [*NextMessage*](message.nextmessage.md) \| [*ErrorMessage*](message.errormessage.md) \| [*CompleteMessage*](message.completemessage.md)) => *void* }\> Register listeners before initialising the client. This way you can ensure to catch all client relevant emitted events. diff --git a/docs/modules/client.md b/docs/modules/client.md index f2a3d5db..eca9c832 100644 --- a/docs/modules/client.md +++ b/docs/modules/client.md @@ -32,6 +32,8 @@ - [EventError](client.md#eventerror) - [EventErrorListener](client.md#eventerrorlistener) - [EventListener](client.md#eventlistener) +- [EventMessage](client.md#eventmessage) +- [EventMessageListener](client.md#eventmessagelistener) - [Message](client.md#message) ### Variables @@ -49,7 +51,7 @@ ### Event -Ƭ **Event**: [*EventConnecting*](client.md#eventconnecting) \| [*EventConnected*](client.md#eventconnected) \| [*EventClosed*](client.md#eventclosed) \| [*EventError*](client.md#eventerror) +Ƭ **Event**: [*EventConnecting*](client.md#eventconnecting) \| [*EventConnected*](client.md#eventconnected) \| [*EventMessage*](client.md#eventmessage) \| [*EventClosed*](client.md#eventclosed) \| [*EventError*](client.md#eventerror) ___ @@ -162,7 +164,7 @@ ___ ### EventListener -Ƭ **EventListener**: E *extends* [*EventConnecting*](client.md#eventconnecting) ? [*EventConnectingListener*](client.md#eventconnectinglistener) : E *extends* [*EventConnected*](client.md#eventconnected) ? [*EventConnectedListener*](client.md#eventconnectedlistener) : E *extends* [*EventClosed*](client.md#eventclosed) ? [*EventClosedListener*](client.md#eventclosedlistener) : E *extends* [*EventError*](client.md#eventerror) ? [*EventErrorListener*](client.md#eventerrorlistener) : *never* +Ƭ **EventListener**: E *extends* [*EventConnecting*](client.md#eventconnecting) ? [*EventConnectingListener*](client.md#eventconnectinglistener) : E *extends* [*EventConnected*](client.md#eventconnected) ? [*EventConnectedListener*](client.md#eventconnectedlistener) : E *extends* [*EventMessage*](client.md#eventmessage) ? [*EventMessageListener*](client.md#eventmessagelistener) : E *extends* [*EventClosed*](client.md#eventclosed) ? [*EventClosedListener*](client.md#eventclosedlistener) : E *extends* [*EventError*](client.md#eventerror) ? [*EventErrorListener*](client.md#eventerrorlistener) : *never* #### Type parameters: @@ -172,6 +174,33 @@ Name | Type | ___ +### EventMessage + +Ƭ **EventMessage**: *message* + +___ + +### EventMessageListener + +Ƭ **EventMessageListener**: (`message`: [*Message*](message.md#message)) => *void* + +Called for all **valid** messages received by the client. Mainly useful for +debugging and logging received messages. + +#### Type declaration: + +▸ (`message`: [*Message*](message.md#message)): *void* + +#### Parameters: + +Name | Type | +:------ | :------ | +`message` | [*Message*](message.md#message) | + +**Returns:** *void* + +___ + ### Message Ƭ **Message**: T *extends* [*ConnectionAck*](../enums/message.messagetype.md#connectionack) ? [*ConnectionAckMessage*](../interfaces/message.connectionackmessage.md) : T *extends* [*ConnectionInit*](../enums/message.messagetype.md#connectioninit) ? [*ConnectionInitMessage*](../interfaces/message.connectioninitmessage.md) : T *extends* [*Subscribe*](../enums/message.messagetype.md#subscribe) ? [*SubscribeMessage*](../interfaces/message.subscribemessage.md) : T *extends* [*Next*](../enums/message.messagetype.md#next) ? [*NextMessage*](../interfaces/message.nextmessage.md) : T *extends* [*Error*](../enums/message.messagetype.md#error) ? [*ErrorMessage*](../interfaces/message.errormessage.md) : T *extends* [*Complete*](../enums/message.messagetype.md#complete) ? [*CompleteMessage*](../interfaces/message.completemessage.md) : *never* diff --git a/src/client.ts b/src/client.ts index 5b7ca84b..863ee3da 100644 --- a/src/client.ts +++ b/src/client.ts @@ -21,9 +21,15 @@ export * from './protocol'; export type EventConnecting = 'connecting'; export type EventConnected = 'connected'; // connected = socket opened + acknowledged +export type EventMessage = 'message'; export type EventClosed = 'closed'; export type EventError = 'error'; -export type Event = EventConnecting | EventConnected | EventClosed | EventError; +export type Event = + | EventConnecting + | EventConnected + | EventMessage + | EventClosed + | EventError; /** * The first argument is actually the `WebSocket`, but to avoid @@ -40,6 +46,12 @@ export type EventConnectedListener = ( export type EventConnectingListener = () => void; +/** + * Called for all **valid** messages received by the client. Mainly useful for + * debugging and logging received messages. + */ +export type EventMessageListener = (message: Message) => void; + /** * The argument is actually the websocket `CloseEvent`, but to avoid * bundling DOM typings because the client can run in Node env too, @@ -59,6 +71,8 @@ export type EventListener = E extends EventConnecting ? EventConnectingListener : E extends EventConnected ? EventConnectedListener + : E extends EventMessage + ? EventMessageListener : E extends EventClosed ? EventClosedListener : E extends EventError @@ -274,6 +288,7 @@ export function createClient(options: ClientOptions): Client { const listeners: { [event in Event]: EventListener[] } = { connecting: on?.connecting ? [on.connecting] : [], connected: on?.connected ? [on.connected] : [], + message: on?.message ? [on.message] : [], closed: on?.closed ? [on.closed] : [], error: on?.error ? [on.error] : [], }; @@ -295,22 +310,19 @@ export function createClient(options: ClientOptions): Client { }; })(); - let connecting: Promise | undefined, + type Connected = [socket: WebSocket, throwOnClose: Promise]; + let connecting: Promise | undefined, locks = 0, retrying = false, retries = 0, disposed = false; async function connect(): Promise< - [ - socket: WebSocket, - release: () => void, - waitForReleaseOrThrowOnClose: Promise, - ] + [socket: WebSocket, release: () => void, throwOnClose: Promise] > { locks++; - const socket = await (connecting ?? - (connecting = new Promise((resolve, reject) => + const [socket, throwOnClose] = await (connecting ?? + (connecting = new Promise((connected, denied) => (async () => { if (retrying) { await retryWait(retries); @@ -328,7 +340,7 @@ export function createClient(options: ClientOptions): Client { socket.onclose = (event) => { connecting = undefined; emitter.emit('closed', event); - reject(event); + denied(event); }; socket.onopen = async () => { @@ -350,18 +362,28 @@ export function createClient(options: ClientOptions): Client { } }; + let hasConnected = false; socket.onmessage = ({ data }) => { - socket.onmessage = null; // interested only in the first message try { const message = parseMessage(data); + emitter.emit('message', message); + if (hasConnected) return; + + // havent connected yet. expect the acknowledgement message and proceed if (message.type !== MessageType.ConnectionAck) { throw new Error( `First message cannot be of type ${message.type}`, ); } + hasConnected = true; emitter.emit('connected', socket, message.payload); // connected = socket opened + acknowledged retries = 0; // reset the retries on connect - resolve(socket); + connected([ + socket, + new Promise((_, closed) => + socket.addEventListener('close', closed), + ), + ]); } catch (err) { socket.close( 4400, @@ -399,9 +421,7 @@ export function createClient(options: ClientOptions): Client { } } }), - new Promise((_resolve, reject) => - socket.addEventListener('close', reject, { once: true }), - ), + throwOnClose, ]), ]; } @@ -469,17 +489,6 @@ export function createClient(options: ClientOptions): Client { })(); } - // to avoid parsing the same message in each - // subscriber, we memo one on the last received data - let lastData: unknown, lastMessage: Message; - function memoParseMessage(data: unknown) { - if (data !== lastData) { - lastMessage = parseMessage(data); - lastData = data; - } - return lastMessage; - } - return { on: emitter.on, subscribe(payload, sink) { @@ -493,36 +502,6 @@ export function createClient(options: ClientOptions): Client { }, }; - function messageHandler({ data }: MessageEvent) { - const message = memoParseMessage(data); - switch (message.type) { - case MessageType.Next: { - if (message.id === id) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - sink.next(message.payload as any); - } - return; - } - case MessageType.Error: { - if (message.id === id) { - completed = true; - sink.error(message.payload); - releaserRef.current(); - // TODO-db-201025 calling releaser will complete the sink, meaning that both the `error` and `complete` will be - // called. neither promises or observables care; once they settle, additional calls to the resolvers will be ignored - } - return; - } - case MessageType.Complete: { - if (message.id === id) { - completed = true; - releaserRef.current(); // release completes the sink - } - return; - } - } - } - (async () => { for (;;) { try { @@ -535,7 +514,34 @@ export function createClient(options: ClientOptions): Client { // if completed while waiting for connect, release the connection lock right away if (completed) return release(); - socket.addEventListener('message', messageHandler); + const unlisten = emitter.on('message', (message) => { + switch (message.type) { + case MessageType.Next: { + if (message.id === id) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + sink.next(message.payload as any); + } + return; + } + case MessageType.Error: { + if (message.id === id) { + completed = true; + sink.error(message.payload); + releaserRef.current(); + // TODO-db-201025 calling releaser will complete the sink, meaning that both the `error` and `complete` will be + // called. neither promises or observables care; once they settle, additional calls to the resolvers will be ignored + } + return; + } + case MessageType.Complete: { + if (message.id === id) { + completed = true; + releaserRef.current(); // release completes the sink + } + return; + } + } + }); socket.send( stringifyMessage({ @@ -559,10 +565,9 @@ export function createClient(options: ClientOptions): Client { }; // either the releaser will be called, connection completed and - // the promise resolved or the socket closed and the promise rejected - await waitForReleaseOrThrowOnClose; - - socket.removeEventListener('message', messageHandler); + // the promise resolved or the socket closed and the promise rejected. + // whatever happens though, we want to stop listening for messages + await waitForReleaseOrThrowOnClose.finally(unlisten); return; // completed, shouldnt try again } catch (errOrCloseEvent) { @@ -579,7 +584,7 @@ export function createClient(options: ClientOptions): Client { disposed = true; if (connecting) { // if there is a connection, close it - const socket = await connecting; + const [socket] = await connecting; socket.close(1000, 'Normal Closure'); } }, diff --git a/src/tests/client.ts b/src/tests/client.ts index 7ab14009..407eccef 100644 --- a/src/tests/client.ts +++ b/src/tests/client.ts @@ -817,9 +817,6 @@ describe('reconnecting', () => { }); it('should resubscribe all subscribers on silent reconnects', async () => { - const defaultMaxListeners = EventEmitter.defaultMaxListeners; - EventEmitter.defaultMaxListeners = 50; // for test - const { url, ...server } = await startTServer(); const client = createClient({ @@ -830,7 +827,7 @@ describe('reconnecting', () => { // add subscribers const subs: TSubscribe[] = []; - for (let i = 0; i < EventEmitter.defaultMaxListeners - 1; i++) { + for (let i = 0; i < 50; i++) { subs.push( tsubscribe(client, { query: `subscription Sub${i} { ping(key: "${i}") }`, @@ -867,14 +864,9 @@ describe('reconnecting', () => { } client.dispose(); - - EventEmitter.defaultMaxListeners = defaultMaxListeners; // reset }); it('should resubscribe all subscribers on silent reconnect when using retry wait delay', async () => { - const defaultMaxListeners = EventEmitter.defaultMaxListeners; - EventEmitter.defaultMaxListeners = 50; // for test - const { url, ...server } = await startTServer(); const client = createClient({ @@ -885,7 +877,7 @@ describe('reconnecting', () => { // add subscribers const subs: TSubscribe[] = []; - for (let i = 0; i < EventEmitter.defaultMaxListeners - 1; i++) { + for (let i = 0; i < 50; i++) { subs.push( tsubscribe(client, { query: `subscription Sub${i} { ping(key: "${i}") }`, @@ -916,8 +908,6 @@ describe('reconnecting', () => { } client.dispose(); - - EventEmitter.defaultMaxListeners = defaultMaxListeners; // reset }); it('should report some close events immediately and not reconnect', async () => { @@ -997,30 +987,35 @@ describe('events', () => { const connectingFn = jest.fn(noop as EventListener<'connecting'>); const connectedFn = jest.fn(noop as EventListener<'connected'>); + const messageFn = jest.fn(noop as EventListener<'message'>); const closedFn = jest.fn(noop as EventListener<'closed'>); // wait for connected - const client = await new Promise((resolve) => { - const client = createClient({ - url, - retryAttempts: 0, - onNonLazyError: noop, - on: { - connecting: connectingFn, - connected: connectedFn, - closed: closedFn, - }, - }); - client.on('connecting', connectingFn); - client.on('connected', (...args) => { - connectedFn(...args); - resolve(client); - }); - client.on('closed', closedFn); - - // trigger connecting - tsubscribe(client, { query: 'subscription {ping}' }); - }); + const [client, sub] = await new Promise<[Client, TSubscribe]>( + (resolve) => { + const client = createClient({ + url, + retryAttempts: 0, + onNonLazyError: noop, + on: { + connecting: connectingFn, + connected: connectedFn, + message: messageFn, + closed: closedFn, + }, + }); + client.on('connecting', connectingFn); + client.on('connected', connectedFn); + client.on('message', messageFn); + client.on('closed', closedFn); + + // trigger connecting + const sub = tsubscribe(client, { query: 'subscription {ping}' }); + + // resolve once subscribed + server.waitForOperation().then(() => resolve([client, sub])); + }, + ); expect(connectingFn).toBeCalledTimes(2); expect(connectingFn.mock.calls[0].length).toBe(0); @@ -1030,6 +1025,11 @@ describe('events', () => { expect(cal[0]).toBeInstanceOf(WebSocket); }); + // (connection ack + pong) * 2 + server.pong(); + await sub.waitForNext(); + expect(messageFn).toBeCalledTimes(4); + expect(closedFn).not.toBeCalled(); server.clients.forEach((client) => {