In [None]:
import mlflow
import dspy

from dotenv import load_dotenv

load_dotenv(dotenv_path="../.env.local")

# Enable autologging with all features
mlflow.dspy.autolog(
    log_compiles=True,  # Track optimization process
    log_evals=True,  # Track evaluation results
    log_traces_from_compile=True,  # Track program traces during optimization
)

# Configure MLflow tracking
mlflow.set_tracking_uri("http://127.0.0.1:5000/")  # Use local MLflow server
mlflow.set_experiment("deep_leads_dspy_test")

### Tools


In [4]:
import os

from tavily import TavilyClient


tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))


async def browse_web(query: str) -> str:
    """browse the web for information"""
    try:
        research_results = tavily_client.search(query, max_results=5)
    except Exception as e:
        print(f"Error browsing the web: {e}")
        return "Error browsing the web"

    return research_results


async def get_website_map(url: str) -> str:
    """get the website map"""
    try:
        website_map = tavily_client.map(url)
    except Exception as e:
        print(f"Error getting the website map: {e}")
        return "Error getting the website map"
    return website_map


async def get_website_content(url: str) -> str:
    """get the website content"""
    try:
        website_content = tavily_client.extract(url)
    except Exception as e:
        print(f"Error getting the website content: {e}")
        return "Error getting the website content"
    return website_content

### Single Agent


In [3]:
from src.types import LeadResults

# Enable caching
dspy.settings.configure(lm=dspy.LM("openai/gpt-4.1"), track_usage=True)


class SingleAgentSig(dspy.Signature):
    """
    You are an expert lead research agent specializing in finding high-quality contact information for specific professionals,
    researchers, and business contacts. Your mission is to conduct thorough, systematic research to identify leads that precisely
    match the user's criteria.
    """

    user_query: str = dspy.InputField()
    leads: LeadResults = dspy.OutputField()


single_agent = dspy.ReAct(
    SingleAgentSig, tools=[browse_web, get_website_map, get_website_content]
)

In [None]:
from rich import print as rprint
from src.agents.utils.build_final_query import build_final_query
from src.types import ResearchParams


user_query = build_final_query(
    ResearchParams(
        who_query="researchers",
        what_query="Human Nutrition",
        where_query="Edmonton",
        context_query="",
    )
)
result = await single_agent.acall(user_query=user_query)

rprint(result)

### MultiAgent


In [None]:
from rich import print as rprint
from src.agents.utils.build_final_query import build_final_query
from src.types import ResearchParams

dspy.enable_logging()


class MultiAgentSig(dspy.Signature):
    """
    You are an expert lead research agent specializing in finding high-quality contact information for specific professionals,
    researchers, and business contacts. Your mission is to conduct thorough, systematic research to identify leads that precisely
    match the user's criteria.

    You can use parallel tools calls and deploy a research agent to explore specfic branches of research.
    """

    user_query: str = dspy.InputField()
    leads: LeadResults = dspy.OutputField()


async def deploy_search_agent(search_query: str) -> LeadResults:
    """
    Deploy a search a research agent that you can use to explore specfic branches of research. This should be used as parallel tool calls.
    """
    return await single_agent.acall(user_query=search_query)


multi_agent = dspy.ReAct(
    MultiAgentSig,
    tools=[browse_web, get_website_map, get_website_content, deploy_search_agent],
)

user_query = build_final_query(
    ResearchParams(
        who_query="researchers",
        what_query="Human Nutrition",
        where_query="Edmonton",
        context_query="",
    )
)
result = await multi_agent.acall(user_query=user_query)

rprint(result)

In [None]:
rprint(result.leads)

### Parallel test


In [5]:
import logging
from typing import TYPE_CHECKING, Any, Callable, Literal, Type, List

from litellm import ContextWindowExceededError

import dspy
from dspy.adapters.types.tool import Tool
from dspy.signatures.signature import ensure_signature
from rich import print as rprint

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
    from dspy.signatures.signature import Signature


class ReAct(dspy.Module):
    def __init__(
        self, signature: Type["Signature"], tools: list[Callable], max_iters: int = 10
    ):
        """
        ReAct stands for "Reasoning and Acting," a popular paradigm for building tool-using agents.
        In this approach, the language model is iteratively provided with a list of tools and has
        to reason about the current situation. The model decides whether to call a tool to gather more
        information or to finish the task based on its reasoning process. The DSPy version of ReAct is
        generalized to work over any signature, thanks to signature polymorphism.

        Args:
            signature: The signature of the module, which defines the input and output of the react module.
            tools (list[Callable]): A list of functions, callable objects, or `dspy.Tool` instances.
            max_iters (Optional[int]): The maximum number of iterations to run. Defaults to 10.

        Example:

        ```python
        def get_weather(city: str) -> str:
            return f"The weather in {city} is sunny."

        react = dspy.ReAct(signature="question->answer", tools=[get_weather])
        pred = react(question="What is the weather in Tokyo?")
        ```
        """
        super().__init__()
        self.signature = signature = ensure_signature(signature)
        self.max_iters = max_iters

        tools = [t if isinstance(t, Tool) else Tool(t) for t in tools]
        tools = {tool.name: tool for tool in tools}

        inputs = ", ".join([f"`{k}`" for k in signature.input_fields.keys()])
        outputs = ", ".join([f"`{k}`" for k in signature.output_fields.keys()])
        instr = [f"{signature.instructions}\n"] if signature.instructions else []

        instr.extend(
            [
                f"You are an Agent. In each episode, you will be given the fields {inputs} as input. And you can see your past trajectory so far.",
                f"Your goal is to use one or more of the supplied tools to collect any necessary information for producing {outputs}.\n",
                "To do this, you will interleave next_thought, next_tool_name, and next_tool_args in each turn, and also when finishing the task.",
                "You can use multiple tools in each turn, and you can use the same tool multiple times in the same turn.",
                "After each tool call, you receive a resulting observation, which gets appended to your trajectory.\n",
                "When writing next_thought, you may reason about the current situation and plan for future steps.",
                "When selecting next_tool_name and its next_tool_args, the tools must be on the following list:\n",
            ]
        )

        tools["finish"] = Tool(
            func=lambda: "Completed.",
            name="finish",
            desc=f"Marks the task as complete. That is, signals that all information for producing the outputs, i.e. {outputs}, are now available to be extracted.",
            args={},
        )

        for idx, tool in enumerate(tools.values()):
            instr.append(f"({idx + 1}) {tool}")
        instr.append(
            "When providing `next_tool_args`, the value inside the field must be in JSON format"
        )

        react_signature = (
            dspy.Signature({**signature.input_fields}, "\n".join(instr))
            .append("trajectory", dspy.InputField(), type_=str)
            .append("next_thought", dspy.OutputField(), type_=str)
            .append(
                "next_tool_name",
                dspy.OutputField(),
                type_=List[Literal[tuple(tools.keys())]],
            )
            .append("next_tool_args", dspy.OutputField(), type_=List[dict[str, Any]])
        )

        fallback_signature = dspy.Signature(
            {**signature.input_fields, **signature.output_fields},
            signature.instructions,
        ).append("trajectory", dspy.InputField(), type_=str)

        self.tools = tools
        self.react = dspy.Predict(react_signature)
        self.extract = dspy.ChainOfThought(fallback_signature)

    def _format_trajectory(self, trajectory: dict[str, Any]):
        adapter = dspy.settings.adapter or dspy.ChatAdapter()
        trajectory_signature = dspy.Signature(f"{', '.join(trajectory.keys())} -> x")
        return adapter.format_user_message_content(trajectory_signature, trajectory)

    def forward(self, **input_args):
        trajectory = {}
        max_iters = input_args.pop("max_iters", self.max_iters)
        for idx in range(max_iters):
            try:
                pred = self._call_with_potential_trajectory_truncation(
                    self.react, trajectory, **input_args
                )
            except ValueError as err:
                logger.warning(
                    f"Ending the trajectory: Agent failed to select a valid tool: {_fmt_exc(err)}"
                )
                break

            rprint(pred)

            trajectory[f"thought_{idx}"] = pred.next_thought
            trajectory[f"tool_name_{idx}"] = pred.next_tool_name
            trajectory[f"tool_args_{idx}"] = pred.next_tool_args

            try:
                trajectory[f"observation_{idx}"] = self.tools[pred.next_tool_name](
                    **pred.next_tool_args
                )
            except Exception as err:
                trajectory[f"observation_{idx}"] = (
                    f"Execution error in {pred.next_tool_name}: {_fmt_exc(err)}"
                )

            if pred.next_tool_name == "finish":
                break

        extract = self._call_with_potential_trajectory_truncation(
            self.extract, trajectory, **input_args
        )
        return dspy.Prediction(trajectory=trajectory, **extract)

    async def aforward(self, **input_args):
        trajectory = {}
        max_iters = input_args.pop("max_iters", self.max_iters)
        for idx in range(max_iters):
            try:
                pred = await self._async_call_with_potential_trajectory_truncation(
                    self.react, trajectory, **input_args
                )
            except ValueError as err:
                logger.warning(
                    f"Ending the trajectory: Agent failed to select a valid tool: {_fmt_exc(err)}"
                )
                break

            print("printing pred")
            rprint(pred)
            print()

            trajectory[f"thought_{idx}"] = pred.next_thought
            trajectory[f"tool_name_{idx}"] = pred.next_tool_name
            trajectory[f"tool_args_{idx}"] = pred.next_tool_args

            try:
                trajectory[f"observation_{idx}"] = await self.tools[
                    pred.next_tool_name
                ].acall(**pred.next_tool_args)
            except Exception as err:
                trajectory[f"observation_{idx}"] = (
                    f"Execution error in {pred.next_tool_name}: {_fmt_exc(err)}"
                )

            if pred.next_tool_name == "finish":
                break

        extract = await self._async_call_with_potential_trajectory_truncation(
            self.extract, trajectory, **input_args
        )
        return dspy.Prediction(trajectory=trajectory, **extract)

    def _call_with_potential_trajectory_truncation(
        self, module, trajectory, **input_args
    ):
        for _ in range(3):
            try:
                return module(
                    **input_args,
                    trajectory=self._format_trajectory(trajectory),
                )
            except ContextWindowExceededError:
                logger.warning(
                    "Trajectory exceeded the context window, truncating the oldest tool call information."
                )
                trajectory = self.truncate_trajectory(trajectory)

    async def _async_call_with_potential_trajectory_truncation(
        self, module, trajectory, **input_args
    ):
        for _ in range(3):
            try:
                return await module.acall(
                    **input_args,
                    trajectory=self._format_trajectory(trajectory),
                )
            except ContextWindowExceededError:
                logger.warning(
                    "Trajectory exceeded the context window, truncating the oldest tool call information."
                )
                trajectory = self.truncate_trajectory(trajectory)

    def truncate_trajectory(self, trajectory):
        """Truncates the trajectory so that it fits in the context window.

        Users can override this method to implement their own truncation logic.
        """
        keys = list(trajectory.keys())
        if len(keys) < 4:
            # Every tool call has 4 keys: thought, tool_name, tool_args, and observation.
            raise ValueError(
                "The trajectory is too long so your prompt exceeded the context window, but the trajectory cannot be "
                "truncated because it only has one tool call."
            )

        for key in keys[:4]:
            trajectory.pop(key)

        return trajectory


def _fmt_exc(err: BaseException, *, limit: int = 5) -> str:
    """
    Return a one-string traceback summary.
    * `limit` - how many stack frames to keep (from the innermost outwards).
    """

    import traceback

    return (
        "\n"
        + "".join(
            traceback.format_exception(type(err), err, err.__traceback__, limit=limit)
        ).strip()
    )

In [None]:
from src.types import LeadResults

# Enable caching
dspy.settings.configure(lm=dspy.LM("openai/gpt-4.1"), track_usage=True)


class SingleAgentSig(dspy.Signature):
    """
    You are an expert lead research agent specializing in finding high-quality contact information for specific professionals,
    researchers, and business contacts. Your mission is to conduct thorough, systematic research to identify leads that precisely
    match the user's criteria.

    You can use multiple tools in each turn, having them ran in parallel on the same search.
    You can also use the same tool multiple times in the same turn.
    """

    user_query: str = dspy.InputField()
    leads: LeadResults = dspy.OutputField()


single_agent = ReAct(
    SingleAgentSig, tools=[browse_web, get_website_map, get_website_content]
)

In [None]:
from rich import print as rprint
from src.agents.utils.build_final_query import build_final_query
from src.types import ResearchParams


user_query = build_final_query(
    ResearchParams(
        who_query="researchers",
        what_query="Human Nutrition",
        where_query="Edmonton",
        context_query="",
    )
)
result = await single_agent.acall(user_query=user_query)

rprint(result)