Skip to content

Commit

Permalink
fix(client): Subscriptions acquire locks
Browse files Browse the repository at this point in the history
  • Loading branch information
enisdenjo committed Apr 11, 2021
1 parent 8b166a9 commit eb6cb2a
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 6 deletions.
11 changes: 5 additions & 6 deletions src/client.ts
Expand Up @@ -346,10 +346,6 @@ export function createClient(options: ClientOptions): Client {
waitForReleaseOrThrowOnClose: Promise<void>,
]
> {
// locks increment only when not retrying because
// the same lock retries connect on abrupt closures
if (!retrying) locks++;

const [socket, throwOnClose] = await (connecting ??
(connecting = new Promise<Connected>((connected, denied) =>
(async () => {
Expand Down Expand Up @@ -425,7 +421,7 @@ export function createClient(options: ClientOptions): Client {
)));

let release = () => {
// releases this connection lock
// releases this connection
};
const released = new Promise<void>((resolve) => (release = resolve));

Expand All @@ -435,8 +431,9 @@ export function createClient(options: ClientOptions): Client {
Promise.race([
// wait for
released.then(() => {
// decrement the subscription locks
if (!--locks) {
// if no more connection locks are present, complete the connection
// and if no more are present, complete the connection
const complete = () => socket.close(1000, 'Normal Closure');
if (isFinite(keepAlive) && keepAlive > 0) {
// if the keepalive is set, allow for the specified calmdown time and
Expand Down Expand Up @@ -496,6 +493,7 @@ export function createClient(options: ClientOptions): Client {
// in non-lazy (hot?) mode always hold one connection lock to persist the socket
if (!lazy) {
(async () => {
locks++;
for (;;) {
try {
const [, , throwOnClose] = await connect();
Expand Down Expand Up @@ -524,6 +522,7 @@ export function createClient(options: ClientOptions): Client {
};

(async () => {
locks++;
for (;;) {
try {
const [
Expand Down
54 changes: 54 additions & 0 deletions src/tests/client.ts
Expand Up @@ -1020,6 +1020,60 @@ describe('reconnecting', () => {
// and no clients should be left
expect(server.clients.size).toBe(0);
});

it('should lazy disconnect even if subscription is created during retries after all get completed', async () => {
const { url, ...server } = await startTServer();

const client = createClient({
url,
lazy: true, // behavior on lazy only
retryAttempts: Infinity, // keep retrying forever
retryWait: () => Promise.resolve(),
});

const sub1 = tsubscribe(client, {
query: 'subscription { ping(key: "1") }',
});

await server.waitForClient((client) => client.close());
await server.waitForClientClose();

await server.waitForClient((client) => client.close());
await server.waitForClientClose();

const sub2 = tsubscribe(client, {
query: 'subscription { ping(key: "2") }',
});

await server.waitForClient((client) => client.close());
await server.waitForClientClose();

// allow both subs on the 3rd retry
await server.waitForOperation();
await server.waitForOperation();

// dispose of the first subscription
sub1.dispose();
await sub1.waitForComplete();

// client should NOT leave yet
await server.waitForClientClose(() => {
fail("Client should've stayed connected");
}, 10);

// and client should still be connected
expect(server.clients.size).toBe(1);

// dispose of the last subscription
sub2.dispose();
await sub2.waitForComplete();

// client should leave now
await server.waitForClientClose();

// and all connections are gone
expect(server.clients.size).toBe(0);
});
});

describe('events', () => {
Expand Down

0 comments on commit eb6cb2a

Please sign in to comment.