From eb6cb2a0654489e1210a8db93f90bfc3ebfe94e4 Mon Sep 17 00:00:00 2001 From: enisdenjo Date: Sun, 11 Apr 2021 22:35:36 +0200 Subject: [PATCH] fix(client): Subscriptions acquire locks --- src/client.ts | 11 +++++---- src/tests/client.ts | 54 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 6 deletions(-) diff --git a/src/client.ts b/src/client.ts index e27d5b99..97ca0c77 100644 --- a/src/client.ts +++ b/src/client.ts @@ -346,10 +346,6 @@ export function createClient(options: ClientOptions): Client { waitForReleaseOrThrowOnClose: Promise, ] > { - // locks increment only when not retrying because - // the same lock retries connect on abrupt closures - if (!retrying) locks++; - const [socket, throwOnClose] = await (connecting ?? (connecting = new Promise((connected, denied) => (async () => { @@ -425,7 +421,7 @@ export function createClient(options: ClientOptions): Client { ))); let release = () => { - // releases this connection lock + // releases this connection }; const released = new Promise((resolve) => (release = resolve)); @@ -435,8 +431,9 @@ export function createClient(options: ClientOptions): Client { Promise.race([ // wait for released.then(() => { + // decrement the subscription locks if (!--locks) { - // if no more connection locks are present, complete the connection + // and if no more are present, complete the connection const complete = () => socket.close(1000, 'Normal Closure'); if (isFinite(keepAlive) && keepAlive > 0) { // if the keepalive is set, allow for the specified calmdown time and @@ -496,6 +493,7 @@ export function createClient(options: ClientOptions): Client { // in non-lazy (hot?) mode always hold one connection lock to persist the socket if (!lazy) { (async () => { + locks++; for (;;) { try { const [, , throwOnClose] = await connect(); @@ -524,6 +522,7 @@ export function createClient(options: ClientOptions): Client { }; (async () => { + locks++; for (;;) { try { const [ diff --git a/src/tests/client.ts b/src/tests/client.ts index 77ec2f57..da6e699f 100644 --- a/src/tests/client.ts +++ b/src/tests/client.ts @@ -1020,6 +1020,60 @@ describe('reconnecting', () => { // and no clients should be left expect(server.clients.size).toBe(0); }); + + it('should lazy disconnect even if subscription is created during retries after all get completed', async () => { + const { url, ...server } = await startTServer(); + + const client = createClient({ + url, + lazy: true, // behavior on lazy only + retryAttempts: Infinity, // keep retrying forever + retryWait: () => Promise.resolve(), + }); + + const sub1 = tsubscribe(client, { + query: 'subscription { ping(key: "1") }', + }); + + await server.waitForClient((client) => client.close()); + await server.waitForClientClose(); + + await server.waitForClient((client) => client.close()); + await server.waitForClientClose(); + + const sub2 = tsubscribe(client, { + query: 'subscription { ping(key: "2") }', + }); + + await server.waitForClient((client) => client.close()); + await server.waitForClientClose(); + + // allow both subs on the 3rd retry + await server.waitForOperation(); + await server.waitForOperation(); + + // dispose of the first subscription + sub1.dispose(); + await sub1.waitForComplete(); + + // client should NOT leave yet + await server.waitForClientClose(() => { + fail("Client should've stayed connected"); + }, 10); + + // and client should still be connected + expect(server.clients.size).toBe(1); + + // dispose of the last subscription + sub2.dispose(); + await sub2.waitForComplete(); + + // client should leave now + await server.waitForClientClose(); + + // and all connections are gone + expect(server.clients.size).toBe(0); + }); }); describe('events', () => {