Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(client): Focus subscription message listeners on id #150

Merged
merged 10 commits into from
Mar 29, 2021
39 changes: 25 additions & 14 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,30 @@ export function createClient(options: ClientOptions): Client {

// websocket status emitter, subscriptions are handled differently
const emitter = (() => {
const message = (() => {
const listeners: { [key: string]: EventMessageListener } = {};
return {
on(id: string, listener: EventMessageListener) {
listeners[id] = listener;
return () => {
delete listeners[id];
};
},
emit(message: Message) {
if ('id' in message) listeners[message.id]?.(message);
},
};
})();
const listeners: { [event in Event]: EventListener<event>[] } = {
connecting: on?.connecting ? [on.connecting] : [],
connected: on?.connected ? [on.connected] : [],
message: on?.message ? [on.message] : [],
message: on?.message ? [message.emit, on.message] : [message.emit],
closed: on?.closed ? [on.closed] : [],
error: on?.error ? [on.error] : [],
};

return {
onMessage: message.on,
on<E extends Event>(event: E, listener: EventListener<E>) {
const l = listeners[event] as EventListener<E>[];
l.push(listener);
Expand Down Expand Up @@ -518,28 +533,24 @@ export function createClient(options: ClientOptions): Client {
// if completed while waiting for connect, release the connection lock right away
if (completed) return release();

const unlisten = emitter.on('message', (message) => {
const unlisten = emitter.onMessage(id, (message) => {
switch (message.type) {
case MessageType.Next: {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if (message.id === id) sink.next(message.payload as any);
sink.next(message.payload as any);
return;
}
case MessageType.Error: {
if (message.id === id) {
completed = true;
sink.error(message.payload);
releaser();
// TODO-db-201025 calling releaser will complete the sink, meaning that both the `error` and `complete` will be
// called. neither promises or observables care; once they settle, additional calls to the resolvers will be ignored
}
completed = true;
sink.error(message.payload);
releaser();
// TODO-db-201025 calling releaser will complete the sink, meaning that both the `error` and `complete` will be
// called. neither promises or observables care; once they settle, additional calls to the resolvers will be ignored
return;
}
case MessageType.Complete: {
if (message.id === id) {
completed = true;
releaser(); // release completes the sink
}
completed = true;
releaser(); // release completes the sink
return;
}
}
Expand Down