diff --git a/src/execution/execute.ts b/src/execution/execute.ts index ed2e15dcec..d0d9c04e63 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -11,6 +11,7 @@ import { isObjectLike } from '../jsutils/isObjectLike'; import { promiseReduce } from '../jsutils/promiseReduce'; import { promiseForObject } from '../jsutils/promiseForObject'; import { addPath, pathToArray } from '../jsutils/Path'; +import { isAsyncIterable } from '../jsutils/isAsyncIterable'; import { isIterableObject } from '../jsutils/isIterableObject'; import type { GraphQLFormattedError } from '../error/GraphQLError'; @@ -58,6 +59,7 @@ import { collectFields, collectSubfields as _collectSubfields, } from './collectFields'; +import { mapAsyncIterator } from './mapAsyncIterator'; /** * A memoized collection of relevant subfields with regard to the return @@ -1032,3 +1034,108 @@ export function getFieldDef( } return parentType.getFields()[fieldName]; } + +export async function executeSubscription( + exeContext: ExecutionContext, +): Promise | ExecutionResult> { + const resultOrStream = await createSourceEventStreamImpl(exeContext); + + if (!isAsyncIterable(resultOrStream)) { + return resultOrStream; + } + + // For each payload yielded from a subscription, map it over the normal + // GraphQL `execute` function, with `payload` as the rootValue. + // This implements the "MapSourceToResponseEvent" algorithm described in + // the GraphQL specification. The `execute` function provides the + // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the + // "ExecuteQuery" algorithm, for which `execute` is also used. + const mapSourceToResponse = (payload: unknown) => + executeQueryOrMutation({ + ...exeContext, + rootValue: payload, + errors: [], + }); + + // Map every source value to a ExecutionResult value as described above. + return mapAsyncIterator(resultOrStream, mapSourceToResponse); +} + +export async function createSourceEventStreamImpl( + exeContext: ExecutionContext, +): Promise | ExecutionResult> { + try { + const eventStream = await executeSubscriptionRootField(exeContext); + + // Assert field returned an event stream, otherwise yield an error. + if (!isAsyncIterable(eventStream)) { + throw new Error( + 'Subscription field must return Async Iterable. ' + + `Received: ${inspect(eventStream)}.`, + ); + } + + return eventStream; + } catch (error) { + // If it GraphQLError, report it as an ExecutionResult, containing only errors and no data. + // Otherwise treat the error as a system-class error and re-throw it. + if (error instanceof GraphQLError) { + return { errors: [error] }; + } + throw error; + } +} + +async function executeSubscriptionRootField( + exeContext: ExecutionContext, +): Promise { + const { schema, fragments, operation, variableValues, rootValue } = + exeContext; + const type = getOperationRootType(schema, operation); + const fields = collectFields( + schema, + fragments, + variableValues, + type, + operation.selectionSet, + ); + const [responseName, fieldNodes] = [...fields.entries()][0]; + const fieldDef = getFieldDef(schema, type, fieldNodes[0]); + + if (!fieldDef) { + const fieldName = fieldNodes[0].name.value; + throw new GraphQLError( + `The subscription field "${fieldName}" is not defined.`, + fieldNodes, + ); + } + + const path = addPath(undefined, responseName, type.name); + const info = buildResolveInfo(exeContext, fieldDef, fieldNodes, type, path); + + try { + // Implements the "ResolveFieldEventStream" algorithm from GraphQL specification. + // It differs from "ResolveFieldValue" due to providing a different `resolveFn`. + + // Build a JS object of arguments from the field.arguments AST, using the + // variables scope to fulfill any variable references. + const args = getArgumentValues(fieldDef, fieldNodes[0], variableValues); + + // The resolve function's optional third argument is a context value that + // is provided to every resolve function within an execution. It is commonly + // used to represent an authenticated user, or request-specific caches. + const contextValue = exeContext.contextValue; + + // Call the `subscribe()` resolver or the default resolver to produce an + // AsyncIterable yielding raw payloads. + const resolveFn = fieldDef.subscribe ?? exeContext.subscribeFieldResolver; + const eventStream = await resolveFn(rootValue, args, contextValue, info); + + if (eventStream instanceof Error) { + throw eventStream; + } + return eventStream; + } catch (error) { + throw locatedError(error, fieldNodes, pathToArray(path)); + } +} diff --git a/src/execution/subscribe.ts b/src/execution/subscribe.ts index af59d78344..7b99fc1703 100644 --- a/src/execution/subscribe.ts +++ b/src/execution/subscribe.ts @@ -1,32 +1,16 @@ -import { inspect } from '../jsutils/inspect'; -import { isAsyncIterable } from '../jsutils/isAsyncIterable'; -import { addPath, pathToArray } from '../jsutils/Path'; import type { Maybe } from '../jsutils/Maybe'; -import { GraphQLError } from '../error/GraphQLError'; -import { locatedError } from '../error/locatedError'; - import type { DocumentNode } from '../language/ast'; import type { GraphQLSchema } from '../type/schema'; import type { GraphQLFieldResolver } from '../type/definition'; -import { getOperationRootType } from '../utilities/getOperationRootType'; - -import type { - ExecutionArgs, - ExecutionResult, - ExecutionContext, -} from './execute'; -import { collectFields } from './collectFields'; -import { getArgumentValues } from './values'; +import type { ExecutionArgs, ExecutionResult } from './execute'; import { buildExecutionContext, - buildResolveInfo, - executeQueryOrMutation, - getFieldDef, + createSourceEventStreamImpl, + executeSubscription, } from './execute'; -import { mapAsyncIterator } from './mapAsyncIterator'; /** * @deprecated use ExecutionArgs instead. @@ -75,32 +59,6 @@ export async function subscribe( return executeSubscription(exeContext); } -async function executeSubscription( - exeContext: ExecutionContext, -): Promise | ExecutionResult> { - const resultOrStream = await createSourceEventStreamImpl(exeContext); - - if (!isAsyncIterable(resultOrStream)) { - return resultOrStream; - } - - // For each payload yielded from a subscription, map it over the normal - // GraphQL `execute` function, with `payload` as the rootValue. - // This implements the "MapSourceToResponseEvent" algorithm described in - // the GraphQL specification. The `execute` function provides the - // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the - // "ExecuteQuery" algorithm, for which `execute` is also used. - const mapSourceToResponse = (payload: unknown) => - executeQueryOrMutation({ - ...exeContext, - rootValue: payload, - errors: [], - }); - - // Map every source value to a ExecutionResult value as described above. - return mapAsyncIterator(resultOrStream, mapSourceToResponse); -} - /** * Implements the "CreateSourceEventStream" algorithm described in the * GraphQL specification, resolving the subscription source event stream. @@ -157,82 +115,3 @@ export async function createSourceEventStream( return createSourceEventStreamImpl(exeContext); } - -export async function createSourceEventStreamImpl( - exeContext: ExecutionContext, -): Promise | ExecutionResult> { - try { - const eventStream = await executeSubscriptionRootField(exeContext); - - // Assert field returned an event stream, otherwise yield an error. - if (!isAsyncIterable(eventStream)) { - throw new Error( - 'Subscription field must return Async Iterable. ' + - `Received: ${inspect(eventStream)}.`, - ); - } - - return eventStream; - } catch (error) { - // If it GraphQLError, report it as an ExecutionResult, containing only errors and no data. - // Otherwise treat the error as a system-class error and re-throw it. - if (error instanceof GraphQLError) { - return { errors: [error] }; - } - throw error; - } -} - -async function executeSubscriptionRootField( - exeContext: ExecutionContext, -): Promise { - const { schema, fragments, operation, variableValues, rootValue } = - exeContext; - const type = getOperationRootType(schema, operation); - const fields = collectFields( - schema, - fragments, - variableValues, - type, - operation.selectionSet, - ); - const [responseName, fieldNodes] = [...fields.entries()][0]; - const fieldDef = getFieldDef(schema, type, fieldNodes[0]); - - if (!fieldDef) { - const fieldName = fieldNodes[0].name.value; - throw new GraphQLError( - `The subscription field "${fieldName}" is not defined.`, - fieldNodes, - ); - } - - const path = addPath(undefined, responseName, type.name); - const info = buildResolveInfo(exeContext, fieldDef, fieldNodes, type, path); - - try { - // Implements the "ResolveFieldEventStream" algorithm from GraphQL specification. - // It differs from "ResolveFieldValue" due to providing a different `resolveFn`. - - // Build a JS object of arguments from the field.arguments AST, using the - // variables scope to fulfill any variable references. - const args = getArgumentValues(fieldDef, fieldNodes[0], variableValues); - - // The resolve function's optional third argument is a context value that - // is provided to every resolve function within an execution. It is commonly - // used to represent an authenticated user, or request-specific caches. - const contextValue = exeContext.contextValue; - - // Call the `subscribe()` resolver or the default resolver to produce an - // AsyncIterable yielding raw payloads. - const resolveFn = fieldDef.subscribe ?? exeContext.subscribeFieldResolver; - const eventStream = await resolveFn(rootValue, args, contextValue, info); - - if (eventStream instanceof Error) { - throw eventStream; - } - return eventStream; - } catch (error) { - throw locatedError(error, fieldNodes, pathToArray(path)); - } -}