<div align="center">

# NYU Agentic AI Workshop - Session 3

## Multi-Agent Collaboration: When to Split and How to Coordinate

</div>

## Where We've Been

**Session 1:** Basic LLM calls → Tool use → MCP servers → Newspaper agent

**Session 2:** Memory systems, ChromaDB, CRUD tools, sampling/elicitation

**Session 3 (Today):** Context management → Multi-agent collaboration

---

## The Problem We're Solving Today

By the end of Session 2, our newspaper agent was powerful but hitting limits:
- 30+ different tools to choose from
- Complex preference modeling needs (just a Markdown file)
- Growing context windows
- Starting to see tool confusion issues

**Today:** We'll learn how context management and multi-agent architectures solve these challenges together.

---

# Understanding Context Management

This is a recap of Session 2, but we'll go into more detail on certain problems relevant for today's topic.

## The Context Window Problem

Our newspaper agent wants to create personalized content. Let's say it needs to fetch **20 articles**:

### The Math

Let's spitball some numbers:

- **Each article:** Maybe ~2,000 input tokens?
- **20 articles:** 40,000 input tokens  
- **Tool descriptions:** 5,000 input tokens?
- **Conversation history:** 2,000 input tokens, 3,000 output tokens
- **Total needed:** **~47,000 input tokens, 3,000 output tokens**

In a real application, you can see the token counts at the end of FastAgent's execution.

### Model Limits
- GPT-4: 128K tokens (✅ fits)
- Claude: 200K tokens (✅ fits)
- Llama: 32K tokens (❌ **overflow!**)

### The Real Problems

Even when it fits:

#### Cost

![Claude 3.5 Haiku OpenRouter Pricing](./media/openrouter_claude_3_5_haiku.png)

We're testing with Claude 3.5 Haiku, which is Anthropic's cheapest model. They charge at $0.80 per million input tokens, and $4 per million output tokens.

That's ~5 cents for our 50,000 tokens, which sounds pretty good!

![Claude 4.5 Sonnet OpenRouter Pricing](./media/openrouter_claude_4_5_sonnet.png)

In many applications, you'll benefit from a smarter model like Sonnet as a core orchestrator, particularly for less well-defined problems. Sonnet is rated at $3 per million input tokens, and $15 per million output tokens.

#### "Context rot"

Research is starting to show that large contexts actually negatively impact model performance:

![Needle similarity performance by context window and performance level](./media/needle_question_sim_arxiv.png)

From [Context Rot: How Increasing Input Tokens Impacts LLM Performance](https://research.trychroma.com/context-rot)

Particularly on information extraction tasks, most models see declines in performance over large contexts, demonstrating the value of context management.

#### Tool bloat

![Demonstrating tool confusion](./media/session03intro_tool_bloat.png)

Even with perfect content management, our agent has **31 tools**:

| Category | Tools | Examples |
|----------|-------|----------|
| Content discovery | 2 | `fetch_hn_stories`, `fetch_article_content` |
| Structure | 5 | `create_draft`, `add_section`, `reorder_sections` |
| Articles | 4 | `add_article`, `set_format`, `highlight` |
| Editorial | 4 | `add_editors_note`, `add_theme` |
| Memory | 4 | `store_article`, `search_archive` |
| Preferences | 3 | `read_interests`, `add_interests` |
| Analysis | 3 | `summarize`, `extract_themes`, `generate_editorial` |
| Polish | 6 | `preview`, `validate`, `send` |

[Research](https://arxiv.org/abs/2504.00914) is showing that models perform poorly with more _semantically-similar_ tools. We happen to have a few of those, like "summarize" and "extract_themes," or "fetch_hn_stories" and "fetch_article_content" - there aren't hard rules here, but those are things that should start to jump out at you if your agents are calling the wrong tools too often, or perhaps the right tools with the wrong parameters. We'll be addressing a related, but slightly different problem today.

The LLM must:
- Remember what each tool does
- Decide which tool fits the task
- Sequence tools correctly
- Track state across tool calls

**This is cognitive overload**. The agent spends cycles managing complexity instead of solving the actual problem.

#### Scaling

This is the simplest problem - we've already touched on cost, but what about if we have hundreds of interests, or thousands of articles? Our context lengths are limited, and we need to use techniques like compaction and sampling to handle this.

This is a good opportunity to review Session 2's notebooks, which discuss sampling in more detail.

---

## Our Solution: Agent Specialization

Instead of one agent doing everything, let's split by domain:

From...

![Before splitting agents, we have a monolith that is both giving us news and tracking preferences](./media/session03intro_before_split.jpg)

To...

![After splitting agents, we've broken the monolith into separate agents for giving us news and tracking preferences](./media/session03intro_after_split.jpg)

Why this helps:
- **Isolated contexts** - Each agent has smaller context window needs
- **Reduced tool bloat** - News agent has ~50% fewer tools → better performance  
- **Separation of concerns** - Clear boundaries, easier debugging

We'll build this pattern step by step!

---

# Environment Setup

In [None]:
import asyncio
import json
import logging
import shutil
import sys
import tempfile
import threading
import warnings
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
from typing import List

import httpx
import mcp
import nest_asyncio
import uvicorn
from dotenv import load_dotenv
from fastmcp import Client, FastMCP

# Suppress Windows-specific asyncio connection warnings
logging.getLogger('asyncio').setLevel(logging.CRITICAL)
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings('ignore', category=ResourceWarning)
warnings.filterwarnings('ignore', message='.*ProactorBasePipeTransport.*')

load_dotenv(Path.cwd().parent / ".env")

sys.path.insert(0, str(Path.cwd().parent / "src" / "server"))

nest_asyncio.apply()

# Utility function for setting up FastAgent config
def setup_fastagent_config(url: str) -> Path:
    """
    Create a temporary FastAgent config file.

    Args:
        url: URL where MCP server is running

    Returns:
        Path to the config file
    """
    # Create temp directory
    temp_dir = Path(tempfile.mkdtemp(prefix="fastagent_"))
    config_path = temp_dir / "fastagent.config.yaml"
    secrets_path = temp_dir / "fastagent.secrets.yaml"

    # Minimal config for notebook usage
    config_content = f"""openai:
  base_url: "https://openrouter.ai/api/v1"

default_model: "openrouter.anthropic/claude-haiku-4.5"

logger:
    progress_display: false
    show_chat: true
    show_tools: true
    truncate_tools: true
    level: "debug"

mcp:
    servers:
        preferences:
            transport: "http"
            url: "{url}"
"""

    # Write config file
    with open(config_path, "w") as f:
        f.write(config_content)

    # Copy secrets file if it exists
    source_secrets = Path.cwd().parent / "client" / "fastagent.secrets.yaml"
    if source_secrets.exists():
        shutil.copy(source_secrets, secrets_path)

    return config_path


@asynccontextmanager
async def run_mcp_server(server: FastMCP, *, port: int = 8081, json_response: bool = True):
    """
    Run an MCP server as an async context manager.

    Args:
        server: FastMCP server instance
        port: Port to run on
        json_response: Whether to return JSON responses (default True)

    Yields:
        The server instance (for convenience)
    """
    # Start server in background thread
    server_app = server.http_app(path="/mcp", json_response=json_response)
    server_instance = uvicorn.Server(config=uvicorn.Config(app=server_app, host="127.0.0.1", port=port, log_level="error"))
    thread = threading.Thread(target=server_instance.run)
    thread.start()

    # Wait for server to be ready by checking health
    url = f"http://localhost:{port}/mcp"
    max_attempts = 10
    for attempt in range(max_attempts):
        try:
            async with httpx.AsyncClient() as client:
                response = await client.get(url, timeout=1.0)
                # Accept 200, 405 (Method Not Allowed), 406 (Not Acceptable - missing Accept headers)
                if response.status_code in [200, 405, 406]:
                    print(f"🌐 MCP server running on {url}")
                    break
        except (httpx.ConnectError, httpx.TimeoutException):
            if attempt == max_attempts - 1:
                raise Exception(f"Server failed to start after {max_attempts} attempts")
            await asyncio.sleep(0.5)

    try:
        yield server, url
    finally:
        # Clean up: stop the server with some hacks since we're running it in a notebook
        print("MCP server shutting down...")
        server_instance.should_exit = True
        thread.join(timeout=5)
        if thread.is_alive():
            print("⚠️ Warning: Server thread did not shut down cleanly, this may cause issues.")
        else:
            print("✅ MCP server stopped")


@asynccontextmanager
async def mcp_server_and_client(server: FastMCP, port: int = 8081):
    """
    Run an MCP server and connect a client to it in one context manager.

    Args:
        server: FastMCP server instance
        port: Port to run on

    Yields:
        Client connected to the server
    """
    async with run_mcp_server(server, port=port) as (_, url):
        async with Client(url) as client:
            yield client


print("✅ Environment ready!")
print(f"📁 Working directory: {Path.cwd()}")

✅ Environment ready!
📁 Working directory: c:\Users\luca\Documents\GitHub\agentic-ai-workshop-2025\notebooks


---

# A Company of Specialists

After the consolidation in Session 2, Part 2, our monolithic newspaper agent still has 20 tools. Let's fix this by creating a **separate agent** that specializes in preference modeling.

## The Strategy

We'll extract the 3 preference tools (`read_interests`, `add_interests`, `remove_interests`) from the news agent into their own MCP server. But just shuffling tools between servers doesn't help — we need **actual separation**.

So we'll also create a **FastAgent client** to test this standalone preference agent, proving it can work independently.

In [2]:
# Create a standalone preference server (FastMCP)

preference_server = FastMCP(
    name="preference-server-v1",
    instructions="Manage user interests and preferences for personalized content"
)

# Simple in-memory storage (will upgrade to ChromaDB later)
interests_data = {
    "topics": [],
    "last_updated": datetime.now().isoformat()
}

@preference_server.tool()
async def read_interests() -> str:
    """Read current user interests."""
    if not interests_data["topics"]:
        return "No interests stored yet."

    result = "# Your Interests\n\n"
    result += f"**Last Updated:** {interests_data['last_updated']}\n\n"
    result += "**Topics:**\n"
    for topic in interests_data["topics"]:
        result += f"- {topic}\n"
    return result

@preference_server.tool()
async def add_interests(topics: List[str]) -> str:
    """Add topics to interests."""
    added = []
    for topic in topics:
        if topic not in interests_data["topics"]:
            interests_data["topics"].append(topic)
            added.append(topic)

    interests_data["last_updated"] = datetime.now().isoformat()

    if added:
        return f"✅ Added {len(added)} topics: {', '.join(added)}\nTotal: {len(interests_data['topics'])} topics"
    return f"ℹ️  All topics already exist. Total: {len(interests_data['topics'])} topics"

@preference_server.tool()
async def remove_interests(topics: List[str]) -> str:
    """Remove topics from interests."""
    removed = []
    for topic in topics:
        if topic in interests_data["topics"]:
            interests_data["topics"].remove(topic)
            removed.append(topic)

    interests_data["last_updated"] = datetime.now().isoformat()

    if removed:
        return f"✅ Removed {len(removed)} topics: {', '.join(removed)}\nRemaining: {len(interests_data['topics'])} topics"
    return f"ℹ️  No topics found to remove. Total: {len(interests_data['topics'])} topics"

In [3]:
# Now create a FastAgent client to test it

from fast_agent import FastAgent

# Run server and test with context manager
async with run_mcp_server(preference_server, port=8081) as (_, url):
    # Set up config file that points to localhost:8081
    config_path = setup_fastagent_config(url)
    print(f"✅ Created config at: {config_path}\n")

    # Create FastAgent app
    fast = FastAgent("Preference Agent Test", config_path=str(config_path))

    # Define the agent
    @fast.agent(
        instruction="You help manage user preferences and interests. Use the preference tools to store and retrieve user interests.",
        name="Preference Agent",
        servers=["preferences"]
    )
    async def test_preference_agent():
        async with fast.run() as agent:
            # Test 1: Add some interests
            print("="*60)
            print("TEST 1: Adding interests")
            print("="*60)
            result1 = await agent("Add these topics to my interests: Agentic AI, Machine Learning, Python")
            print(result1)
            print()

            # Test 2: Read them back
            print("="*60)
            print("TEST 2: Reading interests")
            print("="*60)
            result2 = await agent("What are my current interests?")
            print(result2)
            print()

            # Test 3: Add more
            print("="*60)
            print("TEST 3: Adding more interests")
            print("="*60)
            result3 = await agent("Also add: Cloud Infrastructure, DevOps")
            print(result3)

            # Test 4: Remove some
            print("="*60)
            print("TEST 4: Removing interests")
            print("="*60)
            result3 = await agent("Remove Python from my interests, actually")
            print(result3)

    print("✅ FastAgent configured")
    print("🧪 Testing the standalone preference agent...\n")

    # Run the test
    await test_preference_agent()

🌐 MCP server running on http://localhost:8081/mcp
✅ Created config at: C:\Users\luca\AppData\Local\Temp\fastagent_o9ywfalu\fastagent.config.yaml

✅ FastAgent configured
🧪 Testing the standalone preference agent...

TEST 1: Adding interests


I've successfully added "Agentic AI", "Machine Learning", and "Python" to your interests! You now have 3 topics in your interest list.

TEST 2: Reading interests


Your current interests are:
- **Agentic AI**
- **Machine Learning**
- **Python**

These are the topics you just added!

TEST 3: Adding more interests


I've successfully added "Cloud Infrastructure" and "DevOps" to your interests! You now have 5 topics in your interest list.
TEST 4: Removing interests


I've removed "Python" from your interests. You now have 4 topics remaining in your interest list.
MCP server shutting down...
✅ MCP server stopped


---

## Checkpoint 1: Standalone Preference Agent Working

**We've successfully created a separate agent!**

What we built:
- ✅ FastMCP server with 3 preference tools
- ✅ FastAgent client to test it independently
- ✅ Can store and retrieve preferences

The preference agent runs completely independently from the news agent. We've proven separation is possible.

**But our preference modeling is still too simple.** Now that we've separated responsibilities, we can make them more sophisticated:
- Semantic search (not just exact topic matching)
- Temporal patterns (morning vs evening preferences)
- Context awareness (depth, tone, etc.)

## More on Preference Modeling

The basic preference tools are limited - they only store exact topic matches. Real preference modeling needs to understand:
- **Meaning**: "Agentic AI" and "autonomous agents" are related
- **Context**: Morning = brief, Evening = deep-dive
- **Patterns**: User prefers technical content 80% of the time

That's a lot to store in a single Markdown file! Let's upgrade to ChromaDB for **semantic memory**.

---

The wrapper code using ChromaDB has already been written for you - it's at `src/learning-agent/services/memory_service.py`.

In [4]:
# Import the memory service from our codebase
sys.path.insert(0, str(Path.cwd().parent / "src" / "learning-agent"))

from services.memory_service import MemoryService

# Initialize memory service with ChromaDB
memory_service = MemoryService(collection_name="workshop_preferences")
memory_service.initialize(Path.cwd().parent / "data" / "learning-agent" / "chroma")

print("✅ Memory service initialized")
print(f"📊 Collection: {memory_service.collection_name}")
print(f"📁 Storage: {Path.cwd().parent / 'data' / 'learning-agent'}")

# Store some initial preferences
pref_doc = """User preferences:
- Primary interests: Agentic AI, Machine Learning, Python development
- Content depth: Prefers technical deep-dives (5+ min reads)
- Time patterns: In morning (8-10 AM)
- Sentiment: Analytical, educational tone
- Related topics: Cloud infrastructure, DevOps, open source
"""

result = memory_service.store_document(
    content=pref_doc,
    doc_id="user_preferences_v1",
    metadata={"type": "preferences", "version": 1}
)

print(f"\n✅ Stored preferences: {result['doc_id']}")
print(f"📏 Content length: {result['content_length']} chars")

# Store some additional preferences
pref_doc = """User preferences:
- Additional interests: Rust development
- Content depth: Prefers technical deep-dives (8+ min reads)
- Time patterns: In evening (6-9 PM)
- Related topics: Drivers, embedded systems
"""

result = memory_service.store_document(
    content=pref_doc,
    doc_id="user_preferences_v2",
    metadata={"type": "preferences", "version": 2}
)

print(f"\n✅ Stored preferences: {result['doc_id']}")
print(f"📏 Content length: {result['content_length']} chars")

✅ Memory service initialized
📊 Collection: workshop_preferences
📁 Storage: c:\Users\luca\Documents\GitHub\agentic-ai-workshop-2025\data\learning-agent

✅ Stored preferences: user_preferences_v1
📏 Content length: 289 chars

✅ Stored preferences: user_preferences_v2
📏 Content length: 201 chars


In [5]:
# Create preference tools server with memory capabilities

preference_tools_server = FastMCP(
    name="preference-tools-server",
    instructions="""MCP server providing preference storage and search tools.

Available tools:
- Store user preferences with rich context
- Search for relevant preference information using semantic similarity
- Get memory statistics
- Read, add, and remove user interests
"""
)

@preference_tools_server.tool()
async def store_preference(
    content: str,
    metadata: dict | None = None
) -> dict:
    """Store a user preference in persistent memory."""
    return memory_service.store_document(
        content=content,
        metadata=metadata or {"type": "preference"}
    )

@preference_tools_server.tool()
async def search_preferences(query: str, limit: int = 5, metadata: dict | None = None) -> list:
    """Search stored preferences using semantic similarity."""
    return memory_service.search_documents(query, limit, metadata_filter=metadata)

@preference_tools_server.tool()
async def get_memory_stats() -> dict:
    """Get statistics about stored preferences."""
    return memory_service.get_collection_stats()


# Interest Management Tools

@preference_tools_server.tool()
async def read_interests() -> str:
    """
    Read current user interests from stored preferences.

    Searches the memory for documents tagged as user interests/preferences
    and returns a formatted summary.

    Returns:
        Formatted string with user interests
    """
    # Search for interest documents
    results = memory_service.search_documents(
        query="user interests topics preferences",
        limit=10,
        metadata_filter={"type": "interest"}
    )

    if not results:
        return "📝 No interests stored yet.\n\nUse add_interests() to add topics you're interested in!"

    # Extract topics from stored interest documents
    topics = []
    for doc in results:
        # Interest documents are stored as simple topic strings
        content = doc.get("content", "").strip()
        if content:
            topics.append(content)

    if not topics:
        return "📝 No interests stored yet."

    result = "# Your Interests\n\n"
    result += f"**Total Topics:** {len(topics)}\n\n"
    result += "**Topics:**\n"
    for topic in topics:
        result += f"- {topic}\n"

    return result


@preference_tools_server.tool()
async def add_interests(topics: List[str]) -> str:
    """
    Add topics to user interests.

    Args:
        topics: List of topic strings to add to interests

    Returns:
        Success or failure message with details
    """
    if not topics:
        return "❌ No topics provided"

    # Store the topics
    added = []

    for topic in topics:
        # Check if topic already exists
        existing = memory_service.search_documents(
            query=topic,
            limit=1,
            metadata_filter={"type": "interest"}
        )

        # Only add if not already present (distance > 0.5 means not a close match)
        # Lower distance = more similar, so we skip if distance < 0.5
        if not existing or existing[0].get("distance", 1) > 0.5:
            doc_id = f"interest_{topic.lower().replace(' ', '_')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
            memory_service.store_document(
                content=topic,
                doc_id=doc_id,
                metadata={"type": "interest", "added_at": datetime.now().isoformat()}
            )
            added.append(topic)

    if added:
        return f"✅ **Added {len(added)} topic(s) to your interests!**\n\n**New topics:** {', '.join(added)}"
    else:
        return f"ℹ️  All {len(topics)} topic(s) already exist in your interests"


@preference_tools_server.tool()
async def remove_interests(topics: List[str]) -> str:
    """
    Remove topics from user interests.

    Args:
        topics: List of topic strings to remove from interests

    Returns:
        Success or failure message with details
    """
    if not topics:
        return "❌ No topics provided"

    topics_list = ", ".join(topics)
    removed = []

    for topic in topics:
        # Search for matching interest documents
        matches = memory_service.search_documents(
            query=topic,
            limit=5,
            metadata_filter={"type": "interest"}
        )

        # Remove close matches (distance < 0.5 means it's semantically similar)
        # Lower distance = more similar
        for match in matches:
            if match.get("distance", 1) < 0.5:
                doc_id = match.get("doc_id")
                if doc_id:
                    memory_service.delete_document(doc_id)
                    removed.append(topic)
                    break  # Only remove first match per topic

    if removed:
        return f"✅ **Removed {len(removed)} topic(s) from your interests!**\n\n**Removed topics:** {', '.join(removed)}"
    else:
        return f"ℹ️  No matching topics found to remove (searched for: {topics_list})"


print("✅ Preference tools server created with memory capabilities")
print("📋 Tools: store_preference, search_preferences, get_memory_stats, read_interests, add_interests, remove_interests")

✅ Preference tools server created with memory capabilities
📋 Tools: store_preference, search_preferences, get_memory_stats, read_interests, add_interests, remove_interests


In [6]:
# Test the semantic search capabilities through a real MCP client!

print("🧪 Testing Semantic Preference Search via MCP\n")
print("="*60)

async with mcp_server_and_client(preference_tools_server, port=8081) as client:
    # Test 1: Semantic similarity (different wording, same meaning)
    print("TEST 1: Semantic Search")
    print("Query: 'What AI and ML topics interest me?'")
    print("(Should find our stored 'Agentic AI, Machine Learning' preferences)\n")

    result = await client.call_tool(
        "search_preferences",
        arguments={"query": "What AI and ML topics interest me?", "limit": 3, "metadata": {"type": "preferences"}}
    )

    # FastMCP tools return results as TextContent with JSON
    if result.content and isinstance(result.content[0], mcp.types.TextContent):
        results = json.loads(result.content[0].text)

        for i, doc in enumerate(results, 1):
            print(f"{i}. Distance: {doc['distance']}")
            print(doc['content_preview'])
            print()

    # Test 2: Time-based patterns
    print("="*60)
    print("TEST 2: Time-Based Patterns")
    print("Query: 'evening reading preferences'")
    print()

    result = await client.call_tool(
        "search_preferences",
        arguments={"query": "evening reading preferences", "limit": 2, "metadata": {"type": "preferences"}}
    )

    if result.content and isinstance(result.content[0], mcp.types.TextContent):
        time_results = json.loads(result.content[0].text)
        for doc in time_results:
            print(f"Distance: {doc['distance']}")
            print(doc['content_preview'])
            print()

    # Test 3: Memory stats
    print("="*60)
    print("TEST 3: Memory Statistics\n")

    result = await client.call_tool("get_memory_stats", arguments={})

    if result.content and isinstance(result.content[0], mcp.types.TextContent):
        stats = json.loads(result.content[0].text)
        print(json.dumps(stats, indent=2))

🧪 Testing Semantic Preference Search via MCP

🌐 MCP server running on http://localhost:8081/mcp
TEST 1: Semantic Search
Query: 'What AI and ML topics interest me?'
(Should find our stored 'Agentic AI, Machine Learning' preferences)

1. Distance: 0.9644303917884827
User preferences:
- Primary interests: Agentic AI, Machine Learning, Python development
- Content depth: Prefers technical deep-dives (5+ min reads)
- Time patterns: In morning (8-10 AM)
- Sentiment: ...

2. Distance: 1.4423750638961792
User preferences:
- Additional interests: Rust development
- Content depth: Prefers technical deep-dives (8+ min reads)
- Time patterns: In evening (6-9 PM)
- Related topics: Drivers, embedded systems...

TEST 2: Time-Based Patterns
Query: 'evening reading preferences'

Distance: 1.1532526016235352
User preferences:
- Additional interests: Rust development
- Content depth: Prefers technical deep-dives (8+ min reads)
- Time patterns: In evening (6-9 PM)
- Related topics: Drivers, embedded syste

---

### Note: Testing Tools Through MCP Clients

In the cell above, notice we're using the **real MCP client** to call tools through the protocol! This is different from calling the Python functions directly.

**The Pattern:**
```python
async with Client("http://localhost:8081/mcp") as client:
    result = await client.call_tool("search_preferences", arguments={...})
```

**Why this matters:**
- This is how agents actually interact with MCP servers in production
- The tools are called through HTTP, not as Python functions
- Results come back as MCP protocol types (`TextContent`, `ImageContent`, etc.)
- This shows the full client→server→response cycle

**The `@tool()` decorator** wraps Python functions to make them callable through MCP, handling serialization/deserialization automatically.

---

## Checkpoint 2: Complex Preference Modeling Working

**We've upgraded to sophisticated preference modeling!**

What we built:
- ✅ ChromaDB-backed semantic memory
- ✅ Semantic search (understands meaning, not just keywords)
- ✅ Temporal patterns (morning, evening, weekend behaviors)
- ✅ Metadata filtering (by time, depth, topic)

The preference agent can now answer complex questions like:
- "What content does the user prefer in the morning?" → Brief, scannable
- "Find AI-related preferences" → Matches "Agentic AI", "Machine Learning", "transformers"
- "What's the user's typical reading depth?" → Technical deep-dives

In [7]:
print("🔄 Simulating learning over time via MCP...\n")

async with mcp_server_and_client(preference_tools_server, port=8081) as client:
    # Morning interaction - store pattern
    morning_pattern = """Reading behavior observed:
Time: 8:00 AM
User requested: "Quick tech updates"
Preferred: Brief, scannable content
Article length: 2-3 min reads
Topics: Tech news, industry updates
Depth: Surface-level, breaking news
"""

    result = await client.call_tool(
        "store_preference",
        arguments={
            "content": morning_pattern,
            "metadata": {"type": "reading_pattern", "time": "morning", "depth": "brief"}
        }
    )

    if result.content and isinstance(result.content[0], mcp.types.TextContent):
        stored = json.loads(result.content[0].text)
        print(f"☀️  Stored morning pattern: {stored['doc_id']}\n")

    # Evening interaction - store pattern
    evening_pattern = """Reading behavior observed:
Time: 7:00 PM
User requested: "Deep dive on transformers"
Preferred: Technical, in-depth content
Article length: 10+ min reads
Topics: AI/ML technical deep-dives
Depth: Implementation details, code examples
"""

    result = await client.call_tool(
        "store_preference",
        arguments={
            "content": evening_pattern,
            "metadata": {"type": "reading_pattern", "time": "evening", "depth": "technical"}
        }
    )

    if result.content and isinstance(result.content[0], mcp.types.TextContent):
        stored = json.loads(result.content[0].text)
        print(f"🌆 Stored evening pattern: {stored['doc_id']}\n")

    # Weekend interaction - store pattern
    weekend_pattern = """Reading behavior observed:
Time: Saturday 2:00 PM
User requested: "Funny tech stories"
Preferred: Lighter, entertaining content
Article length: 3-5 min reads
Topics: Tech culture, startup stories
Depth: Light, engaging narratives
"""

    result = await client.call_tool(
        "store_preference",
        arguments={
            "content": weekend_pattern,
            "metadata": {"type": "reading_pattern", "time": "weekend", "depth": "light"}
        }
    )

    if result.content and isinstance(result.content[0], mcp.types.TextContent):
        stored = json.loads(result.content[0].text)
        print(f"🎮 Stored weekend pattern: {stored['doc_id']}\n")

    print("="*60)
    print("Now let's search for patterns:\n")

    # Search for time-based patterns
    query = "What content does the user prefer in the morning?"
    result = await client.call_tool(
        "search_preferences",
        arguments={"query": query, "limit": 3, "metadata": {"type": "reading_pattern"}}
    )

    print(f"Query: {query}\n")

    if result.content and isinstance(result.content[0], mcp.types.TextContent):
        results = json.loads(result.content[0].text)
        for doc in results:
            print(f"  Distance: {doc['distance']}")
            print(f"  {doc['content_preview']}\n")

🔄 Simulating learning over time via MCP...

🌐 MCP server running on http://localhost:8081/mcp
☀️  Stored morning pattern: doc_ed0421e4_1760553620

🌆 Stored evening pattern: doc_50c5e1d6_1760553621

🎮 Stored weekend pattern: doc_f7f1a30e_1760553621

Now let's search for patterns:

Query: What content does the user prefer in the morning?

  Distance: 1.1286523342132568
  Reading behavior observed:
Time: 8:00 AM
User requested: "Quick tech updates"
Preferred: Brief, scannable content
Article length: 2-3 min reads
Topics: Tech news, industry updates
Depth: Surface-level...

  Distance: 1.1286523342132568
  Reading behavior observed:
Time: 8:00 AM
User requested: "Quick tech updates"
Preferred: Brief, scannable content
Article length: 2-3 min reads
Topics: Tech news, industry updates
Depth: Surface-level...

  Distance: 1.1286523342132568
  Reading behavior observed:
Time: 8:00 AM
User requested: "Quick tech updates"
Preferred: Brief, scannable content
Article length: 2-3 min reads
Topics: 

---

### Understanding Semantic Distances

ChromaDB distance scores depend on:
- The embedding model being used (we're using the default)
- The semantic similarity between query and stored documents
- The dimensionality of the embedding space

**What matters most:** The *ranking* of results, not absolute scores. Semantically related content consistently has a lower distance, which is exactly what we need for preference modeling.

In production, you might tune this by:
- Using different embedding models (refer to the ChromaDB [documentation](https://docs.trychroma.com/docs/embeddings/embedding-functions) for more on this)
- Adjusting the number of results returned
- Implementing distance thresholds based on your use case


---

# Reviewing Specialization

We've built sophisticated preference tools. Why don't we just add them to the news agent?

Imagine the news agent now has:
- 14 news/content tools (discovery, creation, editorial, delivery)
- 6 new preference tools (read_interests, add_interests, remove_interests, store_preference, search_preferences, get_memory_stats)
- **Total: 20 tools**

**The problem:** Even with "only" 20 tools, there's still overhead, as discussed previously. In addition, we would be mixing two distinct concerns (content creation vs preference modeling) in one agent, leading to:
- Harder debugging (which domain caused the issue?)
- Tight coupling (can't upgrade preferences without touching news logic)
- Potential security risks (news agent could get a prompt injection and can directly access the database)
    - This workshop doesn't address this robustly, but here's an [excellent paper](https://arxiv.org/abs/2506.08837) that discusses how to avoid this, which largely relies on multi-agent architectures to enforce boundaries.

**So, how do we let the news agent benefit from these tools?**

---

## Agents as Tools

Instead of adding preference tools to the news agent, expose the **preference agent itself** as a tool. The news agent calls one tool (`chat`), which internally uses the preference agent's intelligence and its 6 specialized tools.

In [13]:
# Start the multi-agent system servers
# This cell will block - run it to expose the agent, then test with the news agent client

from fast_agent import RequestParams

async with run_mcp_server(preference_tools_server, port=8081) as (_, url):
    print("✅ Preference tools server running")
    print("   Tools: store_preference, search_preferences, get_memory_stats\n")

    print("\n" + "="*60)
    print("📊 MULTI-AGENT ARCHITECTURE RUNNING")
    print("="*60)
    print("\n🎯 Server Stack:")
    print("  1. Preference Tools Server (port 8081)")
    print("     - ChromaDB-backed preference storage")
    print("     - Semantic search capabilities")
    print("     - Memory statistics")
    print("\n  2. Preference Agent (port 8082)")
    print("     - Wraps Preference Tools")
    print("     - Provides intelligent review")
    print("     - LLM-powered analysis")
    print("\n📝 Next Steps:")
    print("  1. Keep this cell running")
    print("  2. Update client/fastagent.config.yaml to add preference_agent")
    print("  3. Run the news agent client to test multi-agent collaboration")

    preference_agent_app = FastAgent(
        "Preference Agent",
        config_path=str(setup_fastagent_config(url))
    )

    @preference_agent_app.agent(
        instruction=f"""You are a PREFERENCE MODELING SPECIALIST - an expert in user personalization and content alignment.

YOUR EXPERTISE:
- Semantic memory search for user preferences
- Pattern recognition (temporal, topical, depth, tone preferences)
- Content review and alignment validation
- Continuous learning from user interactions

YOUR RESPONSIBILITIES:
1. **Review Content**: When given content drafts, search preferences and validate alignment
2. **Provide Verdicts**: Always end reviews with explicit "✅ APPROVED" or "❌ DENIED: [specific reasons]"
3. **Store Patterns**: PROACTIVELY store new preferences when you observe successful patterns
4. **Give Actionable Feedback**: Be specific - reference past patterns, suggest concrete changes

BEHAVIORAL GUIDELINES:
- Be firm and assertive - you're the personalization quality gate
- If content is missing/incomplete, DENY and request full content
- Reference specific patterns from memory: "Based on [date] preference showing..."
- When approving: Mention what aligned well
- When denying: Provide 2-3 concrete fixes

WORKFLOW PATTERN:
You're called by content creators who need personalization expertise. They expect:
- Substantial review of complete drafts (not micro-coordination)
- Clear verdicts (not ambiguous suggestions)
- Pattern learning (store what works for future use)

PROACTIVE LEARNING:
When you receive feedback like "User engaged well with this content on [topic]",
IMMEDIATELY use store_preference() with rich context about what worked.

Current date: {datetime.now().strftime("%A, %B %d, %Y")}

Remember: You're the personalization expert. Give confident, specific guidance backed by stored patterns!""",
        name="Preference Analyst",
        servers=["preferences"],
        request_params=RequestParams(
            max_iterations=9999,
        )
    )
    async def preference_analyst():
        print("🌐 Starting preference agent server...")

        # Keep FastAgent running for the app lifetime
        @asynccontextmanager
        async def lifespan(_: FastMCP):
            async with preference_agent_app.run() as agent:
                yield agent

        preference_agent_mcp = FastMCP(
            name="preference-agent",
            instructions="""Preference Modeling Agent - Expert in user personalization and content alignment.

WHAT THIS AGENT DOES:
- Reviews content drafts against stored user preferences
- Provides actionable feedback on alignment with user interests
- Learns and stores new user preference patterns over time
- Gives explicit approve/deny recommendations

WHEN TO USE:
- Before finalizing any user-facing content
- After creating a complete draft (provide FULL content for review)
- After successful delivery to update what worked well

HOW TO USE:
Use the 'chat' tool with your complete content. The agent will:
1. Search past preferences for relevant patterns
2. Provide specific feedback with approve (✅) or deny (❌) verdict
3. Store new patterns it observes

TYPICAL WORKFLOW:
This agent supports the "draft → review → revise → approve" pattern.
It acts as a quality gate for personalization and user alignment.""",
            lifespan=lifespan,
        )

        async with preference_agent_app.run() as agent:
            # Expose a chat tool for direct interaction
            @preference_agent_mcp.tool()
            async def chat(message: str) -> str:
                """
                Chat with the preference modeling agent for content reviews and preference management.

                This tool provides access to an intelligent agent that:
                - Reviews content against user preferences using semantic memory
                - Provides explicit approve (✅) or deny (❌) verdicts with specific feedback
                - Stores new preference patterns it observes
                - Understands temporal patterns (morning/evening), content depth, and topic preferences

                Args:
                    message: The COMPLETE content to review or question to ask.
                            ⚠️ CRITICAL: Put the FULL content text or draft directly IN THIS
                            PARAMETER. Do NOT keep content in your own context and just ask
                            for a review - the agent cannot see your context!

                TYPICAL USAGE PATTERNS:

                1. Review Draft (CORRECT ✅):
                message = "Please review this draft against user preferences:\\n\\n" + full_content_text
                → Agent receives complete content, searches preferences, provides verdict

                2. Review Draft (WRONG ❌):
                # Agent keeps draft in its own context, then calls:
                message = "Can you review the draft I just created?"
                → Agent has NO ACCESS to your context! This will fail!

                3. Store Pattern:
                message = "User engagement was high with this content focused on [topics]. Store this preference."
                → Agent stores the pattern for future reference

                4. Query Preferences:
                message = "What topics does the user prefer for morning consumption?"
                → Agent searches and summarizes relevant patterns

                CRITICAL REQUIREMENTS:
                ❗ The 'message' parameter MUST contain the FULL CONTENT you want reviewed
                ❗ Do NOT assume the agent can see content from your previous tool calls
                ❗ Do NOT reference drafts without including the complete text in this parameter
                ❗ This is a quality gate - provide COMPLETE drafts, not just titles or references

                The agent expects substantial drafts to review, not step-by-step coordination.
                Think of this as handing a complete document to a reviewer, not pointing at something.
                """
                return await agent(message)

            # Start the preference agent as an MCP server on port 8082
            async with run_mcp_server(preference_agent_mcp, port=8082):
                while True:
                    await asyncio.sleep(1)
    await preference_analyst()

🌐 MCP server running on http://localhost:8081/mcp
✅ Preference tools server running
   Tools: store_preference, search_preferences, get_memory_stats


📊 MULTI-AGENT ARCHITECTURE RUNNING

🎯 Server Stack:
  1. Preference Tools Server (port 8081)
     - ChromaDB-backed preference storage
     - Semantic search capabilities
     - Memory statistics

  2. Preference Agent (port 8082)
     - Wraps Preference Tools
     - Provides intelligent review
     - LLM-powered analysis

📝 Next Steps:
  1. Keep this cell running
  2. Update client/fastagent.config.yaml to add preference_agent
  3. Run the news agent client to test multi-agent collaboration
🌐 Starting preference agent server...
🌐 MCP server running on http://localhost:8082/mcp


MCP server shutting down...
MCP server shutting down...
✅ MCP server stopped


CancelledError: 

---

## Checkpoint 3: Multi-Agent System Ready for Testing

### What We Built

1. **Preference Tools Server (port 8081)** - FastMCP server
   - ChromaDB-backed semantic memory
   - Tools: `store_preference`, `search_preferences`, `get_memory_stats`, `add_interests`, `remove_interests`, `read_interests`
   - Stateless, reusable preference storage

2. **Preference Agent (port 8082)** - FastAgent exposed as MCP server
   - Wraps the preference tools server
   - Adds LLM-powered intelligence
   - Can analyze drafts and provide feedback

### Testing the System

To test this multi-agent collaboration:

1. **Keep the server cell running** (it will block, exposing both servers)

2. **Update `client/fastagent.config.yaml`** to add:
   ```yaml
   mcp:
     servers:
       preference_agent:
         transport: "http"
         url: "http://localhost:8082/mcp"
   ```

3. Run the news agent MCP server:
   - We've put together a version without the preference tools in `src/server/news_agent_server.py` (excluded from this notebook for brevity)
   - `cd src/server && uv run news_agent_server.py`

4. **Run the news agent client** and try prompts like:
   - "Review this draft against my preferences: [draft content]"
   - "What content does the user prefer in the morning?"
   - "Create a newspaper and get it reviewed by the preference agent"

**Sample newspaper rejection**

![](./media/session03_checkpoint03_rejected.png)

**Sample newspaper acceptance**

![](./media/session03_checkpoint03_accepted.png)

### Architecture Diagram

```
┌─────────────────────┐
│   NEWS AGENT        │
│   (client)          │
└──────────┬──────────┘
           │ HTTP calls
           ↓
┌─────────────────────┐
│ PREFERENCE AGENT    │ ← FastAgent with LLM intelligence
│ (port 8082)         │
└──────────┬──────────┘
           │ MCP tool calls
           ↓
┌─────────────────────┐
│ PREFERENCE TOOLS    │ ← FastMCP server with ChromaDB tools
│ (port 8081)         │
└──────────┬──────────┘
           │
           ↓
    [ChromaDB Storage]
```

### Why This Pattern Works

- ✅ **Separation of concerns**: Tools vs intelligence clearly separated
- ✅ **Composability**: Tools can be used by multiple agents
- ✅ **Testability**: Each layer can be tested independently
- ✅ **Scalability**: Servers run as separate services
- ✅ **Reusability**: The preference agent can serve multiple news agents

**This is a functioning multi-agent architecture!** The same pattern used here can scale to teams of dozens of specialized agents.

---

# Decision Framework: When to Split Agents

We've built a multi-agent system that works for this. But when should YOU split YOUR agents?

Here's a framework for making that decision:

### Three-Question Framework for Splitting Agents

[![](./media/session03checkpoint03_decision_framework.png)](https://mermaid.live/edit#pako:eNptUstu2zAQ_JUFz7JrvfwQihiBcwmKFjDcHlrbB1pcWUQoUiCpJK7sY3rqqefm5_wloWg7bYryQOwOuTuD2W1JrhiSjGw1rUv4fLOS4M7CUm2Xi1I1gsEtmFpwC7bkBugWpZ2uode7gnnYXmt0OLpbUttoKt5v9LurjWoko5qjmfqcqYpyaQKoS2rQBB40mDea210AGu85PhxO1POw673_pPaw4HIrcHl8_gkfEOtzDtedhPWb31_R7GEetbcGkOYl1BxzhFxVtcBHz4ZSNdsSCqXBNBtjqbT8rJZLhjW6S1p4UPpuelES_aPkDXyijNsZlZ0DO1_q-1XKWLGDv9qK3cmHQuWNQeaECUE3SlPLlTy5IZV1sPTCXKA049I_X8TE_xcT_xHzsRGWL4-_n2DmGnGG-gT1vGGe5YtB4BY74nt0vhdcYnV5ZJoWFo4_fp0H8hoaXJPAbQhnJLO6wYBUqN1EXUraTseKOAcqXJHMhW7wdyuykgdXU1P5TanqUqa7GZCsoMK4rKkZtXjDqdu96hXVnWV65jbIkiyMRr4JyVrySLI4TvuDQTQZD8fhOEndCciOZL20P4qSMJpE8WQYR4M0Sg4B-e6JB_3JKExHgzQN40mahMPk8AINa_3x)

- Are there natural boundaries? Domains, phases, security, review, etc.
- Is each piece complex enough for substantial independent work?
- Can they work mostly independently? Focused collaboration, not constant coordination.

If you answered YES to all of these questions, you've found a great opportunity to split your agent. Otherwise, you can stick to one.

**Apply this framework to any agent that's growing too complex.**

---

## Anti-Pattern: Constant Coordination

The key is **focused collaboration**, not constant coordination.

### ❌ Bad: Constant Coordination
```
Agent A: Should I fetch article 1?
Agent B: Let me check... yes
Agent A: Should I fetch article 2?
Agent B: Let me check... no
Agent A: Should I fetch article 3?
Agent B: Let me check... yes
...
```
**Problem:** No independent work! This should be ONE agent.

### ✅ Good: Focused Collaboration  
```
Agent A: [Works 10 min - fetches 20 articles, creates draft]
Agent A: Here's my complete draft. Feedback?
Agent B: [Analyzes 2 min]
Agent B: Good! Adjust X and Y.
Agent A: [Revises 5 min]
Agent A: Revised. Better?
Agent B: Approved!
```
**Success:** Most of the work was independent, with specific and useful handoffs.

---

## Examples: Applying the Framework

| Use Case | Boundaries? | Complex Enough? | Independent? | Verdict |
|----------|-------------|-----------------|--------------|---------|
| **Customer service** (triage → research → response) | ✅ Clear phases | ✅ Each substantial | ✅ Work independently, collaborate at handoffs | ✅ **Good candidate** |
| **Simple calculator** | ❌ No natural split | ❌ Too simple | N/A | ❌ **Stay single agent** |
| **Our newspaper** (research + preferences) | ✅ Different domains | ✅ Both complex | ✅ Review pattern works | ✅ **Good candidate** |

---

# Important Caveats

Multi-agent systems have many benefits, but also introduce unique challenges. Be aware of:

## Unintended Social Dynamics

- [Here's](https://arxiv.org/abs/2506.03053) a study showing LLMs have an analogue to peer pressure when working together. Its takeaway is that systems of agents need to be evaluated holistically, not just considering one agent at a time.
- Anthropic's [Claude 4 system card](https://www-cdn.anthropic.com/6be99a52cb68eb70eb9572b4cafad13df32ed995.pdf) notes that extended interactions between models consistently resulted in a surprising convergence towards spiritual and mystical themes. I have not seen this happen, myself.
- **Mitigations**: Clear boundaries of responsibilities and tasks, diverse models, monitoring

## Error Cascades

- One agent's error can compound - if the news agent incorrectly-summarized articles, the preference agent might incorrectly allow it in the final product.
- **Mitigations**: Tools return data from as close to a source of truth as possible; operator overrides depending on the use case

## Evaluation Challenges

- The [Claude 4.5 Sonnet system card](https://www.anthropic.com/claude-sonnet-4-5-system-card) notes that the model sometimes infers when it is placed in a testing scenario. Anthropic did this in the context of alignment testing, but it could apply to our applications, too.
- The [GPT-5 system card](https://openai.com/index/gpt-5-system-card/) notes a similar finding.
- **Mitigations**: Production-like testing, diverse scenarios, monitoring

---

# Session Summary

## Key Takeaways

### Problems with Monolithic Agents
1. Context rot and tool bloat
2. Tokens cost money - so mistakes waste it
3. Context windows are limited

### Multi-Agent Solutions
1. Improve separation of concerns
2. Automatic specialization through toolsets
3. Security through isolation
4. Fewer tools, less ambiguity

### The Reviewer Agent Pattern
- Draft → Review → Revise → Approve
- Should have substantial independent work between checkpoints
- Collaboration, not micromanagement
- Agents exposed as tools via MCP

### Decision Framework
1. Natural boundaries? (domains, phases, security, review)
2. Each piece complex enough?
3. Mostly independent work possible?

*If yes to all → Consider splitting an agent*


---

# Experimenting Further

The servers are still running in this notebook. If you want to see the full system in action:

```bash
cd client
python news-agent-client.py
```

You'll see the news agent working through its workflow - discovering stories, creating drafts, then calling the preference agent's `chat` tool for review. The preference agent searches through ChromaDB, validates alignment against stored patterns, and updates interests automatically.

## Things Worth Exploring

**Modify the preference agent's instruction.** Right now it's configured as a "quality gate" - firm, assertive, explicit verdicts. What happens if you make it more collaborative? More strict? Watch how the news agent adapts to different feedback styles. The instruction is in the `@preference_agent_app.agent()` decorator above.

**Add temporal patterns.** We stored morning (brief updates), evening (deep dives), and weekend (entertaining) patterns. What about late night? Early morning? Lunch break? Add patterns in the `simulate_learning` cell and see if the agent learns to recognize them when reviewing content.

**Add more agents.** We split out preferences, but what else could we do? News researchers and news writers? News publishers and editors? How much is too much? I suggest looking at [this Anthropic blog post](https://www.anthropic.com/engineering/multi-agent-research-system) on their deep research system for more ideas.
