From 54f0898ee15c030b8d5f22f258b84c8f24720708 Mon Sep 17 00:00:00 2001 From: Hugo Dutka Date: Sun, 16 Nov 2025 19:51:41 +0100 Subject: [PATCH 1/2] fix: make compute server clients share server state --- packages/blink/src/cli/compute-server.test.ts | 279 +++++++++++++++++ packages/blink/src/cli/compute-server.ts | 293 ++++++++++++++++-- 2 files changed, 542 insertions(+), 30 deletions(-) create mode 100644 packages/blink/src/cli/compute-server.test.ts diff --git a/packages/blink/src/cli/compute-server.test.ts b/packages/blink/src/cli/compute-server.test.ts new file mode 100644 index 0000000..dce0d0e --- /dev/null +++ b/packages/blink/src/cli/compute-server.test.ts @@ -0,0 +1,279 @@ +import { expect, test } from "bun:test"; +import { Client } from "@blink-sdk/compute-protocol/client"; +import Multiplexer, { Stream } from "@blink-sdk/multiplexer"; +import { Buffer } from "node:buffer"; +import type { AddressInfo } from "node:net"; +import { createServer as createNetServer } from "node:net"; +import WebSocket from "ws"; +import type { WebSocketServer } from "ws"; +import serveCompute from "./compute-server"; + +type RawData = WebSocket.RawData; + +interface RemoteClient { + client: Client; + close: () => Promise; +} + +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + +const rawDataToUint8Array = (data: RawData): Uint8Array => { + if (Array.isArray(data)) { + return rawDataToUint8Array(Buffer.concat(data)); + } + if (data instanceof Uint8Array) { + return new Uint8Array(data.buffer, data.byteOffset, data.byteLength); + } + return new Uint8Array(data); +}; + +const createRemoteClient = (url: string): Promise => { + return new Promise((resolve, reject) => { + let settled = false; + const ws = new WebSocket(url); + const encoder = new TextEncoder(); + const decoder = new TextDecoder(); + + const multiplexer = new Multiplexer({ + send: (packet) => { + if (ws.readyState === WebSocket.OPEN) { + ws.send(packet); + } + }, + isClient: true, + }); + const clientStream = multiplexer.createStream(); + const client = new Client({ + send: (message: string) => { + clientStream.writeTyped(0x00, encoder.encode(message), true); + }, + }); + + const wireStream = (stream: Stream) => { + stream.onData((data) => { + const payload = data.subarray(1); + client.handleMessage(decoder.decode(payload)); + }); + }; + + wireStream(clientStream); + multiplexer.onStream((stream) => { + wireStream(stream); + }); + + ws.on("message", (data) => { + multiplexer.handleMessage(rawDataToUint8Array(data)); + }); + ws.on("open", () => { + settled = true; + resolve({ + client, + close: async () => { + await new Promise((resolveClose) => { + if (ws.readyState === WebSocket.CLOSED) { + resolveClose(); + return; + } + ws.once("close", () => resolveClose()); + ws.close(); + }); + }, + }); + }); + ws.on("error", (err) => { + if (!settled) { + reject(err); + } + }); + ws.on("close", () => { + client.dispose("connection closed"); + }); + }); +}; + +const closeServer = async (wss: WebSocketServer) => { + await new Promise((resolve, reject) => { + wss.close((err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +}; + +const getAvailablePort = async (host: string): Promise => { + return new Promise((resolve, reject) => { + const server = createNetServer(); + server.once("error", reject); + server.listen(0, host, () => { + const address = server.address() as AddressInfo; + server.close(() => resolve(address.port)); + }); + }); +}; + +const buildTestServer = async () => { + const host = "127.0.0.1"; + const port = await getAvailablePort(host); + const server = await serveCompute({ + host, + port, + logger: { + error: () => {}, + warn: () => {}, + info: () => {}, + }, + }); + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("Failed to determine server address"); + } + const url = `ws://${host}:${address.port}`; + return { + server, + url, + close: () => closeServer(server), + }; +}; + +const waitForCondition = async ( + predicate: () => boolean, + timeoutMs = 5_000 +) => { + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + if (predicate()) return; + await sleep(25); + } + throw new Error("Condition not met within timeout"); +}; + +test("multiple clients share the same compute server state", async () => { + const { server, url, close } = await buildTestServer(); + + const remoteA = await createRemoteClient(url); + const remoteB = await createRemoteClient(url); + + const observedPids: number[] = []; + const notificationDisposable = remoteB.client.onNotification( + "process_status", + (payload) => { + observedPids.push(payload.status.pid); + } + ); + + const exec = await remoteA.client.request("process_execute", { + command: "bash", + args: ["-lc", 'echo "shared-process"'], + cwd: ".", + }); + + const waitResult = await remoteB.client.request("process_wait", { + pid: exec.pid, + timeout_ms: 10_000, + }); + + expect(waitResult.pid).toBe(exec.pid); + expect(waitResult.plain_output.lines.join("\n")).toContain("shared-process"); + expect(observedPids).toContain(exec.pid); + + notificationDisposable.dispose(); + await Promise.all([remoteA.close(), remoteB.close()]); + await close(); +}); + +test("broadcasts process output notifications to all clients", async () => { + const { server, url, close } = await buildTestServer(); + const remoteA = await createRemoteClient(url); + const remoteB = await createRemoteClient(url); + const remoteC = await createRemoteClient(url); + + const outputsB: string[] = []; + const outputsC: string[] = []; + const disposeB = remoteB.client.onNotification("process_output", (payload) => + outputsB.push(payload.output) + ); + const disposeC = remoteC.client.onNotification("process_output", (payload) => + outputsC.push(payload.output) + ); + + const exec = await remoteA.client.request("process_execute", { + command: "bash", + args: ["-lc", 'printf "fanout"; sleep 0.1'], + cwd: ".", + }); + await remoteA.client.request("process_wait", { + pid: exec.pid, + timeout_ms: 5_000, + }); + + await waitForCondition( + () => + outputsB.join("").includes("fanout") && + outputsC.join("").includes("fanout") + ); + + disposeB.dispose(); + disposeC.dispose(); + await Promise.all([remoteA.close(), remoteB.close(), remoteC.close()]); + await close(); +}); + +test("process remains accessible after originating client disconnects", async () => { + const { server, url, close } = await buildTestServer(); + const remoteA = await createRemoteClient(url); + const remoteB = await createRemoteClient(url); + + const exec = await remoteA.client.request("process_execute", { + command: "bash", + args: ["-lc", 'sleep 0.2; echo "still-running"'], + cwd: ".", + }); + + await remoteA.close(); // disconnect before waiting + + const result = await remoteB.client.request("process_wait", { + pid: exec.pid, + timeout_ms: 10_000, + }); + + expect(result.plain_output.lines.join("\n")).toContain("still-running"); + + await remoteB.close(); + await close(); +}); + +test("handles many sequential streams without collisions", async () => { + const { server, url, close } = await buildTestServer(); + const remote = await createRemoteClient(url); + + const promises = []; + + for (let i = 0; i < 10; i++) { + promises.push( + (async () => { + const marker = `seq-${i}`; + const exec = await remote.client.request("process_execute", { + command: "bash", + args: ["-lc", `echo "${marker}"`], + cwd: ".", + }); + const waitResult = await remote.client.request("process_wait", { + pid: exec.pid, + timeout_ms: 5_000, + }); + return { marker, output: waitResult.plain_output.lines.join("\n") }; + })() + ); + } + + for (const promise of promises) { + const { marker, output } = await promise; + expect(output).toContain(marker); + } + + await remote.close(); + await close(); +}); diff --git a/packages/blink/src/cli/compute-server.ts b/packages/blink/src/cli/compute-server.ts index 2f9cb75..ff25beb 100644 --- a/packages/blink/src/cli/compute-server.ts +++ b/packages/blink/src/cli/compute-server.ts @@ -1,8 +1,8 @@ -import { Server, type ServerOptions } from "@blink-sdk/compute-protocol/server"; -import { Emitter } from "@blink-sdk/events"; -import fs from "node:fs"; -import os from "node:os"; -import path from "node:path"; +import { Server } from "@blink-sdk/compute-protocol/server"; +import { FrameCodec, MessageType } from "@blink-sdk/multiplexer"; +import { Buffer } from "node:buffer"; +import type { AddressInfo } from "node:net"; +import type { WebSocket, ServerOptions as WebSocketServerOptions } from "ws"; import { WebSocketServer } from "ws"; const defaultEnvVariables = { @@ -21,52 +21,285 @@ const defaultEnvVariables = { GIT_CONFIG_VALUE_0: "!gh auth git-credential", }; -export default async function serveCompute() { +interface ClientConnection { + ws: WebSocket; + clientToServerStream: Map; + serverToClientStream: Map; +} + +const toUint8Array = ( + data: Buffer | ArrayBuffer | Uint8Array | Buffer[] +): Uint8Array => { + if (Array.isArray(data)) { + const combined = Buffer.concat(data); + return new Uint8Array( + combined.buffer, + combined.byteOffset, + combined.byteLength + ); + } + if (data instanceof Uint8Array) { + return new Uint8Array(data.buffer, data.byteOffset, data.byteLength); + } + return new Uint8Array(data); +}; + +type MultiplexerFrame = ReturnType; + +const encodeForClient = ( + frame: MultiplexerFrame, + streamId: number +): { buffer: Buffer; release: () => void } => { + const encoded = FrameCodec.encode({ + streamId, + type: frame.type, + flags: frame.flags, + payload: frame.payload, + }); + const buffer = Buffer.from( + encoded.buffer, + encoded.byteOffset, + encoded.byteLength + ); + return { + buffer, + release: () => FrameCodec.releaseBuffer(encoded), + }; +}; + +const encodeForServer = ( + frame: MultiplexerFrame, + streamId: number +): Uint8Array => { + const encoded = FrameCodec.encode({ + streamId, + type: frame.type, + flags: frame.flags, + payload: frame.payload, + }); + return encoded; +}; + +export interface ServeComputeOptions { + host?: string; + port?: number; + logger?: { + error: (...args: unknown[]) => void; + warn: (...args: unknown[]) => void; + info: (...args: unknown[]) => void; + }; + createWebSocketServer?: (options: WebSocketServerOptions) => WebSocketServer; +} + +const waitForListening = (wss: WebSocketServer): Promise => { + return new Promise((resolve) => { + const address = wss.address(); + if (address) { + resolve(); + return; + } + wss.once("listening", () => resolve()); + }); +}; + +const resolvePort = (wss: WebSocketServer, fallback: number): number => { + const address = wss.address(); + if (typeof address === "object" && address) { + return (address as AddressInfo).port; + } + if (typeof address === "number") { + return address; + } + return fallback; +}; + +export default async function serveCompute( + options: ServeComputeOptions = {} +): Promise { + const logger = options.logger ?? console; for (const [key, value] of Object.entries(defaultEnvVariables)) { if (process.env[key] === undefined) { process.env[key] = value; } } - const port = parseInt(process.env.PORT ?? "22137"); + const port = + options.port ?? (process.env.PORT ? parseInt(process.env.PORT, 10) : 22137); if (isNaN(port)) { throw new Error("PORT environment variable is not a number"); } - const host = process.env.HOST ?? "127.0.0.1"; + const host = options.host ?? process.env.HOST ?? "127.0.0.1"; - const wss = new WebSocketServer({ port, host }); + let nodePty: typeof import("@lydell/node-pty") | undefined; + try { + nodePty = require("@lydell/node-pty"); + } catch (e) { + // It's fine, we don't _need_ to use TTYs. + } + if (typeof Bun !== "undefined") { + nodePty = undefined; + } - console.log(`Compute server running on ${host}:${port}`); + const clients = new Set(); + // Track which remote websocket owns a given server-side stream. + const serverStreamOwners = new Map(); + let nextServerStreamId = 1; - wss.on("connection", (ws) => { - console.log("Client connected"); + // Server-initiated streams (notifications, proxy responses, etc.) are copied to every client. + const broadcastServerFrame = (frameData: Uint8Array) => { + for (const client of clients) { + client.ws.send(Buffer.from(frameData)); + } + }; - let nodePty: typeof import("@lydell/node-pty") | undefined; - try { - nodePty = require("@lydell/node-pty"); - } catch (e) { - // It's fine, we don't _need_ to use TTYs. + const cleanupStreamMapping = ( + client: ClientConnection, + serverStreamId: number + ) => { + const clientStreamId = client.serverToClientStream.get(serverStreamId); + if (clientStreamId !== undefined) { + client.serverToClientStream.delete(serverStreamId); + client.clientToServerStream.delete(clientStreamId); } - if (typeof Bun !== "undefined") { - nodePty = undefined; + serverStreamOwners.delete(serverStreamId); + }; + + const server = new Server({ + nodePty, + send: (message: Uint8Array) => { + let frame: MultiplexerFrame; + using _releaser = { + [Symbol.dispose]() { + FrameCodec.releaseBuffer(message); + }, + }; + try { + frame = FrameCodec.decode(message); + } catch (err) { + logger.error("Failed to decode server frame", err); + return; + } + const owner = serverStreamOwners.get(frame.streamId); + if (owner) { + const clientStreamId = owner.serverToClientStream.get(frame.streamId); + if (clientStreamId === undefined) { + cleanupStreamMapping(owner, frame.streamId); + return; + } + const { buffer, release } = encodeForClient(frame, clientStreamId); + try { + owner.ws.send(buffer); + } finally { + release(); + } + if ( + frame.type === MessageType.CLOSE || + frame.type === MessageType.ERROR + ) { + cleanupStreamMapping(owner, frame.streamId); + } + return; + } + + if (frame.streamId % 2 === 0) { + // Broadcast server-initiated streams (e.g., notifications) to all clients. + broadcastServerFrame(message); + } else { + // Stream owner vanished (client disconnected). Drop the frame. + } + }, + }); + + const allocateServerStreamId = () => { + const streamId = nextServerStreamId; + nextServerStreamId += 2; + return streamId; + }; + + const forwardToServer = (frame: MultiplexerFrame): void => { + const encoded = encodeForServer(frame, frame.streamId); + try { + server.handleMessage(encoded); + } finally { + FrameCodec.releaseBuffer(encoded); } + }; - const server = new Server({ - nodePty, - send: (message: Uint8Array) => { - // Send binary data to the WebSocket client - ws.send(message); - }, - }); + const wss = + options.createWebSocketServer?.({ port, host }) ?? + new WebSocketServer({ port, host }); + + await waitForListening(wss); + + const resolvedPort = resolvePort(wss, port); + logger.info(`Compute server running on ${host}:${resolvedPort}`); + + wss.on("connection", (ws) => { + logger.info("Client connected"); + const client: ClientConnection = { + ws, + clientToServerStream: new Map(), + serverToClientStream: new Map(), + }; + clients.add(client); - ws.on("message", (data: Buffer) => { - // Forward WebSocket messages to the server - server.handleMessage(new Uint8Array(data)); + ws.on("message", (raw) => { + if (typeof raw === "string") { + logger.warn("Ignoring unexpected text message from client"); + return; + } + const data = toUint8Array(raw as Buffer | ArrayBuffer | Uint8Array); + let frame: MultiplexerFrame; + try { + frame = FrameCodec.decode(data); + } catch (err) { + logger.error("Failed to decode client frame", err); + ws.close(1002, "invalid frame"); + return; + } + let serverStreamId = client.clientToServerStream.get(frame.streamId); + if (serverStreamId === undefined) { + serverStreamId = allocateServerStreamId(); + client.clientToServerStream.set(frame.streamId, serverStreamId); + client.serverToClientStream.set(serverStreamId, frame.streamId); + serverStreamOwners.set(serverStreamId, client); + } + // Rewrite the stream ID so that all clients share a single server instance. + frame.streamId = serverStreamId; + forwardToServer(frame); + if ( + frame.type === MessageType.CLOSE || + frame.type === MessageType.ERROR + ) { + cleanupStreamMapping(client, serverStreamId); + } }); + const closeClientStreams = () => { + for (const serverStreamId of [...client.serverToClientStream.keys()]) { + const cleanupFrame: MultiplexerFrame = { + streamId: serverStreamId, + type: MessageType.CLOSE, + flags: 0, + payload: new Uint8Array(0), + }; + forwardToServer(cleanupFrame); + cleanupStreamMapping(client, serverStreamId); + } + }; + ws.on("close", () => { - console.log("Client disconnected"); + closeClientStreams(); + clients.delete(client); + logger.info("Client disconnected"); + }); + + ws.on("error", () => { + closeClientStreams(); + clients.delete(client); }); }); + + return wss; } From 8cf19b440cfed51fb2441a543dbfd1a308087ab3 Mon Sep 17 00:00:00 2001 From: Hugo Dutka Date: Sun, 16 Nov 2025 21:00:20 +0100 Subject: [PATCH 2/2] bump version --- packages/blink/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/blink/package.json b/packages/blink/package.json index 85036f6..89f2254 100644 --- a/packages/blink/package.json +++ b/packages/blink/package.json @@ -1,6 +1,6 @@ { "name": "blink", - "version": "1.1.32", + "version": "1.1.33", "description": "Blink is a tool for building and deploying AI agents.", "type": "module", "bin": {