Skip to content

Commit

Permalink
fix(client): complete should not be called after subscription error
Browse files Browse the repository at this point in the history
  • Loading branch information
enisdenjo committed Apr 28, 2021
1 parent abd9c28 commit 1fba419
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 13 deletions.
28 changes: 15 additions & 13 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -557,11 +557,12 @@ export function createClient(options: ClientOptions): Client {
subscribe(payload, sink) {
const id = generateID();

let completed = false,
let done = false,
errored = false,
releaser = () => {
// for handling completions before connect
locks--;
completed = true;
done = true;
};

(async () => {
Expand All @@ -574,8 +575,8 @@ export function createClient(options: ClientOptions): Client {
waitForReleaseOrThrowOnClose,
] = await connect();

// if completed while waiting for connect, release the connection lock right away
if (completed) return release();
// if done while waiting for connect, release the connection lock right away
if (done) return release();

const unlisten = emitter.onMessage(id, (message) => {
switch (message.type) {
Expand All @@ -585,15 +586,13 @@ export function createClient(options: ClientOptions): Client {
return;
}
case MessageType.Error: {
completed = true;
(errored = true), (done = true);
sink.error(message.payload);
releaser();
// TODO-db-201025 calling releaser will complete the sink, meaning that both the `error` and `complete` will be
// called. neither promises or observables care; once they settle, additional calls to the resolvers will be ignored
return;
}
case MessageType.Complete: {
completed = true;
done = true;
releaser(); // release completes the sink
return;
}
Expand All @@ -609,7 +608,7 @@ export function createClient(options: ClientOptions): Client {
);

releaser = () => {
if (!completed && socket.readyState === WebSocketImpl.OPEN)
if (!done && socket.readyState === WebSocketImpl.OPEN)
// if not completed already and socket is open, send complete message to server on release
socket.send(
stringifyMessage<MessageType.Complete>({
Expand All @@ -618,7 +617,7 @@ export function createClient(options: ClientOptions): Client {
}),
);
locks--;
completed = true;
done = true;
release();
};

Expand All @@ -634,11 +633,14 @@ export function createClient(options: ClientOptions): Client {
}
})()
.catch(sink.error) // rejects on close events and errors
.then(sink.complete); // resolves on release or normal closure
.then(() => {
// delivering either an error or a complete terminates the sequence
if (!errored) sink.complete();
}); // resolves on release or normal closure

return () => {
// allow disposing only if not already completed
if (!completed) releaser();
// dispose only of active subscriptions
if (!done) releaser();
};
},
async dispose() {
Expand Down
23 changes: 23 additions & 0 deletions src/tests/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,29 @@ it('should not send the complete message if the socket is not open', async () =>
await sub.waitForComplete();
});

it('should not call complete after subscription error', async () => {
const { url } = await startTServer();

const client = createClient({
url,
lazy: true,
retryAttempts: 0,
});

// invalid subscription
const sub = tsubscribe(client, {
query: '{ iDontExist }',
});

// report error
await sub.waitForError();

// but not complete
await sub.waitForComplete(() => {
fail("shouldn't have completed");
}, 20);
});

describe('query operation', () => {
it('should execute the query, "next" the result and then complete', async () => {
const { url } = await startTServer();
Expand Down

0 comments on commit 1fba419

Please sign in to comment.