diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..c367d7b --- /dev/null +++ b/.editorconfig @@ -0,0 +1,12 @@ +root = true + +[*] +charset = utf-8 +end_of_line = lf +indent_size = 4 + +[*.{js,ts,mjs,jsx,tsx}] +indent_style = space +insert_final_newline = true +indent_style = space +trim_trailing_whitespace = true \ No newline at end of file diff --git a/biome.json b/biome.json new file mode 100644 index 0000000..091a94b --- /dev/null +++ b/biome.json @@ -0,0 +1,6 @@ +{ + "formatter": { + "indentStyle": "space", + "indentWidth": 4 + } +} diff --git a/src/createBunHttpHandler.ts b/src/createBunHttpHandler.ts index b0a9fc7..fa1a730 100644 --- a/src/createBunHttpHandler.ts +++ b/src/createBunHttpHandler.ts @@ -6,34 +6,34 @@ import type { HTTPBaseHandlerOptions } from "@trpc/server/http"; export type CreateBunContextOptions = { req: Request }; export type BunHttpHandlerOptions = - HTTPBaseHandlerOptions & { - endpoint?: string; - createContext?: ( - opts: CreateBunContextOptions, - ) => inferRouterContext | Promise>; - }; + HTTPBaseHandlerOptions & { + endpoint?: string; + createContext?: ( + opts: CreateBunContextOptions, + ) => inferRouterContext | Promise>; + }; export function createBunHttpHandler( - opts: BunHttpHandlerOptions & { emitWsUpgrades?: boolean }, + opts: BunHttpHandlerOptions & { emitWsUpgrades?: boolean }, ) { - return (request: Request, server: Server) => { - const url = new URL(request.url); + return (request: Request, server: Server) => { + const url = new URL(request.url); - if (opts.endpoint && !url.pathname.startsWith(opts.endpoint)) { - return; - } + if (opts.endpoint && !url.pathname.startsWith(opts.endpoint)) { + return; + } - if ( - opts.emitWsUpgrades && - server.upgrade(request, { data: { req: request } }) - ) { - return new Response(null, { status: 101 }); - } + if ( + opts.emitWsUpgrades && + server.upgrade(request, { data: { req: request } }) + ) { + return new Response(null, { status: 101 }); + } - return fetchRequestHandler({ - endpoint: opts.endpoint ?? "", - ...opts, - req: request, - }); - }; + return fetchRequestHandler({ + endpoint: opts.endpoint ?? "", + ...opts, + req: request, + }); + }; } diff --git a/src/createBunServeHandler.ts b/src/createBunServeHandler.ts index 2046471..2d91699 100644 --- a/src/createBunServeHandler.ts +++ b/src/createBunServeHandler.ts @@ -1,33 +1,33 @@ import type { ServeOptions, Server } from "bun"; import { createBunWSHandler } from "./createBunWSHandler"; import { - BunHttpHandlerOptions, - createBunHttpHandler, + BunHttpHandlerOptions, + createBunHttpHandler, } from "./createBunHttpHandler"; import type { AnyRouter } from "@trpc/server"; type Optional = Pick, K> & Omit; export function createBunServeHandler( - opts: BunHttpHandlerOptions, - serveOptions?: Optional, + opts: BunHttpHandlerOptions, + serveOptions?: Optional, ) { - const trpcHandler = createBunHttpHandler({ - ...opts, - emitWsUpgrades: true, - }); + const trpcHandler = createBunHttpHandler({ + ...opts, + emitWsUpgrades: true, + }); - return { - ...serveOptions, - async fetch(req: Request, server: Server) { - const trpcReponse = trpcHandler(req, server); + return { + ...serveOptions, + async fetch(req: Request, server: Server) { + const trpcReponse = trpcHandler(req, server); - if (trpcReponse) { - return trpcReponse; - } + if (trpcReponse) { + return trpcReponse; + } - return serveOptions?.fetch?.call(server, req, server); - }, - websocket: createBunWSHandler(opts), - }; + return serveOptions?.fetch?.call(server, req, server); + }, + websocket: createBunWSHandler(opts), + }; } diff --git a/src/createBunWSHandler.ts b/src/createBunWSHandler.ts index edefcc8..801e4de 100644 --- a/src/createBunWSHandler.ts +++ b/src/createBunWSHandler.ts @@ -1,278 +1,301 @@ import { ServerWebSocket, WebSocketHandler } from "bun"; import { - JSONRPC2, - parseTRPCMessage, - TRPCClientOutgoingMessage, - TRPCResponseMessage, + JSONRPC2, + parseTRPCMessage, + TRPCClientOutgoingMessage, + TRPCResponseMessage, } from "@trpc/server/rpc"; import { - AnyRouter, - callProcedure, - getTRPCErrorFromUnknown, - inferRouterContext, - TRPCError, + AnyRouter, + callProcedure, + getErrorShape, + transformTRPCResponse, + getTRPCErrorFromUnknown, + inferRouterContext, + TRPCError, } from "@trpc/server"; -import { getErrorShape, transformTRPCResponse } from "@trpc/server"; import { isObservable, Unsubscribable } from "@trpc/server/observable"; import type { BaseHandlerOptions } from "@trpc/server/http"; export type BunWSAdapterOptions = BaseHandlerOptions< - TRouter, - Request + TRouter, + Request > & { - createContext?: (params: { - req: Request; - client: ServerWebSocket; - }) => Promise | unknown; + createContext?: (params: { + req: Request; + client: ServerWebSocket; + }) => Promise | unknown; }; export type BunWSClientCtx = { - req: Request; - handleRequest: (msg: TRPCClientOutgoingMessage) => Promise; - unsubscribe(): void; + req: Request; + handleRequest: (msg: TRPCClientOutgoingMessage) => Promise; + unsubscribe(): void; }; export function createBunWSHandler( - opts: BunWSAdapterOptions + opts: BunWSAdapterOptions, ): WebSocketHandler { - const { router, createContext } = opts; + const { router, createContext } = opts; - const respond = ( - client: ServerWebSocket, - untransformedJSON: TRPCResponseMessage - ) => { - client.send( - JSON.stringify( - transformTRPCResponse(opts.router._def._config, untransformedJSON) - ) - ); - }; + const respond = ( + client: ServerWebSocket, + untransformedJSON: TRPCResponseMessage, + ) => { + client.send( + JSON.stringify( + transformTRPCResponse( + opts.router._def._config, + untransformedJSON, + ), + ), + ); + }; - return { - async open(client) { - const { req } = client.data; - const clientSubscriptions = new Map(); + return { + async open(client) { + const { req } = client.data; + const clientSubscriptions = new Map< + string | number, + Unsubscribable + >(); - const ctxPromise = createContext?.({ req, client }); - let ctx: inferRouterContext | undefined = undefined; - await (async () => { - try { - ctx = await ctxPromise; - } catch (cause) { - const error = getTRPCErrorFromUnknown(cause); - opts.onError?.({ - error, - path: undefined, - type: "unknown", - ctx, - req, - input: undefined, - }); - respond(client, { - id: null, - error: getErrorShape({ - config: router._def._config, - error, - type: "unknown", - path: undefined, - input: undefined, - ctx, - }), - }); + const ctxPromise = createContext?.({ req, client }); + let ctx: inferRouterContext | undefined = undefined; + await (async () => { + try { + ctx = await ctxPromise; + } catch (cause) { + const error = getTRPCErrorFromUnknown(cause); + opts.onError?.({ + error, + path: undefined, + type: "unknown", + ctx, + req, + input: undefined, + }); + respond(client, { + id: null, + error: getErrorShape({ + config: router._def._config, + error, + type: "unknown", + path: undefined, + input: undefined, + ctx, + }), + }); - // close in next tick - setImmediate(() => client.close()); - } - })(); + // close in next tick + setImmediate(() => client.close()); + } + })(); - const stopSubscription = ( - subscription: Unsubscribable, - { id, jsonrpc }: JSONRPC2.BaseEnvelope & { id: JSONRPC2.RequestId } - ) => { - subscription.unsubscribe(); + const stopSubscription = ( + subscription: Unsubscribable, + { + id, + jsonrpc, + }: JSONRPC2.BaseEnvelope & { id: JSONRPC2.RequestId }, + ) => { + subscription.unsubscribe(); - respond(client, { - id, - jsonrpc, - result: { - type: "stopped", - }, - }); - }; + respond(client, { + id, + jsonrpc, + result: { + type: "stopped", + }, + }); + }; - client.data.handleRequest = async (msg: TRPCClientOutgoingMessage) => { - const { id, jsonrpc } = msg; - /* istanbul ignore next -- @preserve */ - if (id === null) { - throw new TRPCError({ - code: "BAD_REQUEST", - message: "`id` is required", - }); - } - if (msg.method === "subscription.stop") { - const sub = clientSubscriptions.get(id); - if (sub) { - stopSubscription(sub, { id, jsonrpc }); - } - clientSubscriptions.delete(id); - return; - } - const { path, input } = msg.params; - const type = msg.method; - try { - await ctxPromise; // asserts context has been set + client.data.handleRequest = async ( + msg: TRPCClientOutgoingMessage, + ) => { + const { id, jsonrpc } = msg; + /* istanbul ignore next -- @preserve */ + if (id === null) { + throw new TRPCError({ + code: "BAD_REQUEST", + message: "`id` is required", + }); + } + if (msg.method === "subscription.stop") { + const sub = clientSubscriptions.get(id); + if (sub) { + stopSubscription(sub, { id, jsonrpc }); + } + clientSubscriptions.delete(id); + return; + } + const { path, input } = msg.params; + const type = msg.method; + try { + await ctxPromise; // asserts context has been set - const result = await callProcedure({ - procedures: router._def.procedures, - path, - input, - getRawInput: () => Promise.resolve(input), - ctx, - type, - }); + const result = await callProcedure({ + procedures: router._def.procedures, + path, + input, + getRawInput: () => Promise.resolve(input), + ctx, + type, + }); - if (type === "subscription") { - if (!isObservable(result)) { - throw new TRPCError({ - message: `Subscription ${path} did not return an observable`, - code: "INTERNAL_SERVER_ERROR", - }); - } - } else { - // send the value as data if the method is not a subscription - respond(client, { - id, - jsonrpc, - result: { - type: "data", - data: result, - }, - }); - return; - } + if (type === "subscription") { + if (!isObservable(result)) { + throw new TRPCError({ + message: `Subscription ${path} did not return an observable`, + code: "INTERNAL_SERVER_ERROR", + }); + } + } else { + // send the value as data if the method is not a subscription + respond(client, { + id, + jsonrpc, + result: { + type: "data", + data: result, + }, + }); + return; + } - const observable = result; - const sub = observable.subscribe({ - next(data) { - respond(client, { - id, - jsonrpc, - result: { - type: "data", - data, - }, - }); - }, - error(err) { - const error = getTRPCErrorFromUnknown(err); - opts.onError?.({ error, path, type, ctx, req, input }); - respond(client, { - id, - jsonrpc, - error: getErrorShape({ - config: router._def._config, - error, - type, - path, - input, - ctx, - }), - }); - }, - complete() { - respond(client, { - id, - jsonrpc, - result: { - type: "stopped", - }, - }); - }, - }); + const observable = result; + const sub = observable.subscribe({ + next(data) { + respond(client, { + id, + jsonrpc, + result: { + type: "data", + data, + }, + }); + }, + error(err) { + const error = getTRPCErrorFromUnknown(err); + opts.onError?.({ + error, + path, + type, + ctx, + req, + input, + }); + respond(client, { + id, + jsonrpc, + error: getErrorShape({ + config: router._def._config, + error, + type, + path, + input, + ctx, + }), + }); + }, + complete() { + respond(client, { + id, + jsonrpc, + result: { + type: "stopped", + }, + }); + }, + }); - if (client.readyState !== WebSocket.OPEN) { - // if the client got disconnected whilst initializing the subscription - // no need to send stopped message if the client is disconnected - sub.unsubscribe(); - return; - } + if (client.readyState !== WebSocket.OPEN) { + // if the client got disconnected whilst initializing the subscription + // no need to send stopped message if the client is disconnected + sub.unsubscribe(); + return; + } - if (clientSubscriptions.has(id)) { - // duplicate request ids for client - stopSubscription(sub, { id, jsonrpc }); - throw new TRPCError({ - message: `Duplicate id ${id}`, - code: "BAD_REQUEST", - }); - } - clientSubscriptions.set(id, sub); + if (clientSubscriptions.has(id)) { + // duplicate request ids for client + stopSubscription(sub, { id, jsonrpc }); + throw new TRPCError({ + message: `Duplicate id ${id}`, + code: "BAD_REQUEST", + }); + } + clientSubscriptions.set(id, sub); - respond(client, { - id, - jsonrpc, - result: { - type: "started", - }, - }); - } catch (cause) { - // procedure threw an error - const error = getTRPCErrorFromUnknown(cause); - opts.onError?.({ error, path, type, ctx, req, input }); - respond(client, { - id, - jsonrpc, - error: getErrorShape({ - config: router._def._config, - error, - type, - path, - input, - ctx, - }), - }); - } - }; + respond(client, { + id, + jsonrpc, + result: { + type: "started", + }, + }); + } catch (cause) { + // procedure threw an error + const error = getTRPCErrorFromUnknown(cause); + opts.onError?.({ error, path, type, ctx, req, input }); + respond(client, { + id, + jsonrpc, + error: getErrorShape({ + config: router._def._config, + error, + type, + path, + input, + ctx, + }), + }); + } + }; - client.data.unsubscribe = () => { - for (const sub of clientSubscriptions.values()) { - sub.unsubscribe(); - } - clientSubscriptions.clear(); - }; - }, + client.data.unsubscribe = () => { + for (const sub of clientSubscriptions.values()) { + sub.unsubscribe(); + } + clientSubscriptions.clear(); + }; + }, - async close(client) { - client.data.unsubscribe?.(); - }, + async close(client) { + client.data.unsubscribe?.(); + }, - async message(client, message) { - try { - const msgJSON: unknown = JSON.parse(message.toString()); - const msgs: unknown[] = Array.isArray(msgJSON) ? msgJSON : [msgJSON]; + async message(client, message) { + try { + const msgJSON: unknown = JSON.parse(message.toString()); + const msgs: unknown[] = Array.isArray(msgJSON) + ? msgJSON + : [msgJSON]; - const promises = msgs - .map((raw) => parseTRPCMessage(raw, router._def._config.transformer)) - .map(client.data.handleRequest); + const promises = msgs + .map((raw) => + parseTRPCMessage(raw, router._def._config.transformer), + ) + .map(client.data.handleRequest); - await Promise.all(promises); - } catch (cause) { - const error = new TRPCError({ - code: "PARSE_ERROR", - cause, - }); + await Promise.all(promises); + } catch (cause) { + const error = new TRPCError({ + code: "PARSE_ERROR", + cause, + }); - respond(client, { - id: null, - error: getErrorShape({ - config: router._def._config, - error, - type: "unknown", - path: undefined, - input: undefined, - ctx: undefined, - }), - }); - } - }, - }; + respond(client, { + id: null, + error: getErrorShape({ + config: router._def._config, + error, + type: "unknown", + path: undefined, + input: undefined, + ctx: undefined, + }), + }); + } + }, + }; } diff --git a/src/e2e.test.ts b/src/e2e.test.ts index ebe6408..3e1b152 100644 --- a/src/e2e.test.ts +++ b/src/e2e.test.ts @@ -5,301 +5,301 @@ import { observable } from "@trpc/server/observable"; import { Server } from "bun"; describe("e2e", () => { - let server: Server; - - const createContext = ({ req }: { req: Request }) => { - return { - name: req.headers.get("x-name") ?? "World", - }; - }; - - const t = initTRPC.context().create(); - - const router = t.router({ - hello: t.procedure.query(({ ctx }) => `Hello ${ctx.name}!`), - - exception: t.procedure.query(() => { - throw new Error("MyError"); - }), - - digits: t.procedure.subscription(() => - observable((subscriber) => { - setTimeout(() => { - subscriber.next(0); - subscriber.next(1); - subscriber.next(2); - subscriber.error(new Error("MyError")); - }, 10); - }), - ), - }); - - beforeAll(async () => { - server = Bun.serve( - createBunServeHandler( - { - router, - endpoint: "/trpc", - createContext, - }, - { - port: 13123, - fetch(request, server): Response | Promise { - return new Response("Falling back to fetch"); - }, - }, - ), - ); - }); - - afterAll(() => server.stop()); - - test("http call procedure", async () => { - const response = await fetch("http://localhost:13123/trpc/hello"); - expect(response.ok).toBe(true); - const result = await response.json(); - expect(result).toEqual({ result: { data: "Hello World!" } }); - }); - - test("http call procedure +ctx", async () => { - const response = await fetch("http://localhost:13123/trpc/hello", { - headers: { - "x-name": "John", - }, - }); - expect(response.ok).toBe(true); - const result = await response.json(); - expect(result).toEqual({ result: { data: "Hello John!" } }); - }); - - test("http call exception", async () => { - const response = await fetch("http://localhost:13123/trpc/exception"); - expect(response.ok).toBe(false); - const result = await response.json(); - expect(result).toEqual({ - error: { - code: -32603, - message: "MyError", - data: { - code: "INTERNAL_SERVER_ERROR", - httpStatus: 500, - path: "exception", - stack: expect.any(String), - }, - }, - }); - }); - - test("websocket call procedure", async () => { - const ws = new WebSocket("ws://localhost:13123/trpc"); - const id = Math.random(); - - ws.onopen = () => { - ws.send( - JSON.stringify({ - id, - method: "query", - params: { - path: "hello", - }, - }), - ); - }; - - await new Promise((resolve, reject) => { - ws.onmessage = (event) => { - const data = JSON.parse(event.data); - try { - expect(data).toEqual({ - id, - result: { - type: "data", - data: "Hello World!", - }, - }); - } finally { - resolve(true); - } - }; - }); - - ws.close(); - }); - - test("ws error", async () => { - const ws = new WebSocket("ws://localhost:13123/trpc"); - const id = Math.random(); - - ws.onopen = () => { - ws.send( - JSON.stringify({ - id, - method: "query", - params: { - path: "unknown", - }, - }), - ); - }; - - await new Promise((resolve, reject) => { - ws.onmessage = (event) => { - const data = JSON.parse(event.data); - try { - expect(data).toEqual({ - id, - error: { - code: -32004, - message: `No "query"-procedure on path "unknown"`, - data: { - code: "NOT_FOUND", - httpStatus: 404, - path: "unknown", - stack: expect.any(String), - }, - }, - }); - } finally { - resolve(true); - } - }; - }); - - ws.close(); - }); - - test("ws exception", async () => { - const ws = new WebSocket("ws://localhost:13123/trpc"); - const id = Math.random(); - - ws.onopen = () => { - ws.send( - JSON.stringify({ - id, - method: "query", - params: { - path: "exception", - }, - }), - ); - }; - - await new Promise((resolve, reject) => { - ws.onmessage = (event) => { - const data = JSON.parse(event.data); - try { - expect(data).toEqual({ - id, - error: { - code: -32603, - message: "MyError", - data: { - code: "INTERNAL_SERVER_ERROR", - httpStatus: 500, - path: "exception", - stack: expect.any(String), - }, - }, - }); - } finally { - resolve(true); - } - }; - }); - - ws.close(); - }); - - test("websocket call subscription", async () => { - const ws = new WebSocket("ws://localhost:13123/trpc"); - - const messages: unknown[] = []; - const id = Math.random(); - - ws.onopen = () => { - ws.send( - JSON.stringify({ - id, - method: "subscription", - params: { - path: "digits", - }, - }), - ); - }; - - ws.onmessage = (event) => { - const data = JSON.parse(event.data); - messages.push(data); - }; - - await new Promise((resolve) => setTimeout(resolve, 100)); - - ws.send( - JSON.stringify({ - id, - method: "subscription.stop", - }), - ); - - await new Promise((resolve) => setTimeout(resolve, 100)); - - ws.close(); - - expect(messages).toEqual([ - { - id, - result: { - type: "started", - }, - }, - { - id, - result: { - type: "data", - data: 0, - }, - }, - { - id, - result: { - type: "data", - data: 1, - }, - }, - { - id, - result: { - type: "data", - data: 2, - }, - }, - { - id, - error: { - code: -32603, - message: "MyError", - data: { - code: "INTERNAL_SERVER_ERROR", - httpStatus: 500, - path: "digits", - stack: expect.any(String), - }, - }, - }, - { - id, - result: { - type: "stopped", - }, - }, - ]); - }); - - test("fall through to fetch", async () => { - const response = await fetch("http://localhost:13123/other"); - expect(response.ok).toBe(true); - const result = await response.text(); - expect(result).toEqual("Falling back to fetch"); - }); + let server: Server; + + const createContext = ({ req }: { req: Request }) => { + return { + name: req.headers.get("x-name") ?? "World", + }; + }; + + const t = initTRPC.context().create(); + + const router = t.router({ + hello: t.procedure.query(({ ctx }) => `Hello ${ctx.name}!`), + + exception: t.procedure.query(() => { + throw new Error("MyError"); + }), + + digits: t.procedure.subscription(() => + observable((subscriber) => { + setTimeout(() => { + subscriber.next(0); + subscriber.next(1); + subscriber.next(2); + subscriber.error(new Error("MyError")); + }, 10); + }), + ), + }); + + beforeAll(async () => { + server = Bun.serve( + createBunServeHandler( + { + router, + endpoint: "/trpc", + createContext, + }, + { + port: 13123, + fetch(request, server): Response | Promise { + return new Response("Falling back to fetch"); + }, + }, + ), + ); + }); + + afterAll(() => server.stop()); + + test("http call procedure", async () => { + const response = await fetch("http://localhost:13123/trpc/hello"); + expect(response.ok).toBe(true); + const result = await response.json(); + expect(result).toEqual({ result: { data: "Hello World!" } }); + }); + + test("http call procedure +ctx", async () => { + const response = await fetch("http://localhost:13123/trpc/hello", { + headers: { + "x-name": "John", + }, + }); + expect(response.ok).toBe(true); + const result = await response.json(); + expect(result).toEqual({ result: { data: "Hello John!" } }); + }); + + test("http call exception", async () => { + const response = await fetch("http://localhost:13123/trpc/exception"); + expect(response.ok).toBe(false); + const result = await response.json(); + expect(result).toEqual({ + error: { + code: -32603, + message: "MyError", + data: { + code: "INTERNAL_SERVER_ERROR", + httpStatus: 500, + path: "exception", + stack: expect.any(String), + }, + }, + }); + }); + + test("websocket call procedure", async () => { + const ws = new WebSocket("ws://localhost:13123/trpc"); + const id = Math.random(); + + ws.onopen = () => { + ws.send( + JSON.stringify({ + id, + method: "query", + params: { + path: "hello", + }, + }), + ); + }; + + await new Promise((resolve, reject) => { + ws.onmessage = (event) => { + const data = JSON.parse(event.data); + try { + expect(data).toEqual({ + id, + result: { + type: "data", + data: "Hello World!", + }, + }); + } finally { + resolve(true); + } + }; + }); + + ws.close(); + }); + + test("ws error", async () => { + const ws = new WebSocket("ws://localhost:13123/trpc"); + const id = Math.random(); + + ws.onopen = () => { + ws.send( + JSON.stringify({ + id, + method: "query", + params: { + path: "unknown", + }, + }), + ); + }; + + await new Promise((resolve, reject) => { + ws.onmessage = (event) => { + const data = JSON.parse(event.data); + try { + expect(data).toEqual({ + id, + error: { + code: -32004, + message: `No "query"-procedure on path "unknown"`, + data: { + code: "NOT_FOUND", + httpStatus: 404, + path: "unknown", + stack: expect.any(String), + }, + }, + }); + } finally { + resolve(true); + } + }; + }); + + ws.close(); + }); + + test("ws exception", async () => { + const ws = new WebSocket("ws://localhost:13123/trpc"); + const id = Math.random(); + + ws.onopen = () => { + ws.send( + JSON.stringify({ + id, + method: "query", + params: { + path: "exception", + }, + }), + ); + }; + + await new Promise((resolve, reject) => { + ws.onmessage = (event) => { + const data = JSON.parse(event.data); + try { + expect(data).toEqual({ + id, + error: { + code: -32603, + message: "MyError", + data: { + code: "INTERNAL_SERVER_ERROR", + httpStatus: 500, + path: "exception", + stack: expect.any(String), + }, + }, + }); + } finally { + resolve(true); + } + }; + }); + + ws.close(); + }); + + test("websocket call subscription", async () => { + const ws = new WebSocket("ws://localhost:13123/trpc"); + + const messages: unknown[] = []; + const id = Math.random(); + + ws.onopen = () => { + ws.send( + JSON.stringify({ + id, + method: "subscription", + params: { + path: "digits", + }, + }), + ); + }; + + ws.onmessage = (event) => { + const data = JSON.parse(event.data); + messages.push(data); + }; + + await new Promise((resolve) => setTimeout(resolve, 100)); + + ws.send( + JSON.stringify({ + id, + method: "subscription.stop", + }), + ); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + ws.close(); + + expect(messages).toEqual([ + { + id, + result: { + type: "started", + }, + }, + { + id, + result: { + type: "data", + data: 0, + }, + }, + { + id, + result: { + type: "data", + data: 1, + }, + }, + { + id, + result: { + type: "data", + data: 2, + }, + }, + { + id, + error: { + code: -32603, + message: "MyError", + data: { + code: "INTERNAL_SERVER_ERROR", + httpStatus: 500, + path: "digits", + stack: expect.any(String), + }, + }, + }, + { + id, + result: { + type: "stopped", + }, + }, + ]); + }); + + test("fall through to fetch", async () => { + const response = await fetch("http://localhost:13123/other"); + expect(response.ok).toBe(true); + const result = await response.text(); + expect(result).toEqual("Falling back to fetch"); + }); });