# Tool Agent

In [None]:
!pip install python-dotenv langchain-community wikipedia duckduckgo-search

In [2]:
import os

from dotenv import load_dotenv

load_dotenv()
model_path = os.getenv("MODEL_PATH")

In [None]:
import logging
from app.llm.llamacpp.service import LlamaCppService

logging.basicConfig(level=logging.INFO)

llm = LlamaCppService(model_path=model_path, n_gpu_layers=-1)

In [4]:
import enum
import logging
from typing import Optional, List

from pykka import ActorRef

from app.core.agents.base import AgentBase
from app.core.llm.generator import Generator
from app.core.memory.memory import Memory
from app.core.persona import Persona
from app.core.states.base import StateBase, Transition
from app.states.final_answer.state import FinalAnswerState
from app.core.tools.adapter import ToolAdapter

logger = logging.getLogger(__name__)


class DelegationStates(str, enum.Enum):
    DELEGATE = "delegate"
    FINAL_ANSWER = "final_answer"


class DelegationAgent(AgentBase):
    name: str = "DelegationAgent"
    description: str = "Delegates tasks to worker agents as needed."

    def __init__(self,
                 workers: List[ActorRef],
                 **args):
        super().__init__(**args)
        self.memory.data.set("workers", workers)


class DelegateState(StateBase):
    name: str = DelegationStates.DELEGATE

    def build_prompt(self, persona: Persona, memory: Memory, _: Optional[List[ToolAdapter]]) -> str:
        workers = memory.data.get("workers")
        worker_descriptions = [f"{worker.proxy().name.get()}: {worker.proxy().description.get()}" for worker in workers]
        prompt = f'''{persona.prompt()}

Given the problem, determine which of your workers can help solve the problem and let them do it.

If none of your workers can help solve the problem, respond with "NONE: " and give the reason after.

Current Problem:
"""
{memory.data.get_current_message().content}
"""

Workers:
"""
{worker_descriptions}
"""

Respond with only the name of the worker that can help solve the problem.'''
        return prompt

    def after_generation(self, generation: str, memory: Memory, _: Optional[List[ToolAdapter]]) -> Transition:
        if "NONE: " in generation:
            reason = generation.split("NONE: ")[1].strip()
            logger.error(f"No suitable worker found: {reason}")
            return Transition(updated_response=reason, next_state=DelegationStates.FINAL_ANSWER)

        worker = next((worker for worker in memory.data.get("workers") if worker.proxy().name.get() == generation),
                      None)
        if worker:
            result = worker.ask(Query(goal=memory.data.get_current_message().content)).get()
            if result:
                return Transition(
                    updated_response=f"Delegated to: {generation}\nOutput: {result.final_output}",
                    next_state=DelegationStates.FINAL_ANSWER
                )
            else:
                logger.error("Failed to get a response from the worker.")
                return Transition(updated_response="Failed to get a response from the worker.",
                                  next_state=DelegationStates.FINAL_ANSWER)

        logger.error("No worker found with the specified name.")
        return Transition(updated_response="No worker found with that name.", next_state=DelegationStates.FINAL_ANSWER)


In [5]:
from typing import List, Optional
import enum

from app.core.agents.base import AgentBase
from app.core.states.base import StateBase, Transition
from app.core.memory.memory import Memory
from app.core.persona import Persona
from app.core.tools.adapter import ToolAdapter
from app.states.defaults import tools_handler


class SearchStates(str, enum.Enum):
    SEARCH = "search"
    FINAL_ANSWER = "final_answer"


class SearchAgent(AgentBase):
    name: str = "SearchAgent"
    description: str = "Specializes in searching for information."
    pass


class SearchState(StateBase):
    name: str = SearchStates.SEARCH

    def build_prompt(self, persona: Persona, memory: Memory, tools: Optional[List[ToolAdapter]]) -> str:
        messages_formatted = '\n'.join(
            [f"{message.name}: {message.content}" for message in memory.data.get_all_messages()])
        current_problem = memory.data.get_current_message().content

        prompt = f'''{persona.prompt()}

Search for information using your tools to help solve the following problem for the user. Use any previous messages as context.

Current Problem:
"""
{current_problem}
"""

Previous Messages:
"""
{messages_formatted}
"""

Here are the schemas for the tools you have access to, pick only one:
"""
{[tool.schema() for tool in tools]}
"""

Respond with the JSON input for the tool of your choice to best solve the problem.'''
        return prompt

    def after_generation(self, generation: str, memory: Memory, tools: Optional[List[ToolAdapter]]) -> Transition:
        return tools_handler(
            response=generation,
            memory=memory,
            tools=tools,
            next_state=SearchStates.FINAL_ANSWER,
            save_data_key="search_results"
        )


In [6]:
from langchain_community.tools import DuckDuckGoSearchRun
from app.tools.langchain.wrapper import LangChainToolWrapper

search_agent = SearchAgent.start(
    persona=Persona(description="You're an expert researcher."),
    memory=Memory(),
    states=[
        SearchState(
            generator=Generator(service=llm, use_json_model=True, temperature=0.1),
            tools=[
                LangChainToolWrapper.create(DuckDuckGoSearchRun())
            ]
        ),
        FinalAnswerState(
            generator=Generator(service=llm, temperature=0.3)
        )
    ],
    default_initial_state=SearchStates.SEARCH,
    step_limit_state_name=SearchStates.FINAL_ANSWER
)

delegate_agent = DelegationAgent.start(
    workers=[search_agent],
    persona=Persona(description="You're an a helpful assistant. Help the user solve their problem."),
    memory=Memory(),
    states=[
        DelegateState(
            generator=Generator(service=llm, use_json_model=False, temperature=0.1),
        ),
        FinalAnswerState(
            generator=Generator(service=llm, temperature=0.3)
        )
    ],
    default_initial_state=DelegationStates.DELEGATE,
    step_limit_state_name=DelegationStates.FINAL_ANSWER
)

In [7]:
import json
from app.core.messages import Query

future = delegate_agent.ask(Query(initial_state="delegate",
                                  goal="Find the latest performance on the NVDA stock."))
response = future.get(timeout=120)
print(f"Steps: {json.dumps([step.model_dump() for step in response.metadata['steps']], indent=2)}")
print(
    f"Total tokens: {sum((step.token_usage.total_tokens if step.token_usage is not None else 0) for step in response.metadata.get('steps', []))}")
print(response.final_output)

Steps: [
  {
    "state_name": "DelegationStates.DELEGATE",
    "next_state": "DelegationStates.FINAL_ANSWER",
    "prompt": "You're an a helpful assistant. Help the user solve their problem.\n\nGiven the problem, determine which of your workers can help solve the problem and let them do it.\n\nIf none of your workers can help solve the problem, respond with \"NONE: \" and give the reason after.\n\nCurrent Problem:\n\"\"\"\nFind the latest performance on the NVDA stock.\n\"\"\"\n\nWorkers:\n\"\"\"\n['SearchAgent: Specializes in searching for information.']\n\"\"\"\n\nRespond with only the name of the worker that can help solve the problem.",
    "output": "Delegated to: SearchAgent\nOutput: The latest performance for NVDA stock as of the market close is that Nvidia (NVDA) reached a price of $905.54, with a -1.72% movement compared to the previous day. This change lagged the S&P 500's daily gain of 0.13%. The Dow saw an increase as well. Analysts have set share price targets for NVDA ra

In [8]:
future = delegate_agent.ask(Query(initial_state="delegate",
                                  goal="How does their performance compare to AMD?"))
response = future.get(timeout=120)
print(f"Steps: {json.dumps([step.model_dump() for step in response.metadata['steps']], indent=2)}")
print(f"Total tokens: {sum((step.token_usage.total_tokens if step.token_usage is not None else 0) for step in response.metadata.get('steps', []))}")
print(response.final_output)

Steps: [
  {
    "state_name": "DelegationStates.DELEGATE",
    "next_state": "DelegationStates.FINAL_ANSWER",
    "prompt": "You're an a helpful assistant. Help the user solve their problem.\n\nGiven the problem, determine which of your workers can help solve the problem and let them do it.\n\nIf none of your workers can help solve the problem, respond with \"NONE: \" and give the reason after.\n\nCurrent Problem:\n\"\"\"\nHow does their performance compare to AMD?\n\"\"\"\n\nWorkers:\n\"\"\"\n['SearchAgent: Specializes in searching for information.']\n\"\"\"\n\nRespond with only the name of the worker that can help solve the problem.",
    "output": "Delegated to: SearchAgent\nOutput: Based on the latest market data and research findings from my notes, Nvidia (NVDA) has been performing relatively well in comparison to AMD. As of the market close, NVDA reached a price of $905.54 with a -1.72% movement compared to the previous day, which lagged the S&P 500's daily gain of 0.13%. Howeve

In [9]:
search_agent.stop()
delegate_agent.stop()

True