Skip to content

Commit

Permalink
fix(client): Reduce WebSocket event listeners and add new client `mes…
Browse files Browse the repository at this point in the history
…sage` event (enisdenjo#104)

Closes: enisdenjo#102
  • Loading branch information
enisdenjo committed Mar 23, 2021
1 parent eaef6e0 commit 68d0e20
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 98 deletions.
2 changes: 1 addition & 1 deletion docs/interfaces/client.clientoptions.md
Expand Up @@ -120,7 +120,7 @@ ___

### on

`Optional` **on**: *Partial*<{ `closed`: (`event`: *unknown*) => *void* ; `connected`: (`socket`: *unknown*, `payload?`: *Record*<string, unknown\>) => *void* ; `connecting`: () => *void* ; `error`: (`error`: *unknown*) => *void* }\>
`Optional` **on**: *Partial*<{ `closed`: (`event`: *unknown*) => *void* ; `connected`: (`socket`: *unknown*, `payload?`: *Record*<string, unknown\>) => *void* ; `connecting`: () => *void* ; `error`: (`error`: *unknown*) => *void* ; `message`: (`message`: [*ConnectionInitMessage*](message.connectioninitmessage.md) \| [*ConnectionAckMessage*](message.connectionackmessage.md) \| [*SubscribeMessage*](message.subscribemessage.md) \| [*NextMessage*](message.nextmessage.md) \| [*ErrorMessage*](message.errormessage.md) \| [*CompleteMessage*](message.completemessage.md)) => *void* }\>

Register listeners before initialising the client. This way
you can ensure to catch all client relevant emitted events.
Expand Down
33 changes: 31 additions & 2 deletions docs/modules/client.md
Expand Up @@ -32,6 +32,8 @@
- [EventError](client.md#eventerror)
- [EventErrorListener](client.md#eventerrorlistener)
- [EventListener](client.md#eventlistener)
- [EventMessage](client.md#eventmessage)
- [EventMessageListener](client.md#eventmessagelistener)
- [Message](client.md#message)

### Variables
Expand All @@ -49,7 +51,7 @@

### Event

Ƭ **Event**: [*EventConnecting*](client.md#eventconnecting) \| [*EventConnected*](client.md#eventconnected) \| [*EventClosed*](client.md#eventclosed) \| [*EventError*](client.md#eventerror)
Ƭ **Event**: [*EventConnecting*](client.md#eventconnecting) \| [*EventConnected*](client.md#eventconnected) \| [*EventMessage*](client.md#eventmessage) \| [*EventClosed*](client.md#eventclosed) \| [*EventError*](client.md#eventerror)

___

Expand Down Expand Up @@ -162,7 +164,7 @@ ___

### EventListener

Ƭ **EventListener**<E\>: E *extends* [*EventConnecting*](client.md#eventconnecting) ? [*EventConnectingListener*](client.md#eventconnectinglistener) : E *extends* [*EventConnected*](client.md#eventconnected) ? [*EventConnectedListener*](client.md#eventconnectedlistener) : E *extends* [*EventClosed*](client.md#eventclosed) ? [*EventClosedListener*](client.md#eventclosedlistener) : E *extends* [*EventError*](client.md#eventerror) ? [*EventErrorListener*](client.md#eventerrorlistener) : *never*
Ƭ **EventListener**<E\>: E *extends* [*EventConnecting*](client.md#eventconnecting) ? [*EventConnectingListener*](client.md#eventconnectinglistener) : E *extends* [*EventConnected*](client.md#eventconnected) ? [*EventConnectedListener*](client.md#eventconnectedlistener) : E *extends* [*EventMessage*](client.md#eventmessage) ? [*EventMessageListener*](client.md#eventmessagelistener) : E *extends* [*EventClosed*](client.md#eventclosed) ? [*EventClosedListener*](client.md#eventclosedlistener) : E *extends* [*EventError*](client.md#eventerror) ? [*EventErrorListener*](client.md#eventerrorlistener) : *never*

#### Type parameters:

Expand All @@ -172,6 +174,33 @@ Name | Type |

___

### EventMessage

Ƭ **EventMessage**: *message*

___

### EventMessageListener

Ƭ **EventMessageListener**: (`message`: [*Message*](message.md#message)) => *void*

Called for all **valid** messages received by the client. Mainly useful for
debugging and logging received messages.

#### Type declaration:

▸ (`message`: [*Message*](message.md#message)): *void*

#### Parameters:

Name | Type |
:------ | :------ |
`message` | [*Message*](message.md#message) |

**Returns:** *void*

___

### Message

Ƭ **Message**<T\>: T *extends* [*ConnectionAck*](../enums/message.messagetype.md#connectionack) ? [*ConnectionAckMessage*](../interfaces/message.connectionackmessage.md) : T *extends* [*ConnectionInit*](../enums/message.messagetype.md#connectioninit) ? [*ConnectionInitMessage*](../interfaces/message.connectioninitmessage.md) : T *extends* [*Subscribe*](../enums/message.messagetype.md#subscribe) ? [*SubscribeMessage*](../interfaces/message.subscribemessage.md) : T *extends* [*Next*](../enums/message.messagetype.md#next) ? [*NextMessage*](../interfaces/message.nextmessage.md) : T *extends* [*Error*](../enums/message.messagetype.md#error) ? [*ErrorMessage*](../interfaces/message.errormessage.md) : T *extends* [*Complete*](../enums/message.messagetype.md#complete) ? [*CompleteMessage*](../interfaces/message.completemessage.md) : *never*
Expand Down
129 changes: 67 additions & 62 deletions src/client.ts
Expand Up @@ -21,9 +21,15 @@ export * from './protocol';

export type EventConnecting = 'connecting';
export type EventConnected = 'connected'; // connected = socket opened + acknowledged
export type EventMessage = 'message';
export type EventClosed = 'closed';
export type EventError = 'error';
export type Event = EventConnecting | EventConnected | EventClosed | EventError;
export type Event =
| EventConnecting
| EventConnected
| EventMessage
| EventClosed
| EventError;

/**
* The first argument is actually the `WebSocket`, but to avoid
Expand All @@ -40,6 +46,12 @@ export type EventConnectedListener = (

export type EventConnectingListener = () => void;

/**
* Called for all **valid** messages received by the client. Mainly useful for
* debugging and logging received messages.
*/
export type EventMessageListener = (message: Message) => void;

/**
* The argument is actually the websocket `CloseEvent`, but to avoid
* bundling DOM typings because the client can run in Node env too,
Expand All @@ -59,6 +71,8 @@ export type EventListener<E extends Event> = E extends EventConnecting
? EventConnectingListener
: E extends EventConnected
? EventConnectedListener
: E extends EventMessage
? EventMessageListener
: E extends EventClosed
? EventClosedListener
: E extends EventError
Expand Down Expand Up @@ -274,6 +288,7 @@ export function createClient(options: ClientOptions): Client {
const listeners: { [event in Event]: EventListener<event>[] } = {
connecting: on?.connecting ? [on.connecting] : [],
connected: on?.connected ? [on.connected] : [],
message: on?.message ? [on.message] : [],
closed: on?.closed ? [on.closed] : [],
error: on?.error ? [on.error] : [],
};
Expand All @@ -295,22 +310,19 @@ export function createClient(options: ClientOptions): Client {
};
})();

let connecting: Promise<WebSocket> | undefined,
type Connected = [socket: WebSocket, throwOnClose: Promise<void>];
let connecting: Promise<Connected> | undefined,
locks = 0,
retrying = false,
retries = 0,
disposed = false;
async function connect(): Promise<
[
socket: WebSocket,
release: () => void,
waitForReleaseOrThrowOnClose: Promise<void>,
]
[socket: WebSocket, release: () => void, throwOnClose: Promise<void>]
> {
locks++;

const socket = await (connecting ??
(connecting = new Promise<WebSocket>((resolve, reject) =>
const [socket, throwOnClose] = await (connecting ??
(connecting = new Promise<Connected>((connected, denied) =>
(async () => {
if (retrying) {
await retryWait(retries);
Expand All @@ -328,7 +340,7 @@ export function createClient(options: ClientOptions): Client {
socket.onclose = (event) => {
connecting = undefined;
emitter.emit('closed', event);
reject(event);
denied(event);
};

socket.onopen = async () => {
Expand All @@ -350,18 +362,28 @@ export function createClient(options: ClientOptions): Client {
}
};

let hasConnected = false;
socket.onmessage = ({ data }) => {
socket.onmessage = null; // interested only in the first message
try {
const message = parseMessage(data);
emitter.emit('message', message);
if (hasConnected) return;

// havent connected yet. expect the acknowledgement message and proceed
if (message.type !== MessageType.ConnectionAck) {
throw new Error(
`First message cannot be of type ${message.type}`,
);
}
hasConnected = true;
emitter.emit('connected', socket, message.payload); // connected = socket opened + acknowledged
retries = 0; // reset the retries on connect
resolve(socket);
connected([
socket,
new Promise<void>((_, closed) =>
socket.addEventListener('close', closed),
),
]);
} catch (err) {
socket.close(
4400,
Expand Down Expand Up @@ -399,9 +421,7 @@ export function createClient(options: ClientOptions): Client {
}
}
}),
new Promise<void>((_resolve, reject) =>
socket.addEventListener('close', reject, { once: true }),
),
throwOnClose,
]),
];
}
Expand Down Expand Up @@ -469,17 +489,6 @@ export function createClient(options: ClientOptions): Client {
})();
}

// to avoid parsing the same message in each
// subscriber, we memo one on the last received data
let lastData: unknown, lastMessage: Message;
function memoParseMessage(data: unknown) {
if (data !== lastData) {
lastMessage = parseMessage(data);
lastData = data;
}
return lastMessage;
}

return {
on: emitter.on,
subscribe(payload, sink) {
Expand All @@ -493,36 +502,6 @@ export function createClient(options: ClientOptions): Client {
},
};

function messageHandler({ data }: MessageEvent) {
const message = memoParseMessage(data);
switch (message.type) {
case MessageType.Next: {
if (message.id === id) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sink.next(message.payload as any);
}
return;
}
case MessageType.Error: {
if (message.id === id) {
completed = true;
sink.error(message.payload);
releaserRef.current();
// TODO-db-201025 calling releaser will complete the sink, meaning that both the `error` and `complete` will be
// called. neither promises or observables care; once they settle, additional calls to the resolvers will be ignored
}
return;
}
case MessageType.Complete: {
if (message.id === id) {
completed = true;
releaserRef.current(); // release completes the sink
}
return;
}
}
}

(async () => {
for (;;) {
try {
Expand All @@ -535,7 +514,34 @@ export function createClient(options: ClientOptions): Client {
// if completed while waiting for connect, release the connection lock right away
if (completed) return release();

socket.addEventListener('message', messageHandler);
const unlisten = emitter.on('message', (message) => {
switch (message.type) {
case MessageType.Next: {
if (message.id === id) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sink.next(message.payload as any);
}
return;
}
case MessageType.Error: {
if (message.id === id) {
completed = true;
sink.error(message.payload);
releaserRef.current();
// TODO-db-201025 calling releaser will complete the sink, meaning that both the `error` and `complete` will be
// called. neither promises or observables care; once they settle, additional calls to the resolvers will be ignored
}
return;
}
case MessageType.Complete: {
if (message.id === id) {
completed = true;
releaserRef.current(); // release completes the sink
}
return;
}
}
});

socket.send(
stringifyMessage<MessageType.Subscribe>({
Expand All @@ -559,10 +565,9 @@ export function createClient(options: ClientOptions): Client {
};

// either the releaser will be called, connection completed and
// the promise resolved or the socket closed and the promise rejected
await waitForReleaseOrThrowOnClose;

socket.removeEventListener('message', messageHandler);
// the promise resolved or the socket closed and the promise rejected.
// whatever happens though, we want to stop listening for messages
await waitForReleaseOrThrowOnClose.finally(unlisten);

return; // completed, shouldnt try again
} catch (errOrCloseEvent) {
Expand All @@ -579,7 +584,7 @@ export function createClient(options: ClientOptions): Client {
disposed = true;
if (connecting) {
// if there is a connection, close it
const socket = await connecting;
const [socket] = await connecting;
socket.close(1000, 'Normal Closure');
}
},
Expand Down

0 comments on commit 68d0e20

Please sign in to comment.