diff --git a/.changeset/large-ladybugs-breathe.md b/.changeset/large-ladybugs-breathe.md new file mode 100644 index 00000000000..7eb1d1d965a --- /dev/null +++ b/.changeset/large-ladybugs-breathe.md @@ -0,0 +1,5 @@ +--- +"@apollo/server": patch +--- + +Catch errors thrown by subscription generators, and gracefully clean up the subscription instead of crashing. diff --git a/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts b/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts index 65d380c2c11..ea12d27eeff 100644 --- a/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts +++ b/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts @@ -1102,6 +1102,62 @@ describe('SubscriptionCallbackPlugin', () => { `); }); + it('sends a `complete` with errors when a subscription throws an error', async () => { + const server = await startSubscriptionServer({ logger }); + + mockRouterCheckResponse(); + mockRouterCheckResponse(); + mockRouterCompleteResponse({ + errors: [{ message: "The subscription generator didn't catch this!" }], + }); + + const result = await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql + subscription { + throwsError + } + `, + extensions: { + subscription: { + callbackUrl: 'http://mock-router-url.com', + subscriptionId: '1234-cats', + verifier: 'my-verifier-token', + }, + }, + }, + }), + ); + expect(result.status).toEqual(200); + + jest.advanceTimersByTime(5000); + await server.stop(); + + expect(logger.orderOfOperations).toMatchInlineSnapshot(` + [ + "SubscriptionCallback[1234-cats]: Received new subscription request", + "SubscriptionManager[1234-cats]: Sending \`check\` request to router", + "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", + "SubscriptionCallback[1234-cats]: graphql-js subscription successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", + "SubscriptionCallback[1234-cats]: Responding to original subscription request", + "ERROR: SubscriptionManager[1234-cats]: Generator threw an error, terminating subscription: The subscription generator didn't catch this!", + "SubscriptionManager[1234-cats]: Sending \`complete\` request to router with errors", + "SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats", + "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", + "SubscriptionManager[1234-cats]: \`complete\` request successful", + "SubscriptionManager: Terminating subscriptions for ID: 1234-cats", + "SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com", + "SubscriptionManager: Heartbeat received response for ID: 1234-cats", + "SubscriptionManager: Heartbeat request successful, ID: 1234-cats", + "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + ] + `); + }); + (process.env.INCREMENTAL_DELIVERY_TESTS_ENABLED ? describe.skip : describe)( 'error handling', () => { @@ -1979,6 +2035,7 @@ async function startSubscriptionServer( type Subscription { count: Int terminatesSuccessfully: Boolean + throwsError: Int } `, resolvers: { @@ -2011,6 +2068,19 @@ async function startSubscriptionServer( }, }), }, + throwsError: { + subscribe: () => ({ + [Symbol.asyncIterator]() { + return { + next: () => { + throw new Error( + "The subscription generator didn't catch this!", + ); + }, + }; + }, + }), + }, }, }, ...opts, diff --git a/packages/server/src/plugin/subscriptionCallback/index.ts b/packages/server/src/plugin/subscriptionCallback/index.ts index 33258d751e0..159f9cb7c75 100644 --- a/packages/server/src/plugin/subscriptionCallback/index.ts +++ b/packages/server/src/plugin/subscriptionCallback/index.ts @@ -557,39 +557,48 @@ class SubscriptionManager { cancelled: false, async startConsumingSubscription() { self.logger?.debug(`Listening to graphql-js subscription`, id); - for await (const payload of subscription) { - if (this.cancelled) { - self.logger?.debug( - `Subscription already cancelled, ignoring current and future payloads`, - id, - ); - // It's already been cancelled - something else has already handled - // sending the `complete` request so we don't want to `break` here - // and send it again after the loop. - return; - } + try { + for await (const payload of subscription) { + if (this.cancelled) { + self.logger?.debug( + `Subscription already cancelled, ignoring current and future payloads`, + id, + ); + // It's already been cancelled - something else has already handled + // sending the `complete` request so we don't want to `break` here + // and send it again after the loop. + return; + } - try { - await self.retryFetch({ - url: callbackUrl, - action: 'next', - id, - verifier, - payload, - }); - } catch (e) { - const originalError = ensureError(e); - self.logger?.error( - `\`next\` request failed, terminating subscription: ${originalError.message}`, - id, - ); - self.terminateSubscription(id, callbackUrl); + try { + await self.retryFetch({ + url: callbackUrl, + action: 'next', + id, + verifier, + payload, + }); + } catch (e) { + const originalError = ensureError(e); + self.logger?.error( + `\`next\` request failed, terminating subscription: ${originalError.message}`, + id, + ); + self.terminateSubscription(id, callbackUrl); + } } + // The subscription ended without errors, send the `complete` request to + // the router + self.logger?.debug(`Subscription completed without errors`, id); + await this.completeSubscription(); + } catch (e) { + const error = ensureGraphQLError(e); + self.logger?.error( + `Generator threw an error, terminating subscription: ${error.message}`, + id, + ); + this.completeSubscription([error]); } - // The subscription ended without errors, send the `complete` request to - // the router - self.logger?.debug(`Subscription completed without errors`, id); - await this.completeSubscription(); }, async completeSubscription(errors?: readonly GraphQLError[]) { if (this.cancelled) return;