Skip to content

Commit

Permalink
fix: "concurrent" safe and allow pending sinks to connect on next socket
Browse files Browse the repository at this point in the history
  • Loading branch information
enisdenjo committed Aug 28, 2020
1 parent 5cc7151 commit d345c2f
Showing 1 changed file with 37 additions and 24 deletions.
61 changes: 37 additions & 24 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ function createSocky() {
return false;
}

state = { connecting: true, connected: false, disconnecting: false };
state = { ...state, connecting: true };
return true;
},
connected(connectedSocket: WebSocket) {
socket = connectedSocket;
state = { connected: true, connecting: false, disconnecting: false };
state = { ...state, connected: true, connecting: false };
},
registerMessageListener(
listener: (event: MessageEvent) => void,
Expand Down Expand Up @@ -120,14 +120,14 @@ function createSocky() {
},
dispose() {
if (!state.disconnecting) {
state = { disconnecting: true, connected: false, connecting: false };
state = { ...state, disconnecting: true };

if (socket && socket.readyState === WebSocket.OPEN) {
socket.close(1000, 'Normal Closure');
}
socket = undefined;

socket = null;
state = { disconnecting: false, connected: false, connecting: false };
state = { ...state, disconnecting: false, connected: false };
}
},
};
Expand All @@ -137,17 +137,9 @@ function createSocky() {
export function createClient(options: ClientOptions): Client {
const { url, connectionParams } = options;

// holds all currently subscribed sinks, will use this map
// to dispatch messages to the correct destination
const pendingSinks: Record<UUID, Sink> = {};
const subscribedSinks: Record<UUID, Sink> = {};

function errorAllSinks(err: Error | CloseEvent) {
Object.entries(subscribedSinks).forEach(([, sink]) => sink.error(err));
}
function completeAllSinks() {
Object.entries(subscribedSinks).forEach(([, sink]) => sink.complete());
}

const socky = createSocky();
async function prepare(): Promise<void> {
if (await socky.beginConnecting()) {
Expand All @@ -165,19 +157,28 @@ export function createClient(options: ClientOptions): Client {
*/

socket.onclose = (closeEvent) => {
socky.dispose();

if (closeEvent.code === 1000 || closeEvent.code === 1001) {
// close event `1000: Normal Closure` is ok and so is `1001: Going Away` (maybe the server is restarting)
completeAllSinks();
// complete only subscribed sinks because pending ones want a new connection
Object.entries(subscribedSinks).forEach(([, sink]) =>
sink.complete(),
);
} else {
// all other close events are considered erroneous
// all other close events are considered erroneous for all sinks

// reading the `CloseEvent.reason` can either throw or empty the whole error message
// (if trying to pass the reason in the `Error` message). having this in mind,
// simply let the user handle the close event...
errorAllSinks(closeEvent);
Object.entries(pendingSinks).forEach(([, sink]) =>
sink.error(closeEvent),
);
Object.entries(subscribedSinks).forEach(([, sink]) =>
sink.error(closeEvent),
);
}

socky.dispose();
if (!done) {
done = true;
reject(closeEvent);
Expand Down Expand Up @@ -213,9 +214,12 @@ export function createClient(options: ClientOptions): Client {
resolve();
}
} catch (err) {
errorAllSinks(err);

socky.dispose();

Object.entries(pendingSinks).forEach(([, sink]) => sink.error(err));
Object.entries(subscribedSinks).forEach(([, sink]) =>
sink.error(err),
);
if (!done) {
done = true;
reject(err);
Expand All @@ -231,21 +235,24 @@ export function createClient(options: ClientOptions): Client {
return {
subscribe: (payload, sink) => {
const uuid = generateUUID();
if (subscribedSinks[uuid]) {
if (pendingSinks[uuid] || subscribedSinks[uuid]) {
sink.error(new Error(`Sink with ID ${uuid} already registered`));
return noop;
}
subscribedSinks[uuid] = sink;
pendingSinks[uuid] = sink;

let messageListener: Disposable | undefined,
disposed = false;
prepare()
.then(() => {
delete pendingSinks[uuid];

// the sink might have been disposed before the socket became ready
if (disposed) {
return;
}

subscribedSinks[uuid] = sink;
messageListener = socky.registerMessageListener(({ data }) => {
const message = parseMessage(data);
switch (message.type) {
Expand Down Expand Up @@ -296,6 +303,7 @@ export function createClient(options: ClientOptions): Client {
}

sink.complete();
delete pendingSinks[uuid];
delete subscribedSinks[uuid];

if (Object.entries(subscribedSinks).length === 0) {
Expand All @@ -306,8 +314,13 @@ export function createClient(options: ClientOptions): Client {
},
dispose: async () => {
// TODO-db-200817 complete or error? the sinks should be completed BEFORE the client gets disposed
completeAllSinks();

Object.entries(pendingSinks).forEach(([, sink]) => sink.complete());
Object.keys(pendingSinks).forEach((uuid) => {
delete pendingSinks[uuid];
});
Object.entries(subscribedSinks).forEach(([, sink]) => {
sink.complete();
});
Object.keys(subscribedSinks).forEach((uuid) => {
delete subscribedSinks[uuid];
});
Expand Down

0 comments on commit d345c2f

Please sign in to comment.