# Multi-Agent Rag Lab

Throughout this course, we have worked extensively with Python notebooks (JupyterHub/Colab), which are excellent for learning, experimentation, and visualization. However, to implement solutions in real-world scenarios, we need to:

1. Organize code into modular Python files (.py)
2. Understand proper project structure and file organization
3. Learn to run models on our own hardware (not cloud services)
4. Manage dependencies and environments independently
5. Build systems with multiple independently running components

**Important Note**: This lab is designed to be run **locally** on your own computer. It is **not intended** for cloud-based environments such as JupyterHub, Google Colab, Hellbender, Nautilus, or similar platforms. While it may be possible to adapt this lab for cloud environments, it would require additional considerations and modifications that are beyond the scope of this tutorial.

This notebook guides you through building a multi-agent RAG system with:
- Two specialized agents (Retrieval Agent & Response Agent)
- HuggingFace SQuAD dataset
- Small LLMs
- Simulated MCP servers

## Prerequisites
- Previous knowledge about LLMs and how to inference them
- Basic NLP knowledge
- Ideally, this should be run in a high-performance environment
- To address the resource constraints we are using smaller extractive models. It still might take some time to run.
- Python is installed on your device or environment

## LAB STRUCTURE:
1. Setup & Installation
2. Understanding the Architecture
3. Building Retrieval Agent (extractor.py)
4. Building Response Agent (generator.py)
5. Orchestration System (client.py)
6. Exercises & Experiments

## STEP 1: Create and Activate Virtual Environment

Virtual environments keep dependencies isolated and prevent conflicts.

### FOR WINDOWS:

Open Command Prompt or PowerShell:

```
cd path/to/your/project
python -m venv rag_env
rag_env\Scripts\activate
```

If an Unauthorized access error occurs, use the command:
```
Set-ExecutionPolicy -Scope Process -ExecutionPolicy Bypass
```
before running `rag_env\Scripts\activate`

If you need to run multiple terminals for the same environment, just use:
```
cd path/to/your/project
Set-ExecutionPolicy -Scope Process -ExecutionPolicy Bypass
rag_env\Scripts\activate
```

Because the python environment (rag_env) you created will be saved in your project folder

### FOR MAC/LINUX:

Open Terminal:

```bash
cd path/to/your/project
python3 -m venv rag_env
source rag_env/bin/activate
```

For subsequent terminal sessions, navigate to your project and run:
```bash
cd path/to/your/project
source rag_env/bin/activate
```

### VERIFY ACTIVATION:

**Windows:** You should see (rag_env) at the start of your command line:
```
(rag_env) C:\Users\YourName\project>
```

**Mac/Linux:** You should see (rag_env) at the start of your terminal prompt:
```
(rag_env) username@computername:~/project$
```

**<span style="color: red;">From this point onwards we assume that you are running commands in this **rag_env** python environment.</span>**

## STEP 2: Install Required Libraries in Virtual Environment

Make sure your virtual environment is activated (see VERIFY ACTIVATION section above)

### INSTALLATION COMMAND:

Copy and paste this entire command into your terminal:

**For all platforms:**
```bash
pip install --upgrade pip
pip install mcp datasets sentence-transformers torch transformers
```

**Note for Mac users with Apple Silicon (M1/M2/M3):**
If you encounter issues with PyTorch installation, you may need to install the Apple Silicon optimized version:
```bash
pip install torch torchvision torchaudio
```

## Project Directory Structure

Let's create the proper folder structure for the lab.

We plan to save our python files using following structure:
```
path/to/your/project/
├── extractor.py
├── generator.py
├── client.py
```

## UNDERSTANDING INDEPENDENT AGENTS

**KEY CONCEPT**: In notebooks, code runs sequentially in cells.
In this lab, you're building INDEPENDENT agents that:
- Live in separate Python files (when deployed in larger scale in servers)
- Have their own responsibilities
- Communicate through well-defined interfaces (MCP protocol)
- Can be tested and deployed independently

This is how real-world AI systems are built.

### ARCHITECTURE OF INDEPENDENT AGENTS:

**RETRIEVAL AGENT (extractor.py):**
- **Role**: Information specialist
- **Independence**: Can run and be tested without Response Agent
- **Responsibilities**:
  - Decide HOW to search (strategy selection)
  - Execute searches across multiple sources
  - Rank and filter results
  - Return structured data
- **Interface**: receive query → return ranked results

**RESPONSE AGENT (generator.py):**
- **Role**: Question understanding and answer generation
- **Independence**: Can run with ANY retrieval system
- **Responsibilities**:
  - Analyze user questions
  - Determine information needs
  - Request data from Retrieval Agent
  - Synthesize answers
- **Interface**: receive question → return answer

**KEY INSIGHT:**
The agents don't need to know HOW each other works internally. They only need to know each other's INTERFACE (inputs/outputs).

This is a fundamental principle that makes systems:
- Easier to understand
- Easier to modify
- Easier to test
- More reliable

**COMMUNICATION FLOW:**
1. client creates both agents
2. user asks a question
3. client receives the question
4. client sends the question as a query to extractor
5. extractor receives the query
6. extractor sends the related context to client
7. client receives the context to answer the question
8. client sends question and context to generator
9. generator generates the answer and sends back to client
10. client presents the answer to user

This is fundamentally different from a notebook where everything is in one scope!

***More advanced independent agents can learn by experience!***

## STEP 3: Building the Extractor Agent: extractor.py

**FILE**: extractor.py

Copy this entire code block to a file named 'extractor.py' in your project directory

The Retrieval Agent is responsible for:
- Loading SQuAD dataset
- Save it in a simple vector store
- Executing searches using the vector store
- Ranking and filtering results
- Returning high-quality context for answer generation

We will use SQUAD dataset as a simple example: https://huggingface.co/datasets/rajpurkar/squad

***In a standard AI agent system, each agent is a self-contained process that can:***
1. Communicate with other agents through a standard protocol (MCP/jsonrpc)
2. Expose tools (capabilities) that other agents can discover and use (get_tools method)
3. Handle structured requests and responses via an agreed-upon message format (call_tool method)
4. Operate independently yet participate in a larger orchestration network

MCP (Model Context Protocol) is an open standard introduced by Anthropic (and adopted by OpenAI, Hugging Face, etc.) that defines how models, tools, and clients communicate in a consistent, interoperable way. It's designed to make agents and tools portable across environments — e.g., the same agent can be used in Claude, ChatGPT, or a local orchestrator.

It defines the structure for:
1. Initialization (initialize)
2. Tool discovery (tools/list)
3. Tool invocation (tools/call)
4. Events, resources, and capabilities

Note how we structure our agent using these principles in the following agent:

```python
#extractor.py
"""
Extractor MCP Server - loads SQuAD dataset and performs semantic retrieval.
"""

import asyncio
import json
import sys
import numpy as np
from datasets import load_dataset
from sentence_transformers import SentenceTransformer
from typing import Any, Dict, List

sys.stdout.reconfigure(line_buffering=True)

class ExtractorServer:
    def __init__(self):
        print("Loading SQuAD dataset (first 100 rows)", file=sys.stderr)
        #To keep this lightweight we will only use the first 100 rows of the dataset
        dataset = load_dataset("squad", split="train[:100]")
        
        self.documents = [{"title": row["title"], "context": row["context"]} for row in dataset]

        print("Loading embedding model", file=sys.stderr)
        #Using the embedding model "all-MiniLM-L6-v2"
        self.encoder = SentenceTransformer("all-MiniLM-L6-v2")
        
        self.doc_embeddings = self.encoder.encode(
            [d["context"] for d in self.documents],
            show_progress_bar=False, 
            convert_to_numpy=True
        )
        print("Extractor ready with (100 docs embedded)", file=sys.stderr)

    #Define Server info 
    def get_server_info(self) -> Dict[str, Any]:
        return {"name": "extractor-server", "version": "2.0.1"}

    #Defining the extraction of relevant data as a tool. These agents use a variety of tools to conduct their operation
    #We name this tool "extract_info"
    #Other agents can call this method and find out what tools this agent has.
    def get_tools(self) -> List[Dict[str, Any]]:
        return [{
            "name": "extract_info",
            "description": "Performs semantic retrieval using embedded SQuAD contexts.",
            "inputSchema": {
                "type": "object",
                "properties": {"question": {"type": "string"}},
                "required": ["question"]
            }
        }]

    #Defining the call_tool method
    #If other agents want to use this agent, they can call this method, 
    #and specify that they need to use "extract_info" method and give the inputs that it asks.
    def call_tool(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]:
        if name != "extract_info":
            raise ValueError(f"Unknown tool: {name}")
        return self._extract_info(args)

    #Defining the _extract_info method
    #This is the actual action that the method will execute

    def _extract_info(self, args: Dict[str, Any]) -> Dict[str, Any]:
        #For a given question it will find the top k related documents.
        q = args["question"]
        q_emb = self.encoder.encode([q], convert_to_numpy=True)[0]
        sims = np.dot(self.doc_embeddings, q_emb)
        top = np.argsort(sims)[-3:][::-1]

        docs = [self.documents[i] for i in top]
        combined = "\n\n".join([f"{i+1}. {d['context']}" for i, d in enumerate(docs)])
        #Notice that how we format these responses. This is a Standard Structure in MCPs. 
        #So that different agents can communicate using these standard data structures.
        return {"content": [{"type": "text", "text": combined}]} 

async def main():
    #Creating extractor agent as a server
    srv = ExtractorServer()

    #Now, the following part is about setting up the server.
    #Most of the following code is standard practice.
    #This specify the structure of input and output structures, so that any other agent can discover this service and call the methods/tools
    #MCP protocol use jsonrpc as the underlying communication protocol.
    while True:
        line = sys.stdin.readline()
        if not line:
            break
        try:
            req = json.loads(line.strip())
            if req["method"] == "initialize":
                res = {"jsonrpc": "2.0","id": req["id"],
                       "result": {"protocolVersion": "2024-11-05",
                                  "capabilities": {},
                                  "serverInfo": srv.get_server_info()}}
            elif req["method"] == "tools/list":
                res = {"jsonrpc": "2.0","id": req["id"],
                       "result": {"tools": srv.get_tools()}}
            elif req["method"] == "tools/call":
                name = req["params"]["name"]
                args = req["params"]["arguments"]
                result = srv.call_tool(name, args)
                res = {"jsonrpc": "2.0","id": req["id"],"result": result}
            else:
                res = {"jsonrpc": "2.0","id": req["id"],
                       "error": {"code": -32601,"message": "Unknown method"}}
            print(json.dumps(res), flush=True)
        except Exception as e:
            print(json.dumps({"jsonrpc": "2.0","id": None,
                              "error": {"code": -32603,"message": str(e)}}), flush=True)

if __name__ == "__main__":
    #Note, that this server can be run alone, independent of other agents or services.
    asyncio.run(main())
```

To run this agent independently in the terminal:

**Windows:**
```
python extractor.py
```

**Mac/Linux:**
```bash
python3 extractor.py
```

If you can see the following output, your agent is working perfectly:
```
Loading SQuAD dataset (first 100 rows)
Loading embedding model
Extractor ready with (100 docs embedded)
```


## STEP 4: Building the Response Agent: generator.py

**FILE**: generator.py

Copy this entire code block to a file named 'generator.py' in your project directory

The Response Agent is responsible for:
- Loading LLMs
- Generating final answers

Note that this generator agent is also a standard Agent.

***This also follows the same standards:***
1. Communicate with other agents through a standard protocol 
2. Expose tools (capabilities) that other agents can discover and use 
3. Handle structured requests and responses via an agreed-upon message format
4. Operate independently yet participate in a larger orchestration network

```python
#generator.py
"""
Generator MCP Server - uses FLAN-T5-small to generate answers.
"""

import asyncio
import json
import sys
import torch
from transformers import pipeline
from typing import Any, Dict, List

sys.stdout.reconfigure(line_buffering=True)

class GeneratorServer:
    def __init__(self):
        print("Loading FLAN-T5-small.", file=sys.stderr)
        self.llm = pipeline(
            "text2text-generation",
            model="google/flan-t5-small",
            device=0 if torch.cuda.is_available() else -1
        )
        print("Generator ready!", file=sys.stderr)

    def get_server_info(self) -> Dict[str, Any]:
        return {"name": "generator-server", "version": "2.0.1"}

    #Defining the structure of the tool generate_answer
    def get_tools(self) -> List[Dict[str, Any]]:
        return [{
            "name": "generate_answer",
            "description": "Generate a concise answer using FLAN-T5-small given question + context.",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "question": {"type": "string"},
                    "context": {"type": "string"}
                },
                "required": ["question", "context"]
            }
        }]

    #Method to call the tool
    def call_tool(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]:
        if name != "generate_answer":
            raise ValueError(f"Unknown tool: {name}")
        return self._generate(args)

    #_generate the answer using the given context
    def _generate(self, args: Dict[str, Any]) -> Dict[str, Any]:
        #Arguments passed by the service caller contains the information needed in this context "question" and "context"
        q, ctx = args["question"], args["context"]
        prompt = f"Answer the question based on context.\n\nContext:\n{ctx}\n\nQuestion: {q}\nAnswer:"
        out = self.llm(prompt, max_length=200, num_return_sequences=1)[0]["generated_text"]
        return {"content": [{"type": "text", "text": out}]}

async def main():
    #Creation of the generator agent as separate server
    srv = GeneratorServer()
    while True:
        line = sys.stdin.readline()
        if not line:
            break
        try:
            req = json.loads(line.strip())
            if req["method"] == "initialize":
                res = {"jsonrpc":"2.0","id":req["id"],
                       "result":{"protocolVersion":"2024-11-05",
                                 "capabilities":{},
                                 "serverInfo":srv.get_server_info()}}
            elif req["method"] == "tools/list":
                res = {"jsonrpc":"2.0","id":req["id"],
                       "result":{"tools":srv.get_tools()}}
            elif req["method"] == "tools/call":
                name=req["params"]["name"]
                args=req["params"]["arguments"]
                result=srv.call_tool(name,args)
                res={"jsonrpc":"2.0","id":req["id"],"result":result}
            else:
                res={"jsonrpc":"2.0","id":req["id"],
                     "error":{"code":-32601,"message":"Unknown method"}}
            print(json.dumps(res),flush=True)
        except Exception as e:
            print(json.dumps({"jsonrpc":"2.0","id":None,
                              "error":{"code":-32603,"message":str(e)}}),flush=True)

if __name__=="__main__":
    asyncio.run(main())
```

Now you can run this independently:

**Windows:**
```
python generator.py
```

**Mac/Linux:**
```bash
python3 generator.py
```

If you can see this response, you can notice that now the small LLM is loaded and ready to use:
```
Loading FLAN-T5-small.
Generator ready!
```

## STEP 5: Bringing it all together

**FILE**: client.py

Copy this entire code block to a file named 'client.py' in your project folder

This file orchestrates the entire system:
- Starts both agents
- Initializes MCP connections
- Provides the main query interface

```python
#!/usr/bin/env python3
"""
Q&A MCP Orchestrator Client - coordinates extractor + generator.
"""

import asyncio
import json
import subprocess
import sys
from typing import Dict, Any

class QAClient:
    def __init__(self):
        self.req_id = 0
        self.servers: Dict[str, subprocess.Popen] = {}

    def _next_id(self):
        self.req_id += 1
        return self.req_id
        
   #Standard methods to communicate with the other agents/services
    async def _send(self, proc, req):
        proc.stdin.write(json.dumps(req) + "\n")
        proc.stdin.flush()
        line = await asyncio.get_event_loop().run_in_executor(None, proc.stdout.readline)
        if not line:
            raise RuntimeError("No response from subprocess; check stderr for errors.")
        return json.loads(line.strip())

    #Standard methods connect with other services
    async def connect(self, name: str, script: str):
        #Since we are using this orchestrator to start other services, we set it up as two subprocesses
        proc = subprocess.Popen([sys.executable, script],
                                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE, text=True)
        self.servers[name] = proc
        print(f"Starting {name}...", file=sys.stderr)
        await asyncio.sleep(3)  # allow time to load the models, if loading of your models takes time, and if you get a timeout error, 
                                # try increasing this sleep time

        await self._send(proc, {"jsonrpc": "2.0", "id": self._next_id(),
                                "method": "initialize",
                                "params": {"protocolVersion": "2024-11-05", "capabilities": {}}})
        tools = await self._send(proc, {"jsonrpc": "2.0", "id": self._next_id(),
                                        "method": "tools/list", "params": {}})
        print(f"Connected to {name} ({[t['name'] for t in tools['result']['tools']]})") 

    #Method to call the tools
    async def call_tool(self, server: str, tool: str, args: Dict[str, Any]):
        proc = self.servers[server]
        req = {"jsonrpc": "2.0", "id": self._next_id(),
               "method": "tools/call",
               "params": {"name": tool, "arguments": args}}
        res = await self._send(proc, req)
        return res["result"]["content"][0]["text"]

    async def run(self):
        #Now notice in this run method, we are calling the two agents that we created.
        await self.connect("extractor", "extractor.py")
        await self.connect("generator", "generator.py")

        #Now we create a terminal input option to input a question.
        print("\nAsk a question (type 'quit' to exit)")
        while True:
            q = input("\nQ> ").strip()
            if q.lower() in {"quit", "exit"}:
                break

            print("Extracting relevant context...")

            #This is how we call a tool, we are calling the extractor agent, extract_info method and passing down the question
            context = await self.call_tool("extractor", "extract_info", {"question": q})

            
            print("Retrieved context snippet:")
            print(context[:300] + "..." if len(context) > 300 else context)

            print("\nGenerating answer...")

            #Now that we have the context, let's call the generator to generate the answer.
            #Notice that now we call, generator agent, and generate_answer tool and passing down 
            #question and context both.

            answer = await self.call_tool("generator", "generate_answer",
                                          {"question": q, "context": context})
            print("\nFinal Answer:\n" + answer)

        for p in self.servers.values():
            p.terminate()

if __name__ == "__main__":
    asyncio.run(QAClient().run())
```

To run the complete system:

**Windows:**
```
python client.py
```

**Mac/Linux:**
```bash
python3 client.py
```

If you can see the following output, your system is working correctly:
```
Starting extractor...
Connected to extractor (['extract_info'])
Starting generator...
Connected to generator (['generate_answer'])

Ask a question (type 'quit' to exit)

Q> 
```


Note that based on your system performance this might take some time.

## STEP 6: Running experiments

Try asking "What is in front of the Notre Dame Main Building?"

Remember that we have only used first 100 rows of the dataset. So use questions from that context.

Now you know the basic standards and the way to setup an agentic AI framework. You can customize these agents to add more tools and use them as you need!

## Troubleshooting

### Common Issues:

1. **Permission errors on Windows**: Make sure to run `Set-ExecutionPolicy -Scope Process -ExecutionPolicy Bypass` before activating your environment.

2. **Python command not found on Mac/Linux**: Try using `python3` instead of `python` for all commands.

3. **Model loading takes too long**: If you get timeout errors, increase the `await asyncio.sleep(3)` time in client.py to `await asyncio.sleep(5)` or higher.

4. **Memory issues**: The models are kept small to accommodate various systems, but if you still face memory issues, consider closing other applications.

5. **Import errors**: Make sure your virtual environment is activated and all packages are installed correctly.

## Cloud Platform Considerations

If you want to adapt this lab for cloud platforms (JupyterHub, Google Colab, etc.), consider these additional requirements:
- Modified subprocess management for containerized environments
- Adjusted file paths and permissions
- Potential networking restrictions for inter-process communication
- Resource limitations that may affect model loading
- Different Python executable paths and environment management

These adaptations are beyond the scope of this tutorial and would require platform-specific modifications.