Skip to content

Commit

Permalink
fix(server): Should clean up subscription reservations on abrupt erro…
Browse files Browse the repository at this point in the history
…rs without relying on connection close
  • Loading branch information
enisdenjo committed May 12, 2022
1 parent 45d1649 commit 611c223
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 92 deletions.
48 changes: 47 additions & 1 deletion src/__tests__/server.ts
Expand Up @@ -8,7 +8,7 @@ import {
ExecutionResult,
GraphQLSchema,
} from 'graphql';
import { handleProtocols, makeServer } from '../server';
import { Context, handleProtocols, makeServer } from '../server';
import {
GRAPHQL_TRANSPORT_WS_PROTOCOL,
CloseCode,
Expand Down Expand Up @@ -1658,6 +1658,52 @@ describe('Subscribe', () => {

await server.waitForComplete();
});

it('should clean up subscription reservations on abrupt errors without relying on close', async (done) => {
let currCtx: Context;
makeServer({
connectionInitWaitTimeout: 0, // defaults to 3 seconds
schema,
execute: () => {
throw null;
},
onSubscribe: (ctx) => {
currCtx = ctx;
},
}).opened(
{
protocol: GRAPHQL_TRANSPORT_WS_PROTOCOL,
send: () => {
/**/
},
close: () => {
fail("Shouldn't have closed");
},
onMessage: async (cb) => {
await cb(stringifyMessage({ type: MessageType.ConnectionInit }));

try {
// will throw because of execute impl
await cb(
stringifyMessage({
id: '1',
type: MessageType.Subscribe,
payload: {
query: '{ getValue }',
},
}),
);
fail("Subscribe shouldn't have succeeded");
} catch {
// we dont close the connection but still expect the subscriptions to clean up
expect(Object.entries(currCtx.subscriptions)).toHaveLength(0);
done();
}
},
},
{},
);
});
});

describe('Disconnect/close', () => {
Expand Down
189 changes: 98 additions & 91 deletions src/server.ts
Expand Up @@ -723,103 +723,110 @@ export function makeServer<
},
};

let execArgs: ExecutionArgs;
const maybeExecArgsOrErrors = await onSubscribe?.(ctx, message);
if (maybeExecArgsOrErrors) {
if (areGraphQLErrors(maybeExecArgsOrErrors))
return await emit.error(maybeExecArgsOrErrors);
else if (Array.isArray(maybeExecArgsOrErrors))
throw new Error(
'Invalid return value from onSubscribe hook, expected an array of GraphQLError objects',
try {
let execArgs: ExecutionArgs;
const maybeExecArgsOrErrors = await onSubscribe?.(ctx, message);
if (maybeExecArgsOrErrors) {
if (areGraphQLErrors(maybeExecArgsOrErrors))
return await emit.error(maybeExecArgsOrErrors);
else if (Array.isArray(maybeExecArgsOrErrors))
throw new Error(
'Invalid return value from onSubscribe hook, expected an array of GraphQLError objects',
);
// not errors, is exec args
execArgs = maybeExecArgsOrErrors;
} else {
// you either provide a schema dynamically through
// `onSubscribe` or you set one up during the server setup
if (!schema)
throw new Error('The GraphQL schema is not provided');

const args = {
operationName: payload.operationName,
document: parse(payload.query),
variableValues: payload.variables,
};
execArgs = {
...args,
schema:
typeof schema === 'function'
? await schema(ctx, message, args)
: schema,
};
const validationErrors = (validate ?? graphqlValidate)(
execArgs.schema,
execArgs.document,
);
// not errors, is exec args
execArgs = maybeExecArgsOrErrors;
} else {
// you either provide a schema dynamically through
// `onSubscribe` or you set one up during the server setup
if (!schema)
throw new Error('The GraphQL schema is not provided');

const args = {
operationName: payload.operationName,
document: parse(payload.query),
variableValues: payload.variables,
};
execArgs = {
...args,
schema:
typeof schema === 'function'
? await schema(ctx, message, args)
: schema,
};
const validationErrors = (validate ?? graphqlValidate)(
execArgs.schema,
if (validationErrors.length > 0)
return await emit.error(validationErrors);
}

const operationAST = getOperationAST(
execArgs.document,
execArgs.operationName,
);
if (validationErrors.length > 0)
return await emit.error(validationErrors);
}

const operationAST = getOperationAST(
execArgs.document,
execArgs.operationName,
);
if (!operationAST)
return await emit.error([
new GraphQLError('Unable to identify operation'),
]);

// if `onSubscribe` didnt specify a rootValue, inject one
if (!('rootValue' in execArgs))
execArgs.rootValue = roots?.[operationAST.operation];

// if `onSubscribe` didn't specify a context, inject one
if (!('contextValue' in execArgs))
execArgs.contextValue =
typeof context === 'function'
? await context(ctx, message, execArgs)
: context;

// the execution arguments have been prepared
// perform the operation and act accordingly
let operationResult;
if (operationAST.operation === 'subscription')
operationResult = await (subscribe ?? graphqlSubscribe)(execArgs);
// operation === 'query' || 'mutation'
else operationResult = await (execute ?? graphqlExecute)(execArgs);

const maybeResult = await onOperation?.(
ctx,
message,
execArgs,
operationResult,
);
if (maybeResult) operationResult = maybeResult;

if (isAsyncIterable(operationResult)) {
/** multiple emitted results */
if (!(id in ctx.subscriptions)) {
// subscription was completed/canceled before the operation settled
if (isAsyncGenerator(operationResult))
operationResult.return(undefined);
} else {
ctx.subscriptions[id] = operationResult;
for await (const result of operationResult) {
await emit.next(result, execArgs);
if (!operationAST)
return await emit.error([
new GraphQLError('Unable to identify operation'),
]);

// if `onSubscribe` didnt specify a rootValue, inject one
if (!('rootValue' in execArgs))
execArgs.rootValue = roots?.[operationAST.operation];

// if `onSubscribe` didn't specify a context, inject one
if (!('contextValue' in execArgs))
execArgs.contextValue =
typeof context === 'function'
? await context(ctx, message, execArgs)
: context;

// the execution arguments have been prepared
// perform the operation and act accordingly
let operationResult;
if (operationAST.operation === 'subscription')
operationResult = await (subscribe ?? graphqlSubscribe)(
execArgs,
);
// operation === 'query' || 'mutation'
else
operationResult = await (execute ?? graphqlExecute)(execArgs);

const maybeResult = await onOperation?.(
ctx,
message,
execArgs,
operationResult,
);
if (maybeResult) operationResult = maybeResult;

if (isAsyncIterable(operationResult)) {
/** multiple emitted results */
if (!(id in ctx.subscriptions)) {
// subscription was completed/canceled before the operation settled
if (isAsyncGenerator(operationResult))
operationResult.return(undefined);
} else {
ctx.subscriptions[id] = operationResult;
for await (const result of operationResult) {
await emit.next(result, execArgs);
}
}
} else {
/** single emitted result */
// if the client completed the subscription before the single result
// became available, he effectively canceled it and no data should be sent
if (id in ctx.subscriptions)
await emit.next(operationResult, execArgs);
}
} else {
/** single emitted result */
// if the client completed the subscription before the single result
// became available, he effectively canceled it and no data should be sent
if (id in ctx.subscriptions)
await emit.next(operationResult, execArgs);
}

// lack of subscription at this point indicates that the client
// completed the subscription, he doesnt need to be reminded
await emit.complete(id in ctx.subscriptions);
delete ctx.subscriptions[id];
// lack of subscription at this point indicates that the client
// completed the subscription, he doesnt need to be reminded
await emit.complete(id in ctx.subscriptions);
} finally {
// whatever happens to the subscription, we finally want to get rid of the reservation
delete ctx.subscriptions[id];
}
return;
}
case MessageType.Complete: {
Expand Down

0 comments on commit 611c223

Please sign in to comment.