In [48]:
from openai import OpenAI
from pydantic import BaseModel, Field
from typing import Literal, Optional, List
import logging
import asyncio
import os
from dotenv import load_dotenv
from pathlib import Path
import pickle

load_dotenv(Path("../../.env"))
logger = logging.getLogger(__name__)

In [49]:
class OpenAIClient:
    def __init__(self, api_key: str) -> None:
        """
        Args:
            model_name: The name of the openai model we are using
            api_key: The api key for our openai model
        Returns:
        """
        self.client = OpenAI(api_key=api_key)

    def get_client(self) -> OpenAI:
        """
        Args:
            None

        Returns:
            The openai client
        """
        return self.client

In [50]:
llm = OpenAIClient(api_key=os.getenv("OPENAI_API_KEY")).get_client()

In [51]:
from typing import Any, Dict, Optional, Union
from fastmcp.client.client import Client
from contextlib import asynccontextmanager


class MCPClient:
    def __init__(self, config: Union[str, dict] = "http://localhost:8050/sse"):
        """Initialize the MCP client.

        Args:
            config (Union[str, dict]): Either a URL string or a configuration dictionary.
                If string: Treated as the URL of the MCP server.
                If dict: Should follow the MCP configuration format with 'mcpServers' key.
        """
        self.config = config
        self._client = None
        self._is_connected = False

    async def connect(self):
        """Connect to the MCP server(s)."""
        if self._is_connected:
            return

        if isinstance(self.config, str):
            # For SSE transport, we just need the URL
            self._client = Client(self.config)
        else:
            # Configuration mode with multiple servers
            self._client = Client(self.config)

        await self._client.__aenter__()
        self._is_connected = True

    async def disconnect(self):
        """Disconnect from the MCP server(s)."""
        if self._is_connected and self._client:
            await self._client.__aexit__(None, None, None)
            self._is_connected = False
            self._client = None

    @asynccontextmanager
    async def session(self):
        """Context manager for session management."""
        try:
            await self.connect()
            yield self
        finally:
            await self.disconnect()

    async def list_servers(self) -> list:
        """List available MCP servers."""
        if not self._is_connected:
            raise RuntimeError("Not connected to MCP server(s)")
        return list(self._client.servers.keys())

    async def list_tools(self) -> list:
        """List available tools.

        Returns:
            list: List of available tools.
        """
        if not self._is_connected:
            raise RuntimeError("Not connected to MCP server(s)")
        return await self._client.list_tools()

    async def get_tools(self) -> list[dict[str, Any]]:
        """Retrieve tools in a format compatible with OpenAI function calling.

        Returns:
            list[dict[str, Any]]: List of tools in OpenAI function calling format.
        """
        if not self._is_connected:
            raise RuntimeError("Not connected to MCP server(s)")

        tools = await self.list_tools()
        openai_tools = []

        for tool in tools:
            openai_tools.append(
                {
                    "type": "function",
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": {
                        "type": "object",
                        "properties": tool.inputSchema.get("properties", {}),
                        "required": tool.inputSchema.get("required", []),
                    },
                }
            )

        return openai_tools

    async def call_tool(
        self, tool_name: str, arguments: Dict[str, Any], server: Optional[str] = None
    ) -> Any:
        """Call a tool.

        Args:
            tool_name (str): The name of the tool to call.
            arguments (Dict[str, Any]): The arguments to pass to the tool.
            server (str, optional): Specific server to call the tool on.
                                 If None, the client will try to find the tool
                                 in any of the available servers.

        Returns:
            Any: The result of the tool call.
        """
        if not self._is_connected:
            raise RuntimeError("Not connected to MCP server(s)")

        result = await self._client.call_tool(tool_name, arguments, server)
        return result.content[0].text if result.content else None

In [52]:
        mcp_client = MCPClient()
        await mcp_client.connect()

        logger.info("Getting tools from MCP...")
        tools = await mcp_client.get_tools()
        logger.info(f"Loaded {len(tools)} tools from MCP")

Error in sse_reader
Traceback (most recent call last):
  File "/Users/gael/git-repos/essay_writing_ai_agent/.venv/lib/python3.13/site-packages/httpx/_transports/default.py", line 101, in map_httpcore_exceptions
    yield
  File "/Users/gael/git-repos/essay_writing_ai_agent/.venv/lib/python3.13/site-packages/httpx/_transports/default.py", line 271, in __aiter__
    async for part in self._httpcore_stream:
        yield part
  File "/Users/gael/git-repos/essay_writing_ai_agent/.venv/lib/python3.13/site-packages/httpcore/_async/connection_pool.py", line 407, in __aiter__
    raise exc from None
  File "/Users/gael/git-repos/essay_writing_ai_agent/.venv/lib/python3.13/site-packages/httpcore/_async/connection_pool.py", line 403, in __aiter__
    async for part in self._stream:
        yield part
  File "/Users/gael/git-repos/essay_writing_ai_agent/.venv/lib/python3.13/site-packages/httpcore/_async/http11.py", line 342, in __aiter__
    raise exc
  File "/Users/gael/git-repos/essay_writing_a

In [53]:
tools

[{'type': 'function',
  'name': 'get_weather',
  'description': 'Get current temperature for provided coordinates in celsius',
  'parameters': {'type': 'object',
   'properties': {'latitude': {'title': 'latitude', 'type': 'string'},
    'longitude': {'title': 'longitude', 'type': 'string'}},
   'required': ['latitude', 'longitude']}},
 {'type': 'function',
  'name': 'wiki_search',
  'description': 'Search wikipedia for the given query and returns a summary.',
  'parameters': {'type': 'object',
   'properties': {'query': {'title': 'Query', 'type': 'string'},
    'sentences': {'default': 2, 'title': 'Sentences', 'type': 'integer'}},
   'required': ['query']}},
 {'type': 'function',
  'name': 'save_txt',
  'description': 'Save text to a .txt file',
  'parameters': {'type': 'object',
   'properties': {'text': {'title': 'Text', 'type': 'string'},
    'filename': {'default': 'output.txt',
     'title': 'Filename',
     'type': 'string'}},
   'required': ['text']}},
 {'type': 'function',
  'n

In [54]:
# Define prompt for planner agent
PLANNER_AGENT_PROMPT = """
You are an expert essay writer planner.
You take in essay writing request on a given topic or review an essay, and create comprehensive plans, breaking down the main task of writing an essay into smaller actionable tasks.

CORE PRINCIPLE: Be direct and action-oriented. Minimize follow-up questions.

DEFAULT ASSUMPTIONS FOR REQUESTS:
- The request is about writing an essay on a given topic.
- The request might be vague or unclear, one word, or unclear intent
- The request might be very specific or clear
- Request might be to simply review the essay

IMMEDIATE PLANNING APPROACH:
**WORKFLOW:**
1. Always start by creating a plan (for writing an or reviewing an essay) with detailed tasks.
2. Plan should consist of multiple tasks, 
3. Plan should be specific and actionable
4. For each task in the plan, you MUST assign a tool to perform the task. FAILURE to do so will result in task FAIL.
5. YOU must determine how many body paragraphs are sufficient to address the topic.


SAMPLE PLAN FOR WRITING ESSAY (NOT LIMITED TO ONLY THESE STEPS)
Research topic/query,
Select main points to use from research,
Write introduction with some given context (ie research notes),
write folow-up to introduction,
Put what we have of the essay so far together (YOU DECIDE WHEN TO DO THAT)
write followup to previous,
write conclusion,
review,
edit,
proofread,
save essay
return essay

SAMPLE PLAN FOR REVIEWING ESSAY (NOT LIMITED TO ONLY THESE STEPS)
review,
edit,
proofread,
Put what we have of the essay so far together (YOU DECIDE WHEN TO DO THAT)
save essay
return essay


TOOL CALLING STRATEGY:
- YOU MUST ASSIGN TOOLS TO EACH TASK
- FAILURE TO ASSIGN TOOLS TO EACH TASK WILL RESULT IN TASK FAILURE AND OVERALL PLAN FAILURE
- AVOID repetative tool calls
- Use tools APPROPRIATELY
Example of GOOD tool call 
Task= "research this topic with a this query" -> call research tool
Task ="need to write about a certain topic" -> call writing tool
Example of BAD tool call
Task= "Write about a certain topic" -> call research tool
Tool usage MUST make sense with task

MINIMAL QUESTIONS STRATEGY:
- For vauge requests such as single words: generate an interesting topic ie: star wars -> star wars impact on modern culture, then plan and create tasks
- For detailed requests: Create multiple tasks 

You will be given a output format that you must adhere to.

Generate plans immediately without asking follow-up questions unless absolutely necessary.
"""

In [55]:
from typing import Any, Dict, List, Optional, Literal, ForwardRef

class ToolArguments(BaseModel):
    keys: List[str] = Field(description="A list of arguments to a tool")
    values: List[str] = Field(description="A list of argument values to a tool")
    # arguments: Dict[str, Any] = Field(description="A dictionary where keys are tool arguments and values are the tool call values")


class ToolCall(BaseModel):
    """Represents a tool call request from the LLM."""

    id: str = Field(description="The ID of the tool call.")
    name: str = Field(description="The name of the tool to call.")
    arguments: ToolArguments = Field(description="The arguments to call the tool with.")


class ToolCalls(BaseModel):
    id: int = Field(description="An ID for the tool calls")
    tool_calls: List[ToolCall] = Field(
        description="A list of tools to be executed sequentially."
    )

class PlannerTask(BaseModel):
    """Represents a single task generated by the Planner."""

    id: int = Field(description="Sequential ID for the task.")
    description: str = Field(
        description="Clear description of the task to be executed."
    )
    tool_calls: List[ToolCall] = Field(
        description="A list of tools to be executed sequentially to complete the task"
    )
    thought: str = Field(
        description="A explanation of what needs to be done and how. Includes description and tool calls."
    )
    status: Optional[
        Literal[
            "input_required",
            "completed",
            "error",
            "pending",
            "incomplete",
            "todo",
            "not_started",
        ]
    ] = Field(default="input_required", description="Status of the task")


class Plan(BaseModel):
    """Output schema for the Planner Agent."""

    original_query: str = Field(description="The original user query for context.")
    description: str = Field(description="Clear description of the overall plan.")
    tasks: List[PlannerTask] = Field(
        description="A list of tasks to be executed sequentially."
    )


class ToolResult(BaseModel):
    """Represents the result of a tool execution."""

    tool_call_id: str = Field(description="The ID of the tool call this result is for.")
    result: str = Field(description="The result of the tool execution.")
    is_error: bool = Field(
        default=False, description="Whether the tool execution resulted in an error."
    )


class TaskExecutionResponse(BaseModel):
    """Represents a single task generated by the Planner."""

    id: int = Field(description="Id of task we are executing.")
    description: str = Field(description="Clear description of the task to be executed.")
    tools_sueggested: str = Field(description="A list of the tools suggested for the task")
    response_type: Optional[
        Literal[
            "tool_calls",
            "text",
        ]
    ] = Field(default="input_required", description="The response type of the task execution")
    tool_calls: List[ToolCall] = Field(description="A list of tool calls to be executed. Empty if response_type is text")

In [56]:
class PlannerAgent:
    def __init__(
        self,
        dev_prompt,
        llm,
        messages,
        tools,
        model_name: str = "gpt-4.1-mini",
    ):
        self.model_name = model_name
        self.dev_prompt = dev_prompt
        self.llm = llm
        self.messages = messages
        self.tools = tools
        if self.dev_prompt:
            self.messages.append({"role": "developer", "content": self.dev_prompt})

    def add_messages(self, query: str):
        self.messages.append({"role": "user", "content": query})

    def plan(self, query: str):
        """Create a detailed plan to complete the request of the user.

        Args:
            query (str): The request of the user.

        Returns:
            Plan: The plan to complete the request of the user.
        """
        self.add_messages(query=query)
        response = self.llm.responses.parse(
            model=self.model_name,
            input=self.messages,
            tools=self.tools,
            text_format=Plan,
        )
        return response

In [57]:
planner = PlannerAgent(
    dev_prompt=PLANNER_AGENT_PROMPT,
    llm=llm,
    messages=[],
    tools=tools,
    model_name="gpt-4.1-mini",
)
logger.info("Successfully initialized PlannerAgent")

In [58]:
content = "write an essay on the cultural impact of the internet"

In [59]:
plan = planner.plan(content) # UNCOMMENT TO GENERATE A NEW PLAN

In [40]:
f'PLAN TYPE: {type(plan)}'

"PLAN TYPE: <class 'openai.types.responses.parsed_response.ParsedResponse[Plan]'>"

In [60]:
plan

ParsedResponse[Plan](id='resp_688d7c73e4a4819c8a5d9d51b657f87303da78e2ba052918', created_at=1754102899.0, error=None, incomplete_details=None, instructions=None, metadata={}, model='gpt-4.1-mini-2025-04-14', object='response', output=[ParsedResponseOutputMessage[Plan](id='msg_688d7c7483c8819ca87af398723b927203da78e2ba052918', content=[ParsedResponseOutputText[Plan](annotations=[], text='{"original_query":"write an essay on the cultural impact of the internet","description":"Create an essay discussing the cultural impact of the internet by researching the topic, selecting key cultural aspects influenced by the internet, writing a structured essay with introduction, body paragraphs, and conclusion, and then reviewing and saving the essay.","tasks":[{"id":1,"description":"Research the cultural impact of the internet to gather relevant information and key points.","tool_calls":[{"id":"1","name":"functions.wiki_search","arguments":{"keys":["query","sentences"],"values":["cultural impact of 

In [61]:
plan_parsed: Plan = plan.output_parsed

In [62]:
with open("aug_1_plan_2.pkl", "wb") as f:
    pickle.dump(plan_parsed.model_dump(), f)

In [63]:
f'PLAN PARSED: {plan_parsed}'

'PLAN PARSED: original_query=\'write an essay on the cultural impact of the internet\' description=\'Create an essay discussing the cultural impact of the internet by researching the topic, selecting key cultural aspects influenced by the internet, writing a structured essay with introduction, body paragraphs, and conclusion, and then reviewing and saving the essay.\' tasks=[PlannerTask(id=1, description=\'Research the cultural impact of the internet to gather relevant information and key points.\', tool_calls=[ToolCall(id=\'1\', name=\'functions.wiki_search\', arguments=ToolArguments(keys=[\'query\', \'sentences\'], values=[\'cultural impact of the internet\', \'4\']))], thought=\'To write an informed essay, first gather concise and relevant information about the cultural impact of the internet through a Wikipedia search.\', status=\'todo\'), PlannerTask(id=2, description=\'Select main points from the research to structure the essay, focusing on significant cultural aspects influenced

In [29]:
plan_parsed.tasks[0] # select first task

PlannerTask(id=1, description="Research the topic 'Cultural impact of the internet' to gather key points and relevant information.", tool_calls=[ToolCall(id='1', name='functions.wiki_search', arguments=ToolArguments(keys=['query', 'sentences'], values=['Cultural impact of the internet', '5']))], thought='I will research the cultural impact of the internet to collect reliable and concise information to use as the basis for the essay.', status='todo')

In [46]:
plan_parsed.tasks[0].thought

'I will research the cultural impact of the internet to collect reliable and concise information to use as the basis for the essay.'

In [30]:
plan_parsed.tasks[0].tool_calls[0] # select first tool call in task

ToolCall(id='1', name='functions.wiki_search', arguments=ToolArguments(keys=['query', 'sentences'], values=['Cultural impact of the internet', '5']))

In [31]:
plan_parsed.tasks[0].tool_calls[0].arguments # select the arguments in the frist tool call within the first task

ToolArguments(keys=['query', 'sentences'], values=['Cultural impact of the internet', '5'])

In [32]:
plan_parsed.tasks[0].tool_calls[0].arguments.keys

['query', 'sentences']

In [33]:
tool = {'name': plan_parsed.tasks[0].tool_calls[0].name.split('.')[-1]}

In [34]:
tool_call_keys = plan_parsed.tasks[0].tool_calls[0].arguments.keys
tool_call_values = plan_parsed.tasks[0].tool_calls[0].arguments.values
for i in range(len(plan_parsed.tasks[0].tool_calls[0].arguments.keys)):
    tool[plan_parsed.tasks[0].tool_calls[0].arguments.keys[i]] = plan_parsed.tasks[0].tool_calls[0].arguments.values[i]

In [35]:
tool

{'name': 'wiki_search',
 'query': 'Cultural impact of the internet',
 'sentences': '5'}