feat(agent): add parallel tool execution with error handling#2
feat(agent): add parallel tool execution with error handling#2cybersecua merged 1 commit intomainfrom
Conversation
- Add `parallelToolExecution`, `maxParallelTools`, and `toolRetryCount` fields to `AgentConfig` (config.go) and the `Agent` struct. - Implement `executeToolCallsInParallel`: launches a goroutine per tool call protected by panic recovery; an optional semaphore limits concurrency when `max_parallel_tools > 0`; optional retry logic retries on transient hard errors up to `tool_retry_count` times. - Replace the sequential tool-call loop in the agent loop with a conditional: ≥2 concurrent calls with `parallel_tool_execution: true` use the parallel path; single calls or disabled mode fall back to the existing sequential path. Results are always applied to `messages` in original index order to keep tool-call IDs consistent. - Emit a `parallel: true` flag in progress events so the frontend can distinguish parallel from sequential execution batches. - Enable parallel execution by default in `config.yaml`; add `max_parallel_tools` and `tool_retry_count` knobs. https://claude.ai/code/session_01Rd7wB3N5NRdEt5FTWPbABd
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the agent's efficiency and resilience by introducing parallel execution of tool calls. It allows the agent to process multiple tool requests concurrently, improving performance, especially when dealing with several independent tasks. The implementation includes robust error handling, such as panic recovery for individual tool goroutines and configurable retry logic for transient failures, ensuring the agent's stability. These changes provide greater control over resource utilization and fault tolerance through new configuration parameters. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces parallel tool execution, a valuable feature for improving agent performance by allowing multiple tools to be called concurrently. However, the implementation has critical security and robustness issues. It is vulnerable to resource exhaustion (DoS) due to spawning an unbounded number of goroutines based on LLM output. Furthermore, the panic recovery mechanism leaks internal error details to the LLM, which could lead to sensitive information disclosure. There is also a functional bug where the retry logic for tool calls is effectively dead code. Beyond these security and functional concerns, the review also identified areas for improving code clarity, maintainability, and consistency, such as confusing logic around feature flag initialization, significant code duplication that could be refactored, and minor cleanups.
| for idx, tc := range toolCalls { | ||
| wg.Add(1) | ||
| go func(idx int, tc ToolCall) { |
There was a problem hiding this comment.
The executeToolCallsInParallel function launches a new goroutine for every tool call returned by the LLM before acquiring a semaphore slot. If an attacker can influence the LLM (e.g., via prompt injection) to return a very large number of tool calls, this could lead to resource exhaustion (Denial of Service) by spawning an excessive number of goroutines. While Go handles goroutines efficiently, each one consumes memory and system resources.
Consider limiting the number of goroutines created or moving the goroutine creation inside a worker pool or after a more robust check on the number of tool calls.
| if r := recover(); r != nil { | ||
| a.logger.Error("Panic in parallel tool goroutine", | ||
| zap.Int("index", idx), | ||
| zap.String("tool", tc.Function.Name), | ||
| zap.Any("panic", r), | ||
| ) | ||
| panicMsg := fmt.Sprintf("Internal panic executing tool %s: %v", tc.Function.Name, r) |
There was a problem hiding this comment.
The panic recovery logic in executeToolCallsInParallel captures the raw panic object r and includes it in the panicMsg, which is then sent back to the LLM as a tool result. If a tool panics and the panic object contains sensitive information (e.g., internal system details, stack traces, or secrets), this information will be leaked to the LLM and potentially to the end user.
It is recommended to use a generic error message for the LLM and log the detailed panic information only to the internal system logs.
| parallelToolExecution := true | ||
| if agentCfg != nil && !agentCfg.ParallelToolExecution { | ||
| // Only disable when explicitly set to false | ||
| parallelToolExecution = false | ||
| } |
There was a problem hiding this comment.
The logic for setting parallelToolExecution is a bit confusing, and the associated comments are misleading. The logic is equivalent to parallelToolExecution := agentCfg == nil || agentCfg.ParallelToolExecution. This means parallel execution is disabled if the agent block exists in your YAML but parallel_tool_execution is omitted, which contradicts the "enabled by default" comments. Consider simplifying the logic and updating the comments to reflect the actual behavior to avoid future confusion.
| if useParallel { | ||
| // ── Parallel path ──────────────────────────────────────────────────── | ||
| a.logger.Info("Executing tool calls in parallel", | ||
| zap.Int("count", totalTools), | ||
| ) | ||
| parallelResults := a.executeToolCallsInParallel(ctx, choice.Message.ToolCalls) | ||
|
|
||
| // Process results in original order so tool messages match tool_call IDs. | ||
| for _, pr := range parallelResults { | ||
| idx := pr.index | ||
| toolCall := choice.Message.ToolCalls[idx] | ||
|
|
||
| if pr.execErr != nil { | ||
| errorMsg := a.formatToolError(pr.toolName, pr.arguments, pr.execErr) | ||
| messages = append(messages, ChatMessage{ | ||
| Role: "tool", | ||
| ToolCallID: pr.toolCallID, | ||
| Content: errorMsg, | ||
| }) | ||
| sendProgress("tool_result", fmt.Sprintf("Tool %s execution failed", pr.toolName), map[string]interface{}{ | ||
| "toolName": pr.toolName, | ||
| "success": false, | ||
| "isError": true, | ||
| "error": pr.execErr.Error(), | ||
| "toolCallId": pr.toolCallID, | ||
| "index": idx + 1, | ||
| "total": totalTools, | ||
| "iteration": i + 1, | ||
| "parallel": true, | ||
| }) | ||
| a.logger.Warn("Parallel tool execution failed, detailed error message returned", | ||
| zap.String("tool", pr.toolName), | ||
| zap.Error(pr.execErr), | ||
| ) | ||
| } else { | ||
| execResult := pr.execResult | ||
| messages = append(messages, ChatMessage{ | ||
| Role: "tool", | ||
| ToolCallID: pr.toolCallID, | ||
| Content: execResult.Result, | ||
| }) | ||
| if execResult.ExecutionID != "" { | ||
| result.MCPExecutionIDs = append(result.MCPExecutionIDs, execResult.ExecutionID) | ||
| } | ||
| resultPreview := execResult.Result | ||
| if len(resultPreview) > 200 { | ||
| resultPreview = resultPreview[:200] + "..." | ||
| } | ||
| sendProgress("tool_result", fmt.Sprintf("Tool %s execution completed", pr.toolName), map[string]interface{}{ | ||
| "toolName": pr.toolName, | ||
| "success": !execResult.IsError, | ||
| "isError": execResult.IsError, | ||
| "result": execResult.Result, | ||
| "resultPreview": resultPreview, | ||
| "executionId": execResult.ExecutionID, | ||
| "toolCallId": pr.toolCallID, | ||
| "index": idx + 1, | ||
| "total": totalTools, | ||
| "iteration": i + 1, | ||
| "parallel": true, | ||
| }) | ||
| if execResult.IsError { | ||
| a.logger.Warn("Parallel tool returned error result, continuing processing", | ||
| zap.String("tool", pr.toolName), | ||
| zap.String("result", execResult.Result), | ||
| ) | ||
| } | ||
| } | ||
| sendProgress("tool_result", fmt.Sprintf("Tool %s execution completed", toolCall.Function.Name), map[string]interface{}{ | ||
| "toolName": toolCall.Function.Name, | ||
| "success": !execResult.IsError, | ||
| "isError": execResult.IsError, | ||
| "result": execResult.Result, // full result | ||
| "resultPreview": resultPreview, // preview result | ||
| "executionId": execResult.ExecutionID, | ||
| "toolCallId": toolCall.ID, | ||
| "index": idx + 1, | ||
| "total": len(choice.Message.ToolCalls), | ||
| "iteration": i + 1, | ||
| }) | ||
|
|
||
| // If the tool returned an error, log it but do not interrupt the flow | ||
| if execResult.IsError { | ||
| a.logger.Warn("Tool returned error result, but continuing processing", | ||
| _ = toolCall // referenced above via choice.Message.ToolCalls[idx] | ||
| } | ||
| } else { | ||
| // ── Sequential path (single tool call or parallelism disabled) ─────── | ||
| for idx, toolCall := range choice.Message.ToolCalls { | ||
| // Execute tool | ||
| execResult, err := a.executeToolViaMCP(ctx, toolCall.Function.Name, toolCall.Function.Arguments) | ||
| if err != nil { | ||
| // Build detailed error message to help AI understand the problem and make decisions | ||
| errorMsg := a.formatToolError(toolCall.Function.Name, toolCall.Function.Arguments, err) | ||
| messages = append(messages, ChatMessage{ | ||
| Role: "tool", | ||
| ToolCallID: toolCall.ID, | ||
| Content: errorMsg, | ||
| }) | ||
|
|
||
| // Send tool execution failure event | ||
| sendProgress("tool_result", fmt.Sprintf("Tool %s execution failed", toolCall.Function.Name), map[string]interface{}{ | ||
| "toolName": toolCall.Function.Name, | ||
| "success": false, | ||
| "isError": true, | ||
| "error": err.Error(), | ||
| "toolCallId": toolCall.ID, | ||
| "index": idx + 1, | ||
| "total": totalTools, | ||
| "iteration": i + 1, | ||
| }) | ||
|
|
||
| a.logger.Warn("Tool execution failed, detailed error message returned", | ||
| zap.String("tool", toolCall.Function.Name), | ||
| zap.String("result", execResult.Result), | ||
| zap.Error(err), | ||
| ) | ||
| } else { | ||
| // Even if the tool returned an error result (IsError=true), continue processing and let AI decide the next step | ||
| messages = append(messages, ChatMessage{ | ||
| Role: "tool", | ||
| ToolCallID: toolCall.ID, | ||
| Content: execResult.Result, | ||
| }) | ||
| // Collect execution ID | ||
| if execResult.ExecutionID != "" { | ||
| result.MCPExecutionIDs = append(result.MCPExecutionIDs, execResult.ExecutionID) | ||
| } | ||
|
|
||
| // Send tool execution success event | ||
| resultPreview := execResult.Result | ||
| if len(resultPreview) > 200 { | ||
| resultPreview = resultPreview[:200] + "..." | ||
| } | ||
| sendProgress("tool_result", fmt.Sprintf("Tool %s execution completed", toolCall.Function.Name), map[string]interface{}{ | ||
| "toolName": toolCall.Function.Name, | ||
| "success": !execResult.IsError, | ||
| "isError": execResult.IsError, | ||
| "result": execResult.Result, // full result | ||
| "resultPreview": resultPreview, // preview result | ||
| "executionId": execResult.ExecutionID, | ||
| "toolCallId": toolCall.ID, | ||
| "index": idx + 1, | ||
| "total": totalTools, | ||
| "iteration": i + 1, | ||
| }) | ||
|
|
||
| // If the tool returned an error, log it but do not interrupt the flow | ||
| if execResult.IsError { | ||
| a.logger.Warn("Tool returned error result, but continuing processing", | ||
| zap.String("tool", toolCall.Function.Name), | ||
| zap.String("result", execResult.Result), | ||
| ) | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
There is significant code duplication between the parallel (if useParallel) and sequential (else) tool execution paths, especially in how results are processed, messages are appended, and progress is sent. This makes the code harder to maintain.
Consider refactoring the common logic into a helper function. This function could take a tool call result (perhaps by adapting the parallelToolCallResult struct for both paths) and handle appending messages, sending progress, and logging.
| toolCall := choice.Message.ToolCalls[idx] | ||
|
|
||
| if pr.execErr != nil { | ||
| errorMsg := a.formatToolError(pr.toolName, pr.arguments, pr.execErr) | ||
| messages = append(messages, ChatMessage{ | ||
| Role: "tool", | ||
| ToolCallID: pr.toolCallID, | ||
| Content: errorMsg, | ||
| }) | ||
| sendProgress("tool_result", fmt.Sprintf("Tool %s execution failed", pr.toolName), map[string]interface{}{ | ||
| "toolName": pr.toolName, | ||
| "success": false, | ||
| "isError": true, | ||
| "error": pr.execErr.Error(), | ||
| "toolCallId": pr.toolCallID, | ||
| "index": idx + 1, | ||
| "total": totalTools, | ||
| "iteration": i + 1, | ||
| "parallel": true, | ||
| }) | ||
| a.logger.Warn("Parallel tool execution failed, detailed error message returned", | ||
| zap.String("tool", pr.toolName), | ||
| zap.Error(pr.execErr), | ||
| ) | ||
| } else { | ||
| execResult := pr.execResult | ||
| messages = append(messages, ChatMessage{ | ||
| Role: "tool", | ||
| ToolCallID: pr.toolCallID, | ||
| Content: execResult.Result, | ||
| }) | ||
| if execResult.ExecutionID != "" { | ||
| result.MCPExecutionIDs = append(result.MCPExecutionIDs, execResult.ExecutionID) | ||
| } | ||
| resultPreview := execResult.Result | ||
| if len(resultPreview) > 200 { | ||
| resultPreview = resultPreview[:200] + "..." | ||
| } | ||
| sendProgress("tool_result", fmt.Sprintf("Tool %s execution completed", pr.toolName), map[string]interface{}{ | ||
| "toolName": pr.toolName, | ||
| "success": !execResult.IsError, | ||
| "isError": execResult.IsError, | ||
| "result": execResult.Result, | ||
| "resultPreview": resultPreview, | ||
| "executionId": execResult.ExecutionID, | ||
| "toolCallId": pr.toolCallID, | ||
| "index": idx + 1, | ||
| "total": totalTools, | ||
| "iteration": i + 1, | ||
| "parallel": true, | ||
| }) | ||
| if execResult.IsError { | ||
| a.logger.Warn("Parallel tool returned error result, continuing processing", | ||
| zap.String("tool", pr.toolName), | ||
| zap.String("result", execResult.Result), | ||
| ) | ||
| } | ||
| } | ||
| sendProgress("tool_result", fmt.Sprintf("Tool %s execution completed", toolCall.Function.Name), map[string]interface{}{ | ||
| "toolName": toolCall.Function.Name, | ||
| "success": !execResult.IsError, | ||
| "isError": execResult.IsError, | ||
| "result": execResult.Result, // full result | ||
| "resultPreview": resultPreview, // preview result | ||
| "executionId": execResult.ExecutionID, | ||
| "toolCallId": toolCall.ID, | ||
| "index": idx + 1, | ||
| "total": len(choice.Message.ToolCalls), | ||
| "iteration": i + 1, | ||
| }) | ||
|
|
||
| // If the tool returned an error, log it but do not interrupt the flow | ||
| if execResult.IsError { | ||
| a.logger.Warn("Tool returned error result, but continuing processing", | ||
| _ = toolCall // referenced above via choice.Message.ToolCalls[idx] |
| results[idx] = parallelToolCallResult{ | ||
| index: idx, | ||
| toolCallID: tc.ID, | ||
| toolName: tc.Function.Name, | ||
| arguments: tc.Function.Arguments, | ||
| execResult: &ToolExecutionResult{ | ||
| Result: panicMsg, | ||
| IsError: true, | ||
| }, | ||
| } |
There was a problem hiding this comment.
In the panic recovery handler, the result of the panic is stored in execResult, but execErr is left as nil. A panic is a form of hard execution error, similar to what execErr is intended to represent. For consistency, it would be better to populate execErr with the panic information. This would allow the panic to be handled by the if pr.execErr != nil block in AgentLoopWithProgress, which is the more appropriate path for such errors.
results[idx] = parallelToolCallResult{
index: idx,
toolCallID: tc.ID,
toolName: tc.Function.Name,
arguments: tc.Function.Arguments,
execErr: fmt.Errorf(panicMsg),
}
parallelToolExecution,maxParallelTools, andtoolRetryCountfields toAgentConfig(config.go) and theAgentstruct.executeToolCallsInParallel: launches a goroutine per tool call protected by panic recovery; an optional semaphore limits concurrency whenmax_parallel_tools > 0; optional retry logic retries on transient hard errors up totool_retry_counttimes.parallel_tool_execution: trueuse the parallel path; single calls or disabled mode fall back to the existing sequential path. Results are always applied tomessagesin original index order to keep tool-call IDs consistent.parallel: trueflag in progress events so the frontend can distinguish parallel from sequential execution batches.config.yaml; addmax_parallel_toolsandtool_retry_countknobs.https://claude.ai/code/session_01Rd7wB3N5NRdEt5FTWPbABd