From ab96f3f524ee279f9a97bd4f0cd7d9bb76c56834 Mon Sep 17 00:00:00 2001 From: juliusmarminge Date: Fri, 12 Jan 2024 12:32:26 +0100 Subject: [PATCH] update to v11 --- package.json | 2 +- src/createBunWSHandler.ts | 485 +++++++++++++++++++------------------- 2 files changed, 243 insertions(+), 244 deletions(-) diff --git a/package.json b/package.json index e485fa8..7c61e68 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,7 @@ "author": "Sancha ", "license": "MIT", "devDependencies": { - "@trpc/server": "10.43.0", + "@trpc/server": "next", "bun-types": "^1.0.20", "tsup": "^8.0.1", "typescript": "^5.3.3" diff --git a/src/createBunWSHandler.ts b/src/createBunWSHandler.ts index 3feef22..edefcc8 100644 --- a/src/createBunWSHandler.ts +++ b/src/createBunWSHandler.ts @@ -1,279 +1,278 @@ import { ServerWebSocket, WebSocketHandler } from "bun"; import { - JSONRPC2, - parseTRPCMessage, - TRPCClientOutgoingMessage, - TRPCResponseMessage, + JSONRPC2, + parseTRPCMessage, + TRPCClientOutgoingMessage, + TRPCResponseMessage, } from "@trpc/server/rpc"; import { - AnyRouter, - callProcedure, - getTRPCErrorFromUnknown, - inferRouterContext, - TRPCError, + AnyRouter, + callProcedure, + getTRPCErrorFromUnknown, + inferRouterContext, + TRPCError, } from "@trpc/server"; -import { getErrorShape, transformTRPCResponse } from "@trpc/server/shared"; +import { getErrorShape, transformTRPCResponse } from "@trpc/server"; import { isObservable, Unsubscribable } from "@trpc/server/observable"; -import type { BaseHandlerOptions } from "@trpc/server/src/internals/types"; +import type { BaseHandlerOptions } from "@trpc/server/http"; export type BunWSAdapterOptions = BaseHandlerOptions< - TRouter, - Request + TRouter, + Request > & { - createContext?: (params: { - req: Request; - client: ServerWebSocket; - }) => Promise | unknown; + createContext?: (params: { + req: Request; + client: ServerWebSocket; + }) => Promise | unknown; }; export type BunWSClientCtx = { - req: Request; - handleRequest: (msg: TRPCClientOutgoingMessage) => Promise; - unsubscribe(): void; + req: Request; + handleRequest: (msg: TRPCClientOutgoingMessage) => Promise; + unsubscribe(): void; }; export function createBunWSHandler( - opts: BunWSAdapterOptions, + opts: BunWSAdapterOptions ): WebSocketHandler { - const { router, createContext } = opts; + const { router, createContext } = opts; - const respond = ( - client: ServerWebSocket, - untransformedJSON: TRPCResponseMessage, - ) => { - client.send( - JSON.stringify( - transformTRPCResponse(opts.router._def._config, untransformedJSON), - ), - ); - }; + const respond = ( + client: ServerWebSocket, + untransformedJSON: TRPCResponseMessage + ) => { + client.send( + JSON.stringify( + transformTRPCResponse(opts.router._def._config, untransformedJSON) + ) + ); + }; - return { - async open(client) { - const { req } = client.data; - const clientSubscriptions = new Map(); + return { + async open(client) { + const { req } = client.data; + const clientSubscriptions = new Map(); - const ctxPromise = createContext?.({ req, client }); - let ctx: inferRouterContext | undefined = undefined; - await (async () => { - try { - ctx = await ctxPromise; - } catch (cause) { - const error = getTRPCErrorFromUnknown(cause); - opts.onError?.({ - error, - path: undefined, - type: "unknown", - ctx, - req, - input: undefined, - }); - respond(client, { - id: null, - error: getErrorShape({ - config: router._def._config, - error, - type: "unknown", - path: undefined, - input: undefined, - ctx, - }), - }); + const ctxPromise = createContext?.({ req, client }); + let ctx: inferRouterContext | undefined = undefined; + await (async () => { + try { + ctx = await ctxPromise; + } catch (cause) { + const error = getTRPCErrorFromUnknown(cause); + opts.onError?.({ + error, + path: undefined, + type: "unknown", + ctx, + req, + input: undefined, + }); + respond(client, { + id: null, + error: getErrorShape({ + config: router._def._config, + error, + type: "unknown", + path: undefined, + input: undefined, + ctx, + }), + }); - // close in next tick - setImmediate(() => client.close()); - } - })(); + // close in next tick + setImmediate(() => client.close()); + } + })(); - const stopSubscription = ( - subscription: Unsubscribable, - { id, jsonrpc }: JSONRPC2.BaseEnvelope & { id: JSONRPC2.RequestId }, - ) => { - subscription.unsubscribe(); + const stopSubscription = ( + subscription: Unsubscribable, + { id, jsonrpc }: JSONRPC2.BaseEnvelope & { id: JSONRPC2.RequestId } + ) => { + subscription.unsubscribe(); - respond(client, { - id, - jsonrpc, - result: { - type: "stopped", - }, - }); - }; + respond(client, { + id, + jsonrpc, + result: { + type: "stopped", + }, + }); + }; - client.data.handleRequest = async (msg: TRPCClientOutgoingMessage) => { - const { id, jsonrpc } = msg; - /* istanbul ignore next -- @preserve */ - if (id === null) { - throw new TRPCError({ - code: "BAD_REQUEST", - message: "`id` is required", - }); - } - if (msg.method === "subscription.stop") { - const sub = clientSubscriptions.get(id); - if (sub) { - stopSubscription(sub, { id, jsonrpc }); - } - clientSubscriptions.delete(id); - return; - } - const { path, input } = msg.params; - const type = msg.method; - try { - await ctxPromise; // asserts context has been set + client.data.handleRequest = async (msg: TRPCClientOutgoingMessage) => { + const { id, jsonrpc } = msg; + /* istanbul ignore next -- @preserve */ + if (id === null) { + throw new TRPCError({ + code: "BAD_REQUEST", + message: "`id` is required", + }); + } + if (msg.method === "subscription.stop") { + const sub = clientSubscriptions.get(id); + if (sub) { + stopSubscription(sub, { id, jsonrpc }); + } + clientSubscriptions.delete(id); + return; + } + const { path, input } = msg.params; + const type = msg.method; + try { + await ctxPromise; // asserts context has been set - const result = await callProcedure({ - procedures: router._def.procedures, - path, - rawInput: input, - // @ts-expect-error: for a newer trpc versions, cuz we use internal API - getRawInput: () => Promise.resolve(input), - ctx, - type, - }); + const result = await callProcedure({ + procedures: router._def.procedures, + path, + input, + getRawInput: () => Promise.resolve(input), + ctx, + type, + }); - if (type === "subscription") { - if (!isObservable(result)) { - throw new TRPCError({ - message: `Subscription ${path} did not return an observable`, - code: "INTERNAL_SERVER_ERROR", - }); - } - } else { - // send the value as data if the method is not a subscription - respond(client, { - id, - jsonrpc, - result: { - type: "data", - data: result, - }, - }); - return; - } + if (type === "subscription") { + if (!isObservable(result)) { + throw new TRPCError({ + message: `Subscription ${path} did not return an observable`, + code: "INTERNAL_SERVER_ERROR", + }); + } + } else { + // send the value as data if the method is not a subscription + respond(client, { + id, + jsonrpc, + result: { + type: "data", + data: result, + }, + }); + return; + } - const observable = result; - const sub = observable.subscribe({ - next(data) { - respond(client, { - id, - jsonrpc, - result: { - type: "data", - data, - }, - }); - }, - error(err) { - const error = getTRPCErrorFromUnknown(err); - opts.onError?.({ error, path, type, ctx, req, input }); - respond(client, { - id, - jsonrpc, - error: getErrorShape({ - config: router._def._config, - error, - type, - path, - input, - ctx, - }), - }); - }, - complete() { - respond(client, { - id, - jsonrpc, - result: { - type: "stopped", - }, - }); - }, - }); + const observable = result; + const sub = observable.subscribe({ + next(data) { + respond(client, { + id, + jsonrpc, + result: { + type: "data", + data, + }, + }); + }, + error(err) { + const error = getTRPCErrorFromUnknown(err); + opts.onError?.({ error, path, type, ctx, req, input }); + respond(client, { + id, + jsonrpc, + error: getErrorShape({ + config: router._def._config, + error, + type, + path, + input, + ctx, + }), + }); + }, + complete() { + respond(client, { + id, + jsonrpc, + result: { + type: "stopped", + }, + }); + }, + }); - if (client.readyState !== WebSocket.OPEN) { - // if the client got disconnected whilst initializing the subscription - // no need to send stopped message if the client is disconnected - sub.unsubscribe(); - return; - } + if (client.readyState !== WebSocket.OPEN) { + // if the client got disconnected whilst initializing the subscription + // no need to send stopped message if the client is disconnected + sub.unsubscribe(); + return; + } - if (clientSubscriptions.has(id)) { - // duplicate request ids for client - stopSubscription(sub, { id, jsonrpc }); - throw new TRPCError({ - message: `Duplicate id ${id}`, - code: "BAD_REQUEST", - }); - } - clientSubscriptions.set(id, sub); + if (clientSubscriptions.has(id)) { + // duplicate request ids for client + stopSubscription(sub, { id, jsonrpc }); + throw new TRPCError({ + message: `Duplicate id ${id}`, + code: "BAD_REQUEST", + }); + } + clientSubscriptions.set(id, sub); - respond(client, { - id, - jsonrpc, - result: { - type: "started", - }, - }); - } catch (cause) { - // procedure threw an error - const error = getTRPCErrorFromUnknown(cause); - opts.onError?.({ error, path, type, ctx, req, input }); - respond(client, { - id, - jsonrpc, - error: getErrorShape({ - config: router._def._config, - error, - type, - path, - input, - ctx, - }), - }); - } - }; + respond(client, { + id, + jsonrpc, + result: { + type: "started", + }, + }); + } catch (cause) { + // procedure threw an error + const error = getTRPCErrorFromUnknown(cause); + opts.onError?.({ error, path, type, ctx, req, input }); + respond(client, { + id, + jsonrpc, + error: getErrorShape({ + config: router._def._config, + error, + type, + path, + input, + ctx, + }), + }); + } + }; - client.data.unsubscribe = () => { - for (const sub of clientSubscriptions.values()) { - sub.unsubscribe(); - } - clientSubscriptions.clear(); - }; - }, + client.data.unsubscribe = () => { + for (const sub of clientSubscriptions.values()) { + sub.unsubscribe(); + } + clientSubscriptions.clear(); + }; + }, - async close(client) { - client.data.unsubscribe?.(); - }, + async close(client) { + client.data.unsubscribe?.(); + }, - async message(client, message) { - try { - const msgJSON: unknown = JSON.parse(message.toString()); - const msgs: unknown[] = Array.isArray(msgJSON) ? msgJSON : [msgJSON]; + async message(client, message) { + try { + const msgJSON: unknown = JSON.parse(message.toString()); + const msgs: unknown[] = Array.isArray(msgJSON) ? msgJSON : [msgJSON]; - const promises = msgs - .map((raw) => parseTRPCMessage(raw, router._def._config.transformer)) - .map(client.data.handleRequest); + const promises = msgs + .map((raw) => parseTRPCMessage(raw, router._def._config.transformer)) + .map(client.data.handleRequest); - await Promise.all(promises); - } catch (cause) { - const error = new TRPCError({ - code: "PARSE_ERROR", - cause, - }); + await Promise.all(promises); + } catch (cause) { + const error = new TRPCError({ + code: "PARSE_ERROR", + cause, + }); - respond(client, { - id: null, - error: getErrorShape({ - config: router._def._config, - error, - type: "unknown", - path: undefined, - input: undefined, - ctx: undefined, - }), - }); - } - }, - }; + respond(client, { + id: null, + error: getErrorShape({ + config: router._def._config, + error, + type: "unknown", + path: undefined, + input: undefined, + ctx: undefined, + }), + }); + } + }, + }; }