Skip to content

Commit

Permalink
Hmm
Browse files Browse the repository at this point in the history
  • Loading branch information
ardatan committed Mar 21, 2023
1 parent a69635f commit c0e3ee5
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 38 deletions.
9 changes: 5 additions & 4 deletions packages/graphql-yoga/src/plugins/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export type Plugin<
* Use this hook with your own risk. It is still experimental and may change in the future.
* @internal
*/
onResultProcess?: OnResultProcess
onResultProcess?: OnResultProcess<TServerContext>
}

export type OnYogaInitHook<TServerContext extends Record<string, any>> = (
Expand Down Expand Up @@ -122,8 +122,8 @@ export interface OnParamsEventPayload {
fetchAPI: FetchAPI
}

export type OnResultProcess = (
payload: OnResultProcessEventPayload,
export type OnResultProcess<TServerContext> = (
payload: OnResultProcessEventPayload<TServerContext>,
) => PromiseOrValue<void>

export type ResultProcessorInput =
Expand All @@ -136,7 +136,7 @@ export type ResultProcessor = (
acceptedMediaType: string,
) => PromiseOrValue<Response>

export interface OnResultProcessEventPayload {
export interface OnResultProcessEventPayload<TServerContext> {
request: Request
result: ResultProcessorInput
setResult(result: ResultProcessorInput): void
Expand All @@ -147,6 +147,7 @@ export interface OnResultProcessEventPayload {
acceptedMediaType: string,
): void
fetchAPI: FetchAPI
serverContext: TServerContext
endResponse(response: Response): void
}

Expand Down
7 changes: 5 additions & 2 deletions packages/graphql-yoga/src/process-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@ import {
} from './plugins/types.js'
import { FetchAPI, GraphQLParams } from './types.js'

export async function processResult({
export async function processResult<TServerContext>({
request,
result,
fetchAPI,
serverContext,
onResultProcessHooks,
}: {
request: Request
result: ResultProcessorInput
fetchAPI: FetchAPI
serverContext: TServerContext
/**
* Response Hooks
*/
onResultProcessHooks: OnResultProcess[]
onResultProcessHooks: OnResultProcess<TServerContext>[]
}) {
let resultProcessor: ResultProcessor | undefined

Expand All @@ -42,6 +44,7 @@ export async function processResult({
acceptedMediaType = newAcceptedMimeType
},
fetchAPI,
serverContext,
endResponse(response) {
earlyResponse = response
},
Expand Down
6 changes: 4 additions & 2 deletions packages/graphql-yoga/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ export class YogaServer<
>
private onRequestParseHooks: OnRequestParseHook<TServerContext>[]
private onParamsHooks: OnParamsHook[]
private onResultProcessHooks: OnResultProcess[]
private onResultProcessHooks: OnResultProcess<TServerContext>[]
private maskedErrorsOpts: YogaMaskedErrorOpts | null
private id: string

Expand Down Expand Up @@ -356,7 +356,7 @@ export class YogaServer<
useResultProcessors({
legacySSE: options?.legacySse !== false,
}),
useErrorHandling((error, request) => {
useErrorHandling((error, request, serverContext) => {
const errors = handleError(error, this.maskedErrorsOpts, this.logger)

const result = {
Expand All @@ -368,6 +368,7 @@ export class YogaServer<
result,
fetchAPI: this.fetchAPI,
onResultProcessHooks: this.onResultProcessHooks,
serverContext,
})
}),
...(options?.plugins ?? []),
Expand Down Expand Up @@ -603,6 +604,7 @@ export class YogaServer<
result,
fetchAPI: this.fetchAPI,
onResultProcessHooks: this.onResultProcessHooks,
serverContext,
})
}
}
Expand Down
63 changes: 33 additions & 30 deletions packages/plugins/graphql-sse/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export function useGraphQLSSE(
options: GraphQLSSEPluginOptions = {
pubsub: createPubSub(),
},
): Plugin<YogaInitialContext> {
): Plugin<YogaInitialContext, { waitUntil(p: Promise<unknown>): void }> {
const { pubsub } = options
const tokenByRequest = new WeakMap<Request, string>()
const operationIdByRequest = new WeakMap<Request, string>()
Expand Down Expand Up @@ -80,51 +80,54 @@ export function useGraphQLSSE(
operationIdByRequest.set(request, params.extensions.operationId)
}
},
onResultProcess({ request, result, fetchAPI, endResponse }) {
onResultProcess({ request, result, fetchAPI, serverContext, endResponse }) {
const token = tokenByRequest.get(request)
if (token) {
const operationId = operationIdByRequest.get(request)
// Batching is not supported by GraphQL SSE yet
if (operationId && !Array.isArray(result)) {
Promise.resolve().then(async () => {
if (isAsyncIterable(result)) {
const asyncIterator = result[Symbol.asyncIterator]()
pubsub
.subscribe('graphql-sse-unsubscribe', operationId)
.next()
.finally(() => {
asyncIterator.return?.()
})
let iteratorValue: IteratorResult<ExecutionResult>
while (!(iteratorValue = await asyncIterator.next()).done) {
const chunk = iteratorValue.value
serverContext.waitUntil(
Promise.resolve().then(async () => {
if (isAsyncIterable(result)) {
const asyncIterator = result[Symbol.asyncIterator]()
pubsub
.subscribe('graphql-sse-unsubscribe', operationId)
.next()
.finally(() => {
asyncIterator.return?.()
})
let iteratorValue: IteratorResult<ExecutionResult>
while (!(iteratorValue = await asyncIterator.next()).done) {
const chunk = iteratorValue.value
const messageJson = {
id: operationId,
payload: chunk,
}
const messageStr = `event: next\nid: ${operationId}\ndata: ${JSON.stringify(
messageJson,
)}\n\n`
pubsub.publish('graphql-sse-subscribe', token, messageStr)
}
} else {
const messageJson = {
id: operationId,
payload: chunk,
payload: result,
}
const messageStr = `event: next\nid: ${operationId}\ndata: ${JSON.stringify(
messageJson,
)}\n\n`
pubsub.publish('graphql-sse-subscribe', token, messageStr)
}
} else {
const messageJson = {
const completeMessageJson = {
id: operationId,
payload: result,
}
const messageStr = `event: next\nid: ${operationId}\ndata: ${JSON.stringify(
messageJson,
const completeMessageStr = `event: complete\ndata: ${JSON.stringify(
completeMessageJson,
)}\n\n`
pubsub.publish('graphql-sse-subscribe', token, messageStr)
}
const completeMessageJson = {
id: operationId,
}
const completeMessageStr = `event: complete\ndata: ${JSON.stringify(
completeMessageJson,
)}\n\n`
pubsub.publish('graphql-sse-subscribe', token, completeMessageStr)
})
pubsub.publish('graphql-sse-subscribe', token, completeMessageStr)
}),
)

endResponse(
new fetchAPI.Response(null, {
status: 202,
Expand Down

0 comments on commit c0e3ee5

Please sign in to comment.