Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { isObjectLike } from '../jsutils/isObjectLike';
import { promiseReduce } from '../jsutils/promiseReduce';
import { promiseForObject } from '../jsutils/promiseForObject';
import { addPath, pathToArray } from '../jsutils/Path';
import { isAsyncIterable } from '../jsutils/isAsyncIterable';
import { isIterableObject } from '../jsutils/isIterableObject';

import type { GraphQLFormattedError } from '../error/GraphQLError';
Expand Down Expand Up @@ -58,6 +59,7 @@ import {
collectFields,
collectSubfields as _collectSubfields,
} from './collectFields';
import { mapAsyncIterator } from './mapAsyncIterator';

/**
* A memoized collection of relevant subfields with regard to the return
Expand Down Expand Up @@ -1032,3 +1034,108 @@ export function getFieldDef(
}
return parentType.getFields()[fieldName];
}

export async function executeSubscription(
exeContext: ExecutionContext,
): Promise<AsyncGenerator<ExecutionResult, void, void> | ExecutionResult> {
const resultOrStream = await createSourceEventStreamImpl(exeContext);

if (!isAsyncIterable(resultOrStream)) {
return resultOrStream;
}

// For each payload yielded from a subscription, map it over the normal
// GraphQL `execute` function, with `payload` as the rootValue.
// This implements the "MapSourceToResponseEvent" algorithm described in
// the GraphQL specification. The `execute` function provides the
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
// "ExecuteQuery" algorithm, for which `execute` is also used.
const mapSourceToResponse = (payload: unknown) =>
executeQueryOrMutation({
...exeContext,
rootValue: payload,
errors: [],
});

// Map every source value to a ExecutionResult value as described above.
return mapAsyncIterator(resultOrStream, mapSourceToResponse);
}

export async function createSourceEventStreamImpl(
exeContext: ExecutionContext,
): Promise<AsyncIterable<unknown> | ExecutionResult> {
try {
const eventStream = await executeSubscriptionRootField(exeContext);

// Assert field returned an event stream, otherwise yield an error.
if (!isAsyncIterable(eventStream)) {
throw new Error(
'Subscription field must return Async Iterable. ' +
`Received: ${inspect(eventStream)}.`,
);
}

return eventStream;
} catch (error) {
// If it GraphQLError, report it as an ExecutionResult, containing only errors and no data.
// Otherwise treat the error as a system-class error and re-throw it.
if (error instanceof GraphQLError) {
return { errors: [error] };
}
throw error;
}
}

async function executeSubscriptionRootField(
exeContext: ExecutionContext,
): Promise<unknown> {
const { schema, fragments, operation, variableValues, rootValue } =
exeContext;
const type = getOperationRootType(schema, operation);
const fields = collectFields(
schema,
fragments,
variableValues,
type,
operation.selectionSet,
);
const [responseName, fieldNodes] = [...fields.entries()][0];
const fieldDef = getFieldDef(schema, type, fieldNodes[0]);

if (!fieldDef) {
const fieldName = fieldNodes[0].name.value;
throw new GraphQLError(
`The subscription field "${fieldName}" is not defined.`,
fieldNodes,
);
}

const path = addPath(undefined, responseName, type.name);
const info = buildResolveInfo(exeContext, fieldDef, fieldNodes, type, path);

try {
// Implements the "ResolveFieldEventStream" algorithm from GraphQL specification.
// It differs from "ResolveFieldValue" due to providing a different `resolveFn`.

// Build a JS object of arguments from the field.arguments AST, using the
// variables scope to fulfill any variable references.
const args = getArgumentValues(fieldDef, fieldNodes[0], variableValues);

// The resolve function's optional third argument is a context value that
// is provided to every resolve function within an execution. It is commonly
// used to represent an authenticated user, or request-specific caches.
const contextValue = exeContext.contextValue;

// Call the `subscribe()` resolver or the default resolver to produce an
// AsyncIterable yielding raw payloads.
const resolveFn = fieldDef.subscribe ?? exeContext.subscribeFieldResolver;
const eventStream = await resolveFn(rootValue, args, contextValue, info);

if (eventStream instanceof Error) {
throw eventStream;
}
return eventStream;
} catch (error) {
throw locatedError(error, fieldNodes, pathToArray(path));
}
}
127 changes: 3 additions & 124 deletions src/execution/subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,16 @@
import { inspect } from '../jsutils/inspect';
import { isAsyncIterable } from '../jsutils/isAsyncIterable';
import { addPath, pathToArray } from '../jsutils/Path';
import type { Maybe } from '../jsutils/Maybe';

import { GraphQLError } from '../error/GraphQLError';
import { locatedError } from '../error/locatedError';

import type { DocumentNode } from '../language/ast';

import type { GraphQLSchema } from '../type/schema';
import type { GraphQLFieldResolver } from '../type/definition';

import { getOperationRootType } from '../utilities/getOperationRootType';

import type {
ExecutionArgs,
ExecutionResult,
ExecutionContext,
} from './execute';
import { collectFields } from './collectFields';
import { getArgumentValues } from './values';
import type { ExecutionArgs, ExecutionResult } from './execute';
import {
buildExecutionContext,
buildResolveInfo,
executeQueryOrMutation,
getFieldDef,
createSourceEventStreamImpl,
executeSubscription,
} from './execute';
import { mapAsyncIterator } from './mapAsyncIterator';

/**
* @deprecated use ExecutionArgs instead.
Expand Down Expand Up @@ -75,32 +59,6 @@ export async function subscribe(
return executeSubscription(exeContext);
}

async function executeSubscription(
exeContext: ExecutionContext,
): Promise<AsyncGenerator<ExecutionResult, void, void> | ExecutionResult> {
const resultOrStream = await createSourceEventStreamImpl(exeContext);

if (!isAsyncIterable(resultOrStream)) {
return resultOrStream;
}

// For each payload yielded from a subscription, map it over the normal
// GraphQL `execute` function, with `payload` as the rootValue.
// This implements the "MapSourceToResponseEvent" algorithm described in
// the GraphQL specification. The `execute` function provides the
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
// "ExecuteQuery" algorithm, for which `execute` is also used.
const mapSourceToResponse = (payload: unknown) =>
executeQueryOrMutation({
...exeContext,
rootValue: payload,
errors: [],
});

// Map every source value to a ExecutionResult value as described above.
return mapAsyncIterator(resultOrStream, mapSourceToResponse);
}

/**
* Implements the "CreateSourceEventStream" algorithm described in the
* GraphQL specification, resolving the subscription source event stream.
Expand Down Expand Up @@ -157,82 +115,3 @@ export async function createSourceEventStream(

return createSourceEventStreamImpl(exeContext);
}

export async function createSourceEventStreamImpl(
exeContext: ExecutionContext,
): Promise<AsyncIterable<unknown> | ExecutionResult> {
try {
const eventStream = await executeSubscriptionRootField(exeContext);

// Assert field returned an event stream, otherwise yield an error.
if (!isAsyncIterable(eventStream)) {
throw new Error(
'Subscription field must return Async Iterable. ' +
`Received: ${inspect(eventStream)}.`,
);
}

return eventStream;
} catch (error) {
// If it GraphQLError, report it as an ExecutionResult, containing only errors and no data.
// Otherwise treat the error as a system-class error and re-throw it.
if (error instanceof GraphQLError) {
return { errors: [error] };
}
throw error;
}
}

async function executeSubscriptionRootField(
exeContext: ExecutionContext,
): Promise<unknown> {
const { schema, fragments, operation, variableValues, rootValue } =
exeContext;
const type = getOperationRootType(schema, operation);
const fields = collectFields(
schema,
fragments,
variableValues,
type,
operation.selectionSet,
);
const [responseName, fieldNodes] = [...fields.entries()][0];
const fieldDef = getFieldDef(schema, type, fieldNodes[0]);

if (!fieldDef) {
const fieldName = fieldNodes[0].name.value;
throw new GraphQLError(
`The subscription field "${fieldName}" is not defined.`,
fieldNodes,
);
}

const path = addPath(undefined, responseName, type.name);
const info = buildResolveInfo(exeContext, fieldDef, fieldNodes, type, path);

try {
// Implements the "ResolveFieldEventStream" algorithm from GraphQL specification.
// It differs from "ResolveFieldValue" due to providing a different `resolveFn`.

// Build a JS object of arguments from the field.arguments AST, using the
// variables scope to fulfill any variable references.
const args = getArgumentValues(fieldDef, fieldNodes[0], variableValues);

// The resolve function's optional third argument is a context value that
// is provided to every resolve function within an execution. It is commonly
// used to represent an authenticated user, or request-specific caches.
const contextValue = exeContext.contextValue;

// Call the `subscribe()` resolver or the default resolver to produce an
// AsyncIterable yielding raw payloads.
const resolveFn = fieldDef.subscribe ?? exeContext.subscribeFieldResolver;
const eventStream = await resolveFn(rootValue, args, contextValue, info);

if (eventStream instanceof Error) {
throw eventStream;
}
return eventStream;
} catch (error) {
throw locatedError(error, fieldNodes, pathToArray(path));
}
}