diff --git a/packages/blink/src/agent/client/index.ts b/packages/blink/src/agent/client/index.ts index 14f068b..de326a1 100644 --- a/packages/blink/src/agent/client/index.ts +++ b/packages/blink/src/agent/client/index.ts @@ -31,12 +31,10 @@ export type CapabilitiesResponse = Awaited>; export class Client { public readonly baseUrl: string; private readonly client: ReturnType>; - public readonly agentLock: RWLock; public constructor(options: ClientOptions) { this.client = hc(options.baseUrl); this.baseUrl = options.baseUrl; - this.agentLock = new RWLock(); } /** diff --git a/packages/blink/src/cli/run.ts b/packages/blink/src/cli/run.ts index cc0e8ba..da45cab 100644 --- a/packages/blink/src/cli/run.ts +++ b/packages/blink/src/cli/run.ts @@ -9,6 +9,7 @@ import { resolveConfig } from "../build"; import { findNearestEntry } from "../build/util"; import { existsSync } from "node:fs"; import type { ID } from "../agent/types"; +import { RWLock } from "../local/rw-lock"; export default async function run( message: string[], @@ -71,7 +72,7 @@ export default async function run( console.error("Error:", error); }, }); - manager.setAgent(agent.client); + manager.setAgent({ client: agent.client, lock: new RWLock() }); try { // Wait for completion by subscribing to state changes diff --git a/packages/blink/src/local/chat-manager.test.ts b/packages/blink/src/local/chat-manager.test.ts index de04ad3..fa79132 100644 --- a/packages/blink/src/local/chat-manager.test.ts +++ b/packages/blink/src/local/chat-manager.test.ts @@ -10,99 +10,45 @@ import type { StoredChat, StoredMessage } from "./types"; import type { Client } from "../agent/client"; // Helper to create a mock agent -function createMockAgent( - responseText: string = "Assistant response" -): Client & { chatCalls: any[] } { +function createMockAgent(responseText: string = "Assistant response"): { + lock: RWLock; + client: Client; + chatCalls: any[]; +} { const chatCalls: any[] = []; return { - agentLock: new RWLock(), + lock: new RWLock(), chatCalls, - chat: async ({ messages, signal }: any) => { - chatCalls.push({ messages, signal }); + client: { + chat: async ({ messages, signal }: any) => { + chatCalls.push({ messages, signal }); - // Return a ReadableStream of UIMessageChunk objects - const stream = new ReadableStream({ - async start(controller) { - if (signal?.aborted) { - controller.close(); - return; - } - - // Start the message - controller.enqueue({ - type: "start", - messageId: "msg-1", - } as UIMessageChunk); - - // Add text content - controller.enqueue({ - type: "text-start", - id: "text-1", - } as UIMessageChunk); - - // Send text - controller.enqueue({ - type: "text-delta", - id: "text-1", - delta: responseText, - } as UIMessageChunk); - - if (!signal?.aborted) { - controller.enqueue({ - type: "text-end", - id: "text-1", - } as UIMessageChunk); - - controller.enqueue({ - type: "finish", - finishReason: "stop", - usage: { promptTokens: 10, completionTokens: 5 }, - } as UIMessageChunk); - } - controller.close(); - }, - }); - - return stream; - }, - } as any; -} - -// Helper to create a slow-streaming agent (yields control between chunks) -function createSlowAgent(chunks: number = 5): Client { - return { - agentLock: new RWLock(), - chat: async ({ signal }: any) => { - const stream = new ReadableStream({ - async start(controller) { - try { + // Return a ReadableStream of UIMessageChunk objects + const stream = new ReadableStream({ + async start(controller) { if (signal?.aborted) { controller.close(); return; } + // Start the message controller.enqueue({ type: "start", messageId: "msg-1", } as UIMessageChunk); + // Add text content controller.enqueue({ type: "text-start", id: "text-1", } as UIMessageChunk); - for (let i = 0; i < chunks; i++) { - if (signal?.aborted) { - throw new Error("AbortError"); - } - controller.enqueue({ - type: "text-delta", - id: "text-1", - delta: `chunk${i}`, - } as UIMessageChunk); - // Yield control to allow other operations - await new Promise((resolve) => setImmediate(resolve)); - } + // Send text + controller.enqueue({ + type: "text-delta", + id: "text-1", + delta: responseText, + } as UIMessageChunk); if (!signal?.aborted) { controller.enqueue({ @@ -117,18 +63,78 @@ function createSlowAgent(chunks: number = 5): Client { } as UIMessageChunk); } controller.close(); - } catch (err: any) { - if (err.message === "AbortError" || signal?.aborted) { + }, + }); + + return stream; + }, + } as any, + }; +} + +// Helper to create a slow-streaming agent (yields control between chunks) +function createSlowAgent(chunks: number = 5): { client: Client; lock: RWLock } { + return { + lock: new RWLock(), + client: { + chat: async ({ signal }: any) => { + const stream = new ReadableStream({ + async start(controller) { + try { + if (signal?.aborted) { + controller.close(); + return; + } + + controller.enqueue({ + type: "start", + messageId: "msg-1", + } as UIMessageChunk); + + controller.enqueue({ + type: "text-start", + id: "text-1", + } as UIMessageChunk); + + for (let i = 0; i < chunks; i++) { + if (signal?.aborted) { + throw new Error("AbortError"); + } + controller.enqueue({ + type: "text-delta", + id: "text-1", + delta: `chunk${i}`, + } as UIMessageChunk); + // Yield control to allow other operations + await new Promise((resolve) => setImmediate(resolve)); + } + + if (!signal?.aborted) { + controller.enqueue({ + type: "text-end", + id: "text-1", + } as UIMessageChunk); + + controller.enqueue({ + type: "finish", + finishReason: "stop", + usage: { promptTokens: 10, completionTokens: 5 }, + } as UIMessageChunk); + } controller.close(); - } else { - controller.error(err); + } catch (err: any) { + if (err.message === "AbortError" || signal?.aborted) { + controller.close(); + } else { + controller.error(err); + } } - } - }, - }); - return stream; - }, - } as any; + }, + }); + return stream; + }, + } as any, + }; } // Helper to create a stored message diff --git a/packages/blink/src/local/chat-manager.ts b/packages/blink/src/local/chat-manager.ts index 34214ca..190e9c0 100644 --- a/packages/blink/src/local/chat-manager.ts +++ b/packages/blink/src/local/chat-manager.ts @@ -17,6 +17,7 @@ import { import type { ID } from "../agent/types"; import { stripVTControlCharacters } from "node:util"; import { RWLock } from "./rw-lock"; +import type { Agent } from "../react/use-agent"; export type ChatStatus = "idle" | "streaming" | "error"; @@ -60,7 +61,7 @@ type StateListener = (state: ChatState) => void; */ export class ChatManager { private chatId: ID; - private agent: Client | undefined; + private agent: Agent | undefined; private chatStore: Store; private serializeMessage?: (message: UIMessage) => StoredMessage | undefined; private filterMessages?: (message: StoredMessage) => boolean; @@ -171,7 +172,7 @@ export class ChatManager { /** * Update the agent instance to be used for chats */ - setAgent(agent: Client | undefined): void { + setAgent(agent: Agent | undefined): void { this.agent = agent; } @@ -428,11 +429,11 @@ export class ChatManager { }); // Acquire read lock on agent to prevent it from being disposed while streaming. - using _agentLock = await this.agent.agentLock.read(); + using _agentLock = await this.agent.lock.read(); // Stream agent response const streamStartTime = performance.now(); const stream = await runAgent({ - agent: this.agent, + agent: this.agent.client, id: this.chatId as ID, signal: controller.signal, messages, diff --git a/packages/blink/src/local/server.ts b/packages/blink/src/local/server.ts index 458e043..d5b8a4d 100644 --- a/packages/blink/src/local/server.ts +++ b/packages/blink/src/local/server.ts @@ -10,11 +10,12 @@ import { ChatManager } from "./chat-manager"; import { createDiskStore } from "./disk-store"; import { convertMessage, type StoredChat } from "./types"; import { v5 as uuidv5 } from "uuid"; +import type { Agent } from "../react/use-agent"; export interface CreateLocalServerOptions { readonly dataDirectory: string; readonly port?: number; - readonly getAgent: () => Client | undefined; + readonly getAgent: () => Agent | undefined; } export type LocalServer = ReturnType; diff --git a/packages/blink/src/react/use-agent.ts b/packages/blink/src/react/use-agent.ts index 2882a19..8127c22 100644 --- a/packages/blink/src/react/use-agent.ts +++ b/packages/blink/src/react/use-agent.ts @@ -20,9 +20,14 @@ export interface UseAgentOptions { readonly apiServerUrl?: string; } +export interface Agent { + readonly client: Client; + readonly lock: RWLock; +} + // useAgent is a hook that provides a client for an agent at the given entrypoint. export default function useAgent(options: UseAgentOptions) { - const [agent, setAgent] = useState(undefined); + const [agent, setAgent] = useState(undefined); const [logs, setLogs] = useState([]); const [error, setError] = useState(undefined); const [buildResult, setBuildResult] = useState(options.buildResult); @@ -133,7 +138,8 @@ export default function useAgent(options: UseAgentOptions) { const client = new Client({ baseUrl: `http://127.0.0.1:${port}`, }); - lock = client.agentLock; + const agentLock = new RWLock(); + lock = agentLock; // Wait for the health endpoint to be alive. while (!controller.signal.aborted) { try { @@ -150,7 +156,7 @@ export default function useAgent(options: UseAgentOptions) { ready = true; const capabilities = await client.capabilities(); setCapabilities(capabilities); - setAgent(client); + setAgent({ client, lock: agentLock }); })().catch((err) => { // Don't set error if this was just a cleanup abort if (!isCleanup) { diff --git a/packages/blink/src/react/use-chat.ts b/packages/blink/src/react/use-chat.ts index 6299971..1b11022 100644 --- a/packages/blink/src/react/use-chat.ts +++ b/packages/blink/src/react/use-chat.ts @@ -4,12 +4,13 @@ import type { Client } from "../agent/client"; import { ChatManager, type ChatState } from "../local/chat-manager"; import type { StoredMessage } from "../local/types"; import type { ID } from "../agent/types"; +import type { Agent } from "./use-agent"; export type { ChatStatus } from "../local/chat-manager"; export interface UseChatOptions { readonly chatId: ID; - readonly agent: Client | undefined; + readonly agent: Agent | undefined; readonly chatsDirectory: string; /** * Optional function to filter messages before persisting them. diff --git a/packages/blink/src/react/use-dev-mode.ts b/packages/blink/src/react/use-dev-mode.ts index df9ef9a..39746ba 100644 --- a/packages/blink/src/react/use-dev-mode.ts +++ b/packages/blink/src/react/use-dev-mode.ts @@ -11,7 +11,7 @@ import { isLogMessage, isStoredMessageMetadata } from "../local/types"; import type { BuildLog } from "../build"; import type { ID, UIOptions, UIOptionsSchema } from "../agent/index.browser"; import useOptions from "./use-options"; -import useAgent, { type AgentLog } from "./use-agent"; +import useAgent, { type AgentLog, type Agent } from "./use-agent"; import useBundler, { type BundlerStatus } from "./use-bundler"; import useChat, { type UseChat } from "./use-chat"; import useDevhook from "./use-devhook"; @@ -196,7 +196,7 @@ export default function useDevMode(options: UseDevModeOptions): UseDevMode { }, [env, options.onEnvLoaded]); // Server - always use run agent for webhook/API handling - const runAgentRef = useRef(undefined); + const runAgentRef = useRef(undefined); const server = useMemo(() => { return createLocalServer({ port: 0, @@ -219,7 +219,7 @@ export default function useDevMode(options: UseDevModeOptions): UseDevMode { // Edit agent const { - client: editAgent, + agent: editAgent, error: editAgentError, missingApiKey: editModeMissingApiKey, setUserAgentUrl, @@ -247,7 +247,7 @@ export default function useDevMode(options: UseDevModeOptions): UseDevMode { // Update edit agent with user agent URL and handle cleanup useEffect(() => { if (agent) { - setUserAgentUrl(agent.baseUrl); + setUserAgentUrl(agent.client.baseUrl); } // Stop streaming when agents become unavailable @@ -382,7 +382,7 @@ export default function useDevMode(options: UseDevModeOptions): UseDevMode { // Always send the request to the user's agent (not the edit agent) const requestURL = new URL(request.url); - const agentURL = new URL(agent.baseUrl); + const agentURL = new URL(agent.client.baseUrl); agentURL.pathname = requestURL.pathname; agentURL.search = requestURL.search; @@ -431,7 +431,7 @@ export default function useDevMode(options: UseDevModeOptions): UseDevMode { error: optionsError, setOption, } = useOptions({ - agent: mode === "run" ? agent : editAgent, + agent: mode === "run" ? agent?.client : editAgent?.client, capabilities, messages: chat.messages, }); diff --git a/packages/blink/src/react/use-edit-agent.ts b/packages/blink/src/react/use-edit-agent.ts index d570853..a694256 100644 --- a/packages/blink/src/react/use-edit-agent.ts +++ b/packages/blink/src/react/use-edit-agent.ts @@ -15,8 +15,15 @@ export interface UseEditAgentOptions { readonly getDevhookUrl: () => string; } +interface ClientAndLock { + readonly client: Client; + readonly lock: RWLock; +} + export default function useEditAgent(options: UseEditAgentOptions) { - const [client, setClient] = useState(undefined); + const [clientAndLock, setClientAndLock] = useState( + undefined + ); const [error, setError] = useState(undefined); const [missingApiKey, setMissingApiKey] = useState(false); const editAgentRef = useRef(undefined); @@ -27,7 +34,7 @@ export default function useEditAgent(options: UseEditAgentOptions) { // Clear error at the start - attempting to create edit agent setError(undefined); - setClient(undefined); + setClientAndLock(undefined); if (!getEditModeModel(options.env)) { setMissingApiKey(true); @@ -67,7 +74,8 @@ export default function useEditAgent(options: UseEditAgentOptions) { const editClient = new Client({ baseUrl: `http://127.0.0.1:${port}`, }); - lock = editClient.agentLock; + const editAgentLock = new RWLock(); + lock = editAgentLock; // Wait for health check while (!controller.signal.aborted) { @@ -82,7 +90,7 @@ export default function useEditAgent(options: UseEditAgentOptions) { throw controller.signal.reason; } - setClient(editClient); + setClientAndLock({ client: editClient, lock: editAgentLock }); })().catch((err) => { // Don't set error if this was just a cleanup abort if (!isCleanup) { @@ -108,14 +116,14 @@ export default function useEditAgent(options: UseEditAgentOptions) { return useMemo(() => { return { - client, + agent: clientAndLock, error, missingApiKey, setUserAgentUrl: (url: string) => { editAgentRef.current?.setUserAgentUrl(url); }, }; - }, [client, error, missingApiKey]); + }, [clientAndLock, error, missingApiKey]); } async function getRandomPort(): Promise {