In [1]:
tools = [{'type': 'function', 'name': 'get_weather', 'description': 'Get current temperature for provided coordinates in celsius', 'parameters': {'type': 'object', 'properties': {'latitude': {'title': 'latitude', 'type': 'string'}, 'longitude': {'title': 'longitude', 'type': 'string'}}, 'required': ['latitude', 'longitude']}}, {'type': 'function', 'name': 'wiki_search', 'description': 'Get current temperature for provided coordinates in celsius', 'parameters': {'type': 'object', 'properties': {'query': {'title': 'Query', 'type': 'string'}, 'sentences': {'default': 2, 'title': 'Sentences', 'type': 'integer'}}, 'required': ['query']}}, {'type': 'function', 'name': 'save_txt', 'description': 'Save text to a .txt file', 'parameters': {'type': 'object', 'properties': {'text': {'title': 'Text', 'type': 'string'}, 'filename': {'default': 'output.txt', 'title': 'Filename', 'type': 'string'}}, 'required': ['text']}}, {'type': 'function', 'name': 'arxiv_search', 'description': 'Search arxiv', 'parameters': {'type': 'object', 'properties': {'query': {'title': 'Query', 'type': 'string'}}, 'required': ['query']}}]

In [2]:
tools

[{'type': 'function',
  'name': 'get_weather',
  'description': 'Get current temperature for provided coordinates in celsius',
  'parameters': {'type': 'object',
   'properties': {'latitude': {'title': 'latitude', 'type': 'string'},
    'longitude': {'title': 'longitude', 'type': 'string'}},
   'required': ['latitude', 'longitude']}},
 {'type': 'function',
  'name': 'wiki_search',
  'description': 'Get current temperature for provided coordinates in celsius',
  'parameters': {'type': 'object',
   'properties': {'query': {'title': 'Query', 'type': 'string'},
    'sentences': {'default': 2, 'title': 'Sentences', 'type': 'integer'}},
   'required': ['query']}},
 {'type': 'function',
  'name': 'save_txt',
  'description': 'Save text to a .txt file',
  'parameters': {'type': 'object',
   'properties': {'text': {'title': 'Text', 'type': 'string'},
    'filename': {'default': 'output.txt',
     'title': 'Filename',
     'type': 'string'}},
   'required': ['text']}},
 {'type': 'function',
  'n

In [3]:
from openai import OpenAI
from pydantic import BaseModel, Field
from typing import Literal, Optional, List
import logging
import asyncio
import os
from dotenv import load_dotenv
from pathlib import Path
load_dotenv(Path("../.env"))

True

In [4]:
class OpenAIClient:
    def __init__(self, api_key: str) -> None:
        """
        Args:
            model_name: The name of the openai model we are using
            api_key: The api key for our openai model
        Returns:
        """
        self.client = OpenAI(api_key=api_key)

    def get_client(self) -> OpenAI:
        """
        Args:
            None

        Returns:
            The openai client
        """
        return self.client

In [5]:
llm = OpenAIClient(api_key=os.getenv("OPENAI_API_KEY")).get_client()

In [6]:
# Define prompt for planner agent
PLANNER_AGENT_PROMPT = """
You are an expert essay writer planner.
You take in essay writing request on a given topic, and create comprehensive plans, breaking down the main task of writing an essay into smaller actionable tasks.

CORE PRINCIPLE: Be direct and action-oriented. Minimize follow-up questions.

DEFAULT ASSUMPTIONS FOR REQUESTS:
- The request is about writing an essay on a given topic.
- The request might be vague or unclear, one word, or unclear intent
- The request might be very specific or clear

IMMEDIATE PLANNING APPROACH:
**WORKFLOW:**
1. Always start by creating a plan for writing an essay with detailed tasks.
2. Plan should consist of multiple tasks, This is an example but is not limited to [write introduction, research topic, write folow-up, write followup, write conclusion, review, edit, proofread, return essay]
3. Plan should be specific and actionable
4. For each task in the plan, you MUST assign a tool to perform the task. Fail to do so will result in an task FAIL.
7. YOU must determine how many body paragraphs are sufficient to address the topic.
8. Tools will be given to you. YOU ARE NOT TO CALL THEM. You will ONLY suggest their ussage to appropriate tasks.

MINIMAL QUESTIONS STRATEGY:
- For vauge requests such as single words: generate an interesting topic ie: star wars -> star wars impact on society, then plan and create tasks
- For detailed requests: Create multiple tasks 

You will be given a output format that you must adhere to.

Generate plans immediately without asking follow-up questions unless absolutely necessary.
"""


# Define the prompt for the Orchestrator Agent
ORCHESTRATOR_AGENT_PROMPT = """
You are an Orchestrator Agent specialized in coordinating complex essay writing tasks.
Your task is to recive a plan from the Planner Agent and coordinate the execution of tasks.
Each task will contain a description and tool suggestion to complete the task (ie possible tools to call).
YOU have FINAL DECISION MAKING POWER. IF YOU decide another tool is better, you MUST call it. IF not use the
suggested tool. Tools will be given to you and you MUST use them to perform each task with the given description.
FAILURE TO CALL A TOOL WILL RESULT IN A TASK FAIL


When a user makes a complex request, analyze it and determine which specialized agents should be involved:
- wiki_search Tool: For finding general information on a topic.
- web_search Tool: For finding real information on a topic.
- arxiv_search Tool: For findimng scientific information on a topic.

Create a workflow that efficiently coordinates these tools to provide comprehensive results.

**WORKFLOW:**
1. Start by analyzing the task description and determine which tools should be called
2. Call the appropriate tools to complete the task
3. If task fails, attempt to find alternative tools to call
4. If task succeeds, proceed to the next task and give the results of the previous task as input to the next task
5. Repeat steps 2-4 until all tasks are completed


Always provide clear status updates and coordinate the results from different tool calls into a cohesive response.

You will be given a output format that you must adhere to.

"""

# System prompt for the agent
SYSTEM_PROMPT = """You are an order processing assistant. Your task is to analyze emails and extract 
order information to create purchase orders. Be precise with quantities, product names, and other details.
When in doubt, ask for clarification."""

In [7]:
class ToolArguments(BaseModel):
    query: str = Field(description="The query or input for the tool.")
    retries: Optional[int] = Field(default=0, description="Number of retries allowed.")


class ToolCall(BaseModel):
    """Represents a tool call request from the LLM."""

    id: str = Field(description="The ID of the tool call.")
    name: str = Field(description="The name of the tool to call.")
    arguments: ToolArguments = Field(description="The arguments to call the tool with.")


class ToolCalls(BaseModel):
    id: int = Field(description="An ID for the tool calls")
    tool_calls: List[ToolCall] = Field(
        description="A list of tools to be executed sequentially."
    )



class PlannerTask(BaseModel):
    """Represents a single task generated by the Planner."""

    id: int = Field(description="Sequential ID for the task.")
    description: str = Field(
        description="Clear description of the task to be executed."
    )
    tool_suggestions: str = Field(
        description="A list of tool suggestions to be executed."
    )
    status: Optional[
        Literal[
            "input_required",
            "completed",
            "error",
            "pending",
            "incomplete",
            "todo",
            "not_started",
        ]
    ] = Field(default="input_required", description="Status of the task")


class Plan(BaseModel):
    """Output schema for the Planner Agent."""

    original_query: str = Field(description="The original user query for context.")
    description: str = Field(description="Clear description of the overall plan.")
    tasks: List[PlannerTask] = Field(
        description="A list of tasks to be executed sequentially."
    )


class ToolResult(BaseModel):
    """Represents the result of a tool execution."""

    tool_call_id: str = Field(description="The ID of the tool call this result is for.")
    result: str = Field(description="The result of the tool execution.")
    is_error: bool = Field(
        default=False, description="Whether the tool execution resulted in an error."
    )


class ResponseFormat(BaseModel):
    """Respond to the user in this format."""

    status: Literal["input_required", "completed", "error"] = "input_required"
    question: str = Field(
        description="Input needed from the user to generate the code search plan"
    )
    content: Plan = Field(
        description="List of tasks when the code search plan is generated"
    )


class TaskExecutionResponse(BaseModel):
    """Represents a single task generated by the Planner."""

    id: int = Field(description="Id of task we are executing.")
    description: str = Field(
        description="Clear description of the task to be executed."
    )
    tools_sueggested: str = Field(
        description="A list of the tools suggested for the task"
    )
    tool_calls: List[ToolCall] = Field(description="A list of tool calls to be executed.")
    response_type: Optional[
        Literal[
            "tool_calls",
            "text",
        ]
    ] = Field(default="input_required", description="The response type of the task execution")

In [8]:
logger = logging.getLogger(__name__)


class PlannerAgent:
    def __init__(
        self,
        dev_prompt,
        llm,
        messages,
        tools,
        model_name: str = "gpt-4.1-mini",
    ):
        self.model_name = model_name
        self.dev_prompt = dev_prompt
        self.llm = llm
        self.messages = messages
        self.tools = tools
        if self.dev_prompt:
            self.messages.append({"role": "developer", "content": self.dev_prompt})
        self.llm = OpenAI()  # Instantiate internally

    def add_messages(self, query: str):
        self.messages.append({"role": "user", "content": query})

    def plan(self, query: str):
        """Create a detailed plan to complete the request of the user.

        Args:
            query (str): The request of the user.

        Returns:
            Plan: The plan to complete the request of the user.
        """
        self.add_messages(query=query)
        response = self.llm.responses.parse(
            model=self.model_name,
            input=self.messages,
            tools=self.tools,
            text_format=Plan,
        )
        return response

In [35]:
class OrchestratorAgent:
    """OrchestratorAgent class.

    Methods:


    Attributes:


    """

    def __init__(
        self,
        dev_prompt: str,
        llm: OpenAI,
        messages: list[dict],
        tools: list[dict],
        model_name: str = "gpt-4.1-mini",
    ):
        """
        Initialize the OrchestratorAgent.

        Args:
            dev_prompt (str): The developer prompt.
            mcp_client (MCPClient): The MCP client.
            llm (OpenAI): The LLM client.
            messages (list[dict]): The input messages.
            tools (list[dict]): The tools.
            model_name (str): The name of the model.
        """
        self.model_name = model_name
        self.dev_prompt = dev_prompt
        self.llm = llm
        self.messages = messages
        self.tools = tools
        if self.dev_prompt:
            self.messages.append({"role": "developer", "content": self.dev_prompt})

    async def call_tool(self, tool_calls: list[dict]) -> list[dict]:
        """Receives a list of tool calls and calls the tools

        Args:
            tool_calls: Either a list of tool call dicts or a string error message

        Returns:
            list[dict]: The results of the tool calls or error information
        """
        # If we received an error message instead of tool calls
        if isinstance(tool_calls, str):
            return [{"error": True, "message": tool_calls}]

        # # Ensure tool_calls is a list
        if not isinstance(tool_calls, list):
            return [
                {
                    "error": True,
                    "message": f"Expected list of tool calls, got {type(tool_calls).__name__}",
                }
            ]

        results = []  # Tool call results
        for tool in tool_calls:  # For each tool
            try:  # Try to call the tool
                if not isinstance(tool, dict):  # If tool is not a dict return error
                    results.append(
                        {
                            "error": True,
                            "message": f"Expected dict, got {type(tool).__name__}",
                        }
                    )
                    continue
                # Extract tool name and arguments
                name = tool.get("name")
                args = tool.get("arguments", {})

                # If tool name is missing return error
                if not name:
                    results.append(
                        {"error": True, "message": "Tool call missing 'name' field"}
                    )
                    continue

                # Call the tool through MCP client
                result = await self.mcp_client.call_tool(name, args)
                # append tool call reults. Includes name, arguments, and result
                results.append(
                    {"name": name, "arguments": args, "result": result, "error": False}
                )

            # Handle exceptions
            except Exception as e:
                results.append(
                    {
                        "error": True,
                        "name": name if "name" in locals() else "unknown",
                        "message": f"Error calling tool: {str(e)}",
                    }
                )

        return results

    def call_tools(self, tool_calls: list[ToolCall]) -> list[dict]:
        """Receives a list of tool calls and calls the tools

        Args:
            tool_calls: Either a list of tool call dicts or a string error message

        Returns:
            list[dict]: The results of the tool calls or error information
        """
        # If we received an error message instead of tool calls
        if isinstance(tool_calls, str):
            return [{"error": True, "message": tool_calls}]

        # Ensure tool_calls is a list
        if not isinstance(tool_calls, list):
            return [
                {
                    "error": True,
                    "message": f"Expected list of tool calls, got {type(tool_calls).__name__}",
                }
            ]

        results: list[dict] = []
        for i in range(len(tool_calls)):
            tool_call = tool_calls[i]
            result = 'hello'
            result.append(
                {
                    "name": tool_call.name,
                    "arguments": tool_call.arguments,
                    "result": result,
                }
            )
        # Call the tools
        return results

    async def execute_task(
        self, task: PlannerTask, previous_task_results: list
    ) -> list[dict]:
        """Execute the given task generated by the planner agent

        Args:
            task: The task to execute.

        Returns:
            A list of results, each containing:
                - name: The name of the tool called.
                - arguments: The arguments passed to the tool.
                - result: The result of the tool call.
                - error: A boolean indicating whether an error occurred.
        """

        self.print_task(task, previous_task_results)

        messages = [
            {"role": "assistant", "content": str(task.id)},
            {"role": "assistant", "content": f"Tool suggestions: {task.tool_suggestions}"},
            {"role": "assistant", "content": f"Task status: {task.status}"},
            {
                "role": "assistant",
                "content": f"Previous Task Results: {previous_task_results}",
            },
            {
                "role": "user",
                "content": f"You must execute the following task: {task.description}",
            },
        ]

        response = self.llm.responses.parse(
            model=self.model_name,
            input=messages,
            tools=self.tools,
            tool_choice="auto",
            text_format=ToolCalls,
        )

        tool_calls = []
        print(f'RESPONSE: {response} \n')
        print(f'RESPONSE: {response.output[0]} \n')
        print(f'RESPONSE.OUTPUT[0] attribues: {dir(response.output[0])} \n')
        print()
        for i in range(len(response.output)):
            tool_call = response.output[i]
            tool_calls.append(
                {
                    'name': tool_call.name,
                    'arguments': tool_call.arguments
                }
            )
        

        
        results = response
        return results

    def print_task(self, task: PlannerTask, previous_task_results: list) -> None:
        """Print the given task generated by the planner agent

        Args:
            task: The task to print.
            previous_task_results: The results of the previous task.

        Returns:
            None
        """
        print(f"""
              Executing task: \n
              Task ID: {task.id} \n
              Task description: {task.description} \n
              Tool suggestions: {task.tool_suggestions} \n
              Task status: {task.status} \n
              Previous Task Results: {previous_task_results}
        """)

    def print_plan(self, plan: Plan) -> None:
        """Print the given plan generated by the planner agent

        Args:
            plan: The plan to print.

        Returns:
            None
        """
        print(f"""
              Plan to execute is: \n
              Plan ID: {plan.original_query}
              Plan Description: {plan.description}
         """)

    async def execute_plan(self, plan: Plan) -> list:
        """Execute the given plan generated by the planner agent

        Args:
            plan: The plan to execute.

        Returns:
            A list of results
        """
        self.print_plan(plan)
        results = ...
        prev_task_results = ""
        for i in range(len(plan.tasks)):
            task: PlannerTask = plan.tasks[i]
            res = await self.execute_task(task, prev_task_results)
            results = res
            prev_task_results = res
            break

        return results

In [36]:
orchestrator = OrchestratorAgent(
    dev_prompt=ORCHESTRATOR_AGENT_PROMPT,
    llm=llm,
    messages=[],
    tools=tools,
    model_name="gpt-4.1-mini",
            )
logger.info("Successfully initialized OrchestratorAgent")

In [11]:
            planner = PlannerAgent(
                dev_prompt=PLANNER_AGENT_PROMPT,
                llm=llm,
                messages=[],
                tools=tools,
                model_name="gpt-4.1-mini",
            )
            logger.info("Successfully initialized PlannerAgent")

In [12]:
content = "write an essay on the culture impact of the internet"

In [13]:
plan = planner.plan(content)

In [14]:
f'PLAN TYPE: {type(plan)}'

"PLAN TYPE: <class 'openai.types.responses.parsed_response.ParsedResponse[Plan]'>"

In [15]:
plan

ParsedResponse[Plan](id='resp_688a854fa25c81a18b59e27fb70e4dbc0f54ac052f1df4d6', created_at=1753908559.0, error=None, incomplete_details=None, instructions=None, metadata={}, model='gpt-4.1-mini-2025-04-14', object='response', output=[ParsedResponseOutputMessage[Plan](id='msg_688a855005a881a19ebfcc461bcb663a0f54ac052f1df4d6', content=[ParsedResponseOutputText[Plan](annotations=[], text='{\n  "original_query": "write an essay on the culture impact of the internet",\n  "description": "Create a comprehensive essay discussing the cultural impact of the internet, covering its influence on communication, social behavior, information access, and global interconnectedness.",\n  "tasks": [\n    {\n      "id": 1,\n      "description": "Research the cultural impact of the internet, focusing on its effects on communication, social interaction, information dissemination, and global connections.",\n      "tool_suggestions": "Use an online search or Wiki search tool to gather summarized information a

In [16]:
plan_parsed: Plan = plan.output_parsed

In [17]:
f'PLAN PARSED: {plan_parsed}'

'PLAN PARSED: original_query=\'write an essay on the culture impact of the internet\' description=\'Create a comprehensive essay discussing the cultural impact of the internet, covering its influence on communication, social behavior, information access, and global interconnectedness.\' tasks=[PlannerTask(id=1, description=\'Research the cultural impact of the internet, focusing on its effects on communication, social interaction, information dissemination, and global connections.\', tool_suggestions=\'Use an online search or Wiki search tool to gather summarized information and reliable sources about the cultural impact of the internet.\', status=\'todo\'), PlannerTask(id=2, description="Write an engaging introduction that presents the topic and outlines the main points to be discussed about the internet\'s cultural impact.", tool_suggestions=\'Use a text generation tool to draft the introduction based on researched information.\', status=\'todo\'), PlannerTask(id=3, description=\'Wri

In [37]:
res = await orchestrator.execute_plan(plan_parsed)


              Plan to execute is: 

              Plan ID: write an essay on the culture impact of the internet
              Plan Description: Create a comprehensive essay discussing the cultural impact of the internet, covering its influence on communication, social behavior, information access, and global interconnectedness.
         

              Executing task: 

              Task ID: 1 

              Task description: Research the cultural impact of the internet, focusing on its effects on communication, social interaction, information dissemination, and global connections. 

              Tool suggestions: Use an online search or Wiki search tool to gather summarized information and reliable sources about the cultural impact of the internet. 

              Task status: todo 

              Previous Task Results: 
        


AttributeError: 'ParsedResponseOutputMessage[ToolCalls]' object has no attribute 'name'

In [34]:
f'RES TYPE: {type(res)}'

"RES TYPE: <class 'openai.types.responses.parsed_response.ParsedResponse[ToolCalls]'>"

In [None]:
res

In [None]:
dir(res)

In [None]:
f'RES[0] TYPE: {type(res[0])}'

In [None]:
res[0]

In [None]:
dir(res[0])

In [None]:
res[0].output[1].name

In [None]:
res[0].output[1].arguments

In [None]:
res[0].output[0].name

In [None]:
res[0].output[1].arguments

In [None]:
for i in range(len(res[0].output)):
    tool_call = res[0].output[i]
    print(tool_call.name)
    print(tool_call.arguments)
    print()

In [None]:
f'EXCUTE TASK RESULT: {res}'