Skip to content

Commit

Permalink
defer stream cancelation
Browse files Browse the repository at this point in the history
  • Loading branch information
n1ru4l committed Mar 28, 2024
1 parent ce185de commit 3e8d114
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 58 deletions.
10 changes: 0 additions & 10 deletions examples/defer-stream/__integration-tests__/defer-stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { yoga } from '../src/yoga';

describe('Defer / Stream', () => {
it('stream', async () => {
const start = Date.now();
const response = await yoga.fetch('/graphql', {
method: 'POST',
headers: {
Expand All @@ -21,14 +20,9 @@ describe('Defer / Stream', () => {
const contentType = response.headers.get('Content-Type');
expect(contentType).toEqual('multipart/mixed; boundary="-"');
const responseText = await response.text();
const end = Date.now();
expect(responseText).toMatchSnapshot('stream');
const diff = end - start;
expect(diff).toBeLessThan(2650);
expect(diff > 2550).toBeTruthy();
});
it('defer', async () => {
const start = Date.now();
const response = await yoga.fetch('/graphql', {
method: 'POST',
headers: {
Expand All @@ -50,10 +44,6 @@ describe('Defer / Stream', () => {
const contentType = response.headers.get('Content-Type');
expect(contentType).toEqual('multipart/mixed; boundary="-"');
const responseText = await response.text();
const end = Date.now();
expect(responseText).toMatchSnapshot('defer');
const diff = end - start;
expect(diff).toBeLessThan(1600);
expect(diff > 1450).toBeTruthy();
});
});
233 changes: 225 additions & 8 deletions packages/graphql-yoga/__tests__/request-cancellation.spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,47 @@
import { createSchema, createYoga } from '../src/index';
import { useDeferStream } from '@graphql-yoga/plugin-defer-stream';
import { createLogger, createSchema, createYoga, FetchAPI } from '../src/index';

describe('request cancellation', () => {
it('request cancellation stops invocation of subsequent resolvers', async () => {
const variants: Array<[name: string, fetchAPI: undefined | FetchAPI]> = [
['Ponyfilled WhatWG Fetch', undefined],
];

const [major] = globalThis?.process?.versions?.node.split('.') ?? [];

if (major === '21' && process.env.LEAKS_TEST !== 'true') {
variants.push([
'Node.js 21',
{
fetch: globalThis.fetch,
Blob: globalThis.Blob,
btoa: globalThis.btoa,
FormData: globalThis.FormData,
Headers: globalThis.Headers,
Request: globalThis.Request,
crypto: globalThis.crypto,
File: globalThis.File,
ReadableStream: globalThis.ReadableStream,
// @ts-expect-error json function signature
Response: globalThis.Response,
TextDecoder: globalThis.TextDecoder,
TextEncoder: globalThis.TextEncoder,
URL: globalThis.URL,
TransformStream: globalThis.TransformStream,
// URLPattern: globalThis.URLPattern,
URLSearchParams: globalThis.URLSearchParams,
WritableStream: globalThis.WritableStream,
},
]);
}

function waitAFewMillisecondsToMakeSureGraphQLExecutionIsNotResumingInBackground() {
return new Promise(res => setTimeout(res, 5));
}

describe.each(variants)('request cancellation (%s)', (_, fetchAPI) => {
it('request cancellation stops invocation of subsequent resolvers (GraphQL over HTTP)', async () => {
const rootResolverGotInvokedD = createDeferred();
const requestGotCancelledD = createDeferred();
let aResolverGotInvoked = false;
let rootResolverGotInvoked = false;
const schema = createSchema({
typeDefs: /* GraphQL */ `
type Query {
Expand All @@ -18,7 +54,6 @@ describe('request cancellation', () => {
resolvers: {
Query: {
async root() {
rootResolverGotInvoked = true;
rootResolverGotInvokedD.resolve();
await requestGotCancelledD.promise;
return { a: 'a' };
Expand All @@ -32,7 +67,10 @@ describe('request cancellation', () => {
},
},
});
const yoga = createYoga({ schema });
const logger = createLogger('silent');
const debugLogs = jest.fn();
logger.debug = debugLogs;
const yoga = createYoga({ schema, fetchAPI, logging: logger });
const abortController = new AbortController();
const promise = Promise.resolve(
yoga.fetch('http://yoga/graphql', {
Expand All @@ -48,9 +86,188 @@ describe('request cancellation', () => {
abortController.abort();
requestGotCancelledD.resolve();
await expect(promise).rejects.toThrow('This operation was aborted');
expect(rootResolverGotInvoked).toBe(true);
await waitAFewMillisecondsToMakeSureGraphQLExecutionIsNotResumingInBackground();
expect(aResolverGotInvoked).toBe(false);
await requestGotCancelledD.promise;
expect(debugLogs.mock.calls).toEqual([
['Parsing request to extract GraphQL parameters'],
['Processing GraphQL Parameters'],
['Request aborted'],
]);
});

it('request cancellation stops invocation of subsequent resolvers (GraphQL over SSE with Subscription)', async () => {
const rootResolverGotInvokedD = createDeferred();
const requestGotCancelledD = createDeferred();
let aResolverGotInvoked = false;
const schema = createSchema({
typeDefs: /* GraphQL */ `
type Query {
root: A!
}
type Subscription {
root: A!
}
type A {
a: String!
}
`,
resolvers: {
Subscription: {
root: {
async *subscribe() {
yield 1;
},
async resolve() {
rootResolverGotInvokedD.resolve();
await requestGotCancelledD.promise;
return { a: 'a' };
},
},
},
A: {
a() {
aResolverGotInvoked = true;
return 'a';
},
},
},
});
const logger = createLogger('silent');
const debugLogs = jest.fn();
logger.debug = debugLogs;
const yoga = createYoga({ schema, fetchAPI, logging: logger });
const abortController = new AbortController();
const response = await yoga.fetch('http://yoga/graphql', {
method: 'POST',
body: JSON.stringify({ query: 'subscription { root { a } }' }),
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream',
},
signal: abortController.signal,
});
expect(response.status).toBe(200);
const iterator = response.body![Symbol.asyncIterator]();
// first we will always get a ping/keep alive for flushed headers
const next = await iterator.next();
expect(Buffer.from(next.value).toString('utf-8')).toMatchInlineSnapshot(`
":
"
`);

await rootResolverGotInvokedD.promise;
const next$ = iterator.next().then(({ done, value }) => {
// in case it resolves, parse the buffer to string for easier debugging.
return { done, value: Buffer.from(value).toString('utf-8') };
});

abortController.abort();
requestGotCancelledD.resolve();

await expect(next$).rejects.toThrow('This operation was aborted');
await waitAFewMillisecondsToMakeSureGraphQLExecutionIsNotResumingInBackground();
expect(aResolverGotInvoked).toBe(false);

expect(debugLogs.mock.calls).toEqual([
['Parsing request to extract GraphQL parameters'],
['Processing GraphQL Parameters'],
['Processing GraphQL Parameters done.'],
['Request aborted'],
]);
});

it('request cancellation stops invocation of subsequent resolvers (GraphQL over Multipart with defer/stream)', async () => {
const aResolverGotInvokedD = createDeferred();
const requestGotCancelledD = createDeferred();
let bResolverGotInvoked = false;
const schema = createSchema({
typeDefs: /* GraphQL */ `
type Query {
root: A!
}
type A {
a: B!
}
type B {
b: String
}
`,
resolvers: {
Query: {
async root() {
return { a: 'a' };
},
},
A: {
async a() {
aResolverGotInvokedD.resolve();
await requestGotCancelledD.promise;
return { b: 'b' };
},
},
B: {
b: obj => {
bResolverGotInvoked = true;
return obj.b;
},
},
},
});
const logger = createLogger('silent');
const debugLogs = jest.fn();
logger.debug = debugLogs;
const yoga = createYoga({ schema, plugins: [useDeferStream()], fetchAPI, logging: logger });

const abortController = new AbortController();
const response = await yoga.fetch('http://yoga/graphql', {
method: 'POST',
body: JSON.stringify({
query: /* GraphQL */ `
query {
root {
... @defer {
a {
b
}
}
}
}
`,
}),
headers: {
'content-type': 'application/json',
accept: 'multipart/mixed',
},
signal: abortController.signal,
});
expect(response.status).toEqual(200);
const iterator = response.body![Symbol.asyncIterator]();
let payload = '';

// Shitty wait condition, but it works lol
while (payload.split('\r\n').length < 6 || !payload.endsWith('---')) {
const next = await iterator.next();
payload += Buffer.from(next.value).toString('utf-8');
}

const next$ = iterator.next().then(({ done, value }) => {
// in case it resolves, parse the buffer to string for easier debugging.
return { done, value: Buffer.from(value).toString('utf-8') };
});

await aResolverGotInvokedD.promise;
abortController.abort();
requestGotCancelledD.resolve();
await expect(next$).rejects.toThrow('This operation was aborted');
await waitAFewMillisecondsToMakeSureGraphQLExecutionIsNotResumingInBackground();
expect(bResolverGotInvoked).toBe(false);
expect(debugLogs.mock.calls).toEqual([
['Parsing request to extract GraphQL parameters'],
['Processing GraphQL Parameters'],
['Processing GraphQL Parameters done.'],
['Request aborted'],
]);
});
});

Expand Down
2 changes: 1 addition & 1 deletion packages/graphql-yoga/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
},
"dependencies": {
"@envelop/core": "^5.0.0",
"@graphql-tools/executor": "^1.2.3",
"@graphql-tools/executor": "^1.2.5",
"@graphql-tools/schema": "^10.0.0",
"@graphql-tools/utils": "^10.1.0",
"@graphql-yoga/logger": "^2.0.0",
Expand Down
13 changes: 9 additions & 4 deletions packages/graphql-yoga/src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ export function isOriginalGraphQLError(
return false;
}

export function isAbortError(error: unknown): error is DOMException {
return (
typeof error === 'object' &&
error?.constructor?.name === 'DOMException' &&
(error as Record<string, unknown>).name === 'AbortError'
);
}

export function handleError(
error: unknown,
maskedErrorsOpts: YogaMaskedErrorOpts | null,
Expand All @@ -50,10 +58,7 @@ export function handleError(
errors.add(handledError);
}
}
} else if (
error?.constructor?.name === 'DOMException' &&
(error as Record<string, unknown>).name === 'AbortError'
) {
} else if (isAbortError(error)) {
logger.debug('Request aborted');
} else if (maskedErrorsOpts) {
const maskedError = maskedErrorsOpts.maskError(
Expand Down
39 changes: 22 additions & 17 deletions packages/graphql-yoga/src/plugins/result-processor/multipart.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,33 @@ export function processMultipartResult(result: ResultProcessorInput, fetchAPI: F
controller.enqueue(textEncoder.encode(`---`));
},
async pull(controller) {
const { done, value } = await iterator.next();
if (value != null) {
controller.enqueue(textEncoder.encode('\r\n'));
try {
const { done, value } = await iterator.next();
if (value != null) {
controller.enqueue(textEncoder.encode('\r\n'));

controller.enqueue(textEncoder.encode('Content-Type: application/json; charset=utf-8'));
controller.enqueue(textEncoder.encode('\r\n'));
controller.enqueue(textEncoder.encode('Content-Type: application/json; charset=utf-8'));
controller.enqueue(textEncoder.encode('\r\n'));

const chunk = jsonStringifyResultWithoutInternals(value);
const encodedChunk = textEncoder.encode(chunk);
const chunk = jsonStringifyResultWithoutInternals(value);
const encodedChunk = textEncoder.encode(chunk);

controller.enqueue(textEncoder.encode('Content-Length: ' + encodedChunk.byteLength));
controller.enqueue(textEncoder.encode('\r\n'));
controller.enqueue(textEncoder.encode('Content-Length: ' + encodedChunk.byteLength));
controller.enqueue(textEncoder.encode('\r\n'));

controller.enqueue(textEncoder.encode('\r\n'));
controller.enqueue(encodedChunk);
controller.enqueue(textEncoder.encode('\r\n'));
controller.enqueue(textEncoder.encode('\r\n'));
controller.enqueue(encodedChunk);
controller.enqueue(textEncoder.encode('\r\n'));

controller.enqueue(textEncoder.encode('---'));
}
if (done) {
controller.enqueue(textEncoder.encode('--\r\n'));
controller.close();
controller.enqueue(textEncoder.encode('---'));
}

if (done) {
controller.enqueue(textEncoder.encode('--\r\n'));
controller.close();
}
} catch (err) {
controller.error(err);
}
},
async cancel(e) {
Expand Down

0 comments on commit 3e8d114

Please sign in to comment.