diff --git a/app/actions.tsx b/app/actions.tsx index 31eb8629..04673464 100644 --- a/app/actions.tsx +++ b/app/actions.tsx @@ -314,11 +314,16 @@ async function submit(formData?: FormData, skip?: boolean) { const streamText = createStreamableValue() uiStream.update() + /* removal 1: while ( - useSpecificAPI - ? answer.length === 0 - : answer.length === 0 && !errorOccurred - ) { + useSpecificAPI + ? answer.length === 0 + : answer.length === 0 && !errorOccurred + ) { + */ + // Split and stage producers + const abortController = new AbortController() + const stage1 = (async () => { const { fullResponse, hasError, toolResponses } = await researcher( currentSystemPrompt, uiStream, @@ -329,7 +334,9 @@ async function submit(formData?: FormData, skip?: boolean) { answer = fullResponse toolOutputs = toolResponses errorOccurred = hasError + })() + /* removal 1: if (toolOutputs.length > 0) { toolOutputs.map(output => { aiState.update({ @@ -348,6 +355,16 @@ async function submit(formData?: FormData, skip?: boolean) { }) } } + */ + + // Stage 2: cheap producer - related suggestions + const stage2 = (async () => { + try { + await querySuggestor(uiStream, messages) + } catch {} + })() + + await Promise.allSettled([stage1, stage2]) if (useSpecificAPI && answer.length === 0) { const modifiedMessages = aiState diff --git a/lib/agents/researcher.tsx b/lib/agents/researcher.tsx index e54b1428..76f48dca 100644 --- a/lib/agents/researcher.tsx +++ b/lib/agents/researcher.tsx @@ -80,8 +80,10 @@ Analysis & Planning }) }) - // Remove the spinner - uiStream.update(null) + // removal 1: // Remove the spinner + // removal 1: // uiStream.update(null) + // Append the answer section immediately to avoid gating on first token + uiStream.append(answerSection) // Process the response const toolCalls: ToolCallPart[] = [] @@ -90,12 +92,13 @@ Analysis & Planning switch (delta.type) { case 'text-delta': if (delta.textDelta) { + /* removal 1: // If the first text delta is available, add a UI section if (fullResponse.length === 0 && delta.textDelta.length > 0) { // Update the UI uiStream.update(answerSection) } - + */ fullResponse += delta.textDelta streamText.update(fullResponse) } @@ -104,10 +107,13 @@ Analysis & Planning toolCalls.push(delta) break case 'tool-result': + /* removal 1: // Append the answer section if the specific model is not used if (!useSpecificModel && toolResponses.length === 0 && delta.result) { uiStream.append(answerSection) } + */ + // Keep answer section already appended; just collect tool outputs if (!delta.result) { hasError = true } @@ -129,5 +135,7 @@ Analysis & Planning messages.push({ role: 'tool', content: toolResponses }) } + // Mark the answer stream as done + streamText.done() return { result, fullResponse, hasError, toolResponses } } diff --git a/lib/agents/tools/search.tsx b/lib/agents/tools/search.tsx index 3ed9d82d..27cee3bb 100644 --- a/lib/agents/tools/search.tsx +++ b/lib/agents/tools/search.tsx @@ -16,10 +16,18 @@ export const searchTool = ({ uiStream, fullResponse }: ToolProps) => ({ query: string max_results: number search_depth: 'basic' | 'advanced' - }) => { + + /* removal 1: + }) => { + */ + + }, injectedStream?: ReturnType>) => { let hasError = false // Append the search section - const streamResults = createStreamableValue() + /* removal 1: + const streamResults = createStreamableValue() + */ + const streamResults = injectedStream ?? createStreamableValue() uiStream.append() // Tavily API requires a minimum of 5 characters in the query diff --git a/lib/utils/concurrency.ts b/lib/utils/concurrency.ts new file mode 100644 index 00000000..8a86a781 --- /dev/null +++ b/lib/utils/concurrency.ts @@ -0,0 +1,63 @@ +export async function runWithConcurrencyLimit(limit: number, tasks: Array<() => Promise>): Promise[]> { + if (limit <= 0) limit = 1; + const results: PromiseSettledResult[] = []; + let index = 0; + + async function worker() { + while (index < tasks.length) { + const current = index++; + const task = tasks[current]; + try { + const value = await task(); + results[current] = { status: 'fulfilled', value } as PromiseFulfilledResult; + } catch (reason) { + results[current] = { status: 'rejected', reason } as PromiseRejectedResult; + } + } + } + + const workers = Array.from({ length: Math.min(limit, tasks.length) }, () => worker()); + await Promise.all(workers); + return results; +} + +export async function withBudget(options: { maxMs: number; signal?: AbortSignal }, task: () => Promise): Promise { + const { maxMs, signal } = options; + let timeoutId: ReturnType | null = null; + + const timeoutPromise = new Promise((_, reject) => { + timeoutId = setTimeout(() => reject(new Error('budget_exceeded')), maxMs); + }); + + if (signal) { + if (signal.aborted) throw new Error('aborted'); + signal.addEventListener('abort', () => { + if (timeoutId) clearTimeout(timeoutId); + }, { once: true }); + } + + try { + return await Promise.race([task(), timeoutPromise]); + } finally { + if (timeoutId) clearTimeout(timeoutId); + } +} + +export async function tokenGate(start: () => number, threshold: number, fn: () => Promise): Promise { + if (start() >= threshold) { + await fn(); + return; + } + // Poll lightly until threshold reached + await new Promise((resolve) => { + const id = setInterval(async () => { + if (start() >= threshold) { + clearInterval(id); + resolve(); + } + }, 50); + }); + await fn(); +} + +