Skip to content

Commit

Permalink
handle callback subscriptions properly when heartbeat is disabled
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com>
  • Loading branch information
bnjjj committed Dec 14, 2023
1 parent 8d57450 commit bc55b44
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,114 @@ describe('SubscriptionCallbackPlugin', () => {
`);
});

it('simple happy path without heartbeat', async () => {
const server = await startSubscriptionServer({ logger });

// Mock the initial check response from the router.
mockRouterCheckResponse();

// Start the subscription; this triggers the initial check request and
// starts the heartbeat interval. This simulates an incoming subscription
// request from the router.
const result = await server.executeHTTPGraphQLRequest(
buildHTTPGraphQLRequest({
body: {
query: `#graphql
subscription {
count
}
`,
extensions: {
subscription: {
callbackUrl: 'http://mock-router-url.com',
subscriptionId: '1234-cats',
verifier: 'my-verifier-token',
heartbeatIntervalMs: 0,
},
},
},
}),
);

expect(result.status).toEqual(200);

// Advance timers to check heartbeat has been disabled.
jest.advanceTimersByTime(5000);

// Next we'll trigger some subscription events. In advance, we'll mock the 2
// router responses.
const updates = Promise.all([
mockRouterNextResponse({ payload: { count: 1 } }),
mockRouterNextResponse({
payload: { count: 2 },
}),
]);

// Trigger a couple updates. These send `next` requests to the router.
logger.debug('TESTING: Triggering first update');
await server.executeHTTPGraphQLRequest(
buildHTTPGraphQLRequest({
body: {
query: `#graphql
mutation {
addOne
}
`,
},
}),
);

logger.debug('TESTING: Triggering second update');
await server.executeHTTPGraphQLRequest(
buildHTTPGraphQLRequest({
body: {
query: `#graphql
mutation {
addOne
}
`,
},
}),
);
await updates;

// When we shutdown the server, we'll stop listening for subscription
// updates, await unresolved requests, and send a `complete` request to the
// router for each active subscription.
const completeRequest = mockRouterCompleteResponse();

await server.stop();
await completeRequest;

// The heartbeat should be cleaned up at this point. There is no second
// heartbeat mock, so if it ticks again it'll throw an error.
jest.advanceTimersByTime(5000);

expect(logger.orderOfOperations).toMatchInlineSnapshot(`
[
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Heartbeat disabled for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"TESTING: Triggering first update",
"SubscriptionManager[1234-cats]: Sending \`next\` request to router",
"TESTING: Triggering second update",
"SubscriptionManager[1234-cats]: \`next\` request successful",
"SubscriptionManager[1234-cats]: Sending \`next\` request to router",
"SubscriptionManager[1234-cats]: \`next\` request successful",
"SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals",
"SubscriptionManager[1234-cats]: Sending \`complete\` request to router",
"SubscriptionManager[1234-cats]: \`complete\` request successful",
"SubscriptionManager: Terminating subscriptions for ID: 1234-cats",
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
]
`);
});

it('simple happy path with deprecated field names in snake_case', async () => {
const server = await startSubscriptionServer({ logger });

Expand Down
29 changes: 23 additions & 6 deletions packages/server/src/plugin/subscriptionCallback/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,19 @@ export function ApolloServerPluginSubscriptionCallback(
// The GA version of callback protocol use camelCase, this is to keep backward compatibility
callbackUrl = callbackUrl || subscriptionExtension.callback_url;
id = id || subscriptionExtension.subscription_id;
heartbeatIntervalMs =
heartbeatIntervalMs ||
subscriptionExtension.heartbeat_interval_ms ||
5000;
if (
heartbeatIntervalMs == null &&
subscriptionExtension.heartbeat_interval_ms != null
) {
// Deprecated field name
heartbeatIntervalMs = subscriptionExtension.heartbeat_interval_ms;
} else if (
heartbeatIntervalMs == null &&
subscriptionExtension.heartbeat_interval_ms == null
) {
// Default value
heartbeatIntervalMs = 5000;
}

return {
// Implementing `responseForOperation` is the only hook that allows us
Expand Down Expand Up @@ -367,6 +376,12 @@ class SubscriptionManager {
this.subscriptionInfoByCallbackUrl.set(callbackUrl, {});
}

if (heartbeatIntervalMs === 0) {
// Heartbeat has been disabled on the router
this.logger?.debug(`Heartbeat disabled for ${callbackUrl}`, id);
return;
}

// Kickoff heartbeat interval since there isn't one already
this.logger?.debug(
`Starting new heartbeat interval for ${callbackUrl}`,
Expand Down Expand Up @@ -519,8 +534,10 @@ class SubscriptionManager {
subscription.cancelled = true;
}
// cleanup heartbeat for subscription
this.logger?.debug(`Terminating heartbeat interval for ${callbackUrl}`);
if (heartbeat) clearInterval(heartbeat.interval);
if (heartbeat) {
this.logger?.debug(`Terminating heartbeat interval for ${callbackUrl}`);
clearInterval(heartbeat.interval);
}
this.subscriptionInfoByCallbackUrl.delete(callbackUrl);
}

Expand Down

0 comments on commit bc55b44

Please sign in to comment.