From 3e8d114edc1dfaf402ebddc538f530b1e888d345 Mon Sep 17 00:00:00 2001 From: Laurin Quast Date: Thu, 28 Mar 2024 16:26:30 +0100 Subject: [PATCH] defer stream cancelation --- .../defer-stream.spec.ts | 10 - .../__tests__/request-cancellation.spec.ts | 233 +++++++++++++++++- packages/graphql-yoga/package.json | 2 +- packages/graphql-yoga/src/error.ts | 13 +- .../src/plugins/result-processor/multipart.ts | 39 +-- .../src/plugins/result-processor/sse.ts | 27 +- packages/graphql-yoga/src/server.ts | 19 +- pnpm-lock.yaml | 40 ++- 8 files changed, 325 insertions(+), 58 deletions(-) diff --git a/examples/defer-stream/__integration-tests__/defer-stream.spec.ts b/examples/defer-stream/__integration-tests__/defer-stream.spec.ts index 04ab36e404..2690a03445 100644 --- a/examples/defer-stream/__integration-tests__/defer-stream.spec.ts +++ b/examples/defer-stream/__integration-tests__/defer-stream.spec.ts @@ -2,7 +2,6 @@ import { yoga } from '../src/yoga'; describe('Defer / Stream', () => { it('stream', async () => { - const start = Date.now(); const response = await yoga.fetch('/graphql', { method: 'POST', headers: { @@ -21,14 +20,9 @@ describe('Defer / Stream', () => { const contentType = response.headers.get('Content-Type'); expect(contentType).toEqual('multipart/mixed; boundary="-"'); const responseText = await response.text(); - const end = Date.now(); expect(responseText).toMatchSnapshot('stream'); - const diff = end - start; - expect(diff).toBeLessThan(2650); - expect(diff > 2550).toBeTruthy(); }); it('defer', async () => { - const start = Date.now(); const response = await yoga.fetch('/graphql', { method: 'POST', headers: { @@ -50,10 +44,6 @@ describe('Defer / Stream', () => { const contentType = response.headers.get('Content-Type'); expect(contentType).toEqual('multipart/mixed; boundary="-"'); const responseText = await response.text(); - const end = Date.now(); expect(responseText).toMatchSnapshot('defer'); - const diff = end - start; - expect(diff).toBeLessThan(1600); - expect(diff > 1450).toBeTruthy(); }); }); diff --git a/packages/graphql-yoga/__tests__/request-cancellation.spec.ts b/packages/graphql-yoga/__tests__/request-cancellation.spec.ts index 56cf812cd1..674f93bd01 100644 --- a/packages/graphql-yoga/__tests__/request-cancellation.spec.ts +++ b/packages/graphql-yoga/__tests__/request-cancellation.spec.ts @@ -1,11 +1,47 @@ -import { createSchema, createYoga } from '../src/index'; +import { useDeferStream } from '@graphql-yoga/plugin-defer-stream'; +import { createLogger, createSchema, createYoga, FetchAPI } from '../src/index'; -describe('request cancellation', () => { - it('request cancellation stops invocation of subsequent resolvers', async () => { +const variants: Array<[name: string, fetchAPI: undefined | FetchAPI]> = [ + ['Ponyfilled WhatWG Fetch', undefined], +]; + +const [major] = globalThis?.process?.versions?.node.split('.') ?? []; + +if (major === '21' && process.env.LEAKS_TEST !== 'true') { + variants.push([ + 'Node.js 21', + { + fetch: globalThis.fetch, + Blob: globalThis.Blob, + btoa: globalThis.btoa, + FormData: globalThis.FormData, + Headers: globalThis.Headers, + Request: globalThis.Request, + crypto: globalThis.crypto, + File: globalThis.File, + ReadableStream: globalThis.ReadableStream, + // @ts-expect-error json function signature + Response: globalThis.Response, + TextDecoder: globalThis.TextDecoder, + TextEncoder: globalThis.TextEncoder, + URL: globalThis.URL, + TransformStream: globalThis.TransformStream, + // URLPattern: globalThis.URLPattern, + URLSearchParams: globalThis.URLSearchParams, + WritableStream: globalThis.WritableStream, + }, + ]); +} + +function waitAFewMillisecondsToMakeSureGraphQLExecutionIsNotResumingInBackground() { + return new Promise(res => setTimeout(res, 5)); +} + +describe.each(variants)('request cancellation (%s)', (_, fetchAPI) => { + it('request cancellation stops invocation of subsequent resolvers (GraphQL over HTTP)', async () => { const rootResolverGotInvokedD = createDeferred(); const requestGotCancelledD = createDeferred(); let aResolverGotInvoked = false; - let rootResolverGotInvoked = false; const schema = createSchema({ typeDefs: /* GraphQL */ ` type Query { @@ -18,7 +54,6 @@ describe('request cancellation', () => { resolvers: { Query: { async root() { - rootResolverGotInvoked = true; rootResolverGotInvokedD.resolve(); await requestGotCancelledD.promise; return { a: 'a' }; @@ -32,7 +67,10 @@ describe('request cancellation', () => { }, }, }); - const yoga = createYoga({ schema }); + const logger = createLogger('silent'); + const debugLogs = jest.fn(); + logger.debug = debugLogs; + const yoga = createYoga({ schema, fetchAPI, logging: logger }); const abortController = new AbortController(); const promise = Promise.resolve( yoga.fetch('http://yoga/graphql', { @@ -48,9 +86,188 @@ describe('request cancellation', () => { abortController.abort(); requestGotCancelledD.resolve(); await expect(promise).rejects.toThrow('This operation was aborted'); - expect(rootResolverGotInvoked).toBe(true); + await waitAFewMillisecondsToMakeSureGraphQLExecutionIsNotResumingInBackground(); expect(aResolverGotInvoked).toBe(false); - await requestGotCancelledD.promise; + expect(debugLogs.mock.calls).toEqual([ + ['Parsing request to extract GraphQL parameters'], + ['Processing GraphQL Parameters'], + ['Request aborted'], + ]); + }); + + it('request cancellation stops invocation of subsequent resolvers (GraphQL over SSE with Subscription)', async () => { + const rootResolverGotInvokedD = createDeferred(); + const requestGotCancelledD = createDeferred(); + let aResolverGotInvoked = false; + const schema = createSchema({ + typeDefs: /* GraphQL */ ` + type Query { + root: A! + } + type Subscription { + root: A! + } + type A { + a: String! + } + `, + resolvers: { + Subscription: { + root: { + async *subscribe() { + yield 1; + }, + async resolve() { + rootResolverGotInvokedD.resolve(); + await requestGotCancelledD.promise; + return { a: 'a' }; + }, + }, + }, + A: { + a() { + aResolverGotInvoked = true; + return 'a'; + }, + }, + }, + }); + const logger = createLogger('silent'); + const debugLogs = jest.fn(); + logger.debug = debugLogs; + const yoga = createYoga({ schema, fetchAPI, logging: logger }); + const abortController = new AbortController(); + const response = await yoga.fetch('http://yoga/graphql', { + method: 'POST', + body: JSON.stringify({ query: 'subscription { root { a } }' }), + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream', + }, + signal: abortController.signal, + }); + expect(response.status).toBe(200); + const iterator = response.body![Symbol.asyncIterator](); + // first we will always get a ping/keep alive for flushed headers + const next = await iterator.next(); + expect(Buffer.from(next.value).toString('utf-8')).toMatchInlineSnapshot(` +": + +" +`); + + await rootResolverGotInvokedD.promise; + const next$ = iterator.next().then(({ done, value }) => { + // in case it resolves, parse the buffer to string for easier debugging. + return { done, value: Buffer.from(value).toString('utf-8') }; + }); + + abortController.abort(); + requestGotCancelledD.resolve(); + + await expect(next$).rejects.toThrow('This operation was aborted'); + await waitAFewMillisecondsToMakeSureGraphQLExecutionIsNotResumingInBackground(); + expect(aResolverGotInvoked).toBe(false); + + expect(debugLogs.mock.calls).toEqual([ + ['Parsing request to extract GraphQL parameters'], + ['Processing GraphQL Parameters'], + ['Processing GraphQL Parameters done.'], + ['Request aborted'], + ]); + }); + + it('request cancellation stops invocation of subsequent resolvers (GraphQL over Multipart with defer/stream)', async () => { + const aResolverGotInvokedD = createDeferred(); + const requestGotCancelledD = createDeferred(); + let bResolverGotInvoked = false; + const schema = createSchema({ + typeDefs: /* GraphQL */ ` + type Query { + root: A! + } + type A { + a: B! + } + type B { + b: String + } + `, + resolvers: { + Query: { + async root() { + return { a: 'a' }; + }, + }, + A: { + async a() { + aResolverGotInvokedD.resolve(); + await requestGotCancelledD.promise; + return { b: 'b' }; + }, + }, + B: { + b: obj => { + bResolverGotInvoked = true; + return obj.b; + }, + }, + }, + }); + const logger = createLogger('silent'); + const debugLogs = jest.fn(); + logger.debug = debugLogs; + const yoga = createYoga({ schema, plugins: [useDeferStream()], fetchAPI, logging: logger }); + + const abortController = new AbortController(); + const response = await yoga.fetch('http://yoga/graphql', { + method: 'POST', + body: JSON.stringify({ + query: /* GraphQL */ ` + query { + root { + ... @defer { + a { + b + } + } + } + } + `, + }), + headers: { + 'content-type': 'application/json', + accept: 'multipart/mixed', + }, + signal: abortController.signal, + }); + expect(response.status).toEqual(200); + const iterator = response.body![Symbol.asyncIterator](); + let payload = ''; + + // Shitty wait condition, but it works lol + while (payload.split('\r\n').length < 6 || !payload.endsWith('---')) { + const next = await iterator.next(); + payload += Buffer.from(next.value).toString('utf-8'); + } + + const next$ = iterator.next().then(({ done, value }) => { + // in case it resolves, parse the buffer to string for easier debugging. + return { done, value: Buffer.from(value).toString('utf-8') }; + }); + + await aResolverGotInvokedD.promise; + abortController.abort(); + requestGotCancelledD.resolve(); + await expect(next$).rejects.toThrow('This operation was aborted'); + await waitAFewMillisecondsToMakeSureGraphQLExecutionIsNotResumingInBackground(); + expect(bResolverGotInvoked).toBe(false); + expect(debugLogs.mock.calls).toEqual([ + ['Parsing request to extract GraphQL parameters'], + ['Processing GraphQL Parameters'], + ['Processing GraphQL Parameters done.'], + ['Request aborted'], + ]); }); }); diff --git a/packages/graphql-yoga/package.json b/packages/graphql-yoga/package.json index 0e6b2f36dc..4fbc98f77b 100644 --- a/packages/graphql-yoga/package.json +++ b/packages/graphql-yoga/package.json @@ -50,7 +50,7 @@ }, "dependencies": { "@envelop/core": "^5.0.0", - "@graphql-tools/executor": "^1.2.3", + "@graphql-tools/executor": "^1.2.5", "@graphql-tools/schema": "^10.0.0", "@graphql-tools/utils": "^10.1.0", "@graphql-yoga/logger": "^2.0.0", diff --git a/packages/graphql-yoga/src/error.ts b/packages/graphql-yoga/src/error.ts index e090473b08..32a3be6101 100644 --- a/packages/graphql-yoga/src/error.ts +++ b/packages/graphql-yoga/src/error.ts @@ -37,6 +37,14 @@ export function isOriginalGraphQLError( return false; } +export function isAbortError(error: unknown): error is DOMException { + return ( + typeof error === 'object' && + error?.constructor?.name === 'DOMException' && + (error as Record).name === 'AbortError' + ); +} + export function handleError( error: unknown, maskedErrorsOpts: YogaMaskedErrorOpts | null, @@ -50,10 +58,7 @@ export function handleError( errors.add(handledError); } } - } else if ( - error?.constructor?.name === 'DOMException' && - (error as Record).name === 'AbortError' - ) { + } else if (isAbortError(error)) { logger.debug('Request aborted'); } else if (maskedErrorsOpts) { const maskedError = maskedErrorsOpts.maskError( diff --git a/packages/graphql-yoga/src/plugins/result-processor/multipart.ts b/packages/graphql-yoga/src/plugins/result-processor/multipart.ts index f4914b6e5e..3ceed18aa5 100644 --- a/packages/graphql-yoga/src/plugins/result-processor/multipart.ts +++ b/packages/graphql-yoga/src/plugins/result-processor/multipart.ts @@ -37,28 +37,33 @@ export function processMultipartResult(result: ResultProcessorInput, fetchAPI: F controller.enqueue(textEncoder.encode(`---`)); }, async pull(controller) { - const { done, value } = await iterator.next(); - if (value != null) { - controller.enqueue(textEncoder.encode('\r\n')); + try { + const { done, value } = await iterator.next(); + if (value != null) { + controller.enqueue(textEncoder.encode('\r\n')); - controller.enqueue(textEncoder.encode('Content-Type: application/json; charset=utf-8')); - controller.enqueue(textEncoder.encode('\r\n')); + controller.enqueue(textEncoder.encode('Content-Type: application/json; charset=utf-8')); + controller.enqueue(textEncoder.encode('\r\n')); - const chunk = jsonStringifyResultWithoutInternals(value); - const encodedChunk = textEncoder.encode(chunk); + const chunk = jsonStringifyResultWithoutInternals(value); + const encodedChunk = textEncoder.encode(chunk); - controller.enqueue(textEncoder.encode('Content-Length: ' + encodedChunk.byteLength)); - controller.enqueue(textEncoder.encode('\r\n')); + controller.enqueue(textEncoder.encode('Content-Length: ' + encodedChunk.byteLength)); + controller.enqueue(textEncoder.encode('\r\n')); - controller.enqueue(textEncoder.encode('\r\n')); - controller.enqueue(encodedChunk); - controller.enqueue(textEncoder.encode('\r\n')); + controller.enqueue(textEncoder.encode('\r\n')); + controller.enqueue(encodedChunk); + controller.enqueue(textEncoder.encode('\r\n')); - controller.enqueue(textEncoder.encode('---')); - } - if (done) { - controller.enqueue(textEncoder.encode('--\r\n')); - controller.close(); + controller.enqueue(textEncoder.encode('---')); + } + + if (done) { + controller.enqueue(textEncoder.encode('--\r\n')); + controller.close(); + } + } catch (err) { + controller.error(err); } }, async cancel(e) { diff --git a/packages/graphql-yoga/src/plugins/result-processor/sse.ts b/packages/graphql-yoga/src/plugins/result-processor/sse.ts index 414e8ee842..b19ba3be02 100644 --- a/packages/graphql-yoga/src/plugins/result-processor/sse.ts +++ b/packages/graphql-yoga/src/plugins/result-processor/sse.ts @@ -58,17 +58,22 @@ export function getSSEProcessor(): ResultProcessor { } }, async pull(controller) { - const { done, value } = await iterator.next(); - if (value != null) { - controller.enqueue(textEncoder.encode(`event: next\n`)); - const chunk = jsonStringifyResultWithoutInternals(value); - controller.enqueue(textEncoder.encode(`data: ${chunk}\n\n`)); - } - if (done) { - controller.enqueue(textEncoder.encode(`event: complete\n`)); - controller.enqueue(textEncoder.encode(`data:\n\n`)); - clearInterval(pingInterval); - controller.close(); + try { + const result = await iterator.next(); + + if (result.value != null) { + controller.enqueue(textEncoder.encode(`event: next\n`)); + const chunk = jsonStringifyResultWithoutInternals(result.value); + controller.enqueue(textEncoder.encode(`data: ${chunk}\n\n`)); + } + if (result.done) { + controller.enqueue(textEncoder.encode(`event: complete\n`)); + controller.enqueue(textEncoder.encode(`data:\n\n`)); + clearInterval(pingInterval); + controller.close(); + } + } catch (err) { + controller.error(err); } }, async cancel(e) { diff --git a/packages/graphql-yoga/src/server.ts b/packages/graphql-yoga/src/server.ts index 9a98974f03..0aadf352ff 100644 --- a/packages/graphql-yoga/src/server.ts +++ b/packages/graphql-yoga/src/server.ts @@ -373,7 +373,20 @@ export class YogaServer< // We make sure that the user doesn't send a mutation with GET // @ts-expect-error Add plugins has context but this hook doesn't care addPlugin(usePreventMutationViaGET()); + if (maskedErrors) { + // Make sure we always throw AbortError instead of masking it! + addPlugin({ + onSubscribe() { + return { + onSubscribeError({ error }) { + if ((error as any)?.name === 'AbortError') { + throw error; + } + }, + }; + }, + }); addPlugin(useMaskedErrors(maskedErrors)); } addPlugin( @@ -468,7 +481,6 @@ export class YogaServer< const enveloped = this.getEnveloped(initialContext); this.logger.debug(`Processing GraphQL Parameters`); - result = await processGraphQLParams({ params, enveloped, @@ -485,6 +497,11 @@ export class YogaServer< iterator, v => v, (err: Error) => { + if (err.name === 'AbortError') { + this.logger.debug(`Request aborted`); + throw err; + } + const errors = handleError(err, this.maskedErrorsOpts, this.logger); return { errors, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3f47c00540..241d46b090 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1545,7 +1545,7 @@ importers: version: 0.8.4(graphql@16.6.0) '@graphql-tools/url-loader': specifier: 8.0.1 - version: 8.0.1(@types/node@18.16.16)(graphql@16.6.0) + version: 8.0.1(graphql@16.6.0) graphiql: specifier: 2.0.7 version: 2.0.7(@codemirror/language@6.0.0)(@types/react@18.2.8)(graphql@16.6.0)(react-dom@18.2.0)(react-is@17.0.2)(react@18.2.0) @@ -1584,8 +1584,8 @@ importers: specifier: 4.0.0 version: 4.0.0 '@graphql-tools/executor': - specifier: ^1.2.3 - version: 1.2.3(graphql@16.6.0) + specifier: ^1.2.5 + version: 1.2.5(graphql@16.6.0) '@graphql-tools/schema': specifier: ^10.0.0 version: 10.0.0(graphql@16.6.0) @@ -7811,7 +7811,7 @@ packages: graphql: ^14.0.0 || ^15.0.0 || ^16.0.0 || ^17.0.0 dependencies: '@graphql-tools/batch-execute': 9.0.0(graphql@16.6.0) - '@graphql-tools/executor': 1.2.3(graphql@16.6.0) + '@graphql-tools/executor': 1.2.5(graphql@16.6.0) '@graphql-tools/schema': 10.0.0(graphql@16.6.0) '@graphql-tools/utils': 10.1.1(graphql@16.6.0) dataloader: 2.2.2 @@ -7943,8 +7943,8 @@ packages: value-or-promise: 1.0.12 dev: false - /@graphql-tools/executor@1.2.3(graphql@16.6.0): - resolution: {integrity: sha512-aAS+TGjSq8BJuDq1RV/A/8E53Iu3KvaWpD8DPio0Qe/0YF26tdpK6EcmNSGrrjiZOwVIZ80wclZrFstHzCPm8A==} + /@graphql-tools/executor@1.2.5(graphql@16.6.0): + resolution: {integrity: sha512-s7sW4K3BUNsk9sjq+vNicwb9KwcR3G55uS/CI8KZQ4x0ZdeYMIwpeU9MVeORCCpHuQyTaV+/VnO0hFrS/ygzsg==} engines: {node: '>=16.0.0'} peerDependencies: graphql: ^14.0.0 || ^15.0.0 || ^16.0.0 || ^17.0.0 @@ -8229,6 +8229,33 @@ packages: dev: true /@graphql-tools/url-loader@8.0.1(@types/node@18.16.16)(graphql@16.6.0): + resolution: {integrity: sha512-B2k8KQEkEQmfV1zhurT5GLoXo8jbXP+YQHUayhCSxKYlRV7j/1Fhp1b21PDM8LXIDGlDRXaZ0FbWKOs7eYXDuQ==} + engines: {node: '>=16.0.0'} + peerDependencies: + graphql: ^14.0.0 || ^15.0.0 || ^16.0.0 || ^17.0.0 + dependencies: + '@ardatan/sync-fetch': 0.0.1 + '@graphql-tools/delegate': 10.0.0(graphql@16.6.0) + '@graphql-tools/executor-graphql-ws': 1.0.0(graphql@16.6.0) + '@graphql-tools/executor-http': 1.0.5(@types/node@18.16.16)(graphql@16.6.0) + '@graphql-tools/executor-legacy-ws': 1.0.0(graphql@16.6.0) + '@graphql-tools/utils': 10.1.1(graphql@16.6.0) + '@graphql-tools/wrap': 10.0.0(graphql@16.6.0) + '@types/ws': 8.5.4 + '@whatwg-node/fetch': 0.9.17 + graphql: 16.6.0 + isomorphic-ws: 5.0.0(ws@8.13.0) + tslib: 2.6.2 + value-or-promise: 1.0.12 + ws: 8.13.0 + transitivePeerDependencies: + - '@types/node' + - bufferutil + - encoding + - utf-8-validate + dev: true + + /@graphql-tools/url-loader@8.0.1(graphql@16.6.0): resolution: {integrity: sha512-B2k8KQEkEQmfV1zhurT5GLoXo8jbXP+YQHUayhCSxKYlRV7j/1Fhp1b21PDM8LXIDGlDRXaZ0FbWKOs7eYXDuQ==} engines: {node: '>=16.0.0'} peerDependencies: @@ -8253,6 +8280,7 @@ packages: - bufferutil - encoding - utf-8-validate + dev: false /@graphql-tools/utils@10.0.0(graphql@16.6.0): resolution: {integrity: sha512-ndBPc6zgR+eGU/jHLpuojrs61kYN3Z89JyMLwK3GCRkPv4EQn9EOr1UWqF1JO0iM+/jAVHY0mvfUxyrFFN9DUQ==}