Skip to content

fix(runner): server tool results, mixed-tool execution, thought_signature passthrough#45

Merged
windsornguyen merged 5 commits intonextfrom
fix/gemini-thought-signature-passthrough
Feb 7, 2026
Merged

fix(runner): server tool results, mixed-tool execution, thought_signature passthrough#45
windsornguyen merged 5 commits intonextfrom
fix/gemini-thought-signature-passthrough

Conversation

@windsornguyen
Copy link
Member

Fixes three issues in the dependency-aware tool scheduler (#44):

  • Inject server tool results into conversation history before local tool execution
  • Prevent early loop exit when both server and local tools are pending
  • Preserve thought_signature during streaming tool call accumulation

Depends on #44.

@cursor
Copy link

cursor bot commented Feb 7, 2026

PR Summary

Medium Risk
Touches core runner streaming/tool-execution flow and conversation history mutation, which can change tool scheduling behavior and downstream model interactions, though changes are localized to tool-call handling.

Overview
Fixes streaming runner behavior around tool calls and MCP server tooling.

During streaming (sync/async), the runner now captures mcp_tool_results emitted in chunk metadata and adjusts mixed tool execution by no longer appending an extra assistant tool_calls message for the local-only subset before handing off to the dependency-aware scheduler.

Tool call parsing/stream accumulation is updated to passthrough thought_signature when present, so this metadata survives both non-stream extraction (_extract_tool_calls) and streaming delta accumulation (_accumulate_tool_calls).

Written by Cursor Bugbot for commit a8a9685. This will update automatically on new commits. Configure here.

# Collect MCP tool results emitted by the server
chunk_extra = getattr(chunk, "__pydantic_extra__", None) or {}
if isinstance(chunk_extra, dict) and "mcp_tool_results" in chunk_extra:
mcp_tool_results_from_server = chunk_extra["mcp_tool_results"]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MCP tool results overwritten instead of accumulated

Low Severity

When collecting MCP tool results from server chunks, the code uses assignment (mcp_tool_results_from_server = chunk_extra["mcp_tool_results"]) rather than extending the list. If the server sends mcp_tool_results across multiple chunks, earlier results would be overwritten and lost. This pattern appears in both the async streaming path (line 706) and sync streaming path (line 1015).

🔬 Verification Test

Why verification test was not possible: This potential edge case depends on the server's behavior when streaming MCP tool results. Without access to the actual server implementation or mock infrastructure that simulates multi-chunk MCP result delivery, it's not possible to verify whether results would be lost in practice. The bug is flagged based on the code pattern showing assignment rather than accumulation in a streaming context.

Additional Locations (1)

Fix in Cursor Fix in Web

@cursor
Copy link

cursor bot commented Feb 7, 2026

Bugbot Autofix prepared fixes for 2 of the 2 bugs found in the latest run.

  • ✅ Fixed: Massive commented-out code block left in file
    • Removed ~1,375 lines of commented-out 'production version' code and the BEGIN/END development markers that wrapped the active implementation.
  • ✅ Fixed: MCP tool results overwritten instead of accumulated
    • Changed direct assignment to .extend() in both async (line 703) and sync (line 1012) streaming paths so MCP tool results from multiple chunks are properly accumulated.

Create PR

Or push these changes by commenting:

@cursor push db1e9ff6f5
Preview (db1e9ff6f5)
diff --git a/src/dedalus_labs/lib/runner/core.py b/src/dedalus_labs/lib/runner/core.py
--- a/src/dedalus_labs/lib/runner/core.py
+++ b/src/dedalus_labs/lib/runner/core.py
@@ -1,6 +1,3 @@
-# ===========================================================================
-# BEGIN: feat/dep-graph-scheduler version (ACTIVE)
-# ===========================================================================
 # ==============================================================================
 #                  © 2025 Dedalus Labs, Inc. and affiliates
 #                            Licensed under MIT
@@ -703,7 +700,7 @@
                 # Collect MCP tool results emitted by the server
                 chunk_extra = getattr(chunk, "__pydantic_extra__", None) or {}
                 if isinstance(chunk_extra, dict) and "mcp_tool_results" in chunk_extra:
-                    mcp_tool_results_from_server = chunk_extra["mcp_tool_results"]
+                    mcp_tool_results_from_server.extend(chunk_extra["mcp_tool_results"])
 
                 if hasattr(chunk, "choices") and chunk.choices:
                     choice = chunk.choices[0]
@@ -1012,7 +1009,7 @@
                 # Collect MCP tool results emitted by the server
                 chunk_extra = getattr(chunk, "__pydantic_extra__", None) or {}
                 if isinstance(chunk_extra, dict) and "mcp_tool_results" in chunk_extra:
-                    mcp_tool_results_from_server = chunk_extra["mcp_tool_results"]
+                    mcp_tool_results_from_server.extend(chunk_extra["mcp_tool_results"])
 
                 if hasattr(chunk, "choices") and chunk.choices:
                     choice = chunk.choices[0]
@@ -1336,1378 +1333,3 @@
 
         return {k: v for k, v in kwargs.items() if v is not None}
 
-# ===========================================================================
-# END: feat/dep-graph-scheduler version
-# ===========================================================================
-
-# ===========================================================================
-# BEGIN: production version (COMMENTED OUT)
-# ===========================================================================
-# #                  © 2025 Dedalus Labs, Inc. and affiliates
-# #                            Licensed under MIT
-# #           github.com/dedalus-labs/dedalus-sdk-python/LICENSE
-#
-# from __future__ import annotations
-#
-# import json
-# import asyncio
-# import inspect
-# from typing import (
-#     TYPE_CHECKING,
-#     Any,
-#     Dict,
-#     Literal,
-#     Callable,
-#     Iterator,
-#     Protocol,
-#     AsyncIterator,
-#     Sequence,
-#     Union,
-# )
-# from dataclasses import field, asdict, dataclass
-#
-# if TYPE_CHECKING:
-#     from ...types.shared.dedalus_model import DedalusModel
-#
-# from ..._client import Dedalus, AsyncDedalus
-#
-# from .types import Message, ToolCall, JsonValue, ToolResult, PolicyInput, PolicyContext
-# from ...types.shared import MCPToolResult
-# from ..mcp import serialize_mcp_servers, MCPServerProtocol
-#
-# # Type alias for mcp_servers parameter - accepts strings, server objects, or mixed lists
-# MCPServersInput = Union[
-#     str,  # Single slug or URL
-#     MCPServerProtocol,  # MCP server object
-#     Sequence[Union[str, MCPServerProtocol, Dict[str, Any]]],  # Mixed list
-#     None,
-# ]
-# from ..utils._schemas import to_schema
-#
-#
-# def _process_policy(policy: PolicyInput, context: PolicyContext) -> Dict[str, JsonValue]:
-#     """Process policy, handling all possible input types safely."""
-#     if policy is None:
-#         return {}
-#
-#     if callable(policy):
-#         try:
-#             result = policy(context)
-#             return result if isinstance(result, dict) else {}
-#         except Exception:
-#             return {}
-#
-#     if isinstance(policy, dict):
-#         try:
-#             return dict(policy)
-#         except Exception:
-#             return {}
-#
-#     return {}
-#
-#
-# def _extract_mcp_results(response: Any) -> list[MCPToolResult]:
-#     """Extract MCP tool results from API response."""
-#     mcp_results = getattr(response, "mcp_tool_results", None)
-#     if not mcp_results:
-#         return []
-#     return [item if isinstance(item, MCPToolResult) else MCPToolResult.model_validate(item) for item in mcp_results]
-#
-#
-# class _ToolHandler(Protocol):
-#     def schemas(self) -> list[Dict]: ...
-#     async def exec(self, name: str, args: Dict[str, JsonValue]) -> JsonValue: ...
-#     def exec_sync(self, name: str, args: Dict[str, JsonValue]) -> JsonValue: ...
-#
-#
-# class _FunctionToolHandler:
-#     """Converts Python functions to tool handler via introspection."""
-#
-#     def __init__(self, funcs: list[Callable[..., Any]]):
-#         self._funcs = {f.__name__: f for f in funcs}
-#
-#     def schemas(self) -> list[Dict]:
-#         """Build OpenAI-compatible function schemas via introspection."""
-#         out: list[Dict[str, Any]] = []
-#         for fn in self._funcs.values():
-#             try:
-#                 out.append(to_schema(fn))
-#             except Exception:
-#                 continue
-#         return out
-#
-#     async def exec(self, name: str, args: Dict[str, JsonValue]) -> JsonValue:
-#         """Execute tool by name with given args (async)."""
-#         fn = self._funcs[name]
-#         if inspect.iscoroutinefunction(fn):
-#             return await fn(**args)
-#         # asyncio.to_thread is Python 3.9+, use run_in_executor for 3.8 compat
-#         loop = asyncio.get_event_loop()
-#         # Use partial to properly pass keyword arguments
-#         from functools import partial
-#
-#         return await loop.run_in_executor(None, partial(fn, **args))
-#
-#     def exec_sync(self, name: str, args: Dict[str, JsonValue]) -> JsonValue:
-#         """Execute tool by name with given args (sync)."""
-#         fn = self._funcs[name]
-#         if inspect.iscoroutinefunction(fn):
-#             loop = asyncio.new_event_loop()
-#             asyncio.set_event_loop(loop)
-#             try:
-#                 return loop.run_until_complete(fn(**args))
-#             finally:
-#                 loop.close()
-#         return fn(**args)
-#
-#
-# @dataclass
-# class _ModelConfig:
-#     """Model configuration parameters."""
-#
-#     id: str
-#     model_list: list[str] | None = None  # Store the full model list when provided
-#     temperature: float | None = None
-#     max_tokens: int | None = None
-#     top_p: float | None = None
-#     frequency_penalty: float | None = None
-#     presence_penalty: float | None = None
-#     logit_bias: Dict[str, int] | None = None
-#     response_format: Dict[str, JsonValue] | type | None = None  # Dict or Pydantic model
-#     agent_attributes: Dict[str, float] | None = None
-#     model_attributes: Dict[str, Dict[str, float]] | None = None
-#     tool_choice: str | Dict[str, JsonValue] | None = None
-#     guardrails: list[Dict[str, JsonValue]] | None = None
-#     handoff_config: Dict[str, JsonValue] | None = None
-#
-#
-# @dataclass
-# class _ExecutionConfig:
-#     """Configuration for tool execution behavior and policies."""
-#
-#     mcp_servers: list[str | Dict[str, Any]] = field(default_factory=list)  # Wire format
-#     credentials: list[Any] | None = None  # CredentialProtocol objects (not serialized)
-#     max_steps: int = 10
-#     stream: bool = False
-#     transport: Literal["http", "realtime"] = "http"
-#     verbose: bool = False
-#     debug: bool = False
-#     on_tool_event: Callable[[Dict[str, JsonValue]], None] | None = None
-#     return_intent: bool = False
-#     policy: PolicyInput = None
-#     available_models: list[str] = field(default_factory=list)
-#     strict_models: bool = True
-#
-#
-# @dataclass
-# class _RunResult:
-#     """Result from a completed tool execution run."""
-#
-#     final_output: str  # Final text output from conversation
-#     tool_results: list[ToolResult]
-#     steps_used: int
-#     messages: list[Message] = field(default_factory=list)  # Full conversation history
-#     intents: list[Dict[str, JsonValue]] | None = None
-#     tools_called: list[str] = field(default_factory=list)
-#     mcp_results: list[MCPToolResult] = field(default_factory=list)
-#     """MCP tool results from server-side tool calls."""
-#
-#     @property
-#     def output(self) -> str:
-#         """Alias for final_output."""
-#         return self.final_output
-#
-#     @property
-#     def content(self) -> str:
-#         """Alias for final_output."""
-#         return self.final_output
-#
-#     def to_input_list(self) -> list[Message]:
-#         """Get the full conversation history for continuation."""
-#         return list(self.messages)
-#
-#
-# class DedalusRunner:
-#     """Enhanced Dedalus client with tool execution capabilities."""
-#
-#     def __init__(self, client: Dedalus | AsyncDedalus, verbose: bool = False):
-#         self.client = client
-#         self.verbose = verbose
-#
-#     def run(
-#         self,
-#         input: str | list[Message] | None = None,
-#         tools: list[Callable] | None = None,
-#         messages: list[Message] | None = None,
-#         instructions: str | None = None,
-#         model: str | list[str] | DedalusModel | list[DedalusModel] | None = None,
-#         max_steps: int = 10,
-#         mcp_servers: MCPServersInput = None,
-#         credentials: Sequence[Any] | None = None,  # TODO: Loosely typed as `Any` for now
-#         temperature: float | None = None,
-#         max_tokens: int | None = None,
-#         top_p: float | None = None,
-#         frequency_penalty: float | None = None,
-#         presence_penalty: float | None = None,
-#         logit_bias: Dict[str, int] | None = None,
-#         response_format: Dict[str, JsonValue] | type | None = None,
-#         stream: bool = False,
-#         transport: Literal["http", "realtime"] = "http",
-#         verbose: bool | None = None,
-#         debug: bool | None = None,
-#         on_tool_event: Callable[[Dict[str, JsonValue]], None] | None = None,
-#         return_intent: bool = False,
-#         agent_attributes: Dict[str, float] | None = None,
-#         model_attributes: Dict[str, Dict[str, float]] | None = None,
-#         tool_choice: str | Dict[str, JsonValue] | None = None,
-#         guardrails: list[Dict[str, JsonValue]] | None = None,
-#         handoff_config: Dict[str, JsonValue] | None = None,
-#         policy: PolicyInput = None,
-#         available_models: list[str] | None = None,
-#         strict_models: bool = True,
-#     ):
-#         """Execute tools with unified async/sync + streaming/non-streaming logic."""
-#         if not model:
-#             raise ValueError("model must be provided")
-#
-#         # Validate tools parameter
-#         if tools is not None:
-#             if not isinstance(tools, list):
-#                 msg = "tools must be a list of callable functions or None"
-#                 raise ValueError(msg)
-#
-#             # Check for nested lists (common mistake: tools=[[]] instead of tools=[])
-#             for i, tool in enumerate(tools):
-#                 if not callable(tool):
-#                     if isinstance(tool, list):
-#                         msg = f"tools[{i}] is a list, not a callable function. Did you mean to pass tools={tool} instead of tools=[{tool}]?"
-#                         raise TypeError(msg)
-#                     msg = (
-#                         f"tools[{i}] is not callable (got {type(tool).__name__}). All tools must be callable functions."
-#                     )
-#                     raise TypeError(msg)
-#
-#         # Parse model to extract name and config
-#         model_name = None
-#         model_list = []
-#
-#         if isinstance(model, list):
-#             if not model:
-#                 raise ValueError("model list cannot be empty")
-#             # Handle list of DedalusModel or strings
-#             for m in model:
-#                 if hasattr(m, "name"):  # DedalusModel
-#                     model_list.append(m.name)
-#                     # Use config from first DedalusModel if params not explicitly set
-#                     if model_name is None:
-#                         model_name = m.name
-#                         temperature = temperature if temperature is not None else getattr(m, "temperature", None)
-#                         max_tokens = max_tokens if max_tokens is not None else getattr(m, "max_tokens", None)
-#                         top_p = top_p if top_p is not None else getattr(m, "top_p", None)
-#                         frequency_penalty = (
-#                             frequency_penalty
-#                             if frequency_penalty is not None
-#                             else getattr(m, "frequency_penalty", None)
-#                         )
-#                         presence_penalty = (
-#                             presence_penalty if presence_penalty is not None else getattr(m, "presence_penalty", None)
-#                         )
-#                         logit_bias = logit_bias if logit_bias is not None else getattr(m, "logit_bias", None)
-#
-#                         # Extract additional parameters from first DedalusModel
-#                         stream = stream if stream is not False else getattr(m, "stream", False)
-#                         tool_choice = tool_choice if tool_choice is not None else getattr(m, "tool_choice", None)
-#
-#                         # Extract Dedalus-specific extensions
-#                         if hasattr(m, "attributes") and m.attributes:
-#                             agent_attributes = agent_attributes if agent_attributes is not None else m.attributes
-#
-#                         # Check for unsupported parameters (only warn once for first model)
-#                         unsupported_params = []
-#                         if hasattr(m, "n") and m.n is not None:
-#                             unsupported_params.append("n")
-#                         if hasattr(m, "stop") and m.stop is not None:
-#                             unsupported_params.append("stop")
-#                         if hasattr(m, "stream_options") and m.stream_options is not None:
-#                             unsupported_params.append("stream_options")
-#                         if hasattr(m, "logprobs") and m.logprobs is not None:
-#                             unsupported_params.append("logprobs")
-#                         if hasattr(m, "top_logprobs") and m.top_logprobs is not None:
-#                             unsupported_params.append("top_logprobs")
-#                         if hasattr(m, "seed") and m.seed is not None:
-#                             unsupported_params.append("seed")
-#                         if hasattr(m, "service_tier") and m.service_tier is not None:
-#                             unsupported_params.append("service_tier")
-#                         if hasattr(m, "tools") and m.tools is not None:
-#                             unsupported_params.append("tools")
-#                         if hasattr(m, "parallel_tool_calls") and m.parallel_tool_calls is not None:
-#                             unsupported_params.append("parallel_tool_calls")
-#                         if hasattr(m, "user") and m.user is not None:
-#                             unsupported_params.append("user")
-#                         if hasattr(m, "max_completion_tokens") and m.max_completion_tokens is not None:
-#                             unsupported_params.append("max_completion_tokens")
-#
-#                         if unsupported_params:
-#                             import warnings
-#
-#                             warnings.warn(
-#                                 f"The following DedalusModel parameters are not yet supported and will be ignored: {', '.join(unsupported_params)}. "
-#                                 f"Support for these parameters is coming soon.",
-#                                 UserWarning,
-#                                 stacklevel=2,
-#                             )
-#                 else:  # String
-#                     model_list.append(m)
-#                     if model_name is None:
-#                         model_name = m
-#         elif hasattr(model, "name"):  # Single DedalusModel
-#             model_name = model.name
-#             model_list = [model.name]
-#             # Extract config from DedalusModel if params not explicitly set
-#             temperature = temperature if temperature is not None else getattr(model, "temperature", None)
-#             max_tokens = max_tokens if max_tokens is not None else getattr(model, "max_tokens", None)
-#             top_p = top_p if top_p is not None else getattr(model, "top_p", None)
-#             frequency_penalty = (
-#                 frequency_penalty if frequency_penalty is not None else getattr(model, "frequency_penalty", None)
-#             )
-#             presence_penalty = (
-#                 presence_penalty if presence_penalty is not None else getattr(model, "presence_penalty", None)
-#             )
-#             logit_bias = logit_bias if logit_bias is not None else getattr(model, "logit_bias", None)
-#
-#             # Extract additional supported parameters
-#             stream = stream if stream is not False else getattr(model, "stream", False)
-#             tool_choice = tool_choice if tool_choice is not None else getattr(model, "tool_choice", None)
-#
-#             # Extract Dedalus-specific extensions
-#             if hasattr(model, "attributes") and model.attributes:
-#                 agent_attributes = agent_attributes if agent_attributes is not None else model.attributes
-#             if hasattr(model, "metadata") and model.metadata:
-#                 # metadata is stored but not yet fully utilized
-#                 pass
-#
-#             # Log warnings for unsupported parameters
-#             unsupported_params = []
-#             if hasattr(model, "n") and model.n is not None:
-#                 unsupported_params.append("n")
-#             if hasattr(model, "stop") and model.stop is not None:
-#                 unsupported_params.append("stop")
-#             if hasattr(model, "stream_options") and model.stream_options is not None:
-#                 unsupported_params.append("stream_options")
-#             if hasattr(model, "logprobs") and model.logprobs is not None:
-#                 unsupported_params.append("logprobs")
-#             if hasattr(model, "top_logprobs") and model.top_logprobs is not None:
-#                 unsupported_params.append("top_logprobs")
-#             if hasattr(model, "seed") and model.seed is not None:
-#                 unsupported_params.append("seed")
-#             if hasattr(model, "service_tier") and model.service_tier is not None:
-#                 unsupported_params.append("service_tier")
-#             if hasattr(model, "tools") and model.tools is not None:
-#                 unsupported_params.append("tools")
-#             if hasattr(model, "parallel_tool_calls") and model.parallel_tool_calls is not None:
-#                 unsupported_params.append("parallel_tool_calls")
-#             if hasattr(model, "user") and model.user is not None:
-#                 unsupported_params.append("user")
-#             if hasattr(model, "max_completion_tokens") and model.max_completion_tokens is not None:
-#                 unsupported_params.append("max_completion_tokens")
-#
-#             if unsupported_params:
-#                 import warnings
-#
-#                 warnings.warn(
-#                     f"The following DedalusModel parameters are not yet supported and will be ignored: {', '.join(unsupported_params)}. "
-#                     f"Support for these parameters is coming soon.",
-#                     UserWarning,
-#                     stacklevel=2,
-#                 )
-#         else:  # Single string
-#             model_name = model
-#             model_list = [model] if model else []
-#
-#         available_models = model_list if available_models is None else available_models
-#
-#         model_config = _ModelConfig(
-#             id=str(model_name),
-#             model_list=model_list,  # Pass the full model list
-#             temperature=temperature,
-#             max_tokens=max_tokens,
-#             top_p=top_p,
-#             frequency_penalty=frequency_penalty,
-#             presence_penalty=presence_penalty,
-#             logit_bias=logit_bias,
-#             response_format=response_format,
-#             agent_attributes=agent_attributes,
-#             model_attributes=model_attributes,
-#             tool_choice=tool_choice,
-#             guardrails=guardrails,
-#             handoff_config=handoff_config,
-#         )
-#
-#         # Serialize mcp_servers to wire format
-#         serialized_mcp_servers = serialize_mcp_servers(mcp_servers)
-#
-#         exec_config = _ExecutionConfig(
-#             mcp_servers=serialized_mcp_servers,
-#             credentials=list(credentials) if credentials else None,
-#             max_steps=max_steps,
-#             stream=stream,
-#             transport=transport,
-#             verbose=verbose if verbose is not None else self.verbose,
-#             debug=debug or False,
-#             on_tool_event=on_tool_event,
-#             return_intent=return_intent,
-#             policy=policy,
-#             available_models=available_models or [],
-#             strict_models=strict_models,
-#         )
-#
-#         tool_handler = _FunctionToolHandler(list(tools or []))
-#
-#         # Handle instructions and messages parameters
-#         if instructions is not None and messages is not None:
-#             # instructions overrides any existing system messages
-#             conversation = [{"role": "system", "content": instructions}] + [
-#                 msg for msg in messages if msg.get("role") != "system"
-#             ]
-#         elif instructions is not None:
-#             # Convert instructions to system message, optionally with user input
-#             if input is not None:
-#                 if isinstance(input, str):
-#                     conversation = [
-#                         {"role": "system", "content": instructions},
-#                         {"role": "user", "content": input},
-#                     ]
-#                 else:
-#                     conversation = [{"role": "system", "content": instructions}] + list(input)
-#             else:
-#                 conversation = [{"role": "system", "content": instructions}]
-#         elif messages is not None:
-#             conversation = messages
-#         elif input is not None:
-#             conversation = [{"role": "user", "content": input}] if isinstance(input, str) else input
-#         else:
-#             raise ValueError("Must provide one of: 'instructions', 'messages', or 'input'")
-#
-#         return self._execute_conversation(conversation, tool_handler, model_config, exec_config)
-#
-#     def _execute_conversation(
-#         self,
-#         messages: list[Message],
-#         tool_handler: _ToolHandler,
-#         model_config: _ModelConfig,
-#         exec_config: _ExecutionConfig,
-#     ):
-#         """Execute conversation with unified logic for all client/streaming combinations."""
-#         is_async = isinstance(self.client, AsyncDedalus)
-#
-#         if is_async:
-#             if exec_config.stream:
-#                 return self._execute_streaming_async(messages, tool_handler, model_config, exec_config)
-#             else:
-#                 return self._execute_turns_async(messages, tool_handler, model_config, exec_config)
-#         else:
-#             if exec_config.stream:
-#                 return self._execute_streaming_sync(messages, tool_handler, model_config, exec_config)
-#             else:
-#                 return self._execute_turns_sync(messages, tool_handler, model_config, exec_config)
-#
-#     async def _execute_turns_async(
-#         self,
-#         messages: list[Message],
-#         tool_handler: _ToolHandler,
-#         model_config: _ModelConfig,
-#         exec_config: _ExecutionConfig,
-#     ) -> _RunResult:
-#         """Execute async non-streaming conversation."""
-#         messages = list(messages)
-#         steps = 0
-#         final_text = ""
-#         tool_results: list[ToolResult] = []
-#         tools_called: list[str] = []
-#
-#         while steps < exec_config.max_steps:
-#             steps += 1
-#             if exec_config.verbose:
-#                 print(f"Step started: Step={steps}")
-#                 # Show what models are configured
-#                 if model_config.model_list and len(model_config.model_list) > 1:
-#                     print(f"  Available models: {model_config.model_list}")
-#                     print(f"    Primary model: {model_config.id}")
-#                 else:
-#                     print(f"  Using model: {model_config.id}")
-#
-#             # Apply policy and get model params
-#             policy_result = self._apply_policy(
-#                 exec_config.policy,
-#                 {
-#                     "step": steps,
-#                     "messages": messages,
-#                     "model": model_config.id,
-#                     "mcp_servers": exec_config.mcp_servers,
-#                     "tools": list(getattr(tool_handler, "_funcs", {}).keys()),
-#                     "available_models": exec_config.available_models,
-#                 },
-#                 model_config,
-#                 exec_config,
-#             )
-#
-#             # Make model call
-#             current_messages = self._build_messages(messages, policy_result["prepend"], policy_result["append"])
-#
-#             response = await self.client.chat.completions.create(
-#                 model=policy_result["model"],
-#                 messages=current_messages,
-#                 tools=tool_handler.schemas() or None,
-#                 mcp_servers=policy_result["mcp_servers"],
-#                 credentials=exec_config.credentials,
-#                 **{**self._mk_kwargs(model_config), **policy_result["model_kwargs"]},
-#             )
-#
-#             if exec_config.verbose:
-#                 actual_model = policy_result["model"]
-#                 if isinstance(actual_model, list):
-#                     print(f"  API called with model list: {actual_model}")
-#                 else:
-#                     print(f"  API called with single model: {actual_model}")
-#                 print(f"  Response received (server says model: {getattr(response, 'model', 'unknown')})")
-#                 print(f"    Response type: {type(response).__name__}")
-#                 # Surface agent timeline if server included it
-#                 agents_used = getattr(response, "agents_used", None)
-#                 if not agents_used:
-#                     extra = getattr(response, "__pydantic_extra__", None)
-#                     if isinstance(extra, dict):
-#                         agents_used = extra.get("agents_used")
-#                 if agents_used:
-#                     print(f" [EVENT] agents_used: {agents_used}")
-#
-#             # Check if we have tool calls
-#             if not hasattr(response, "choices") or not response.choices:
-#                 final_text = ""
-#                 break
-#
-#             message = response.choices[0].message
-#             msg = vars(message) if hasattr(message, "__dict__") else message
-#             tool_calls = msg.get("tool_calls")
-#             content = msg.get("content", "")
-#
-#             if exec_config.verbose:
-#                 print(f" Response content: {content[:100] if content else '(none)'}...")
-#                 if tool_calls:
-#                     call_names = []
-#                     for tc in tool_calls:
-#                         try:
-#                             if isinstance(tc, dict):
-#                                 call_names.append(tc.get("function", {}).get("name", "?"))
-#                             else:
-#                                 call_names.append(getattr(getattr(tc, "function", None), "name", "?"))
-#                         except Exception:
-#                             call_names.append("?")
-#                     print(f" Tool calls in response: {call_names}")
-#
-#             if not tool_calls:
-#                 final_text = content or ""
-#                 # Add assistant response to conversation
-#                 if final_text:
-#                     messages.append({"role": "assistant", "content": final_text})
-#                 break
-#
-#             # Execute tools
-#             tool_calls = self._extract_tool_calls(response.choices[0])
-#             if exec_config.verbose:
-#                 print(f" Extracted {len(tool_calls)} tool calls")
-#                 for tc in tool_calls:
-#                     print(f"  - {tc.get('function', {}).get('name', '?')} (id: {tc.get('id', '?')})")
-#             await self._execute_tool_calls(
-#                 tool_calls,
-#                 tool_handler,
-#                 messages,
-#                 tool_results,
-#                 tools_called,
-#                 steps,
-#                 verbose=exec_config.verbose,
-#             )
-#
-#         # Extract MCP tool executions from the last response
-#         mcp_results = _extract_mcp_results(response)
-#
-#         return _RunResult(
-#             final_output=final_text,
-#             tool_results=tool_results,
-#             steps_used=steps,
-#             tools_called=tools_called,
-#             messages=messages,
-#             mcp_results=mcp_results,
-#         )
-#
-#     async def _execute_streaming_async(
-#         self,
-#         messages: list[Message],
-#         tool_handler: _ToolHandler,
-#         model_config: _ModelConfig,
-#         exec_config: _ExecutionConfig,
-#     ) -> AsyncIterator[Any]:
-#         """Execute async streaming conversation."""
-#         messages = list(messages)
-#         steps = 0
-#
-#         while steps < exec_config.max_steps:
-#             steps += 1
-#             if exec_config.verbose:
-#                 print(f"Step started: Step={steps} (max_steps={exec_config.max_steps})")
-#                 print(f" Starting step {steps} with {len(messages)} messages in conversation")
-#                 print(f" Message history:")
-#                 for i, msg in enumerate(messages):
-#                     role = msg.get("role")
-#                     content = str(msg.get("content", ""))[:50] + "..." if msg.get("content") else ""
-#                     tool_info = ""
-#                     if msg.get("tool_calls"):
-#                         tool_names = [tc.get("function", {}).get("name", "?") for tc in msg.get("tool_calls", [])]
-#                         tool_info = f" [calling: {', '.join(tool_names)}]"
-#                     elif msg.get("tool_call_id"):
-#                         tool_info = f" [response to: {msg.get('tool_call_id')[:8]}...]"
-#                     print(f"  [Message {i}] {role}: {content}{tool_info}")
-#
-#             # Apply policy
-#             policy_result = self._apply_policy(
-#                 exec_config.policy,
-#                 {
-#                     "step": steps,
-#                     "messages": messages,
-#                     "model": model_config.id,
-#                     "mcp_servers": exec_config.mcp_servers,
-#                     "tools": list(getattr(tool_handler, "_funcs", {}).keys()),
-#                     "available_models": exec_config.available_models,
-#                 },
-#                 model_config,
-#                 exec_config,
-#             )
-#
-#             # Stream model response
-#             current_messages = self._build_messages(messages, policy_result["prepend"], policy_result["append"])
-#
-#             # Suppress per-message debug; keep streaming minimal
-#
-#             stream = await self.client.chat.completions.create(
-#                 model=policy_result["model"],
-#                 messages=current_messages,
-#                 tools=tool_handler.schemas() or None,
-#                 mcp_servers=policy_result["mcp_servers"],
-#                 credentials=exec_config.credentials,
-#                 stream=True,
-#                 **{**self._mk_kwargs(model_config), **policy_result["model_kwargs"]},
-#             )
-#
-#             tool_calls = []
-#             chunk_count = 0
-#             content_chunks = 0
-#             tool_call_chunks = 0
-#             finish_reason = None
-#             async for chunk in stream:
-#                 chunk_count += 1
-#                 if exec_config.verbose:
-#                     # Only surface agent_updated metadata; suppress raw chunk spam
-#                     extra = getattr(chunk, "__pydantic_extra__", None)
-#                     if isinstance(extra, dict):
-#                         meta = extra.get("x_dedalus_event") or extra.get("dedalus_event")
-#                         if isinstance(meta, dict) and meta.get("type") == "agent_updated":
-#                             print(f" [EVENT] agent_updated: agent={meta.get('agent')} model={meta.get('model')}")
-#                 if hasattr(chunk, "choices") and chunk.choices:
-#                     choice = chunk.choices[0]
-#                     delta = choice.delta
-#
-#                     # Check finish reason
-#                     if hasattr(choice, "finish_reason") and choice.finish_reason:
-#                         finish_reason = choice.finish_reason
-#                         # suppress per-chunk finish_reason spam
-#
-#                     # Check for tool calls
-#                     if hasattr(delta, "tool_calls") and delta.tool_calls:
-#                         tool_call_chunks += 1
-#                         self._accumulate_tool_calls(delta.tool_calls, tool_calls)
-#                         # suppress per-chunk tool_call delta spam
-#
-#                     # Check for content
-#                     if hasattr(delta, "content") and delta.content:
-#                         content_chunks += 1
-#                         # suppress per-chunk content spam
-#
-#                     # Check for role (suppressed)
-#                     if hasattr(delta, "role") and delta.role:
-#                         pass
-#
-#                     yield chunk
-#
-#             if exec_config.verbose:
-#                 # Keep a compact end-of-stream summary
-#                 names = [tc.get("function", {}).get("name", "unknown") for tc in tool_calls]
-#                 print(f" Stream summary: chunks={chunk_count} content={content_chunks} tool_calls={tool_call_chunks}")
-#                 if names:
-#                     print(f" Tools called this turn: {names}")
-#
-#             # Execute any accumulated tool calls
-#             if tool_calls:
-#                 if exec_config.verbose:
-#                     print(f" Processing {len(tool_calls)} tool calls")
-#
-#                 # Categorize tools
-#                 local_names = [
-#                     tc["function"]["name"]
-#                     for tc in tool_calls
-#                     if tc["function"]["name"] in getattr(tool_handler, "_funcs", {})
-#                 ]
-#                 mcp_names = [
-#                     tc["function"]["name"]
-#                     for tc in tool_calls
-#                     if tc["function"]["name"] not in getattr(tool_handler, "_funcs", {})
-#                 ]
-#
-#                 # Check if ALL tools are MCP tools (none are local)
-#                 all_mcp = all(tc["function"]["name"] not in getattr(tool_handler, "_funcs", {}) for tc in tool_calls)
-#
-#                 # Check if stream already contains content (MCP results)
-#                 has_streamed_content = content_chunks > 0
-#
-#                 if exec_config.verbose:
-#                     print(f" Local tools used: {local_names}")
-#                     print(f" Server tools used: {mcp_names}")
-#
-#                 # When MCP tools are involved and content was streamed, we're done
-#                 if mcp_names and has_streamed_content:
-#                     if exec_config.verbose:
-#                         print(f" MCP tools called and content streamed - response complete, breaking loop")
-#                     break
-#
-#                 if all_mcp:
-#                     # All tools are MCP - the response should be streamed
-#                     if exec_config.verbose:
-#                         print(f" All tools are MCP, expecting streamed response")
-#                     # Don't break here - let the next iteration handle it
-#                 else:
-#                     # We have at least one local tool
-#                     # Filter to only include local tool calls in the assistant message
-#                     local_only_tool_calls = [
-#                         tc for tc in tool_calls if tc["function"]["name"] in getattr(tool_handler, "_funcs", {})
-#                     ]
-#                     messages.append({"role": "assistant", "tool_calls": local_only_tool_calls})
-#                     if exec_config.verbose:
-#                         print(
-#                             f" Added assistant message with {len(local_only_tool_calls)} local tool calls (filtered from {len(tool_calls)} total)"
-#                         )
-#
-#                     # Execute only local tools
-#                     for tc in tool_calls:
-#                         fn_name = tc["function"]["name"]
-#                         fn_args_str = tc["function"]["arguments"]
-#
-#                         if fn_name in getattr(tool_handler, "_funcs", {}):
-#                             # Local tool
-#                             try:
-#                                 fn_args = json.loads(fn_args_str)
-#                             except json.JSONDecodeError:
... diff truncated: showing 800 of 1407 lines

@cursor
Copy link

cursor bot commented Feb 7, 2026

Bugbot Autofix prepared fixes for 1 of the 1 bugs found in the latest run.

  • ✅ Fixed: Missing verbose warning in sync streaming path
    • Added the missing elif exec_config.verbose: clause with the warning print statement to the sync streaming path, matching the existing async streaming path.

Create PR

Or push these changes by commenting:

@cursor push 18ce68dda3
Preview (18ce68dda3)
diff --git a/src/dedalus_labs/lib/runner/core.py b/src/dedalus_labs/lib/runner/core.py
--- a/src/dedalus_labs/lib/runner/core.py
+++ b/src/dedalus_labs/lib/runner/core.py
@@ -1107,6 +1107,8 @@
                             if result_data:
                                 content = json.dumps(result_data["result"]) if result_data.get("result") is not None else ""
                                 messages.append({"role": "tool", "tool_call_id": call_id, "content": content})
+                            elif exec_config.verbose:
+                                print(f"  Warning: no server result for MCP tool {tc_name} ({call_id[:8]}...)")
 
                     # Execute only local tools
                     local_only = [

@windsornguyen windsornguyen merged commit 637d9b8 into next Feb 7, 2026
5 of 7 checks passed
@stainless-app stainless-app bot mentioned this pull request Feb 7, 2026
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Bugbot Autofix is ON. A Cloud Agent has been kicked off to fix the reported issues.

# Collect MCP tool results emitted by the server
chunk_extra = getattr(chunk, "__pydantic_extra__", None) or {}
if isinstance(chunk_extra, dict) and "mcp_tool_results" in chunk_extra:
mcp_tool_results_from_server = chunk_extra["mcp_tool_results"]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Server tool results collected but never injected into messages

High Severity

The variable mcp_tool_results_from_server is collected from stream chunks but never actually used. The PR description explicitly states the intent is to "inject server tool results into conversation history before local tool execution," but the collected results are never injected into messages. The variable is initialized, populated in the loop, then discarded without being used. The fix for this feature is incomplete.

🔬 Verification Test

Why verification test was not possible: The VM infrastructure was unreachable during this review (repeated "Pod exists but exec-daemon is unreachable" errors across 30+ attempts). However, this bug is clearly identifiable through static analysis of the diff alone - the variable mcp_tool_results_from_server is only ever written to (on lines 703 and 983) and never read from anywhere in the diff. Searching for any usage of this variable shows it's only referenced in assignment statements, confirming the collected data is never utilized.

Additional Locations (1)

Fix in Cursor Fix in Web


import asyncio
import inspect
import json
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused json import added to module

Low Severity

The import json statement was added but the json module is not used anywhere in the changed code. This appears to be a leftover from planned code that was not implemented, likely related to the incomplete mcp_tool_results_from_server feature which may have intended to serialize results using JSON.

🔬 Verification Test

Why verification test was not possible: The VM infrastructure was unreachable during this review. However, this issue is verifiable through static analysis - the string "json." does not appear anywhere in the diff's changed code, confirming the import is unused in the new code being added.

Fix in Cursor Fix in Web

@cursor
Copy link

cursor bot commented Feb 7, 2026

Bugbot Autofix prepared fixes for 2 of the 2 bugs found in the latest run.

  • ✅ Fixed: Server tool results collected but never injected into messages
    • Added _inject_mcp_tool_messages() helper and called it in both async and sync streaming paths to inject MCP tool results as tool-response messages into the conversation history (along with the required assistant message) before local tool execution.
  • ✅ Fixed: Unused json import added to module
    • Removed the unused import json statement since all json usage was moved to the _scheduler module during the refactoring.

Create PR

Or push these changes by commenting:

@cursor push 6f66811ca8
Preview (6f66811ca8)
diff --git a/src/dedalus_labs/lib/runner/core.py b/src/dedalus_labs/lib/runner/core.py
--- a/src/dedalus_labs/lib/runner/core.py
+++ b/src/dedalus_labs/lib/runner/core.py
@@ -8,7 +8,6 @@
 
 import asyncio
 import inspect
-import json
 from typing import (
     TYPE_CHECKING,
     Any,
@@ -70,6 +69,39 @@
     return [item if isinstance(item, MCPToolResult) else MCPToolResult.model_validate(item) for item in mcp_results]
 
 
+def _inject_mcp_tool_messages(
+    tool_calls: list[Dict[str, Any]],
+    mcp_results: list,
+    messages: list,
+    local_funcs: set,
+) -> None:
+    """Add tool-response messages for server-handled MCP tool calls.
+
+    Matches each MCP result to its corresponding tool call by
+    ``tool_name`` and appends a ``role='tool'`` message so the
+    conversation history is complete before local tool execution.
+    """
+    results_by_name: Dict[str, list] = {}
+    for r in mcp_results:
+        name = r["tool_name"] if isinstance(r, dict) else getattr(r, "tool_name", "")
+        results_by_name.setdefault(name, []).append(r)
+
+    for tc in tool_calls:
+        fn_name = tc["function"]["name"]
+        if fn_name not in local_funcs:
+            queue = results_by_name.get(fn_name, [])
+            if queue:
+                res = queue.pop(0)
+                content = res.get("result") if isinstance(res, dict) else getattr(res, "result", None)
+                messages.append(
+                    {
+                        "role": "tool",
+                        "tool_call_id": tc["id"],
+                        "content": str(content) if content is not None else "",
+                    }
+                )
+
+
 class _ToolHandler(Protocol):
     def schemas(self) -> list[Dict]: ...
     async def exec(self, name: str, args: Dict[str, JsonValue]) -> JsonValue: ...
@@ -770,6 +802,18 @@
 
                 # At least one local tool exists. Execute via the dependency aware scheduler.
                 if not all_mcp:
+                    # Record assistant message with all tool calls (OpenAI format).
+                    messages.append({"role": "assistant", "tool_calls": list(tool_calls)})
+
+                    # Inject server-side MCP tool results before local execution.
+                    if mcp_tool_results_from_server:
+                        _inject_mcp_tool_messages(
+                            tool_calls,
+                            mcp_tool_results_from_server,
+                            messages,
+                            set(getattr(tool_handler, "_funcs", {})),
+                        )
+
                     local_only = [
                         tc for tc in tool_calls if tc["function"]["name"] in getattr(tool_handler, "_funcs", {})
                     ]
@@ -1059,6 +1103,18 @@
 
                 # At least one local tool exists. Execute via the dependency aware scheduler.
                 if not all_mcp:
+                    # Record assistant message with all tool calls (OpenAI format).
+                    messages.append({"role": "assistant", "tool_calls": list(tool_calls)})
+
+                    # Inject server-side MCP tool results before local execution.
+                    if mcp_tool_results_from_server:
+                        _inject_mcp_tool_messages(
+                            tool_calls,
+                            mcp_tool_results_from_server,
+                            messages,
+                            set(getattr(tool_handler, "_funcs", {})),
+                        )
+
                     local_only = [
                         tc for tc in tool_calls if tc["function"]["name"] in getattr(tool_handler, "_funcs", {})
                     ]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants