Skip to content

Commit

Permalink
Handle chunked and no-space messages in SSE (#5724)
Browse files Browse the repository at this point in the history
  • Loading branch information
enisdenjo committed Nov 27, 2023
1 parent 6fd5e19 commit cfbd2e0
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 13 deletions.
5 changes: 5 additions & 0 deletions .changeset/mean-jobs-tickle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@graphql-tools/executor-http': patch
---

Handle chunked and no-space messages in SSE
38 changes: 25 additions & 13 deletions packages/executors/http/src/handleAsyncIterable.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
/* eslint-disable no-labels */
import { TextDecoder } from '@whatwg-node/fetch';

const DELIM = '\n\n';

export async function* handleAsyncIterable(asyncIterable: AsyncIterable<Uint8Array | string>) {
const textDecoder = new TextDecoder();
let currChunk = '';
outer: for await (const chunk of asyncIterable) {
const chunkStr =
typeof chunk === 'string' ? chunk : textDecoder.decode(chunk, { stream: true });
for (const part of chunkStr.split('\n\n')) {
if (part) {
const eventStr = part.split('event: ')[1];
const dataStr = part.split('data: ')[1];
if (eventStr === 'complete') {
break outer;
}
if (dataStr) {
const data = JSON.parse(dataStr);
yield data.payload || data;
}
currChunk += typeof chunk === 'string' ? chunk : textDecoder.decode(chunk);
for (;;) {
const delimIndex = currChunk.indexOf(DELIM);
if (delimIndex === -1) {
// incomplete message, wait for more chunks
continue outer;
}

const msg = currChunk.slice(0, delimIndex); // whole message
currChunk = currChunk.slice(delimIndex + DELIM.length); // remainder

// data
const dataStr = msg.split('data:')[1]?.trim();
if (dataStr) {
const data = JSON.parse(dataStr);
yield data.payload || data;
}

// event
const event = msg.split('event:')[1]?.trim();
if (event === 'complete') {
break outer;
}
}
}
Expand Down
85 changes: 85 additions & 0 deletions packages/executors/http/tests/handleEventStreamResponse.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,89 @@ describe('handleEventStreamResponse', () => {
},
});
});

it('should handle an event without spaces', async () => {
const readableStream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(encoder.encode('event:complete\n'));
controller.enqueue(encoder.encode('data:{"foo":"bar"}\n'));
controller.enqueue(encoder.encode('\n'));
},
});

const response = new Response(readableStream);
const asyncIterable = handleEventStreamResponse(response);
const iterator = asyncIterable[Symbol.asyncIterator]();

await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": {
"foo": "bar",
},
}
`);
});

it('should handle a chunked event with data', async () => {
let currChunk = 0;
const chunks = [
'event: next\n',
'data: { "foo":',
'"bar" }\n\n',
'event: next',
'\ndata: { "foo": "baz" }\n',
'\nevent: next\ndata: { "foo": "',
'bay"',
' }\n',
'\n',
];

const readableStream = new ReadableStream<Uint8Array>({
async pull(controller) {
const chunk = chunks[currChunk++];
if (chunk) {
await new Promise(resolve => setTimeout(resolve, 0)); // stream chunk after one tick
controller.enqueue(encoder.encode(chunk));
} else {
controller.close();
}
},
});

const response = new Response(readableStream);
const asyncIterable = handleEventStreamResponse(response);
const iterator = asyncIterable[Symbol.asyncIterator]();

await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": {
"foo": "bar",
},
}
`);
await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": {
"foo": "baz",
},
}
`);
await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": {
"foo": "bay",
},
}
`);
await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": true,
"value": undefined,
}
`);
});
});

0 comments on commit cfbd2e0

Please sign in to comment.