From 495535fdeb43de4b3afe1e9be96d6e1908b7fdaa Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Fri, 18 Jun 2021 01:46:01 +0300 Subject: [PATCH] consolidate SubscriptionExecutor into Executor --- .../__tests__/mapAsyncIterator-test.ts | 0 src/execution/execute.ts | 11 +- src/execution/executor.ts | 183 +++++++++++++++-- .../mapAsyncIterator.ts | 0 src/subscription/__tests__/subscribe-test.ts | 2 +- src/subscription/subscribe.ts | 11 +- src/subscription/subscriptionExecutor.ts | 185 ------------------ 7 files changed, 184 insertions(+), 208 deletions(-) rename src/{subscription => execution}/__tests__/mapAsyncIterator-test.ts (100%) rename src/{subscription => execution}/mapAsyncIterator.ts (100%) delete mode 100644 src/subscription/subscriptionExecutor.ts diff --git a/src/subscription/__tests__/mapAsyncIterator-test.ts b/src/execution/__tests__/mapAsyncIterator-test.ts similarity index 100% rename from src/subscription/__tests__/mapAsyncIterator-test.ts rename to src/execution/__tests__/mapAsyncIterator-test.ts diff --git a/src/execution/execute.ts b/src/execution/execute.ts index b0eb05570bf..cb0e2185c22 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -60,6 +60,7 @@ export interface ExecutionContext { variableValues: { [variable: string]: unknown }; fieldResolver: GraphQLFieldResolver; typeResolver: GraphQLTypeResolver; + subscribeFieldResolver?: Maybe>; errors: Array; } @@ -128,7 +129,7 @@ export function execute(args: ExecutionArgs): PromiseOrValue { // field and its descendants will be omitted, and sibling fields will still // be executed. An execution which encounters errors will still result in a // resolved Promise. - return executor.execute(); + return executor.executeQueryOrMutation(); } /** @@ -170,6 +171,10 @@ export function assertValidExecutionArguments( ); } +export interface BuildExecutionContextArgs extends ExecutionArgs { + subscribeFieldResolver?: Maybe>; +} + /** * Constructs a ExecutionContext object from the arguments passed to * execute, which we will pass throughout the other execution methods. @@ -179,7 +184,7 @@ export function assertValidExecutionArguments( * @internal */ export function buildExecutionContext( - args: ExecutionArgs, + args: BuildExecutionContextArgs, ): ReadonlyArray | ExecutionContext { const { schema, @@ -190,6 +195,7 @@ export function buildExecutionContext( operationName, fieldResolver, typeResolver, + subscribeFieldResolver, } = args; // If arguments are missing or incorrect, throw an error. @@ -249,6 +255,7 @@ export function buildExecutionContext( variableValues: coercedVariableValues.coerced, fieldResolver: fieldResolver ?? defaultFieldResolver, typeResolver: typeResolver ?? defaultTypeResolver, + subscribeFieldResolver, errors: [], }; } diff --git a/src/execution/executor.ts b/src/execution/executor.ts index 4b5c1e6ac1c..86d5940c6fa 100644 --- a/src/execution/executor.ts +++ b/src/execution/executor.ts @@ -47,9 +47,12 @@ import { import { getOperationRootType } from '../utilities/getOperationRootType'; +import { isAsyncIterable } from '../jsutils/isAsyncIterable'; + import type { ExecutionContext, ExecutionResult } from './execute'; import { getArgumentValues } from './values'; import { collectFields } from './collectFields'; +import { mapAsyncIterator } from './mapAsyncIterator'; /** * This class is exported only to assist people in implementing their own executors @@ -81,6 +84,7 @@ export class Executor { protected _variableValues: { [variable: string]: unknown }; protected _fieldResolver: GraphQLFieldResolver; protected _typeResolver: GraphQLTypeResolver; + protected _subscribeFieldResolver?: Maybe>; protected _errors: Array; constructor({ @@ -92,6 +96,7 @@ export class Executor { variableValues, fieldResolver, typeResolver, + subscribeFieldResolver, errors, }: ExecutionContext) { this._schema = schema; @@ -102,18 +107,15 @@ export class Executor { this._variableValues = variableValues; this._fieldResolver = fieldResolver; this._typeResolver = typeResolver; + this._subscribeFieldResolver = subscribeFieldResolver; this._errors = errors; } - execute(): PromiseOrValue { - const data = this.executeOperation(); - return this.buildResponse(data); - } - /** - * Implements the "Executing operations" section of the spec. + * Implements the "Executing operations" section of the spec for + * queries and mutations. */ - executeOperation(): PromiseOrValue | null> { + executeQueryOrMutation(): PromiseOrValue { const { _schema, _fragments, _rootValue, _operation, _variableValues } = this; const type = getOperationRootType(_schema, _operation); @@ -138,15 +140,17 @@ export class Executor { ? this.executeFieldsSerially(type, _rootValue, path, fields) : this.executeFields(type, _rootValue, path, fields); if (isPromise(result)) { - return result.then(undefined, (error) => { - this._errors.push(error); - return Promise.resolve(null); - }); + return this.buildResponse( + result.then(undefined, (error) => { + this._errors.push(error); + return Promise.resolve(null); + }), + ); } - return result; + return this.buildResponse(result); } catch (error) { this._errors.push(error); - return null; + return this.buildResponse(null); } } @@ -665,6 +669,159 @@ export class Executor { return this.executeFields(returnType, result, path, subFieldNodes); } + async executeSubscription(): Promise< + AsyncGenerator | ExecutionResult + > { + const resultOrStream = await this.createSourceEventStream(); + + if (!isAsyncIterable(resultOrStream)) { + return resultOrStream; + } + + // For each payload yielded from a subscription, map it over the normal + // GraphQL `execute` function, with `payload` as the rootValue. + // This implements the "MapSourceToResponseEvent" algorithm described in + // the GraphQL specification. The `execute` function provides the + // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the + // "ExecuteQuery" algorithm, for which `execute` is also used. + const mapSourceToResponse = (payload: unknown) => { + const executor = new Executor({ + schema: this._schema, + fragments: this._fragments, + rootValue: payload, + contextValue: this._contextValue, + operation: this._operation, + variableValues: this._variableValues, + fieldResolver: this._fieldResolver, + typeResolver: this._typeResolver, + errors: [], + }); + + return executor.executeQueryOrMutation(); + }; + + // Map every source value to a ExecutionResult value as described above. + return mapAsyncIterator(resultOrStream, mapSourceToResponse); + } + + /** + * Implements the "CreateSourceEventStream" algorithm described in the + * GraphQL specification, resolving the subscription source event stream. + * + * Returns a Promise which resolves to either an AsyncIterable (if successful) + * or an ExecutionResult (error). The promise will be rejected if the schema or + * other arguments to this function are invalid, or if the resolved event stream + * is not an async iterable. + * + * If the client-provided arguments to this function do not result in a + * compliant subscription, a GraphQL Response (ExecutionResult) with + * descriptive errors and no data will be returned. + * + * If the the source stream could not be created due to faulty subscription + * resolver logic or underlying systems, the promise will resolve to a single + * ExecutionResult containing `errors` and no `data`. + * + * If the operation succeeded, the promise resolves to the AsyncIterable for the + * event stream returned by the resolver. + * + * A Source Event Stream represents a sequence of events, each of which triggers + * a GraphQL execution for that event. + * + * This may be useful when hosting the stateful subscription service in a + * different process or machine than the stateless GraphQL execution engine, + * or otherwise separating these two steps. For more on this, see the + * "Supporting Subscriptions at Scale" information in the GraphQL specification. + */ + async createSourceEventStream(): Promise< + AsyncIterable | ExecutionResult + > { + try { + const eventStream = await this._createSourceEventStream(); + + // Assert field returned an event stream, otherwise yield an error. + if (!isAsyncIterable(eventStream)) { + throw new Error( + 'Subscription field must return Async Iterable. ' + + `Received: ${inspect(eventStream)}.`, + ); + } + + return eventStream; + } catch (error) { + // If it GraphQLError, report it as an ExecutionResult, containing only errors and no data. + // Otherwise treat the error as a system-class error and re-throw it. + if (error instanceof GraphQLError) { + return { errors: [error] }; + } + throw error; + } + } + + public async _createSourceEventStream(): Promise { + const { + _schema, + _fragments, + _rootValue, + _contextValue, + _operation, + _variableValues, + _fieldResolver, + } = this; + const type = getOperationRootType(_schema, _operation); + const fields = collectFields( + _schema, + _fragments, + _variableValues, + type, + _operation.selectionSet, + new Map(), + new Set(), + ); + const [responseName, fieldNodes] = [...fields.entries()][0]; + const fieldDef = getFieldDef(_schema, type, fieldNodes[0]); + + if (!fieldDef) { + const fieldName = fieldNodes[0].name.value; + throw new GraphQLError( + `The subscription field "${fieldName}" is not defined.`, + fieldNodes, + ); + } + + const path = addPath(undefined, responseName, type.name); + const info = this.buildResolveInfo(fieldDef, fieldNodes, type, path); + + try { + // Implements the "ResolveFieldEventStream" algorithm from GraphQL specification. + // It differs from "ResolveFieldValue" due to providing a different `resolveFn`. + + // Build a JS object of arguments from the field.arguments AST, using the + // variables scope to fulfill any variable references. + const args = getArgumentValues(fieldDef, fieldNodes[0], _variableValues); + + // Call the `subscribe()` resolver or the default resolver to produce an + // AsyncIterable yielding raw payloads. + const resolveFn = fieldDef.subscribe ?? _fieldResolver; + + // The resolve function's optional third argument is a context value that + // is provided to every resolve function within an execution. It is commonly + // used to represent an authenticated user, or request-specific caches. + const eventStream = await resolveFn( + _rootValue, + args, + _contextValue, + info, + ); + + if (eventStream instanceof Error) { + throw eventStream; + } + return eventStream; + } catch (error) { + throw locatedError(error, fieldNodes, pathToArray(path)); + } + } + /** * A collection of relevant subfields with regard to the return type. * See 'collectSubfields' above for the memoized version. diff --git a/src/subscription/mapAsyncIterator.ts b/src/execution/mapAsyncIterator.ts similarity index 100% rename from src/subscription/mapAsyncIterator.ts rename to src/execution/mapAsyncIterator.ts diff --git a/src/subscription/__tests__/subscribe-test.ts b/src/subscription/__tests__/subscribe-test.ts index 74d79fa0590..ba50ae00078 100644 --- a/src/subscription/__tests__/subscribe-test.ts +++ b/src/subscription/__tests__/subscribe-test.ts @@ -12,7 +12,7 @@ import { GraphQLSchema } from '../../type/schema'; import { GraphQLList, GraphQLObjectType } from '../../type/definition'; import { GraphQLInt, GraphQLString, GraphQLBoolean } from '../../type/scalars'; -import { SubscriptionExecutor, subscribe } from '../subscribe'; +import { subscribe } from '../subscribe'; import { SimplePubSub } from './simplePubSub'; diff --git a/src/subscription/subscribe.ts b/src/subscription/subscribe.ts index 6291f43e42a..b242527331d 100644 --- a/src/subscription/subscribe.ts +++ b/src/subscription/subscribe.ts @@ -2,14 +2,14 @@ import type { Maybe } from '../jsutils/Maybe'; import type { DocumentNode } from '../language/ast'; +import { Executor } from '../execution/executor'; + import type { ExecutionResult } from '../execution/execute'; import { buildExecutionContext } from '../execution/execute'; import type { GraphQLSchema } from '../type/schema'; import type { GraphQLFieldResolver } from '../type/definition'; -import { SubscriptionExecutor } from './subscriptionExecutor'; - export interface SubscriptionArgs { schema: GraphQLSchema; document: DocumentNode; @@ -47,16 +47,13 @@ export async function subscribe( ): Promise | ExecutionResult> { // If a valid execution context cannot be created due to incorrect arguments, // a "Response" with only errors is returned. - const exeContext = buildExecutionContext({ - ...args, - fieldResolver: args.subscribeFieldResolver, - }); + const exeContext = buildExecutionContext(args); // Return early errors if execution context failed. if (!('schema' in exeContext)) { return Promise.resolve({ errors: exeContext }); } - const executor = new SubscriptionExecutor(exeContext, args.document); + const executor = new Executor(exeContext); return executor.executeSubscription(); } diff --git a/src/subscription/subscriptionExecutor.ts b/src/subscription/subscriptionExecutor.ts deleted file mode 100644 index c11615947b2..00000000000 --- a/src/subscription/subscriptionExecutor.ts +++ /dev/null @@ -1,185 +0,0 @@ -import { inspect } from '../jsutils/inspect'; -import { isAsyncIterable } from '../jsutils/isAsyncIterable'; -import { addPath, pathToArray } from '../jsutils/Path'; - -import { GraphQLError } from '../error/GraphQLError'; -import { locatedError } from '../error/locatedError'; - -import type { DocumentNode } from '../language/ast'; - -import type { ExecutionContext, ExecutionResult } from '../execution/execute'; -import { collectFields } from '../execution/collectFields'; -import { getArgumentValues } from '../execution/values'; -import { Executor, getFieldDef } from '../execution/executor'; -import { execute } from '../execution/execute'; - -import { getOperationRootType } from '../utilities/getOperationRootType'; - -import { mapAsyncIterator } from './mapAsyncIterator'; - -/** - * This class is exported only to assist people in implementing their own executors - * without duplicating too much code and should be used only as last resort for cases - * requiring custom execution or if certain features could not be contributed upstream. - * - * It is still part of the internal API and is versioned, so any changes to it are never - * considered breaking changes. If you still need to support multiple versions of the - * library, please use the `versionInfo` variable for version detection. - * - * @internal - */ -export class SubscriptionExecutor extends Executor { - protected _document: DocumentNode; - - constructor(exeContext: ExecutionContext, document: DocumentNode) { - super(exeContext); - this._document = document; - } - - async executeSubscription(): Promise< - AsyncGenerator | ExecutionResult - > { - const resultOrStream = await this.createSourceEventStream(); - - if (!isAsyncIterable(resultOrStream)) { - return resultOrStream; - } - - // For each payload yielded from a subscription, map it over the normal - // GraphQL `execute` function, with `payload` as the rootValue. - // This implements the "MapSourceToResponseEvent" algorithm described in - // the GraphQL specification. The `execute` function provides the - // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the - // "ExecuteQuery" algorithm, for which `execute` is also used. - const mapSourceToResponse = (payload: unknown) => - execute({ - schema: this._schema, - document: this._document, - rootValue: payload, - contextValue: this._contextValue, - variableValues: this._variableValues, - fieldResolver: this._fieldResolver, - }); - - // Map every source value to a ExecutionResult value as described above. - return mapAsyncIterator(resultOrStream, mapSourceToResponse); - } - - /** - * Implements the "CreateSourceEventStream" algorithm described in the - * GraphQL specification, resolving the subscription source event stream. - * - * Returns a Promise which resolves to either an AsyncIterable (if successful) - * or an ExecutionResult (error). The promise will be rejected if the schema or - * other arguments to this function are invalid, or if the resolved event stream - * is not an async iterable. - * - * If the client-provided arguments to this function do not result in a - * compliant subscription, a GraphQL Response (ExecutionResult) with - * descriptive errors and no data will be returned. - * - * If the the source stream could not be created due to faulty subscription - * resolver logic or underlying systems, the promise will resolve to a single - * ExecutionResult containing `errors` and no `data`. - * - * If the operation succeeded, the promise resolves to the AsyncIterable for the - * event stream returned by the resolver. - * - * A Source Event Stream represents a sequence of events, each of which triggers - * a GraphQL execution for that event. - * - * This may be useful when hosting the stateful subscription service in a - * different process or machine than the stateless GraphQL execution engine, - * or otherwise separating these two steps. For more on this, see the - * "Supporting Subscriptions at Scale" information in the GraphQL specification. - */ - async createSourceEventStream(): Promise< - AsyncIterable | ExecutionResult - > { - try { - const eventStream = await this._createSourceEventStream(); - - // Assert field returned an event stream, otherwise yield an error. - if (!isAsyncIterable(eventStream)) { - throw new Error( - 'Subscription field must return Async Iterable. ' + - `Received: ${inspect(eventStream)}.`, - ); - } - - return eventStream; - } catch (error) { - // If it GraphQLError, report it as an ExecutionResult, containing only errors and no data. - // Otherwise treat the error as a system-class error and re-throw it. - if (error instanceof GraphQLError) { - return { errors: [error] }; - } - throw error; - } - } - - public async _createSourceEventStream(): Promise { - const { - _schema, - _fragments, - _rootValue, - _contextValue, - _operation, - _variableValues, - _fieldResolver, - } = this; - const type = getOperationRootType(_schema, _operation); - const fields = collectFields( - _schema, - _fragments, - _variableValues, - type, - _operation.selectionSet, - new Map(), - new Set(), - ); - const [responseName, fieldNodes] = [...fields.entries()][0]; - const fieldDef = getFieldDef(_schema, type, fieldNodes[0]); - - if (!fieldDef) { - const fieldName = fieldNodes[0].name.value; - throw new GraphQLError( - `The subscription field "${fieldName}" is not defined.`, - fieldNodes, - ); - } - - const path = addPath(undefined, responseName, type.name); - const info = this.buildResolveInfo(fieldDef, fieldNodes, type, path); - - try { - // Implements the "ResolveFieldEventStream" algorithm from GraphQL specification. - // It differs from "ResolveFieldValue" due to providing a different `resolveFn`. - - // Build a JS object of arguments from the field.arguments AST, using the - // variables scope to fulfill any variable references. - const args = getArgumentValues(fieldDef, fieldNodes[0], _variableValues); - - // Call the `subscribe()` resolver or the default resolver to produce an - // AsyncIterable yielding raw payloads. - const resolveFn = fieldDef.subscribe ?? _fieldResolver; - - // The resolve function's optional third argument is a context value that - // is provided to every resolve function within an execution. It is commonly - // used to represent an authenticated user, or request-specific caches. - const eventStream = await resolveFn( - _rootValue, - args, - _contextValue, - info, - ); - - if (eventStream instanceof Error) { - throw eventStream; - } - return eventStream; - } catch (error) { - throw locatedError(error, fieldNodes, pathToArray(path)); - } - } -}