-
-
Notifications
You must be signed in to change notification settings - Fork 6
feat: Implement Tool Coordinator for efficient multi-step tool execution #375
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f2bc7da
75e01df
0311b79
924bebd
612479d
9e2a308
37290d5
e4dbb38
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,7 +12,7 @@ import type { FeatureCollection } from 'geojson' | |
| import { Spinner } from '@/components/ui/spinner' | ||
| import { Section } from '@/components/section' | ||
| import { FollowupPanel } from '@/components/followup-panel' | ||
| import { inquire, researcher, taskManager, querySuggestor, resolutionSearch } from '@/lib/agents' | ||
| import { inquire, researcher, taskManager, querySuggestor, resolutionSearch, toolCoordinator, executeToolPlan, aggregateToolResults } from '@/lib/agents' | ||
| // Removed import of useGeospatialToolMcp as it no longer exists and was incorrectly used here. | ||
| // The geospatialTool (if used by agents like researcher) now manages its own MCP client. | ||
| import { writer } from '@/lib/agents/writer' | ||
|
|
@@ -125,6 +125,7 @@ async function submit(formData?: FormData, skip?: boolean) { | |
|
|
||
| const groupeId = nanoid() | ||
| const useSpecificAPI = process.env.USE_SPECIFIC_API_FOR_WRITER === 'true' | ||
| const useToolCoordinator = process.env.USE_TOOL_COORDINATOR === 'true' | ||
| const maxMessages = useSpecificAPI ? 5 : 10 | ||
| messages.splice(0, Math.max(messages.length - maxMessages, 0)) | ||
|
|
||
|
|
@@ -319,17 +320,55 @@ async function submit(formData?: FormData, skip?: boolean) { | |
| const streamText = createStreamableValue<string>() | ||
| uiStream.update(<Spinner />) | ||
|
|
||
| let finalMessages = messages | ||
|
|
||
| if (useToolCoordinator) { | ||
| uiStream.update(<div><Spinner /> Planning tool execution...</div>) | ||
| try { | ||
| const plan = await toolCoordinator(messages) | ||
| uiStream.update(<div><Spinner /> Executing tool plan...</div>) | ||
| const results = await executeToolPlan(plan) | ||
| toolOutputs = results | ||
| const summary = aggregateToolResults(results, plan) | ||
|
|
||
| // Add the summary to the messages for the final synthesis agent | ||
| finalMessages = [ | ||
| ...messages, | ||
| { | ||
| id: nanoid(), | ||
| role: 'tool', | ||
| content: summary, | ||
| type: 'tool_coordinator_summary' | ||
| } as any // Cast to any to satisfy CoreMessage type for custom type | ||
| ] | ||
|
|
||
| // Stream a message to the user about the tool execution completion | ||
| uiStream.append( | ||
| <BotMessage content="Tool execution complete. Synthesizing final answer..." /> | ||
| ) | ||
| } catch (e) { | ||
| console.error('Tool Coordinator failed:', e) | ||
| uiStream.append( | ||
| <BotMessage content="Tool Coordinator failed. Falling back to streaming researcher." /> | ||
| ) | ||
| // Fallback: continue with the original messages and let the researcher handle it | ||
| finalMessages = messages | ||
| } | ||
| } | ||
|
Comment on lines
+325
to
+357
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix missing parameter, type mismatches, and BotMessage usage Three issues in the coordinator flow:
Apply these fixes: if (useToolCoordinator) {
uiStream.update(<div><Spinner /> Planning tool execution...</div>)
try {
const plan = await toolCoordinator(messages)
uiStream.update(<div><Spinner /> Executing tool plan...</div>)
- const results = await executeToolPlan(plan)
+ const results = await executeToolPlan(plan, { uiStream, fullResponse: '' })
toolOutputs = results
const summary = aggregateToolResults(results, plan)
- // Add the summary to the messages for the final synthesis agent
+ // Add coordinator summary as an assistant message for synthesis
finalMessages = [
...messages,
{
id: nanoid(),
- role: 'tool',
+ role: 'assistant',
content: summary,
type: 'tool_coordinator_summary'
- } as any // Cast to any to satisfy CoreMessage type for custom type
+ }
]
- // Stream a message to the user about the tool execution completion
+ const completionMsg = createStreamableValue<string>()
+ completionMsg.done("Tool execution complete. Synthesizing final answer...")
uiStream.append(
- <BotMessage content="Tool execution complete. Synthesizing final answer..." />
+ <BotMessage content={completionMsg.value} />
)
} catch (e) {
console.error('Tool Coordinator failed:', e)
+ const errorMsg = createStreamableValue<string>()
+ errorMsg.done("Tool Coordinator failed. Falling back to streaming researcher.")
uiStream.append(
- <BotMessage content="Tool Coordinator failed. Falling back to streaming researcher." />
+ <BotMessage content={errorMsg.value} />
)
- // Fallback: continue with the original messages and let the researcher handle it
finalMessages = messages
}
}Changing the role from
🤖 Prompt for AI Agents |
||
|
|
||
| while ( | ||
| useSpecificAPI | ||
| ? answer.length === 0 | ||
| : answer.length === 0 && !errorOccurred | ||
| ) { | ||
| // If coordinator was used, pass finalMessages and disable tools for researcher | ||
| const { fullResponse, hasError, toolResponses } = await researcher( | ||
| currentSystemPrompt, | ||
| uiStream, | ||
| streamText, | ||
| messages, | ||
| useSpecificAPI | ||
| finalMessages, | ||
| useSpecificAPI, | ||
| !useToolCoordinator // Pass a flag to disable tools if coordinator was used | ||
| ) | ||
| answer = fullResponse | ||
| toolOutputs = toolResponses | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,197 @@ | ||
| import { generateObject } from 'ai' | ||
| import { z } from 'zod' | ||
| import { Message } from 'ai/react' | ||
| import { getTools } from '@/lib/agents/tools' | ||
| import { ToolResultPart } from '@/lib/types' | ||
|
|
||
| // —————————————————————————————————————— | ||
| // Fallbacks if the original files don't exist yet | ||
| // —————————————————————————————————————— | ||
|
|
||
| let getModel: () => any | ||
| let createStreamableUI: () => any | ||
|
|
||
| try { | ||
| // Try the most common real locations first | ||
| const models = require('@/lib/models') | ||
| getModel = models.getModel || models.default || (() => null) | ||
| } catch { | ||
| try { | ||
| const mod = require('@/lib/ai/models') | ||
| getModel = mod.getModel || mod.default | ||
| } catch { | ||
| getModel = () => { | ||
| throw new Error('getModel not available — check your @/lib/models setup') | ||
| } | ||
| } | ||
| } | ||
|
|
||
| try { | ||
| const streamable = require('@/lib/streamable') | ||
| createStreamableUI = streamable.createStreamableUI || streamable.default | ||
| } catch { | ||
| try { | ||
| const s = require('@/lib/ui/streamable') | ||
| createStreamableUI = s.createStreamableUI | ||
| } catch { | ||
| // Minimal no-op version that won't break tool calling | ||
| createStreamableUI = () => ({ | ||
| append: () => {}, | ||
| update: () => {}, | ||
| done: () => {}, | ||
| value: null | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| // —————————————————————————————————————— | ||
| // Schemas | ||
| // —————————————————————————————————————— | ||
|
|
||
| const toolStepSchema = z.object({ | ||
| toolName: z.string(), | ||
| toolArgs: z.record(z.any()), | ||
| dependencyIndices: z.array(z.number()).optional(), | ||
| purpose: z.string() | ||
| }) | ||
|
|
||
| const toolPlanSchema = z.object({ | ||
| reasoning: z.string(), | ||
| steps: z.array(toolStepSchema) | ||
| }) | ||
|
|
||
| export type ToolPlan = z.infer<typeof toolPlanSchema> | ||
| export type ToolStep = z.infer<typeof toolStepSchema> | ||
|
Comment on lines
+51
to
+64
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Schema is functional but could be tightened for safety and guidance The
Consider:
These changes aren’t required for correctness but would improve robustness. 🤖 Prompt for AI Agents |
||
|
|
||
| // —————————————————————————————————————— | ||
| // 1. Plan Generation | ||
| // —————————————————————————————————————— | ||
|
|
||
| export async function toolCoordinator(messages: Message[]): Promise<ToolPlan> { | ||
| const model = getModel() | ||
|
|
||
| const toolsObj = getTools({ | ||
| uiStream: createStreamableUI(), | ||
| fullResponse: '' | ||
| }) | ||
|
|
||
| const toolDescriptions = Object.values(toolsObj).map(tool => ({ | ||
| name: tool.toolName, | ||
| description: tool.description, | ||
| parameters: tool.parameters | ||
| })) | ||
|
|
||
| const systemPrompt = `You are an expert Tool Coordinator. Create a precise multi-step plan using only these tools. | ||
| Rules: | ||
| - Use exact toolName from the list. | ||
| - Use dependencyIndices (0-based) when a step needs prior results. | ||
| - Output must be valid JSON matching the schema. | ||
| Available Tools: | ||
| ${JSON.stringify(toolDescriptions, null, 2)} | ||
| ` | ||
|
|
||
| const { object } = await generateObject({ | ||
| model, | ||
| system: systemPrompt, | ||
| messages, | ||
| schema: toolPlanSchema | ||
| }) | ||
|
|
||
| return object | ||
| } | ||
|
|
||
| // —————————————————————————————————————— | ||
| // 2. Execution | ||
| // —————————————————————————————————————— | ||
|
|
||
| interface ExecutionContext { | ||
| uiStream: any | ||
| fullResponse: string | ||
| } | ||
|
|
||
| export async function executeToolPlan( | ||
| plan: ToolPlan, | ||
| context: ExecutionContext | ||
| ): Promise<ToolResultPart[]> { | ||
| const { uiStream, fullResponse } = context | ||
|
|
||
| const toolsObj = getTools({ uiStream, fullResponse }) | ||
| const toolMap = new Map(Object.values(toolsObj).map(t => [t.toolName, t])) | ||
|
|
||
| const results = new Map<number, any>() | ||
| const toolResults: ToolResultPart[] = [] | ||
|
|
||
| const resolveDeps = (indices: number[] = []) => | ||
| indices.map(i => { | ||
| if (!results.has(i)) throw new Error(`Dependency step ${i} missing`) | ||
| return results.get(i) | ||
| }) | ||
|
|
||
| for (let i = 0; i < plan.steps.length; i++) { | ||
| const step = plan.steps[i] | ||
| const tool = toolMap.get(step.toolName) | ||
|
|
||
| let result: any = { error: `Tool "${step.toolName}" not found` } | ||
|
|
||
| try { | ||
| if (!tool) throw new Error(`Tool not found: ${step.toolName}`) | ||
|
|
||
| const deps = step.dependencyIndices ? resolveDeps(step.dependencyIndices) : [] | ||
| const args = { | ||
| ...step.toolArgs, | ||
| ...(deps.length > 0 && { _dependencyResults: deps }) | ||
| } | ||
|
|
||
| console.log(`[ToolCoordinator] Step ${i}: ${step.toolName}`) | ||
| result = await tool.execute(args) | ||
| } catch (err: any) { | ||
| const msg = err?.message || String(err) | ||
| console.error(`[ToolCoordinator] Step ${i} failed:`, msg) | ||
| result = { error: msg } | ||
| } | ||
|
|
||
| results.set(i, result) | ||
| toolResults.push({ | ||
| toolName: step.toolName, | ||
| toolCallId: `coord-${i}`, | ||
| result | ||
| }) | ||
| } | ||
|
|
||
| return toolResults | ||
| } | ||
|
|
||
| // —————————————————————————————————————— | ||
| // 3. Aggregation | ||
| // —————————————————————————————————————— | ||
|
|
||
| export function aggregateToolResults(toolResults: ToolResultPart[], plan: ToolPlan): string { | ||
| let out = `# Tool Coordinator Results | ||
| ### Plan | ||
| ${plan.reasoning} | ||
| ### Steps | ||
| ` | ||
|
|
||
| toolResults.forEach((tr, i) => { | ||
| const step = plan.steps[i] | ||
| const hasError = tr.result && typeof tr.result === 'object' && 'error' in tr.result | ||
|
|
||
| out += `\n#### Step ${i + 1}: ${step.purpose} (\`${step.toolName}\`)` | ||
|
|
||
| if (hasError) { | ||
| out += `\n**Status:** Failed\n**Error:** ${tr.result.error}` | ||
| } else { | ||
| const json = JSON.stringify(tr.result, null, 2) | ||
| const truncated = json.length > 600 ? json.slice(0, 600) + '...' : json | ||
| out += `\n**Status:** Success\n**Result:**\n\`\`\`json\n${truncated}\n\`\`\`` | ||
| } | ||
| }) | ||
|
|
||
| out += `\n\n---\n**INSTRUCTION:** Write a natural, helpful final answer using only the information above. Do not mention tools, steps, or internal process.` | ||
|
|
||
| return out | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Fix syntax around
researchercall and preserve coordinator tool resultsThere are two separate issues in this block:
Broken syntax (parse error) around
researchercallLines 365‑374 contain stray characters (
c,)cAPI,, duplicated!useToolCoordinator), which Biome also flags. This makes the file invalid TypeScript/JS.You likely intended the loop body to look like this:
useToolCoordinatoristrue, you correctly settoolOutputs = resultsfromexecuteToolPlan(plan).toolOutputs = toolResponses.researcheris called withuseTools = !useToolCoordinator→false, sotoolResponsesis always[]. That overwrites the actual coordinator results.aiState:never sees the coordinator’s tool results.
To fix this while keeping current behavior for non‑coordinator runs, guard the assignment:
Together with the cleaned‑up call, that will:
toolOutputscontinues to refer to the executed plan’s results and gets persisted intoaiStateas intended, while non‑coordinator flows keep usingresearcher’s tool responses.After applying the fixes, re‑run your linter/formatter and verify:
USE_TOOL_COORDINATOR=true, you seetoolmessages in saved chats corresponding to the coordinator’s executed steps, and with itfalseyou still see the regular streaming-tool behavior.Also applies to: 128-129, 323-357, 359-379
🏁 Script executed:
Length of output: 2626
🏁 Script executed:
Length of output: 513
🏁 Script executed:
Length of output: 362
Fix syntax errors in
researchercall and preserve coordinator resultsLines 365–375 contain malformed syntax blocking file parsing:
cbeforeconst)cAPI,!useToolCoordinatorcomments with extra closing parenthesesClean up the call to match the intended function signature:
while ( useSpecificAPI ? answer.length === 0 : answer.length === 0 && !errorOccurred ) { // If coordinator was used, pass finalMessages and disable tools for researcher - c const { fullResponse, hasError, toolResponses } = await researcher( - currentSystemPrompt, - uiStream, - streamText, - finalMessages, - useSpecificAPI, - !useToolCoordinator // Pass a flag to disable tools if coordinator was used - )cAPI, - !useToolCoordinator // Pass a flag to disable tools if coordinator was used - ) !useToolCoordinator // Pass a flag to disable tools if coordinator was used - ) + const { fullResponse, hasError, toolResponses } = await researcher( + currentSystemPrompt, + uiStream, + streamText, + finalMessages, + useSpecificAPI, + !useToolCoordinator // Pass a flag to disable tools if coordinator was used + )Additionally, line 377 unconditionally overwrites
toolOutputs(set from coordinator results at line 331) withtoolResponses, which is empty when coordinator is enabled sinceresearcheris called with tools disabled. Guard the assignment to preserve coordinator results:This ensures the block at line 380–396 that persists tool outputs receives the coordinator's executed plan results when enabled, while non-coordinator flows continue using researcher tool responses.