Skip to content

Commit

Permalink
fix(client): Leverage active streams for reliable network error retries
Browse files Browse the repository at this point in the history
  • Loading branch information
enisdenjo committed Jul 17, 2022
1 parent 28d3891 commit 607b468
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 16 deletions.
59 changes: 55 additions & 4 deletions src/__tests__/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,59 @@ describe('retries', () => {
});
});

// TODO: how to simulate network errors during emission? calling socket.destroy does nothing (see: https://github.com/enisdenjo/graphql-sse/issues/22)
it.todo(
'should retry network errors even if they occur during event emission',
);
it('should retry network errors even if they occur during event emission', async (done) => {
const server = await startTServer();

const client = createClient({
url: server.url,
fetchFn: fetch,
retryAttempts: 1,
retry: async () => {
client.dispose();
done();
},
});

const sub = tsubscribe(client, {
query: 'subscription { slowGreetings }',
});

await sub.waitForNext();

await server.dispose();
});

it('should not retry fatal errors occurring during event emission', async (done) => {
const server = await startTServer();

let msgsCount = 0;
const fatalErr = new Error('Boom, I am fatal');

const client = createClient({
url: server.url,
fetchFn: fetch,
retryAttempts: 1,
retry: async () => {
done(new Error("Shouldnt've retried"));
},
onMessage: () => {
// onMessage is in the middle of stream processing, throwing from it is considered fatal
msgsCount++;
if (msgsCount > 3) {
throw fatalErr;
}
},
});

const sub = tsubscribe(client, {
query: 'subscription { slowGreetings }',
});

await sub.waitForNext();

await sub.waitForError((err) => {
expect(err).toBe(fatalErr);
done();
});
});
});
35 changes: 23 additions & 12 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -695,14 +695,23 @@ async function connect<SingleConnection extends boolean>(
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
for await (const chunk of toAsyncIterator(res.body!)) {
if (typeof chunk === 'string')
throw new Error(`Unexpected string chunk "${chunk}"`);
throw (error = new Error(`Unexpected string chunk "${chunk}"`)); // set error as fatal indicator

// read chunk and if messages are ready, yield them
const msgs = parse(chunk);
let msgs;
try {
msgs = parse(chunk);
} catch (err) {
throw (error = err); // set error as fatal indicator
}
if (!msgs) continue;

for (const msg of msgs) {
onMessage?.(msg);
try {
onMessage?.(msg);
} catch (err) {
throw (error = err); // set error as fatal indicator
}

const operationId =
msg.data && 'id' in msg.data
Expand All @@ -726,7 +735,9 @@ async function connect<SingleConnection extends boolean>(
queue[operationId].push('complete');
break;
default:
throw new Error(`Unexpected message event "${msg.event}"`);
throw (error = new Error(
`Unexpected message event "${msg.event}"`,
)); // set error as fatal indicator
}

waiting[operationId]?.proceed();
Expand All @@ -736,16 +747,16 @@ async function connect<SingleConnection extends boolean>(
// some browsers (like Safari) closes the connection without errors even on abrupt server shutdowns,
// we therefore make sure that no stream is active and waiting for results (not completed)
if (Object.keys(waiting).length) {
throw new NetworkError('Connection closed while having active streams');
throw new Error('Connection closed while having active streams');
}
} catch (err) {
// non-network errors shouldn't ever have "network" or "stream" in the message, right?
// keyword "network" is for Chrome and keyword "stream" is for Firefox, Safari closes
// the connection and that is handled above by checking for active streams
error =
!(err instanceof NetworkError) && /network|stream/i.test(err)
? new NetworkError(err)
: err;
if (!error && Object.keys(waiting).length) {
// we assume the error is most likely a NetworkError because there are listeners waiting for events.
// additionally, the `error` is another indicator because we set it early if the error is considered fatal
error = new NetworkError(err);
} else {
error = err;
}
waitingForThrow?.(error);
} finally {
Object.values(waiting).forEach(({ proceed }) => proceed());
Expand Down

0 comments on commit 607b468

Please sign in to comment.