# Intro to Agentic AI

MSOE AI Club Workshop
```
  _____________
 /0   /     \  \
/  \ M A I C/  /\
\ / *      /  / /
 \___\____/  @ /
          \_/_/
```

*(ROSIE is not needed for this workshop!)*

Prereqs:
- Install [VSCode](https://code.visualstudio.com/)
- Install [Python](https://www.python.org/downloads/)
- Ensure you can run notebooks in VSCode.
- **For MCP servers (optional):** Install [Node.js](https://nodejs.org/) so you can run servers like `npx -y @modelcontextprotocol/server-filesystem` or `server-github`.

Run the below pip installs now so we don't have to wait for them later: 

In [None]:
# %pip install langchain langchain-core langchain-community langgraph langchain-google-genai langchain-openai tavily-python mcp nest_asyncio pillow python-dotenv

In [None]:
import os
import sys
import json
import sqlite3
import textwrap
import base64
import traceback
import asyncio
import nest_asyncio
import subprocess
import threading
from datetime import date
from typing import Any, Dict, List, Optional, TypedDict
from pathlib import Path

from dotenv import load_dotenv
from langchain_core.tools import tool
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.graph import StateGraph, END
from tavily import TavilyClient
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from langchain_openai import ChatOpenAI

<span style="color:#ff5555;font-weight:bold;font-size:1.5rem;">
    STOP
</span>

... or keep going if you want to work ahead.

---

**What is Agentic AI?**

Unlike a simple chatbot that only generates one reply per turn, an *agent* can **use tools** and work **autonomously**: search the web, run code, read/write files, call APIs, and remember facts across conversations. The model decides *when* to call which tool and then continues reasoning with the results.

In this workshop we'll build an agent that has:
- **Web search** (e.g. Tavily) for up-to-date information
- **A Python interpreter** for math, data, and file operations
- **Persistent memory** so it can remember your name, preferences, and project details
- **MCP (Model Context Protocol)** so it can use external tools (filesystem, GitHub, etc.) when you connect them
- **Multimodal input** so you can send images (e.g. photos of equipment) and get descriptions or advice

We'll use **LangGraph** to define the flow: *main agent ‚Üí optional tool calls (looping) ‚Üí end*, with a **memory manager** running in the background after each turn so it never slows you down. Run the pip installs above, then we'll start with imports and environment setup.

---

<span style="color:#55ff55;font-weight:bold;font-size:1.5rem;">
    GO
</span>

Run the cell below to add your API keys to your environment variables
> NOTE: If you are running this notebook or doing this workshop after the February 12th, 2026 event, you will need to obtain your own API keys as we have rotated them. Here are the sites you need keys from, you can get a free tier for each of these:
> - https://aistudio.google.com/
> - https://www.tavily.com/
> 
> Optionally, you can instead obtain an OpenAI, they just do not have a free tier.

In [None]:
load_dotenv(override=False)  # won't overwrite vars already set in os.environ

# You can still set keys manually here ‚Äî these take priority over the .env file:
os.environ["TAVILY_API_KEY"] = ...              # Add a Tavily key here for the workshop
# os.environ["OPENAI_API_KEY"] = "sk-proj-..."  # Uncomment to use OpenAI instead of Gemini
# os.environ["GEMINI_API_KEY"] = "AI..."        # Uncomment to use Gemini instead of OpenAI

# Quick sanity check ‚Äî show which keys are available
for _key in ("OPENAI_API_KEY", "TAVILY_API_KEY", "GEMINI_API_KEY"):
    _val = os.environ.get(_key)
    if _val:
        print(f"  {_key} = {_val[:4]}...{_val[-4:]}")
    else:
        print(f"  {_key} = [NOT SET]")

<span style="color:#ff5555;font-weight:bold;font-size:1.5rem;">
    STOP
</span>

... or keep going if you want to work ahead.

---

**Why do we need a memory store?**

Agents often need to remember things *across* sessions: your name, timezone, project path, or ‚Äúalways use Python 3.11.‚Äù If we only kept the current chat history, that context would be lost when the conversation gets long or when you start a new session.

We‚Äôll use a small **persistent store** (SQLite) with:
- **Upsert** (create/update) by key
- **Search** by query text (for retrieval when answering)
- **List** recent memories
- **Delete** when something is wrong or outdated

A separate **memory manager** step (later) will decide *what* to store or remove based on the conversation, so the main agent doesn‚Äôt have to worry about that.

---

<span style="color:#55ff55;font-weight:bold;font-size:1.5rem;">
    GO
</span>

The next cell defines `MemoryStore` and a global `MEM` instance. Run it to create the DB and have `MEM` ready for the memory tools.

In [None]:
def normalize_ai_content(content) -> str:
    if isinstance(content, str):
        return content

    if isinstance(content, list):
        texts = []
        for part in content:
            if isinstance(part, dict):
                if part.get("type") == "text" and "text" in part:
                    texts.append(part["text"])
                elif "text" in part:
                    texts.append(str(part["text"]))
            elif isinstance(part, str):
                texts.append(part)
        return "\n".join(t for t in texts if t).strip()

    return str(content)

class MemoryStore:
    """Simple persistent key/value memory with tags + freeform text."""
    def __init__(self, path: str = "agent_memory.sqlite3"):
        self.path = path
        self._init_db()

    def _init_db(self):
        with sqlite3.connect(self.path) as con:
            con.execute("""
                CREATE TABLE IF NOT EXISTS memories (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    key TEXT UNIQUE,
                    value TEXT NOT NULL,
                    tags TEXT DEFAULT '[]',
                    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
                )
            """)
            con.commit()

    def upsert(self, key: str, value: str, tags: Optional[List[str]] = None) -> str:
        tags_json = json.dumps(tags or [])
        with sqlite3.connect(self.path) as con:
            con.execute("""
                INSERT INTO memories(key, value, tags, updated_at)
                VALUES(?, ?, ?, CURRENT_TIMESTAMP)
                ON CONFLICT(key) DO UPDATE SET
                    value=excluded.value,
                    tags=excluded.tags,
                    updated_at=CURRENT_TIMESTAMP
            """, (key, value, tags_json))
            con.commit()
        return f"Saved memory: {key}"

    def delete(self, key: str) -> str:
        with sqlite3.connect(self.path) as con:
            cur = con.execute("DELETE FROM memories WHERE key=?", (key,))
            con.commit()
        return f"Deleted memory: {key}" if cur.rowcount else f"No memory found for key: {key}"

    def list_all(self, limit: int = 50) -> List[Dict[str, Any]]:
        with sqlite3.connect(self.path) as con:
            rows = con.execute("""
                SELECT key, value, tags, updated_at
                FROM memories
                ORDER BY updated_at DESC
                LIMIT ?
            """, (limit,)).fetchall()
        return [{"key": k, "value": v, "tags": json.loads(t), "updated_at": u} for k, v, t, u in rows]

    def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
        q = f"%{query.lower()}%"
        with sqlite3.connect(self.path) as con:
            rows = con.execute("""
                SELECT key, value, tags, updated_at
                FROM memories
                WHERE lower(key) LIKE ? OR lower(value) LIKE ?
                ORDER BY updated_at DESC
                LIMIT ?
            """, (q, q, limit)).fetchall()
        return [{"key": k, "value": v, "tags": json.loads(t), "updated_at": u} for k, v, t, u in rows]


MEM = MemoryStore(path="agent_memory.sqlite3")

<span style="color:#ff5555;font-weight:bold;font-size:1.5rem;">
    STOP
</span>

... or keep going if you want to work ahead.

---

**What is MCP (Model Context Protocol)?**

MCP lets your agent use **external servers** that expose tools over a standard protocol. It's like a programmatic way to allow your agent take actions! For example:
- **Filesystem server** ‚Äì read/write files in a directory
- **GitHub server** ‚Äì list repos, create issues, read files (with a token)
- **Custom servers** ‚Äì your own tools (databases, APIs, etc.)

The notebook will **connect** to a server by running its command (e.g. `npx -y @modelcontextprotocol/server-filesystem /path`). Once connected, the agent gets a list of tools and can call them via a wrapper tool we‚Äôll define. All of this is async, so we use a helper to run it from the notebook.

---

<span style="color:#55ff55;font-weight:bold;font-size:1.5rem;">
    GO
</span>

The next cell defines `MCPManager`: connect, disconnect, call a tool, list tools. The cell after that creates the global `MCP` and a `run_async` helper. Run both.

In [None]:
class MCPManager:
    """Manages connections to MCP servers and exposes their tools."""
    
    def __init__(self):
        self.servers: Dict[str, dict] = {}  # name -> {process, session, tools, read, write}
    
    async def connect(self, name: str, command: str, args: List[str] = None, env: Dict[str, str] = None) -> str:
        """Connect to an MCP server via stdio."""
        if name in self.servers:
            return f"Already connected to '{name}'"
        
        print(f"üîå [MCP] Connecting to server: {name}")
        print(f"   Command: {command} {' '.join(args or [])}")
        
        full_env = os.environ.copy()
        if env:
            full_env.update(env)
        
        if 'github' in name.lower() and 'GITHUB_TOKEN' not in full_env:
            print("   ‚ö†Ô∏è  Warning: GITHUB_TOKEN not set. GitHub server may fail.")
            print("   Set with: os.environ['GITHUB_TOKEN'] = 'your-token'")
        
        try:
            server_params = StdioServerParameters(
                command=command,
                args=args or [],
                env=full_env
            )
            
            client_cm = stdio_client(server_params)
            read_stream, write_stream = await asyncio.wait_for(
                client_cm.__aenter__(),
                timeout=30
            )
            
            session = ClientSession(read_stream, write_stream)
            await session.__aenter__()
            
            init_result = await asyncio.wait_for(
                session.initialize(),
                timeout=30
            )
            
            tools_result = await session.list_tools()
            tools = tools_result.tools if hasattr(tools_result, 'tools') else []
            
            self.servers[name] = {
                'session': session,
                'client_cm': client_cm,
                'tools': tools,
                'command': command,
                'args': args or []
            }
            
            tool_names = [t.name for t in tools]
            print(f"   ‚úÖ Connected! Available tools ({len(tool_names)}):")
            for t in tools:
                desc = getattr(t, 'description', '')[:60]
                print(f"      ‚Ä¢ {t.name}: {desc}...")
            
            return f"Connected to '{name}' with {len(tool_names)} tools"
            
        except asyncio.TimeoutError:
            print(f"   ‚ùå Connection timed out")
            return f"Failed to connect to '{name}': timeout"
        except FileNotFoundError as e:
            print(f"   ‚ùå Command not found: {command}")
            print(f"   Make sure npx/node is installed and in PATH")
            return f"Failed to connect to '{name}': command not found"
        except Exception as e:
            print(f"   ‚ùå Connection failed: {type(e).__name__}: {e}")
            traceback.print_exc()
            return f"Failed to connect to '{name}': {e}"
    
    async def disconnect(self, name: str) -> str:
        """Disconnect from an MCP server."""
        if name not in self.servers:
            return f"Not connected to '{name}'"
        
        print(f"üîå [MCP] Disconnecting from: {name}")
        
        try:
            info = self.servers[name]
            await info['session'].__aexit__(None, None, None)
            await info['client_cm'].__aexit__(None, None, None)
            del self.servers[name]
            print(f"   ‚úÖ Disconnected")
            return f"Disconnected from '{name}'"
        except Exception as e:
            print(f"   ‚ùå Error: {e}")

            if name in self.servers:
                del self.servers[name]
            return f"Disconnected from '{name}' (with errors: {e})"
    
    async def call_tool(self, server_name: str, tool_name: str, arguments: dict) -> str:
        """Call a tool on an MCP server."""
        if server_name not in self.servers:
            return f"Not connected to server '{server_name}'. Use /mcp list to see connected servers."
        
        session = self.servers[server_name]['session']
        
        print(f"üîß [MCP:{server_name}] Calling tool: {tool_name}")
        if arguments:
            print(f"   Arguments: {json.dumps(arguments, indent=2)}")
        
        try:
            result = await asyncio.wait_for(
                session.call_tool(tool_name, arguments),
                timeout=60
            )
            
            output = ""
            if hasattr(result, 'content'):
                for item in result.content:
                    if hasattr(item, 'text'):
                        output += item.text + "\n"
                    elif hasattr(item, 'data'):
                        output += f"[Binary data: {len(item.data)} bytes]\n"
            
            output = output.strip() or str(result)
            
            display = output[:500] + '...' if len(output) > 500 else output
            print(f"   ‚úÖ Result:\n{display}")
            
            return output
            
        except asyncio.TimeoutError:
            print(f"   ‚ùå Tool call timed out")
            return f"Tool call timed out after 60 seconds"
        except Exception as e:
            print(f"   ‚ùå Error: {type(e).__name__}: {e}")
            return f"Error calling tool: {e}"
    
    def list_servers(self) -> str:
        """List connected MCP servers and their tools."""
        if not self.servers:
            return "No MCP servers connected.\n\nUse: /mcp connect <name> <command> [args...]"
        
        lines = ["Connected MCP servers:"]
        for name, info in self.servers.items():
            tool_names = [t.name for t in info['tools']]
            lines.append(f"\n  üì° {name} ({len(tool_names)} tools)")
            for tn in tool_names[:10]:
                lines.append(f"     ‚Ä¢ {tn}")
            if len(tool_names) > 10:
                lines.append(f"     ... and {len(tool_names) - 10} more")
        
        return "\n".join(lines)
    
    def get_all_tools_info(self) -> List[dict]:
        """Get info about all tools from all connected servers."""
        all_tools = []
        for server_name, info in self.servers.items():
            for tool in info['tools']:
                all_tools.append({
                    'server': server_name,
                    'name': tool.name,
                    'description': getattr(tool, 'description', ''),
                    'input_schema': getattr(tool, 'inputSchema', {})
                })
        return all_tools

In [None]:
MCP = MCPManager()

def run_async(coro):
    """Helper to run async code in sync context."""
    try:
        nest_asyncio.apply()
    except ImportError:
        pass
    
    try:
        loop = asyncio.get_event_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    
    return loop.run_until_complete(coro)

<span style="color:#ff5555;font-weight:bold;font-size:1.5rem;">
    STOP
</span>

... or keep going if you want to work ahead.

---

**Multimodal input: images**

Many models (including OpenAI and Gemini) can take **images** with the user message. That lets the agent answer questions about screenshots, diagrams, or photos (e.g. ‚ÄúWhat‚Äôs wrong with this water heater?‚Äù). We‚Äôll load images from disk, encode them as base64 (a format that allows us convert files into text that we can more easily send over the Internet), and attach them to a `HumanMessage` in the format the model expects. The chat loop will support an `/img` command to queue images for the next message.

---

<span style="color:#55ff55;font-weight:bold;font-size:1.5rem;">
    GO
</span>

The next cells define `load_image_as_base64` and `create_multimodal_message`. Run them so the chat loop can attach images to user messages.

In [None]:
def load_image_as_base64(path: str) -> dict:
    """Load an image file and return it as a base64-encoded data dict for the model."""
    path = Path(path).expanduser().resolve()
    
    if not path.exists():
        raise FileNotFoundError(f"Image not found: {path}")
    
    suffix = path.suffix.lower()
    mime_types = {
        '.jpg': 'image/jpeg',
        '.jpeg': 'image/jpeg',
        '.png': 'image/png',
        '.gif': 'image/gif',
        '.webp': 'image/webp',
    }
    mime_type = mime_types.get(suffix)
    if not mime_type:
        raise ValueError(f"Unsupported image format: {suffix}. Use jpg, png, gif, or webp.")
    
    with open(path, 'rb') as f:
        image_data = base64.standard_b64encode(f.read()).decode('utf-8')
    
    return {
        "type": "image_url",
        "image_url": {"url": f"data:{mime_type};base64,{image_data}"}
    }

def create_multimodal_message(text: str, image_paths: List[str] = None) -> HumanMessage:
    """Create a HumanMessage with text and optional images."""
    if not image_paths:
        return HumanMessage(content=text)
    
    content = []
    
    for img_path in image_paths:
        try:
            img_data = load_image_as_base64(img_path)
            content.append(img_data)
            print(f"üì∑ Attached image: {img_path}")
        except Exception as e:
            print(f"‚ö†Ô∏è  Failed to load image {img_path}: {e}")
    
    # Add text
    content.append({"type": "text", "text": text})
    
    return HumanMessage(content=content)

<span style="color:#ff5555;font-weight:bold;font-size:1.5rem;">
    STOP
</span>

... or keep going if you want to work ahead.

---

**Tools: web search**

LLMs are trained on a fixed snapshot of the world. To get **current** information (news, docs, prices, ‚Äúwhat‚Äôs the latest ‚Ä¶‚Äù), the agent needs to call a search API. We‚Äôll use **Tavily** (or you can swap in another provider). The tool is a single function the model can invoke with a query; it returns snippets the agent can cite. Set `TAVILY_API_KEY` in your environment (or in the keys cell) for this to work.

---

<span style="color:#55ff55;font-weight:bold;font-size:1.5rem;">
    GO
</span>

The next cell defines `web_search_impl` and the `@tool` `web_search`. Run it, then we‚Äôll add the Python interpreter.

In [None]:
def web_search_impl(query: str, k: int = 5) -> str:
    """
    Plug in any search provider.
    Recommended: Tavily (reliable) or Serper.
    """
    print(f"üîç [Web Search] Searching for: \"{query}\"")
    
    tavily_key = os.getenv("TAVILY_API_KEY")
    if not tavily_key:
        print("   ‚ùå Web search not configured")
        return (
            "Web search is not configured. Set TAVILY_API_KEY to enable search.\n"
            "Query was: " + query
        )
    
    client = TavilyClient(api_key=tavily_key)
    res = client.search(query=query, max_results=k)
    
    chunks = []
    for r in res.get("results", []):
        chunks.append(f"- {r.get('title','(no title)')}\n  {r.get('url')}\n  {r.get('content','')[:400]}")
    
    result_count = len(chunks)
    print(f"   ‚úÖ Found {result_count} results")
    
    return "\n".join(chunks) if chunks else "No results."

@tool
def web_search(query: str, k: int = 5) -> str:
    """Search the web for up-to-date information. Returns top results with snippets."""
    return web_search_impl(query=query, k=k)

<span style="color:#ff5555;font-weight:bold;font-size:1.5rem;">
    STOP
</span>

... or keep going if you want to work ahead.

---

**Tools: Python interpreter**

Giving the agent the ability to **run Python** lets it do **deterministic** math, parse data, transform files, and save outputs. This means that instead of the agent hallucinating information that might be true or false, it can run code to **confirm** it. Because this runs real code on your machine, the notebook implementation **asks for confirmation** before executing. In production you‚Äôd add sandboxing, timeouts, and stricter limits. For the workshop, always review the code the agent wants to run and only confirm when you‚Äôre comfortable.

---

<span style="color:#55ff55;font-weight:bold;font-size:1.5rem;">
    GO
</span>

The next cell defines the `python_interpreter` tool (with a confirmation prompt). Run it, then we‚Äôll add memory and MCP tools.

In [None]:
@tool
def python_interpreter(code: str) -> str:
    """
    Execute arbitrary Python code. Returns stdout output or the result.
    WARNING: This executes code without restrictions - use with caution.
    You can save files using standard Python (open(), pathlib, etc.).
    For organization, consider saving to a subfolder like 'outputs/' or 'generated/'.
    """
    
    cwd = os.getcwd()
    
    print(f"üêç [Python] Agent wants to execute:")
    print(f"   üìÅ Working directory: {cwd}")
    print("‚îÄ" * 40)
    print(code)
    print("‚îÄ" * 40)
    
    confirm = input("Execute this code? (y/n): ").strip().lower()
    if confirm not in ('y', 'yes'):
        print("   ‚õî Execution cancelled by user")
        return "Code execution was cancelled by the user."
    
    print("   ‚è≥ Executing...")
    
    try:
        result = subprocess.run(
            [sys.executable, "-c", code],
            capture_output=True,
            text=True,
            timeout=30,
            cwd=cwd
        )
        
        output = ""
        if result.stdout:
            output += result.stdout
        if result.stderr:
            output += ("\n" if output else "") + result.stderr
        
        if not output.strip():
            output = "OK (no output)."
        else:
            output = output.strip()
        
        print(f"   ‚úÖ Result:")
        print("‚îÄ" * 40)
        print(output)
        print("‚îÄ" * 40)
        
        # Include cwd in result so agent knows where files were saved
        return f"[Executed in: {cwd}]\n{output}"
        
    except subprocess.TimeoutExpired:
        print("   ‚ùå Timed out after 30 seconds")
        return "Error: Code execution timed out (30 second limit)."
    except Exception as e:
        print(f"   ‚ùå Error: {str(e)}")
        return f"Error: {str(e)}"

<span style="color:#ff5555;font-weight:bold;font-size:1.5rem;">
    STOP
</span>

... or keep going if you want to work ahead.

---

**Tools: memory**

The agent needs to *use* the memory store we built. We expose four tools:
- **memory_search** ‚Äì find memories by query (used automatically to inject context)
- **memory_list** ‚Äì list recent memories (useful for ‚Äúwhat do you know about me?‚Äù)
- **memory_upsert** ‚Äì create or update a memory (usually called by the memory manager, not the user)
- **memory_delete** ‚Äì remove a memory by key

The **memory manager** (a separate graph node) will decide what to upsert/delete based on the conversation; the main agent just reads via search/list and doesn‚Äôt have to manage storage itself.

---

<span style="color:#55ff55;font-weight:bold;font-size:1.5rem;">
    GO
</span>

The next cell defines the four memory tools. After that we‚Äôll add the MCP wrapper tools so the agent can call any connected MCP server.

In [None]:
@tool
def memory_search(query: str, limit: int = 10) -> str:
    """Search persistent memory for relevant items."""
    print(f"üß† [Memory Search] Searching for: \"{query}\"")
    results = MEM.search(query=query, limit=limit)
    print(f"   ‚úÖ Found {len(results)} matching memories")
    return json.dumps(results, indent=2)

@tool
def memory_list(limit: int = 50) -> str:
    """List recent persistent memories."""
    print(f"üß† [Memory List] Listing up to {limit} memories")
    results = MEM.list_all(limit=limit)
    print(f"   ‚úÖ Retrieved {len(results)} memories")
    return json.dumps(results, indent=2)

@tool
def memory_upsert(key: str, value: str, tags: Optional[List[str]] = None) -> str:
    """Create/update a memory item."""
    print(f"üß† [Memory Save] Saving: \"{key}\"")
    print(f"   Value: {value[:100]}{'...' if len(value) > 100 else ''}")
    if tags:
        print(f"   Tags: {', '.join(tags)}")
    result = MEM.upsert(key=key, value=value, tags=tags)
    print(f"   ‚úÖ Memory saved successfully")
    return result

@tool
def memory_delete(key: str) -> str:
    """Delete a memory item by key."""
    print(f"üß† [Memory Delete] Deleting: \"{key}\"")
    result = MEM.delete(key=key)
    if "No memory found" in result:
        print(f"   ‚ö†Ô∏è  Memory not found")
    else:
        print(f"   ‚úÖ Memory deleted")
    return result

<span style="color:#ff5555;font-weight:bold;font-size:1.5rem;">
    STOP
</span>

... or keep going if you want to work ahead.

---

**Tools: MCP wrappers**

The agent doesn‚Äôt talk to MCP servers directly. We give it two tools:
- **mcp_list_tools** ‚Äì list all tools from all connected servers (name, description, server).
- **mcp_call** ‚Äì call a tool on a server by name with a JSON string of arguments.

The model chooses the server and tool name from the list and passes arguments as a JSON string. We parse that and call `MCP.call_tool()` under the hood. After this, we‚Äôll collect all tools into one list and bind them to the LLM.

---

<span style="color:#55ff55;font-weight:bold;font-size:1.5rem;">
    GO
</span>

The next cell defines `mcp_call` and `mcp_list_tools`. The one after that sets `TOOLS = [web_search, python_interpreter, memory_*, mcp_*]`. Run both.

In [None]:
@tool
def mcp_call(server: str, tool_name: str, arguments: str = "{}") -> str:
    """
    Call a tool on a connected MCP server.
    
    Args:
        server: Name of the MCP server (e.g., 'filesystem', 'github')
        tool_name: Name of the tool to call
        arguments: JSON string of arguments to pass to the tool
    """
    print(f"üîß [MCP] Calling {server}/{tool_name}")
    
    try:
        args = json.loads(arguments) if arguments else {}
    except json.JSONDecodeError as e:
        return f"Invalid JSON arguments: {e}"
    
    return run_async(MCP.call_tool(server, tool_name, args))

@tool
def mcp_list_tools() -> str:
    """List all available tools from connected MCP servers."""
    tools = MCP.get_all_tools_info()
    
    if not tools:
        return "No MCP servers connected. Use /mcp connect <name> <command> to connect."
    
    lines = ["Available MCP tools:"]
    for t in tools:
        desc = t['description'][:100] + '...' if len(t['description']) > 100 else t['description']
        lines.append(f"  ‚Ä¢ [{t['server']}] {t['name']}: {desc}")
    
    return "\n".join(lines)

In [None]:
TOOLS = [web_search, python_interpreter, memory_search, memory_list, memory_upsert, memory_delete, mcp_call, mcp_list_tools]

<span style="color:#ff5555;font-weight:bold;font-size:1.5rem;">
    STOP
</span>

... or keep going if you want to work ahead.

---

**LLM and tool calling**

The agent is powered by a chat model (here, OpenAI) that supports **tool calls**: the model can return structured requests like ‚Äúcall `web_search` with query = ‚Ä¶‚Äù instead of only plain text. We define an `AgentState` (just `messages` for this graph), create the LLM with `make_llm()`, and **bind** the `TOOLS` list so the model knows their names and schemas. We‚Äôll use one LLM for the main agent and a smaller one for the memory manager.

---

<span style="color:#55ff55;font-weight:bold;font-size:1.5rem;">
    GO
</span>

The next cell defines `AgentState`, `make_llm`, `MAIN_LLM`, and `MEMORY_LLM`. Run it, then we‚Äôll add the system prompts.

In [None]:
class AgentState(TypedDict):
    messages: List[Any]


def _which_provider() -> str:
    """OpenAI over Gemini if both env vars set."""
    if os.getenv("OPENAI_API_KEY"):
        return "openai"
    if os.getenv("GEMINI_API_KEY"):
        return "gemini"
    raise ValueError(
        "Set OPENAI_API_KEY and/or GEMINI_API_KEY. "
        "If both are set, OpenAI is used."
    )


def make_llm(
    model: Optional[str] = None,
    *,
    openai_model: str = "gpt-5-nano",
    gemini_model: str = "gemini-3-flash-preview",
) -> BaseChatModel:
    provider = _which_provider()
    if provider == "openai":
        m = model or openai_model
        return ChatOpenAI(
            model=m,
            temperature=0.2
        ).bind_tools(TOOLS)
    else:
        m = model or gemini_model
        return ChatGoogleGenerativeAI(
            model=m,
            api_key=os.getenv("GEMINI_API_KEY"),
            temperature=0.2
        ).bind_tools(TOOLS)


_MAIN_OPENAI = "gpt-5-mini"
_MAIN_GEMINI = "gemini-3-flash-preview"
_MEMORY_OPENAI = "gpt-5-nano"
_MEMORY_GEMINI = "gemini-2.5-flash"

MAIN_LLM = make_llm(openai_model=_MAIN_OPENAI, gemini_model=_MAIN_GEMINI)
MEMORY_LLM = make_llm(openai_model=_MEMORY_OPENAI, gemini_model=_MEMORY_GEMINI)

<span style="color:#ff5555;font-weight:bold;font-size:1.5rem;">
    STOP
</span>

... or keep going if you want to work ahead.

---

**System prompts**

- **SYSTEM_MAIN** tells the agent its role (helpful agent with tools), when to use web search vs Python vs memory, and that it always has full conversation history. We‚Äôll also inject **retrieved memories** into this system message so the agent can personalize without storing everything in the prompt.
- **SYSTEM_MEMORY** tells the **memory manager** to output JSON: a list of `upsert` and `delete` actions. Another node will parse that and call `MEM.upsert` / `MEM.delete`. The main agent doesn‚Äôt write to memory directly; the manager does it in one place.

---

<span style="color:#55ff55;font-weight:bold;font-size:1.5rem;">
    GO
</span>

The next cell defines `SYSTEM_MAIN` and `SYSTEM_MEMORY`. Run it, then we‚Äôll build the graph nodes and edges.

In [None]:
SYSTEM_MAIN = SystemMessage(content=textwrap.dedent(f"""
Today's date is {date.today()}
Current working directory: {os.getcwd()}

You are a helpful agent with tools. Keep your responses concise and to the point of what the user is asking for. Do not add any extra information or commentary.
- Use web_search for fresh facts, citations, and "what's current".
- Use python_interpreter for math, parsing, data transforms, and file operations.
  - You can save files to the current directory or subdirectories like 'outputs/'
- Use memory_search/list to recall persistent facts.
- Do NOT store everything in memory. The memory manager will decide what to store.
When you need a tool, call it via tool calling (not by describing it).

IMPORTANT:
- You are always given the full conversation history in the `messages` list.
- Never claim you "don't have the chat history" or that it "reset between interactions" unless the user explicitly ran /reset.
- If asked about prior turns (e.g., "what was my last question?"), answer by looking at the latest HumanMessage in `messages`.
""").strip())


SYSTEM_MEMORY = SystemMessage(content=textwrap.dedent("""
You are a MEMORY MANAGER for an assistant.

Goal: maintain a small set of high-value, persistent facts about the user/preferences/projects.
You may ADD, UPDATE, or DELETE memory items.

Rules:
- Only store info that will remain useful in future conversations (preferences, stable project facts, recurring constraints).
- Avoid saving transient details (one-off tasks, short-lived plans, temporary numbers).
- If a memory becomes wrong/outdated, delete or update it.
- Output MUST be valid JSON in this schema:

{
  "actions": [
    {"type": "upsert", "key": "...", "value": "...", "tags": ["..."]},
    {"type": "delete", "key": "..."}
  ]
}

If no changes: {"actions": []}
""").strip())

<span style="color:#ff5555;font-weight:bold;font-size:1.5rem;">
    STOP
</span>

... or keep going if you want to work ahead.

---

**Graph nodes and streaming**

We'll implement two graph nodes and supporting helpers:
1. **main_agent_node** ‚Äì Takes `messages`, optionally injects retrieved memories into the system message, calls the main LLM. If the LLM returns tool calls, we route to `tools`.
2. **tools_node** ‚Äì Runs each tool call from the last AI message, appends `ToolMessage` results to `messages`, then we go back to **main** so the agent can reason on the results.

**Conditional edge:** After `main`, we check the last message: if it has `tool_calls`, go to `tools`; otherwise the graph ends.

**Memory manager (background):** After the graph finishes, `memory_manager_node` runs in a **background thread** so it never blocks you from typing the next message. It calls a smaller LLM on the last few turns and applies any upsert/delete actions to the memory store.

**Streaming helper:** `_stream_agent` uses LangGraph's `astream_events` to print the agent's response **token by token** as it's generated, rather than waiting for the full response.

---

<span style="color:#55ff55;font-weight:bold;font-size:1.5rem;">
    GO
</span>

The next cell defines `retrieve_relevant_memories`, `main_agent_node`, `tools_node`, `should_continue`, `memory_manager_node`, `_run_memory_bg`, and `_stream_agent`. Run it.

In [None]:
def retrieve_relevant_memories(user_message: str) -> str:
    """
    Automatically retrieve relevant memories based on the user's message.
    Uses multiple search strategies for robustness.
    """
    results = []
    
    direct_results = MEM.search(user_message, limit=5)
    results.extend(direct_results)
    
    personal_keywords = ["name", "preference", "project", "work", "like", "favorite", "setting"]
    for keyword in personal_keywords:
        if keyword.lower() in user_message.lower():
            keyword_results = MEM.search(keyword, limit=3)
            for r in keyword_results:
                if r not in results:
                    results.append(r)
    
    recent = MEM.list_all(limit=10)
    for r in recent:
        if r not in results:
            results.append(r)
    
    seen_keys = set()
    unique_results = []
    for r in results:
        if r["key"] not in seen_keys:
            seen_keys.add(r["key"])
            unique_results.append(r)
    
    if not unique_results:
        return ""
    
    memory_text = "=== RETRIEVED MEMORIES (use this information to personalize your response) ===\n"
    for mem in unique_results[:15]:
        tags_str = f" [tags: {', '.join(mem['tags'])}]" if mem['tags'] else ""
        memory_text += f"- {mem['key']}: {mem['value']}{tags_str}\n"
    memory_text += "=== END MEMORIES ===\n"
    
    return memory_text


def main_agent_node(state: AgentState) -> AgentState:
    msgs = state["messages"]
    
    latest_user_msg = None
    for m in reversed(msgs):
        if isinstance(m, HumanMessage):
            latest_user_msg = normalize_ai_content(m.content)
            break
    
    memory_context = ""
    if latest_user_msg:
        memory_context = retrieve_relevant_memories(latest_user_msg)
    
    system_content = SYSTEM_MAIN.content
    if memory_context:
        system_content = f"{system_content}\n\n{memory_context}"
    
    working_msgs = msgs.copy()
    if not working_msgs or not isinstance(working_msgs[0], SystemMessage):
        working_msgs = [SystemMessage(content=system_content)] + working_msgs
    else:
        working_msgs[0] = SystemMessage(content=system_content)

    ai = MAIN_LLM.invoke(working_msgs)
    return {"messages": msgs + [ai]}


def tools_node(state: AgentState) -> AgentState:
    """Executes any tool calls found in the last AI message."""
    msgs = state["messages"]
    last = msgs[-1]
    if not isinstance(last, AIMessage):
        return state

    tool_messages: List[ToolMessage] = []
    for call in (last.tool_calls or []):
        name = call.get("name")
        args = call.get("args", {})
        tool_map = {t.name: t for t in TOOLS}
        tool_obj = tool_map.get(name)
        if tool_obj is None:
            tool_messages.append(ToolMessage(content=f"Unknown tool: {name}", tool_call_id=call.get("id", "unknown")))
            continue
        result = tool_obj.invoke(args)
        tool_messages.append(ToolMessage(content=str(result), tool_call_id=call.get("id", "tool_call")))
    return {"messages": msgs + tool_messages}

def should_continue(state: AgentState) -> str:
    last = state["messages"][-1]
    if isinstance(last, AIMessage) and getattr(last, "tool_calls", None):
        return "tools"
    return "end"

def _run_memory_bg(messages):
    """Run memory manager in a background thread so it never blocks the REPL."""
    try:
        memory_manager_node({"messages": messages})
    except Exception:
        pass  # don't crash on memory failures

def memory_manager_node(state: AgentState) -> AgentState:
    """
    After the main agent responds, decide what to add/remove in memory,
    then apply it via memory_upsert/memory_delete tools.
    """
    msgs = state["messages"]

    tail = msgs[-12:]
    mm_input = [SYSTEM_MEMORY] + tail

    mm = MEMORY_LLM.invoke(mm_input)

    actions = []
    try:
        payload = json.loads(normalize_ai_content(mm.content))
        actions = payload.get("actions", [])
    except Exception:
        return state

    for act in actions:
        if act.get("type") == "upsert":
            MEM.upsert(
                key=act["key"],
                value=act["value"],
                tags=act.get("tags") or [],
            )
        elif act.get("type") == "delete":
            MEM.delete(key=act["key"])

    return state

async def _stream_agent(history):
    """Stream the agent's text output token by token."""
    streamed_any = False
    final_messages = None

    async for event in APP.astream_events({"messages": history}, version="v2"):
        kind = event["event"]
        node = event.get("metadata", {}).get("langgraph_node", "")

        if kind == "on_chat_model_stream" and node == "main":
            chunk = event["data"]["chunk"]
            text = ""
            if hasattr(chunk, "content"):
                c = chunk.content
                if isinstance(c, str):
                    text = c
                elif isinstance(c, list):
                    text = "".join(
                        p.get("text", "") if isinstance(p, dict) else str(p)
                        for p in c
                    )
            if text:
                if not streamed_any:
                    sys.stdout.write("agent> ")
                    streamed_any = True
                sys.stdout.write(text)
                sys.stdout.flush()

        if kind == "on_chain_end" and event["name"] == "LangGraph":
            output = event["data"].get("output", {})
            if isinstance(output, dict) and "messages" in output:
                final_messages = output["messages"]

    if streamed_any:
        sys.stdout.write("\n\n")
        sys.stdout.flush()
    else:
        print("agent> (no response)\n")

    # Fire off memory manager in background ‚Äî user can type immediately
    if final_messages:
        threading.Thread(
            target=_run_memory_bg,
            args=(list(final_messages),),
            daemon=True,
        ).start()

    return final_messages if final_messages else history

<span style="color:#ff5555;font-weight:bold;font-size:1.5rem;">
    STOP
</span>

... or keep going if you want to work ahead.

---

**Assembling the graph**

We create a `StateGraph(AgentState)`, add the two nodes, set the entry point to `main`, and add:
- **Conditional edge** from `main` ‚Üí `should_continue` ‚Üí `tools` or `END`
- **Edge** `tools` ‚Üí `main` (loop until no more tool calls)

Then we **compile** the graph into a runnable `APP`. When we stream through `APP`, it runs the full loop: agent ‚Üí tools (if any) ‚Üí back to agent ‚Üí ‚Ä¶ ‚Üí end. The memory manager fires off in a background thread after each turn.

---

<span style="color:#55ff55;font-weight:bold;font-size:1.5rem;">
    GO
</span>

The next cell builds the graph and compiles it to `APP`. Run it, then we'll add the chat loop.

In [None]:
graph = StateGraph(AgentState)
graph.add_node("main", main_agent_node)
graph.add_node("tools", tools_node)

graph.set_entry_point("main")
graph.add_conditional_edges("main", should_continue, {
    "tools": "tools",
    "end": END,
})
graph.add_edge("tools", "main")

APP = graph.compile()

<span style="color:#ff5555;font-weight:bold;font-size:1.5rem;">
    STOP
</span>

... or keep going if you want to work ahead.

---

**Chat loop**

The last piece is a simple REPL: read a line from the user, handle commands like `/exit`, `/reset`, `/mem`, `/mcp`, `/env`, `/img`, then build a `HumanMessage` (with any queued images), append it to `history`, and **stream** the agent's response token by token via `_stream_agent`. The memory manager runs automatically in a background thread at the end of each turn, so you can start typing your next message immediately.

---

<span style="color:#55ff55;font-weight:bold;font-size:1.5rem;">
    GO
</span>

Run the cell below to start chatting with your agent. Try questions that need search, memory, or code execution, and use `/img <path>` to attach images.

In [None]:
print("Chat with your Agent. Commands: /exit, /reset, /mem, /mcp, /env, /img")
print("‚ö†Ô∏è  WARNING: Python execution is UNRESTRICTED. Use caution with code execution requests.\n")
history: List[Any] = []
pending_images: List[str] = []

while True:
    user = input("you> ").strip()
    if not user:
        continue
    if user.lower() in {"/exit", "/quit"}:
        break
    if user.lower() == "/reset":
        history = []
        pending_images = []
        print("(cleared)\n")
        continue
    if user.lower() == "/mem":
        print(memory_list.invoke({"limit": 25}))
        print()
        continue
    
    if user.lower().startswith("/img"):
        parts = user.split(maxsplit=1)
        if len(parts) < 2:
            print("Image Commands:")
            print("  /img <path>    - Attach an image to your next message")
            print("  /img clear     - Clear pending images")
            print("  /img list      - Show pending images")
            print(f"\nPending images: {len(pending_images)}")
            for p in pending_images:
                print(f"  üì∑ {p}")
            print("\nSupported formats: jpg, png, gif, webp")
            print("Example: /img ~/photos/screenshot.png")
        else:
            arg = parts[1].strip()
            if arg.lower() == "clear":
                pending_images = []
                print("‚úÖ Cleared pending images")
            elif arg.lower() == "list":
                if pending_images:
                    print(f"Pending images ({len(pending_images)}):")
                    for p in pending_images:
                        print(f"  üì∑ {p}")
                else:
                    print("No pending images")
            else:
                img_path = Path(arg).expanduser().resolve()
                if img_path.exists():
                    pending_images.append(str(img_path))
                    print(f"‚úÖ Image queued: {img_path}")
                    print(f"   ({len(pending_images)} image(s) will be sent with your next message)")
                else:
                    print(f"‚ùå File not found: {img_path}")
        print()
        continue
    
    if user.lower().startswith("/env"):
        parts = user.split(maxsplit=2)
        
        if len(parts) == 1:
            print("Environment Variables:")
            print("  /env KEY=VALUE       - Set an environment variable")
            print("  /env KEY             - Show current value of KEY")
            print("  /env list            - List all custom-set vars this session")
            print("\nCommon variables for MCP servers:")
            print(f"  GITHUB_TOKEN = {'[SET]' if os.environ.get('GITHUB_TOKEN') else '[NOT SET]'}")
            print(f"  OPENAI_API_KEY = {'[SET]' if os.environ.get('OPENAI_API_KEY') else '[NOT SET]'}")
            print(f"  ANTHROPIC_API_KEY = {'[SET]' if os.environ.get('ANTHROPIC_API_KEY') else '[NOT SET]'}")
            print("\nExample: /env GITHUB_TOKEN=ghp_xxxxxxxxxxxx")
        elif len(parts) >= 2:
            arg = parts[1] if len(parts) == 2 else parts[1] + " " + parts[2]
            
            if arg.lower() == "list":
                print("Current environment (selected vars):")
                for key in ['GITHUB_TOKEN', 'OPENAI_API_KEY', 'ANTHROPIC_API_KEY', 'TAVILY_API_KEY', 'GEMINI_API_KEY']:
                    val = os.environ.get(key)
                    if val:
                        masked = val[:4] + '...' + val[-4:] if len(val) > 10 else '[SET]'
                        print(f"  {key} = {masked}")
                    else:
                        print(f"  {key} = [NOT SET]")
            elif "=" in arg:
                key, value = arg.split("=", 1)
                key = key.strip()
                value = value.strip()
                os.environ[key] = value
                masked = value[:4] + '...' + value[-4:] if len(value) > 10 else value
                print(f"‚úÖ Set {key} = {masked}")
            else:
                key = arg.strip()
                val = os.environ.get(key)
                if val:
                    masked = val[:4] + '...' + val[-4:] if len(val) > 10 else val
                    print(f"{key} = {masked}")
                else:
                    print(f"{key} = [NOT SET]")
        print()
        continue
    
    if user.lower().startswith("/mcp"):
        parts = user.split(maxsplit=3)
        cmd = parts[1] if len(parts) > 1 else "help"
        
        if cmd == "list":
            print(MCP.list_servers())
        elif cmd == "tools":
            print(mcp_list_tools.invoke({}))
        elif cmd == "connect" and len(parts) >= 3:
            name = parts[2]
            if len(parts) > 3:
                cmd_parts = parts[3].split()
                command = cmd_parts[0]
                args = cmd_parts[1:] if len(cmd_parts) > 1 else []
            else:
                print("Usage: /mcp connect <name> <command> [args...]")
                print("Example: /mcp connect filesystem npx -y @modelcontextprotocol/server-filesystem /path/to/dir")
                continue
            print(run_async(MCP.connect(name, command, args)))
        elif cmd == "disconnect" and len(parts) >= 3:
            print(run_async(MCP.disconnect(parts[2])))
        else:
            print("MCP Commands:")
            print("  /mcp list              - List connected servers")
            print("  /mcp tools             - List available tools from all servers")
            print("  /mcp connect <name> <command> [args...]  - Connect to a server")
            print("  /mcp disconnect <name> - Disconnect from a server")
            print("\nExamples:")
            print("  /env GITHUB_TOKEN=ghp_xxxxxxxxxxxx")
            print("  /mcp connect github npx -y @modelcontextprotocol/server-github")
            print("  /mcp connect fs npx -y @modelcontextprotocol/server-filesystem /Users/me/projects")
        print()
        continue

    if pending_images:
        human_msg = create_multimodal_message(user, pending_images)
        pending_images = []
    else:
        human_msg = HumanMessage(content=user)
    
    history.append(human_msg)
    history = run_async(_stream_agent(history))

<span style="color:#ff5555;font-weight:bold;font-size:1.5rem;">
    STOP
</span>

---

**Where to go from here**

- **Add more tools** ‚Äì e.g. database queries, email, custom APIs.
- **Connect MCP servers** ‚Äì `/env GITHUB_TOKEN=...` then `/mcp connect github npx -y @modelcontextprotocol/server-github` to give the agent GitHub access.
- **Tighten safety** ‚Äì sandbox the Python interpreter, rate-limit search, or restrict which MCP servers can be connected.
- **Improve memory** ‚Äì try semantic search over memories instead of simple text search (try checking out our embeddings workshop from last year ;) https://github.com/MSOE-AI-Club/workshops/blob/main/Embeddings/embeddings-workshop.ipynb).
- **Multi-agent** ‚Äì use LangGraph to add specialist agents (e.g. researcher vs coder) with different tools and prompts.

You‚Äôve built a full agent with tools, memory, MCP, and a clear STOP/GO workshop flow. Have fun extending it!