diff --git a/.changeset/four-knives-ask.md b/.changeset/four-knives-ask.md new file mode 100644 index 000000000..a4ca00e7a --- /dev/null +++ b/.changeset/four-knives-ask.md @@ -0,0 +1,5 @@ +--- +"@browserbasehq/stagehand": patch +--- + +Add streaming support to agent through stream:true in the agent config diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bf7d12f87..142208731 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -188,6 +188,7 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 50 env: + ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} HEADLESS: true steps: - name: Check out repository code diff --git a/packages/core/examples/agent_stream_example.ts b/packages/core/examples/agent_stream_example.ts new file mode 100644 index 000000000..66d3257f8 --- /dev/null +++ b/packages/core/examples/agent_stream_example.ts @@ -0,0 +1,49 @@ +import { Stagehand } from "../lib/v3"; +import dotenv from "dotenv"; +import chalk from "chalk"; + +// Load environment variables +dotenv.config(); +async function main() { + console.log(`\n${chalk.bold("Stagehand 🤘 Agent Streaming Example")}\n`); + // Initialize Stagehand + const stagehand = new Stagehand({ + env: "LOCAL", + verbose: 0, + cacheDir: "stagehand-agent-cache", + logInferenceToFile: false, + experimental: true, + }); + + await stagehand.init(); + + try { + const page = stagehand.context.pages()[0]; + await page.goto("https://amazon.com"); + + // Create a streaming agent with stream: true in the config + const agent = stagehand.agent({ + model: "anthropic/claude-sonnet-4-5-20250929", + stream: true, // This makes execute() return AgentStreamResult + }); + + const agentRun = await agent.execute({ + instruction: "go to amazon, and search for shampoo, stop after searching", + maxSteps: 20, + }); + // stream the text + for await (const delta of agentRun.textStream) { + process.stdout.write(delta); + } + // stream everything ( toolcalls, messages, etc.) + // for await (const delta of result.fullStream) { + // console.log(delta); + // } + + const finalResult = await agentRun.result; + console.log("Final Result:", finalResult); + } catch (error) { + console.log(`${chalk.red("✗")} Error: ${error}`); + } +} +main(); diff --git a/packages/core/lib/v3/cache/AgentCache.ts b/packages/core/lib/v3/cache/AgentCache.ts index 92c0504f9..637374d3e 100644 --- a/packages/core/lib/v3/cache/AgentCache.ts +++ b/packages/core/lib/v3/cache/AgentCache.ts @@ -18,6 +18,7 @@ import type { import type { AvailableModel, AgentResult, + AgentStreamResult, AgentConfig, AgentExecuteOptions, Logger, @@ -185,6 +186,135 @@ export class AgentCache { return await this.replayAgentCacheEntry(entry); } + /** + * Attempts to replay a cached agent execution and returns it as a stream result. + * + * This method exists because the agent API exposes two execution modes: + * - `execute()` - Returns a Promise directly + * - `stream()` - Returns an AgentStreamResult with async iterables for real-time output + * + * When a cache hit occurs, we need to return the appropriate type for each mode: + * - For `execute()`, we use `tryReplay()` which returns AgentResult + * - For `stream()`, we use `tryReplayAsStream()` which wraps the result in a + * stream-compatible interface + * + * This ensures consumers using `stream()` can still iterate over `textStream` + * and await `result` even when the response comes from cache, maintaining + * API consistency regardless of whether the result was cached or live. + */ + async tryReplayAsStream( + context: AgentCacheContext, + ): Promise { + const result = await this.tryReplay(context); + if (!result) return null; + return this.createCachedStreamResult(result); + } + + /** + * Creates a mock AgentStreamResult that wraps a cached AgentResult. + * + * AgentStreamResult (from the AI SDK) is a complex type with multiple async + * iterables and promises. When serving from cache, we don't have an actual + * LLM stream to consume - we just have the final result. This method creates + * a "fake" stream + + * This approach lets cached responses be transparent to the consumer - + * they can use the same iteration patterns whether the result is live or cached. + */ + private createCachedStreamResult( + cachedResult: AgentResult, + ): AgentStreamResult { + const message = cachedResult.message ?? ""; + + async function* textStreamGenerator(): AsyncGenerator { + yield message; + } + + async function* fullStreamGenerator(): AsyncGenerator<{ + type: string; + textDelta?: string; + }> { + yield { type: "text-delta", textDelta: message }; + yield { type: "finish" }; + } + + const mockStreamResult = { + textStream: textStreamGenerator(), + fullStream: fullStreamGenerator(), + result: Promise.resolve(cachedResult), + text: Promise.resolve(message), + usage: Promise.resolve({ + promptTokens: 0, + completionTokens: 0, + totalTokens: 0, + }), + finishReason: Promise.resolve("stop" as const), + experimental_providerMetadata: Promise.resolve(undefined), + response: Promise.resolve({ + id: "cached", + timestamp: new Date(), + modelId: "cached", + }), + rawResponse: Promise.resolve({ headers: {} }), + warnings: Promise.resolve([]), + steps: Promise.resolve([]), + toolCalls: Promise.resolve([]), + toolResults: Promise.resolve([]), + [Symbol.asyncIterator]: () => textStreamGenerator(), + } as unknown as AgentStreamResult; + + return mockStreamResult; + } + + /** + * Wraps an AgentStreamResult with caching logic. + * + * This method handles the complexity of caching for streaming responses: + * 1. Begins recording agent replay steps + * 2. Wraps the stream's result promise to capture completion + * 3. On success: ends recording and stores the cache entry + * 4. On error: discards the recording + * + * This keeps the caching orchestration in AgentCache rather than + * spreading it across the V3 class. + * + * @param context - The cache context for this execution + * @param streamResult - The stream result from the agent handler + * @param beginRecording - Callback to start recording (from V3) + * @param endRecording - Callback to end recording and get steps (from V3) + * @param discardRecording - Callback to discard recording on error (from V3) + * @returns The wrapped stream result with caching enabled + */ + wrapStreamForCaching( + context: AgentCacheContext, + streamResult: AgentStreamResult, + beginRecording: () => void, + endRecording: () => AgentReplayStep[], + discardRecording: () => void, + ): AgentStreamResult { + beginRecording(); + + const originalResultPromise = streamResult.result; + const wrappedResultPromise = originalResultPromise.then( + async (result) => { + const agentSteps = endRecording(); + + if (result.success && agentSteps.length > 0) { + await this.store(context, agentSteps, result); + } + + return result; + }, + (error) => { + discardRecording(); + throw error; + }, + ); + + streamResult.result = wrappedResultPromise; + return streamResult; + } + async store( context: AgentCacheContext, steps: AgentReplayStep[], diff --git a/packages/core/lib/v3/handlers/v3AgentHandler.ts b/packages/core/lib/v3/handlers/v3AgentHandler.ts index 503fc78e1..72e885f95 100644 --- a/packages/core/lib/v3/handlers/v3AgentHandler.ts +++ b/packages/core/lib/v3/handlers/v3AgentHandler.ts @@ -1,13 +1,22 @@ import { createAgentTools } from "../agent/tools"; import { LogLine } from "../types/public/logs"; import { V3 } from "../v3"; -import { ModelMessage, ToolSet, wrapLanguageModel, stepCountIs } from "ai"; +import { + ModelMessage, + ToolSet, + wrapLanguageModel, + stepCountIs, + type LanguageModelUsage, + type StepResult, +} from "ai"; import { processMessages } from "../agent/utils/messageProcessing"; import { LLMClient } from "../llm/LLMClient"; import { - AgentAction, AgentExecuteOptions, AgentResult, + AgentContext, + AgentState, + AgentStreamResult, } from "../types/public/agent"; import { V3FunctionName } from "../types/public/methods"; import { mapToolResultToActions } from "../agent/utils/actionMapping"; @@ -37,30 +46,23 @@ export class V3AgentHandler { this.mcpTools = mcpTools; } - public async execute( + private async prepareAgent( instructionOrOptions: string | AgentExecuteOptions, - ): Promise { - const startTime = Date.now(); - const options = - typeof instructionOrOptions === "string" - ? { instruction: instructionOrOptions } - : instructionOrOptions; - - const maxSteps = options.maxSteps || 10; - const actions: AgentAction[] = []; - let finalMessage = ""; - let completed = false; - const collectedReasoning: string[] = []; + ): Promise { + try { + const options = + typeof instructionOrOptions === "string" + ? { instruction: instructionOrOptions } + : instructionOrOptions; - let currentPageUrl = (await this.v3.context.awaitActivePage()).url(); + const maxSteps = options.maxSteps || 20; - try { const systemPrompt = this.buildSystemPrompt( options.instruction, this.systemInstructions, ); const tools = this.createTools(); - const allTools = { ...tools, ...this.mcpTools }; + const allTools: ToolSet = { ...tools, ...this.mcpTools }; const messages: ModelMessage[] = [ { role: "user", content: options.instruction }, ]; @@ -79,97 +81,112 @@ export class V3AgentHandler { }, }); + const initialPageUrl = (await this.v3.context.awaitActivePage()).url(); + + return { + options, + maxSteps, + systemPrompt, + allTools, + messages, + wrappedModel, + initialPageUrl, + }; + } catch (error) { + this.logger({ + category: "agent", + message: `failed to prepare agent: ${error}`, + level: 0, + }); + throw error; + } + } + + private createStepHandler(state: AgentState) { + return async (event: StepResult) => { + this.logger({ + category: "agent", + message: `Step finished: ${event.finishReason}`, + level: 2, + }); + + if (event.toolCalls && event.toolCalls.length > 0) { + for (let i = 0; i < event.toolCalls.length; i++) { + const toolCall = event.toolCalls[i]; + const args = toolCall.input; + const toolResult = event.toolResults?.[i]; + + if (event.text && event.text.length > 0) { + state.collectedReasoning.push(event.text); + this.logger({ + category: "agent", + message: `reasoning: ${event.text}`, + level: 1, + }); + } + + if (toolCall.toolName === "close") { + state.completed = true; + if (args?.taskComplete) { + const closeReasoning = args.reasoning; + const allReasoning = state.collectedReasoning.join(" "); + state.finalMessage = closeReasoning + ? `${allReasoning} ${closeReasoning}`.trim() + : allReasoning || "Task completed successfully"; + } + } + const mappedActions = mapToolResultToActions({ + toolCallName: toolCall.toolName, + toolResult, + args, + reasoning: event.text || undefined, + }); + + for (const action of mappedActions) { + action.pageUrl = state.currentPageUrl; + action.timestamp = Date.now(); + state.actions.push(action); + } + } + state.currentPageUrl = (await this.v3.context.awaitActivePage()).url(); + } + }; + } + + public async execute( + instructionOrOptions: string | AgentExecuteOptions, + ): Promise { + const startTime = Date.now(); + const { + maxSteps, + systemPrompt, + allTools, + messages, + wrappedModel, + initialPageUrl, + } = await this.prepareAgent(instructionOrOptions); + + const state: AgentState = { + collectedReasoning: [], + actions: [], + finalMessage: "", + completed: false, + currentPageUrl: initialPageUrl, + }; + + try { const result = await this.llmClient.generateText({ model: wrappedModel, system: systemPrompt, messages, tools: allTools, - stopWhen: stepCountIs(maxSteps), + stopWhen: (result) => this.handleStop(result, maxSteps), temperature: 1, toolChoice: "auto", - onStepFinish: async (event) => { - this.logger({ - category: "agent", - message: `Step finished: ${event.finishReason}`, - level: 2, - }); - - if (event.toolCalls && event.toolCalls.length > 0) { - for (let i = 0; i < event.toolCalls.length; i++) { - const toolCall = event.toolCalls[i]; - const args = toolCall.input as Record; - const toolResult = event.toolResults?.[i]; - - if (event.text.length > 0) { - collectedReasoning.push(event.text); - this.logger({ - category: "agent", - message: `reasoning: ${event.text}`, - level: 1, - }); - } - - if (toolCall.toolName === "close") { - completed = true; - if (args?.taskComplete) { - const closeReasoning = args.reasoning; - const allReasoning = collectedReasoning.join(" "); - finalMessage = closeReasoning - ? `${allReasoning} ${closeReasoning}`.trim() - : allReasoning || "Task completed successfully"; - } - } - const mappedActions = mapToolResultToActions({ - toolCallName: toolCall.toolName, - toolResult, - args, - reasoning: event.text || undefined, - }); - - for (const action of mappedActions) { - action.pageUrl = currentPageUrl; - action.timestamp = Date.now(); - actions.push(action); - } - } - currentPageUrl = (await this.v3.context.awaitActivePage()).url(); - } - }, + onStepFinish: this.createStepHandler(state), }); - if (!finalMessage) { - const allReasoning = collectedReasoning.join(" ").trim(); - finalMessage = allReasoning || result.text; - } - - const endTime = Date.now(); - const inferenceTimeMs = endTime - startTime; - if (result.usage) { - this.v3.updateMetrics( - V3FunctionName.AGENT, - result.usage.inputTokens || 0, - result.usage.outputTokens || 0, - result.usage.reasoningTokens || 0, - result.usage.cachedInputTokens || 0, - inferenceTimeMs, - ); - } - - return { - success: completed, - message: finalMessage || "Task execution completed", - actions, - completed, - usage: result.usage - ? { - input_tokens: result.usage.inputTokens || 0, - output_tokens: result.usage.outputTokens || 0, - reasoning_tokens: result.usage.reasoningTokens || 0, - cached_input_tokens: result.usage.cachedInputTokens || 0, - inference_time_ms: inferenceTimeMs, - } - : undefined, - }; + return this.consolidateMetricsAndResult(startTime, state, result); } catch (error) { const errorMessage = error?.message ?? String(error); this.logger({ @@ -179,13 +196,123 @@ export class V3AgentHandler { }); return { success: false, - actions, + actions: state.actions, message: `Failed to execute task: ${errorMessage}`, completed: false, }; } } + public async stream( + instructionOrOptions: string | AgentExecuteOptions, + ): Promise { + const { + maxSteps, + systemPrompt, + allTools, + messages, + wrappedModel, + initialPageUrl, + } = await this.prepareAgent(instructionOrOptions); + + const state: AgentState = { + collectedReasoning: [], + actions: [], + finalMessage: "", + completed: false, + currentPageUrl: initialPageUrl, + }; + const startTime = Date.now(); + + let resolveResult: (value: AgentResult | PromiseLike) => void; + let rejectResult: (reason: unknown) => void; + const resultPromise = new Promise((resolve, reject) => { + resolveResult = resolve; + rejectResult = reject; + }); + + const handleError = (error: unknown) => { + const errorMessage = + error instanceof Error ? error.message : String(error); + this.logger({ + category: "agent", + message: `Error during streaming: ${errorMessage}`, + level: 0, + }); + rejectResult(error); + }; + + const streamResult = this.llmClient.streamText({ + model: wrappedModel, + system: systemPrompt, + messages, + tools: allTools, + stopWhen: (result) => this.handleStop(result, maxSteps), + temperature: 1, + toolChoice: "auto", + onStepFinish: this.createStepHandler(state), + onError: ({ error }) => { + handleError(error); + }, + onFinish: (event) => { + try { + const result = this.consolidateMetricsAndResult( + startTime, + state, + event, + ); + resolveResult(result); + } catch (error) { + handleError(error); + } + }, + }); + + const agentStreamResult = streamResult as AgentStreamResult; + agentStreamResult.result = resultPromise; + return agentStreamResult; + } + + private consolidateMetricsAndResult( + startTime: number, + state: AgentState, + result: { text?: string; usage?: LanguageModelUsage }, + ): AgentResult { + if (!state.finalMessage) { + const allReasoning = state.collectedReasoning.join(" ").trim(); + state.finalMessage = allReasoning || result.text || ""; + } + + const endTime = Date.now(); + const inferenceTimeMs = endTime - startTime; + if (result.usage) { + this.v3.updateMetrics( + V3FunctionName.AGENT, + result.usage.inputTokens || 0, + result.usage.outputTokens || 0, + result.usage.reasoningTokens || 0, + result.usage.cachedInputTokens || 0, + inferenceTimeMs, + ); + } + + return { + success: state.completed, + message: state.finalMessage || "Task execution completed", + actions: state.actions, + completed: state.completed, + usage: result.usage + ? { + input_tokens: result.usage.inputTokens || 0, + output_tokens: result.usage.outputTokens || 0, + reasoning_tokens: result.usage.reasoningTokens || 0, + cached_input_tokens: result.usage.cachedInputTokens || 0, + inference_time_ms: inferenceTimeMs, + } + : undefined, + }; + } + private buildSystemPrompt( executionInstruction: string, systemInstructions?: string, @@ -202,4 +329,15 @@ export class V3AgentHandler { logger: this.logger, }); } + + private handleStop( + result: Parameters>[0], + maxSteps: number, + ): boolean | PromiseLike { + const lastStep = result.steps[result.steps.length - 1]; + if (lastStep?.toolCalls?.some((tc) => tc.toolName === "close")) { + return true; + } + return stepCountIs(maxSteps)(result); + } } diff --git a/packages/core/lib/v3/tests/agent-streaming.spec.ts b/packages/core/lib/v3/tests/agent-streaming.spec.ts new file mode 100644 index 000000000..35020beb3 --- /dev/null +++ b/packages/core/lib/v3/tests/agent-streaming.spec.ts @@ -0,0 +1,177 @@ +import { test, expect } from "@playwright/test"; +import { V3 } from "../v3"; +import { v3TestConfig } from "./v3.config"; +import type { AgentResult } from "../types/public/agent"; + +test.describe("Stagehand agent streaming behavior", () => { + let v3: V3; + + test.beforeEach(async () => { + v3 = new V3({ + ...v3TestConfig, + experimental: true, // Required for streaming + }); + await v3.init(); + }); + + test.afterEach(async () => { + await v3?.close?.().catch(() => {}); + }); + + test.describe("agent({ stream: true })", () => { + test("AgentStreamResult has textStream as async iterable", async () => { + test.setTimeout(60000); + + const agent = v3.agent({ + stream: true, + model: "anthropic/claude-haiku-4-5-20251001", + }); + + // Navigate to a simple page first + const page = v3.context.pages()[0]; + await page.goto("https://example.com"); + + const streamResult = await agent.execute({ + instruction: + "What is the title of this page? Use the close tool immediately after answering.", + maxSteps: 3, + }); + + // Verify it's an AgentStreamResult with streaming capabilities + expect(streamResult).toHaveProperty("textStream"); + expect(streamResult).toHaveProperty("result"); + + // textStream should be async iterable + expect(typeof streamResult.textStream[Symbol.asyncIterator]).toBe( + "function", + ); + + // result should be a promise + expect(streamResult.result).toBeInstanceOf(Promise); + }); + + test("textStream yields chunks incrementally", async () => { + test.setTimeout(60000); + + const agent = v3.agent({ + stream: true, + model: "anthropic/claude-haiku-4-5-20251001", + }); + + const page = v3.context.pages()[0]; + await page.goto("https://example.com"); + + const streamResult = await agent.execute({ + instruction: + "Say hello and then use close tool with taskComplete: true", + maxSteps: 3, + }); + + // Collect chunks from the stream + const chunks: string[] = []; + for await (const chunk of streamResult.textStream) { + chunks.push(chunk); + } + + // Should have received at least some chunks (streaming behavior) + // The exact content depends on the LLM response + expect(Array.isArray(chunks)).toBe(true); + expect(chunks.length).toBeGreaterThan(0); + }); + + test("result promise resolves to AgentResult after stream completes", async () => { + test.setTimeout(60000); + + const agent = v3.agent({ + stream: true, + model: "anthropic/claude-haiku-4-5-20251001", + }); + + const page = v3.context.pages()[0]; + await page.goto("https://example.com"); + + const streamResult = await agent.execute({ + instruction: + "What is this page about? Use close tool with taskComplete: true after answering.", + maxSteps: 5, + }); + + // Consume the stream first + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _ of streamResult.textStream) { + // Just consume + } + + // Now get the final result + const finalResult: AgentResult = await streamResult.result; + + // Verify it's a proper AgentResult + expect(finalResult).toHaveProperty("success"); + expect(finalResult).toHaveProperty("message"); + expect(finalResult).toHaveProperty("actions"); + expect(finalResult).toHaveProperty("completed"); + expect(typeof finalResult.success).toBe("boolean"); + expect(typeof finalResult.message).toBe("string"); + expect(Array.isArray(finalResult.actions)).toBe(true); + }); + }); + + test.describe("agent({ stream: false }) or agent()", () => { + test("execute returns AgentResult without streaming properties", async () => { + test.setTimeout(60000); + + const agent = v3.agent({ + model: "anthropic/claude-haiku-4-5-20251001", + }); + + const page = v3.context.pages()[0]; + await page.goto("https://example.com"); + + const result = await agent.execute({ + instruction: "What is this page? Use close tool immediately.", + maxSteps: 3, + }); + + // Should be AgentResult, not AgentStreamResult + expect(result).toHaveProperty("success"); + expect(result).toHaveProperty("message"); + expect(result).toHaveProperty("actions"); + expect(result).toHaveProperty("completed"); + + // Should NOT have streaming properties + expect(result).not.toHaveProperty("textStream"); + }); + }); + + test.describe("CUA disables streaming", () => { + test("throws StagehandInvalidArgumentError when cua: true and stream: true", () => { + expect(() => { + v3.agent({ + cua: true, + stream: true, + model: "anthropic/claude-haiku-4-5-20251001", + }); + }).toThrow("Streaming is not supported with CUA"); + }); + + test("allows cua: true without stream", () => { + // Should not throw + const agent = v3.agent({ + cua: true, + model: "anthropic/claude-haiku-4-5-20251001", + }); + + expect(agent).toHaveProperty("execute"); + }); + + test("allows stream: true without cua", () => { + // Should not throw + const agent = v3.agent({ + stream: true, + model: "anthropic/claude-haiku-4-5-20251001", + }); + + expect(agent).toHaveProperty("execute"); + }); + }); +}); diff --git a/packages/core/lib/v3/types/public/agent.ts b/packages/core/lib/v3/types/public/agent.ts index c99a396c2..aaf2bd489 100644 --- a/packages/core/lib/v3/types/public/agent.ts +++ b/packages/core/lib/v3/types/public/agent.ts @@ -1,11 +1,29 @@ import type { Client } from "@modelcontextprotocol/sdk/client/index.js"; -import { ToolSet } from "ai"; +import { ToolSet, ModelMessage, wrapLanguageModel, StreamTextResult } from "ai"; import { LogLine } from "./logs"; import { Page as PlaywrightPage } from "playwright-core"; import { Page as PuppeteerPage } from "puppeteer-core"; import { Page as PatchrightPage } from "patchright-core"; import { Page } from "../../understudy/page"; +export interface AgentContext { + options: AgentExecuteOptions; + maxSteps: number; + systemPrompt: string; + allTools: ToolSet; + messages: ModelMessage[]; + wrappedModel: ReturnType; + initialPageUrl: string; +} + +export interface AgentState { + collectedReasoning: string[]; + actions: AgentAction[]; + finalMessage: string; + completed: boolean; + currentPageUrl: string; +} + export interface AgentAction { type: string; reasoning?: string; @@ -34,6 +52,10 @@ export interface AgentResult { }; } +export type AgentStreamResult = StreamTextResult & { + result: Promise; +}; + export interface AgentExecuteOptions { instruction: string; maxSteps?: number; @@ -199,4 +221,30 @@ export type AgentConfig = { * Format: "provider/model" (e.g., "openai/gpt-4o-mini", "google/gemini-2.0-flash-exp") */ executionModel?: string | AgentModelConfig; + /** + * Enable streaming mode for the agent. + * When true, execute() returns AgentStreamResult with textStream for incremental output. + * When false (default), execute() returns AgentResult after completion. + */ + stream?: boolean; }; + +/** + * Agent instance returned when stream: true is set in AgentConfig. + * execute() returns a streaming result that can be consumed incrementally. + */ +export interface StreamingAgentInstance { + execute: ( + instructionOrOptions: string | AgentExecuteOptions, + ) => Promise; +} + +/** + * Agent instance returned when stream is false or not set in AgentConfig. + * execute() returns a result after the agent completes. + */ +export interface NonStreamingAgentInstance { + execute: ( + instructionOrOptions: string | AgentExecuteOptions, + ) => Promise; +} diff --git a/packages/core/lib/v3/v3.ts b/packages/core/lib/v3/v3.ts index 3c795d8ac..71d84d17b 100644 --- a/packages/core/lib/v3/v3.ts +++ b/packages/core/lib/v3/v3.ts @@ -65,6 +65,7 @@ import { StagehandNotInitializedError, MissingEnvironmentVariableError, StagehandInitError, + AgentStreamResult, } from "./types/public"; import { V3Context } from "./understudy/context"; import { Page } from "./understudy/page"; @@ -1490,14 +1491,92 @@ export class V3 { } } + /** + * Prepares shared context for agent execution (both execute and stream). + * Extracts duplicated setup logic into a single helper. + */ + private async prepareAgentExecution( + options: AgentConfig | undefined, + instructionOrOptions: string | AgentExecuteOptions, + agentConfigSignature: string, + ): Promise<{ + handler: V3AgentHandler; + resolvedOptions: AgentExecuteOptions; + instruction: string; + cacheContext: AgentCacheContext | null; + }> { + if ((options?.integrations || options?.tools) && !this.experimental) { + throw new ExperimentalNotConfiguredError( + "MCP integrations and custom tools", + ); + } + + const tools = options?.integrations + ? await resolveTools(options.integrations, options.tools) + : (options?.tools ?? {}); + + const agentLlmClient = options?.model + ? this.resolveLlmClient(options.model) + : this.llmClient; + + const handler = new V3AgentHandler( + this, + this.logger, + agentLlmClient, + typeof options?.executionModel === "string" + ? options.executionModel + : options?.executionModel?.modelName, + options?.systemPrompt, + tools, + ); + + const resolvedOptions: AgentExecuteOptions = + typeof instructionOrOptions === "string" + ? { instruction: instructionOrOptions } + : instructionOrOptions; + + if (resolvedOptions.page) { + const normalizedPage = await this.normalizeToV3Page(resolvedOptions.page); + this.ctx!.setActivePage(normalizedPage); + } + + const instruction = resolvedOptions.instruction.trim(); + const sanitizedOptions = + this.agentCache.sanitizeExecuteOptions(resolvedOptions); + + const cacheContext = this.agentCache.shouldAttemptCache(instruction) + ? await this.agentCache.prepareContext({ + instruction, + options: sanitizedOptions, + configSignature: agentConfigSignature, + page: await this.ctx!.awaitActivePage(), + }) + : null; + + return { handler, resolvedOptions, instruction, cacheContext }; + } + /** * Create a v3 agent instance (AISDK tool-based) with execute(). * Mirrors the v2 Stagehand.agent() tool mode (no CUA provider here). + * + * @overload When stream: true, returns a streaming agent where execute() returns AgentStreamResult + * @overload When stream is false/undefined, returns a non-streaming agent where execute() returns AgentResult */ - agent(options?: AgentConfig): { + agent(options: AgentConfig & { stream: true }): { + execute: ( + instructionOrOptions: string | AgentExecuteOptions, + ) => Promise; + }; + agent(options?: AgentConfig & { stream?: false }): { execute: ( instructionOrOptions: string | AgentExecuteOptions, ) => Promise; + }; + agent(options?: AgentConfig): { + execute: ( + instructionOrOptions: string | AgentExecuteOptions, + ) => Promise; } { this.logger({ category: "agent", @@ -1523,6 +1602,12 @@ export class V3 { // If CUA is enabled, use the computer-use agent path if (options?.cua) { + if (options?.stream) { + throw new StagehandInvalidArgumentError( + "Streaming is not supported with CUA (Computer Use Agent) mode. Remove either 'stream: true' or 'cua: true' from your agent config.", + ); + } + if ((options?.integrations || options?.tools) && !this.experimental) { throw new ExperimentalNotConfiguredError( "MCP integrations and custom tools", @@ -1637,66 +1722,61 @@ export class V3 { // Default: AISDK tools-based agent const agentConfigSignature = this.agentCache.buildConfigSignature(options); + const isStreaming = options?.stream ?? false; return { - execute: async (instructionOrOptions: string | AgentExecuteOptions) => + execute: async ( + instructionOrOptions: string | AgentExecuteOptions, + ): Promise => withInstanceLogContext(this.instanceId, async () => { - if ((options?.integrations || options?.tools) && !this.experimental) { - throw new ExperimentalNotConfiguredError( - "MCP integrations and custom tools", - ); - } - - const tools = options?.integrations - ? await resolveTools(options.integrations, options.tools) - : (options?.tools ?? {}); - - // Resolve the LLM client for the agent based on the model parameter - // Use the agent's model if specified, otherwise fall back to the default - const agentLlmClient = options?.model - ? this.resolveLlmClient(options.model) - : this.llmClient; - - const handler = new V3AgentHandler( - this, - this.logger, - agentLlmClient, - typeof options?.executionModel === "string" - ? options.executionModel - : options?.executionModel?.modelName, - options?.systemPrompt, - tools, - ); + // Streaming mode + if (isStreaming) { + if (!this.experimental) { + throw new ExperimentalNotConfiguredError("Agent streaming"); + } - const resolvedOptions: AgentExecuteOptions = - typeof instructionOrOptions === "string" - ? { instruction: instructionOrOptions } - : instructionOrOptions; - if (resolvedOptions.page) { - const normalizedPage = await this.normalizeToV3Page( - resolvedOptions.page, + const { handler, cacheContext } = await this.prepareAgentExecution( + options, + instructionOrOptions, + agentConfigSignature, ); - this.ctx!.setActivePage(normalizedPage); - } - const instruction = resolvedOptions.instruction.trim(); - const sanitizedOptions = - this.agentCache.sanitizeExecuteOptions(resolvedOptions); - - let cacheContext: AgentCacheContext | null = null; - if (this.agentCache.shouldAttemptCache(instruction)) { - const startPage = await this.ctx!.awaitActivePage(); - cacheContext = await this.agentCache.prepareContext({ - instruction, - options: sanitizedOptions, - configSignature: agentConfigSignature, - page: startPage, - }); + if (cacheContext) { - const replayed = await this.agentCache.tryReplay(cacheContext); + const replayed = + await this.agentCache.tryReplayAsStream(cacheContext); if (replayed) { return replayed; } } + + const streamResult = await handler.stream(instructionOrOptions); + + if (cacheContext) { + return this.agentCache.wrapStreamForCaching( + cacheContext, + streamResult, + () => this.beginAgentReplayRecording(), + () => this.endAgentReplayRecording(), + () => this.discardAgentReplayRecording(), + ); + } + + return streamResult; + } + + // Non-streaming mode (default) + const { handler, resolvedOptions, cacheContext } = + await this.prepareAgentExecution( + options, + instructionOrOptions, + agentConfigSignature, + ); + + if (cacheContext) { + const replayed = await this.agentCache.tryReplay(cacheContext); + if (replayed) { + return replayed; + } } let agentSteps: AgentReplayStep[] = [];