Skip to content

Commit e160e6d

Browse files
authored
fix: make compute server clients share server state (#80)
1 parent a48955f commit e160e6d

File tree

3 files changed

+543
-31
lines changed

3 files changed

+543
-31
lines changed

packages/blink/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "blink",
3-
"version": "1.1.32",
3+
"version": "1.1.33",
44
"description": "Blink is a tool for building and deploying AI agents.",
55
"type": "module",
66
"bin": {
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
import { expect, test } from "bun:test";
2+
import { Client } from "@blink-sdk/compute-protocol/client";
3+
import Multiplexer, { Stream } from "@blink-sdk/multiplexer";
4+
import { Buffer } from "node:buffer";
5+
import type { AddressInfo } from "node:net";
6+
import { createServer as createNetServer } from "node:net";
7+
import WebSocket from "ws";
8+
import type { WebSocketServer } from "ws";
9+
import serveCompute from "./compute-server";
10+
11+
type RawData = WebSocket.RawData;
12+
13+
interface RemoteClient {
14+
client: Client;
15+
close: () => Promise<void>;
16+
}
17+
18+
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
19+
20+
const rawDataToUint8Array = (data: RawData): Uint8Array => {
21+
if (Array.isArray(data)) {
22+
return rawDataToUint8Array(Buffer.concat(data));
23+
}
24+
if (data instanceof Uint8Array) {
25+
return new Uint8Array(data.buffer, data.byteOffset, data.byteLength);
26+
}
27+
return new Uint8Array(data);
28+
};
29+
30+
const createRemoteClient = (url: string): Promise<RemoteClient> => {
31+
return new Promise((resolve, reject) => {
32+
let settled = false;
33+
const ws = new WebSocket(url);
34+
const encoder = new TextEncoder();
35+
const decoder = new TextDecoder();
36+
37+
const multiplexer = new Multiplexer({
38+
send: (packet) => {
39+
if (ws.readyState === WebSocket.OPEN) {
40+
ws.send(packet);
41+
}
42+
},
43+
isClient: true,
44+
});
45+
const clientStream = multiplexer.createStream();
46+
const client = new Client({
47+
send: (message: string) => {
48+
clientStream.writeTyped(0x00, encoder.encode(message), true);
49+
},
50+
});
51+
52+
const wireStream = (stream: Stream) => {
53+
stream.onData((data) => {
54+
const payload = data.subarray(1);
55+
client.handleMessage(decoder.decode(payload));
56+
});
57+
};
58+
59+
wireStream(clientStream);
60+
multiplexer.onStream((stream) => {
61+
wireStream(stream);
62+
});
63+
64+
ws.on("message", (data) => {
65+
multiplexer.handleMessage(rawDataToUint8Array(data));
66+
});
67+
ws.on("open", () => {
68+
settled = true;
69+
resolve({
70+
client,
71+
close: async () => {
72+
await new Promise<void>((resolveClose) => {
73+
if (ws.readyState === WebSocket.CLOSED) {
74+
resolveClose();
75+
return;
76+
}
77+
ws.once("close", () => resolveClose());
78+
ws.close();
79+
});
80+
},
81+
});
82+
});
83+
ws.on("error", (err) => {
84+
if (!settled) {
85+
reject(err);
86+
}
87+
});
88+
ws.on("close", () => {
89+
client.dispose("connection closed");
90+
});
91+
});
92+
};
93+
94+
const closeServer = async (wss: WebSocketServer) => {
95+
await new Promise<void>((resolve, reject) => {
96+
wss.close((err) => {
97+
if (err) {
98+
reject(err);
99+
} else {
100+
resolve();
101+
}
102+
});
103+
});
104+
};
105+
106+
const getAvailablePort = async (host: string): Promise<number> => {
107+
return new Promise((resolve, reject) => {
108+
const server = createNetServer();
109+
server.once("error", reject);
110+
server.listen(0, host, () => {
111+
const address = server.address() as AddressInfo;
112+
server.close(() => resolve(address.port));
113+
});
114+
});
115+
};
116+
117+
const buildTestServer = async () => {
118+
const host = "127.0.0.1";
119+
const port = await getAvailablePort(host);
120+
const server = await serveCompute({
121+
host,
122+
port,
123+
logger: {
124+
error: () => {},
125+
warn: () => {},
126+
info: () => {},
127+
},
128+
});
129+
const address = server.address();
130+
if (!address || typeof address === "string") {
131+
throw new Error("Failed to determine server address");
132+
}
133+
const url = `ws://${host}:${address.port}`;
134+
return {
135+
server,
136+
url,
137+
close: () => closeServer(server),
138+
};
139+
};
140+
141+
const waitForCondition = async (
142+
predicate: () => boolean,
143+
timeoutMs = 5_000
144+
) => {
145+
const start = Date.now();
146+
while (Date.now() - start < timeoutMs) {
147+
if (predicate()) return;
148+
await sleep(25);
149+
}
150+
throw new Error("Condition not met within timeout");
151+
};
152+
153+
test("multiple clients share the same compute server state", async () => {
154+
const { server, url, close } = await buildTestServer();
155+
156+
const remoteA = await createRemoteClient(url);
157+
const remoteB = await createRemoteClient(url);
158+
159+
const observedPids: number[] = [];
160+
const notificationDisposable = remoteB.client.onNotification(
161+
"process_status",
162+
(payload) => {
163+
observedPids.push(payload.status.pid);
164+
}
165+
);
166+
167+
const exec = await remoteA.client.request("process_execute", {
168+
command: "bash",
169+
args: ["-lc", 'echo "shared-process"'],
170+
cwd: ".",
171+
});
172+
173+
const waitResult = await remoteB.client.request("process_wait", {
174+
pid: exec.pid,
175+
timeout_ms: 10_000,
176+
});
177+
178+
expect(waitResult.pid).toBe(exec.pid);
179+
expect(waitResult.plain_output.lines.join("\n")).toContain("shared-process");
180+
expect(observedPids).toContain(exec.pid);
181+
182+
notificationDisposable.dispose();
183+
await Promise.all([remoteA.close(), remoteB.close()]);
184+
await close();
185+
});
186+
187+
test("broadcasts process output notifications to all clients", async () => {
188+
const { server, url, close } = await buildTestServer();
189+
const remoteA = await createRemoteClient(url);
190+
const remoteB = await createRemoteClient(url);
191+
const remoteC = await createRemoteClient(url);
192+
193+
const outputsB: string[] = [];
194+
const outputsC: string[] = [];
195+
const disposeB = remoteB.client.onNotification("process_output", (payload) =>
196+
outputsB.push(payload.output)
197+
);
198+
const disposeC = remoteC.client.onNotification("process_output", (payload) =>
199+
outputsC.push(payload.output)
200+
);
201+
202+
const exec = await remoteA.client.request("process_execute", {
203+
command: "bash",
204+
args: ["-lc", 'printf "fanout"; sleep 0.1'],
205+
cwd: ".",
206+
});
207+
await remoteA.client.request("process_wait", {
208+
pid: exec.pid,
209+
timeout_ms: 5_000,
210+
});
211+
212+
await waitForCondition(
213+
() =>
214+
outputsB.join("").includes("fanout") &&
215+
outputsC.join("").includes("fanout")
216+
);
217+
218+
disposeB.dispose();
219+
disposeC.dispose();
220+
await Promise.all([remoteA.close(), remoteB.close(), remoteC.close()]);
221+
await close();
222+
});
223+
224+
test("process remains accessible after originating client disconnects", async () => {
225+
const { server, url, close } = await buildTestServer();
226+
const remoteA = await createRemoteClient(url);
227+
const remoteB = await createRemoteClient(url);
228+
229+
const exec = await remoteA.client.request("process_execute", {
230+
command: "bash",
231+
args: ["-lc", 'sleep 0.2; echo "still-running"'],
232+
cwd: ".",
233+
});
234+
235+
await remoteA.close(); // disconnect before waiting
236+
237+
const result = await remoteB.client.request("process_wait", {
238+
pid: exec.pid,
239+
timeout_ms: 10_000,
240+
});
241+
242+
expect(result.plain_output.lines.join("\n")).toContain("still-running");
243+
244+
await remoteB.close();
245+
await close();
246+
});
247+
248+
test("handles many sequential streams without collisions", async () => {
249+
const { server, url, close } = await buildTestServer();
250+
const remote = await createRemoteClient(url);
251+
252+
const promises = [];
253+
254+
for (let i = 0; i < 10; i++) {
255+
promises.push(
256+
(async () => {
257+
const marker = `seq-${i}`;
258+
const exec = await remote.client.request("process_execute", {
259+
command: "bash",
260+
args: ["-lc", `echo "${marker}"`],
261+
cwd: ".",
262+
});
263+
const waitResult = await remote.client.request("process_wait", {
264+
pid: exec.pid,
265+
timeout_ms: 5_000,
266+
});
267+
return { marker, output: waitResult.plain_output.lines.join("\n") };
268+
})()
269+
);
270+
}
271+
272+
for (const promise of promises) {
273+
const { marker, output } = await promise;
274+
expect(output).toContain(marker);
275+
}
276+
277+
await remote.close();
278+
await close();
279+
});

0 commit comments

Comments
 (0)