In [None]:
%pip install --quiet langchain langchain_community langgraph langchain_openai
%pip install --quiet langchain-experimental langgraph-supervisor
%pip install --quiet tavily-python httpx pandas tabulate matplotlib seaborn

In [None]:
# Set API Keys
import os
os.environ["OPENAI_API_KEY"] = "sk-proj-"
os.environ["TAVILY_API_KEY"] = "tvly-dev-"

In [None]:
import httpx
import json
import os
import logging
import pandas as pd

from typing import Dict, Any
from datetime import datetime

from tavily import TavilyClient
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import InMemorySaver

from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.prompts import ChatPromptTemplate
from langchain.agents.agent_types import AgentType
from langchain_experimental.agents.agent_toolkits import create_pandas_dataframe_agent

---

### **Set Clients**

In [None]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

llm_summarizer_agent = ChatOpenAI(api_key=os.environ["OPENAI_API_KEY"], model="gpt-4.1", temperature=0.1)
llm_worker_agent = ChatOpenAI(api_key=os.environ["OPENAI_API_KEY"], model="gpt-4o-mini", temperature=0.1)

tavily_client = TavilyClient(api_key=os.environ["TAVILY_API_KEY"])

---

### **Set Utils for Scratchpad Memory**

In [None]:
# Scratchpad Memory Utils
MEMORY_FILE = "scratchpad_memory.json"

def initialize_memory_file():
    """Initialize the memory file if it doesn't exist"""
    if not os.path.exists(MEMORY_FILE):
        initial_memory = {
            "created_at": datetime.now().isoformat(),
            "last_updated": datetime.now().isoformat(),
            "documentations": {}
        }
        with open(MEMORY_FILE, 'w', encoding='utf-8') as f:
            json.dump(initial_memory, f, indent=2, ensure_ascii=False)
        print(f"Initialized memory file: {MEMORY_FILE}")

def load_memory() -> Dict[str, Any]:
    """Load the memory file"""
    logger.info(f"Loading memory from file: {MEMORY_FILE}")
    try:
        with open(MEMORY_FILE, 'r', encoding='utf-8') as f:
            memory_data = json.load(f)
            logger.info(f"Memory loaded successfully. Found {len(memory_data.get('documentations', {}))} documentation entries")
            return memory_data
    except FileNotFoundError:
        logger.warning(f"Memory file {MEMORY_FILE} not found, initializing...")
        initialize_memory_file()
        return load_memory()
    except Exception as e:
        logger.error(f"Error loading memory: {e}")
        print(f"Error loading memory: {e}")
        return {"documentations": {}}

def save_memory(memory_data: Dict[str, Any]):
    """Save the memory file"""
    logger.info(f"Saving memory to file: {MEMORY_FILE}")
    try:
        memory_data["last_updated"] = datetime.now().isoformat()
        with open(MEMORY_FILE, 'w', encoding='utf-8') as f:
            json.dump(memory_data, f, indent=2, ensure_ascii=False)
        logger.info(f"Memory saved successfully to {MEMORY_FILE}")
        print(f"Memory saved to {MEMORY_FILE}")
    except Exception as e:
        logger.error(f"Error saving memory: {e}")
        print(f"Error saving memory: {e}")

# Initialize memory file
initialize_memory_file()

---

### **Set Utils for CSV Memory**

In [None]:
# CSV Memory Utils
CSV_MEMORY_FILE = "csv_memory.json"

def initialize_csv_memory_file():
    """Initialize the CSV memory file if it doesn't exist"""
    if not os.path.exists(CSV_MEMORY_FILE):
        initial_csv_memory = {
            "created_at": datetime.now().isoformat(),
            "last_updated": datetime.now().isoformat(),
            "csv_data": {}
        }
        with open(CSV_MEMORY_FILE, 'w', encoding='utf-8') as f:
            json.dump(initial_csv_memory, f, indent=2, ensure_ascii=False)
        print(f"Initialized CSV memory file: {CSV_MEMORY_FILE}")

def load_csv_memory() -> Dict[str, Any]:
    """Load the CSV memory file"""
    logger.info(f"Loading CSV memory from file: {CSV_MEMORY_FILE}")
    try:
        with open(CSV_MEMORY_FILE, 'r', encoding='utf-8') as f:
            csv_memory_data = json.load(f)
            logger.info(f"CSV memory loaded successfully. Found {len(csv_memory_data.get('csv_data', {}))} CSV entries")
            return csv_memory_data
    except FileNotFoundError:
        logger.warning(f"CSV memory file {CSV_MEMORY_FILE} not found, initializing...")
        initialize_csv_memory_file()
        return load_csv_memory()
    except Exception as e:
        logger.error(f"Error loading CSV memory: {e}")
        print(f"Error loading CSV memory: {e}")
        return {"csv_data": {}}

def save_csv_memory(csv_memory_data: Dict[str, Any]):
    """Save the CSV memory file"""
    logger.info(f"Saving CSV memory to file: {CSV_MEMORY_FILE}")
    try:
        csv_memory_data["last_updated"] = datetime.now().isoformat()
        with open(CSV_MEMORY_FILE, 'w', encoding='utf-8') as f:
            json.dump(csv_memory_data, f, indent=2, ensure_ascii=False)
        logger.info(f"CSV memory saved successfully to {CSV_MEMORY_FILE}")
        print(f"CSV memory saved to {CSV_MEMORY_FILE}")
    except Exception as e:
        logger.error(f"Error saving CSV memory: {e}")
        print(f"Error saving CSV memory: {e}")

def store_csv_data(csv_name: str, csv_content: str, source: str = "OpenF1"):
    """Store CSV data in persistent file"""
    csv_memory = load_csv_memory()
    csv_memory["csv_data"][csv_name] = {
        "content": csv_content,
        "source": source,
        "stored_at": datetime.now().isoformat(),
        "size": len(csv_content)
    }
    save_csv_memory(csv_memory)
    print(f"CSV data stored: {csv_name} ({len(csv_content)} characters)")

def get_csv_data(csv_name: str) -> str:
    """Get CSV data from persistent file"""
    csv_memory = load_csv_memory()
    if csv_name in csv_memory.get("csv_data", {}):
        return csv_memory["csv_data"][csv_name]["content"]
    return None

def list_available_csvs() -> Dict[str, Any]:
    """List all available CSV datasets in persistent storage"""
    csv_memory = load_csv_memory()
    csv_data = csv_memory.get("csv_data", {})
    
    if not csv_data:
        return {"message": "No CSV datasets available"}
    
    result = {"available_datasets": {}}
    for name, data in csv_data.items():
        result["available_datasets"][name] = {
            "source": data["source"],
            "stored_at": data["stored_at"],
            "size": data["size"]
        }
    return result

# Helper function for loading DataFrames
def load_dataframe_from_csv(csv_name: str) -> pd.DataFrame:
    """Load DataFrame from CSV data stored in persistent file"""
    # Get CSV content from persistent storage
    csv_content = get_csv_data(csv_name)
    if csv_content is None:
        raise ValueError(f"CSV '{csv_name}' not found in persistent storage")
    
    # Load DataFrame from CSV content
    from io import StringIO
    df = pd.read_csv(StringIO(csv_content))
    
    print(f"DataFrame loaded: {csv_name} ({df.shape[0]} rows, {df.shape[1]} columns)")
    return df

# Initialize CSV memory file
initialize_csv_memory_file()


---

### **Tools for Context Agent**

In [None]:
@tool
def extract_documentation_from_website(url: str) -> str:
    """
    Extract documentation content from a given website URL.
    This tool extracts raw documentation text from a website, 
    which can later be processed or analyzed to identify API endpoints, 
    parameters, and other technical details.
    """
    try:
        content_response = tavily_client.extract(urls=url)

        if content_response and len(content_response) > 0:
            return f"Documentation extracted from {url}:\n\n{content_response}"
       
        else:
            return f"Could not extract content from {url}. Please check the URL and try again."
            
    except Exception as e:
        return f"Error extracting documentation from {url}: {str(e)}"

@tool
def extract_documentation_from_local_file(file_path: str = "_api_endpoints.txt") -> str:
    """
    Load API endpoints documentation from a local file.
    This tool reads the _api_endpoints.txt file directly, avoiding the need for web scraping.
    The file contains comprehensive OpenF1 API documentation with all endpoints, parameters, and examples.
    """
    logger.info(f"Starting extract_documentation_from_local_file with file_path: {file_path}")
    
    try:
        # Check if file exists
        logger.info(f"Checking if file exists: {file_path}")
        if not os.path.exists(file_path):
            logger.error(f"File not found: {file_path}")
            return f"File {file_path} not found. Please ensure the file exists in the current directory."
        
        logger.info(f"File exists, attempting to read content...")
        # Read the file content
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        logger.info(f"File read successfully. Content length: {len(content)} characters")
        
        if not content.strip():
            logger.warning(f"File {file_path} is empty")
            return f"File {file_path} is empty."
        
        logger.info(f"Returning documentation content successfully")
        return content
        
    except Exception as e:
        logger.error(f"Error loading API endpoints from {file_path}: {str(e)}")
        return f"Error loading API endpoints from {file_path}: {str(e)}"

In [None]:
# Simplified Debug Tools
@tool
def debug_csv_storage() -> str:
    """
    Debug tool to check the current state of CSV storage.
    This helps diagnose issues with data sharing between agents.
    """
    try:
        result = "CSV STORAGE DEBUG:\n\n"
        
        # Check persistent storage
        csv_memory = load_csv_memory()
        csv_data = csv_memory.get("csv_data", {})
        result += f"Persistent CSV storage: {len(csv_data)} items\n"
        for name, data in csv_data.items():
            result += f"  - {name}: {data['size']} chars, source: {data['source']}\n"
        
        return result
        
    except Exception as e:
        return f"Error in debug_csv_storage: {str(e)}"

@tool
def list_available_data() -> str:
    """
    List all available data sources for analysis.
    This provides a comprehensive view of what data is available.
    """
    try:
        result = "AVAILABLE DATA SOURCES\n\n"
        
        # Persistent Storage
        result += "CSV Storage:\n"
        csv_memory = load_csv_memory()
        csv_data = csv_memory.get("csv_data", {})
        if csv_data:
            for name, data in csv_data.items():
                result += f"   - {name}: {data['size']} chars, source: {data['source']}\n"
        else:
            result += "   - No CSV data available\n"
        
        return result
        
    except Exception as e:
        return f"Error listing available data: {str(e)}"


In [None]:
@tool
def create_documentation_summary(content: str) -> str:
    """
    Generate a concise summary of documentation content.
    This tool processes raw documentation text, removes irrelevant details, 
    and creates a well-formatted file with API endpoints and parameters.
    """
    logger.info(f"Starting create_documentation_summary")
    
    try:
        system_prompt_summary_agent = """
        You are an expert in cleaning and restructuring API documentation.

        You will receive raw documentation text. Your task is to transform it into a clear, structured, and developer-friendly reference.  
        Keep all technical content intact and discard only irrelevant website elements such as navigation menus, headers, footers, ads, or unrelated links.

        Always preserve and organize:
        - Endpoint names, HTTP methods, and URLs
        - Descriptions of what each endpoint does
        - Parameters and filters, including their names, types, constraints, defaults, and full descriptions
        - Response fields (attributes), with their names, meanings, units, and formats
        - Request and response body structures
        - Code snippets and sample URLs
        - Authentication rules and error codes
        - Notes, caveats, or important usage details

        Do not shorten or drop technical details. Reformat them into a clean layout with clear sections (e.g. Overview, Parameters, Attributes, Examples, Notes). Use concise organization (headings, short lists, or tables) to improve readability, but keep all information provided by the source.

        Your goal: produce a complete, accurate, and well-organized version of the documentation that is easy for developers to use, without losing any meaningful technical content.
        """

        logger.info("Creating prompt template...")
        prompt = ChatPromptTemplate.from_messages([
            ("system", system_prompt_summary_agent),
            ("user", "Here is the content to be summarized: {content}")
        ])
        
        logger.info("Invoking LLM summarizer agent...")
        formatted_prompt = prompt.format(content=content)
        logger.info(f"Formatted prompt length: {len(formatted_prompt)}")
        
        response = llm_summarizer_agent.invoke(formatted_prompt)
        
        logger.info(f"LLM response received. Response length: {len(response.content)}")
        logger.info(f"Response preview (first 200 chars): {response.content[:200]}...")
        
        return response.content
        
    except Exception as e:
        logger.error(f"Error in create_documentation_summary: {str(e)}")
        return f"Error creating documentation summary: {str(e)}"

In [None]:
@tool
def fetch_api_data(endpoint: str, parameters: Dict[str, Any] = None) -> str:
    """
    Fetch data from a specified API endpoint.
    This tool constructs the request URL with optional parameters, 
    performs an HTTP GET request, and returns the response content.
    For OpenF1 API endpoints, automatically requests CSV format and stores in memory.
    """
    try:
        # Check if this is an OpenF1 API endpoint
        is_openf1 = "api.openf1.org" in endpoint
        
        if parameters:
            # Convert parameters to query string
            param_strings = []
            for key, value in parameters.items():
                param_strings.append(f"{key}={value}")
            if param_strings:
                endpoint += "?" + "&".join(param_strings)
        
        # For OpenF1 API, automatically append csv=true parameter (only if not already present)
        if is_openf1 and "csv=true" not in endpoint:
            separator = "&" if "?" in endpoint else "?"
            endpoint += f"{separator}csv=true"
        
        # For OpenF1 API, check if CSV already exists before making request
        if is_openf1:
            csv_name = generate_csv_name(endpoint, parameters)
            existing_csv = get_csv_data(csv_name)
            if existing_csv:
                return f"CSV data already exists in memory as '{csv_name}' from {endpoint}\n\nData preview (first 5 lines):\n{existing_csv.split(chr(10))[:5]}\n\nNo new API call needed - using cached data."
        
        # Make the HTTP request
        with httpx.Client() as client:
            response = client.get(endpoint, timeout=30.0)
            response.raise_for_status()
            
            # For OpenF1 CSV responses, store in memory and return confirmation
            if is_openf1 and response.headers.get('content-type', '').startswith('text/csv'):
                # Generate CSV name based on endpoint (including all filters)
                csv_name = generate_csv_name(endpoint, parameters)
                store_csv_data(csv_name, response.text, "OpenF1")
                return f"CSV data fetched and stored as '{csv_name}' from {endpoint}\n\nData preview (first 5 lines):\n{response.text.split(chr(10))[:5]}"
            
            # For other APIs, try JSON first, then fall back to text
            try:
                data = response.json()
                return f"API Response from {endpoint}:\n\n{json.dumps(data, indent=2)}"
            except json.JSONDecodeError:
                return f"API Response from {endpoint}:\n\n{response.text}"
                
    except httpx.HTTPStatusError as e:
        return f"HTTP Error {e.response.status_code} when fetching {endpoint}: {e.response.text}"
    except httpx.TimeoutException:
        return f"Timeout error when fetching {endpoint}. The request took too long."
    except Exception as e:
        return f"Error fetching data from {endpoint}: {str(e)}"

def generate_csv_name(endpoint: str, parameters: Dict[str, Any] = None) -> str:
    """Generate a CSV name based on endpoint and all parameters (including URL filters)"""
    # Extract endpoint type (e.g., 'laps', 'sessions', 'drivers')
    endpoint_clean = endpoint.split('?')[0].split('/')[-1] if endpoint else "data"
    
    # Extract all parameters from URL and combine with passed parameters
    all_params = {}
    
    # Parse URL parameters
    if '?' in endpoint:
        url_params = endpoint.split('?')[1]
        for param in url_params.split('&'):
            if '=' in param:
                key, value = param.split('=', 1)
                if key != 'csv':  # Skip csv parameter
                    all_params[key] = value
    
    # Add passed parameters (overriding URL params if same key)
    if parameters:
        for key, value in parameters.items():
            if key != 'csv':  # Skip csv parameter
                all_params[key] = value
    
    # Create suffix from all parameters
    param_suffix = ""
    if all_params:
        param_parts = []
        for key, value in sorted(all_params.items()):
            # Clean parameter values for filename
            clean_value = str(value).replace('=', '').replace('&', '').replace('?', '').replace('<', 'lt').replace('>', 'gt')
            param_parts.append(f"{key}_{clean_value}")
        if param_parts:
            param_suffix = "_" + "_".join(param_parts)
    
    return f"openf1_{endpoint_clean}{param_suffix}.csv"


In [None]:
@tool
def look_memory() -> str:
    """
    Look up all stored API documentation summaries in the memory scratchpad.
    This tool checks what documentation is already available in memory, 
    avoiding the need to re-extract and re-summarize.
    """
    logger.info("Starting look_memory...")
    try:
        memory = load_memory()
        documentations = memory.get("documentations", {})
        
        logger.info(f"Found {len(documentations)} documentation entries in memory")
        
        if not documentations:
            logger.info("Memory is empty")
            return "Memory scratchpad is empty. No API documentation summaries found."
        
        # Return overview of all documentation
        result = f"Memory scratchpad contains {len(documentations)} API documentation summary(ies):\n\n"
        for doc_id, doc_data in documentations.items():
            result += f"**{doc_data.get('api_name', 'Unknown API')}**\n"
            result += f"   Source: {doc_data.get('source_url', 'Unknown')}\n"
            result += f"   Created: {doc_data.get('created_at', 'Unknown')}\n"
            result += f"   Endpoints: {doc_data.get('endpoints_count', 0)}\n\n"
        
        logger.info("Memory lookup completed successfully")
        return result
        
    except Exception as e:
        logger.error(f"Error looking up memory: {str(e)}")
        return f"Error looking up memory: {str(e)}"

In [None]:
@tool
def write_memory(api_name: str, summary: str) -> str:
    """
    Save a new API documentation summary into the memory scratchpad.
    This tool stores structured documentation so it can be reused later 
    without re-extracting and re-summarizing the same API source.
    """
    logger.info(f"Starting write_memory with api_name: {api_name}")
    logger.info(f"Summary length: {len(summary)}")
    
    try:
        logger.info("Loading existing memory...")
        memory = load_memory()
        logger.info(f"Current memory has {len(memory.get('documentations', {}))} entries")
        
        # Create a unique ID for this documentation
        doc_id = f"{api_name.lower().replace(' ', '_')}_{int(datetime.now().timestamp())}"
        logger.info(f"Generated doc_id: {doc_id}")
        
        # Try to parse endpoints count safely
        endpoints_count = 0
        try:
            if summary.startswith('{'):
                parsed_summary = json.loads(summary)
                endpoints_count = len(parsed_summary.get("endpoints", []))
                logger.info(f"Parsed JSON summary, found {endpoints_count} endpoints")
            else:
                logger.info("Summary is not JSON format, setting endpoints_count to 0")
        except json.JSONDecodeError as e:
            logger.warning(f"Could not parse summary as JSON: {e}")
            endpoints_count = 0
        
        # Create the documentation entry
        doc_entry = {
            "api_name": api_name,
            "source_url": "local_file",  # Default source for local files
            "summary": summary,
            "created_at": datetime.now().isoformat(),
            "endpoints_count": endpoints_count
        }
        
        logger.info(f"Created doc_entry: {doc_entry}")
        
        # Add to memory
        memory["documentations"][doc_id] = doc_entry
        logger.info(f"Added entry to memory. Total entries now: {len(memory['documentations'])}")
        
        # Save memory
        logger.info("Saving memory to file...")
        save_memory(memory)
        logger.info("Memory saved successfully")
        
        return f"Successfully saved documentation for **{api_name}** to memory scratchpad!\n\n" \
               f"**Details:**\n" \
               f"   API: {api_name}\n" \
               f"   Endpoints: {doc_entry['endpoints_count']}\n" \
               f"   Saved at: {doc_entry['created_at']}\n\n" \
               f"This documentation can now be reused for future API calls without re-extraction."
               
    except Exception as e:
        logger.error(f"Error writing to memory: {str(e)}")
        return f"Error writing to memory: {str(e)}"

---

### **Tools for Analysis Agent**

In [None]:
@tool
def get_memory_documentation() -> str:
    """
    Retrieve all stored API documentation summaries from the memory scratchpad.
    This tool returns all documentation entries, allowing reuse without re-extracting the source.
    """
    logger.info("Starting get_memory_documentation...")
    try:
        memory = load_memory()
        documentations = memory.get("documentations", {})
        
        logger.info(f"Found {len(documentations)} documentation entries")
        
        if not documentations:
            logger.info("No documentation found in memory")
            return "No documentation found in memory."
        
        # Return all documentation
        result = "**Available Documentation in Memory:**\n\n"
        for doc_id, doc_data in documentations.items():
            result += f"**{doc_data['api_name']}**\n"
            result += f"Source: {doc_data['source_url']}\n"
            result += f"Created: {doc_data['created_at']}\n"
            result += f"Endpoints: {doc_data.get('endpoints_count', 0)}\n\n"
            result += f"**Documentation Summary:**\n{doc_data['summary']}\n\n"
            result += "---\n\n"
        
        logger.info("Documentation retrieval completed successfully")
        return result
        
    except Exception as e:
        logger.error(f"Error retrieving documentation from memory: {str(e)}")
        return f"Error retrieving documentation from memory: {str(e)}"

In [None]:
@tool
def analyze_data_with_pandas(analysis_query: str, csv_names: str = None) -> str:
    """
    Analyze CSV datasets using a Pandas DataFrame agent.
    This tool loads CSV data from persistent storage and allows natural language queries 
    on the datasets by leveraging a Pandas agent that executes LLM-generated Python code.
    It can work with multiple DataFrames simultaneously for comparative analysis.
    
    Args:
        analysis_query: The analysis query in natural language
        csv_names: Comma-separated list of CSV names to analyze. If None, analyzes all available CSVs.
    """
    try:
        # Get list of available CSVs
        available_csvs = list_available_csvs()
        
        if "message" in available_csvs:
            return "No CSV datasets available. Please fetch some data from OpenF1 API first."
        
        # Get available CSV names
        available_names = list(available_csvs["available_datasets"].keys())
        
        # Determine which CSVs to analyze
        if csv_names:
            csv_list = [name.strip() for name in csv_names.split(',')]
            # Filter to only include available CSVs
            csv_list = [name for name in csv_list if name in available_names]
        else:
            csv_list = available_names
        
        if not csv_list:
            return "No valid CSV names provided or no CSVs available."
        
        # Load DataFrames directly from persistent storage
        dataframes_list = []
        dataframe_names = []
        
        for csv_name in csv_list:
            try:
                # Load DataFrame using helper function
                df = load_dataframe_from_csv(csv_name)
                
                # Create a clean name for the DataFrame
                clean_name = csv_name.replace('.csv', '').replace('openf1_', '')
                dataframes_list.append(df)
                dataframe_names.append(f"df_{clean_name}")
                
            except Exception as e:
                print(f"Warning: Could not load {csv_name}: {e}")
                continue
        
        if not dataframes_list:
            return "No DataFrames could be loaded successfully."
        
        # Create pandas agent with all dataframes
        agent = create_pandas_dataframe_agent(
            ChatOpenAI(temperature=0, model="gpt-4o-mini"),
            dataframes_list,
            verbose=True,
            agent_type=AgentType.OPENAI_FUNCTIONS,
            allow_dangerous_code=True
        )
        
        result = agent.invoke(analysis_query)
        return f"Analysis of {len(dataframes_list)} CSV datasets:\n\n{result}"
        
    except Exception as e:
        return f"Analysis error: {str(e)}"


---

### **Tools Bindings**

In [None]:
# Tools Bindings for Context Agent
tools_context_agent = [
    extract_documentation_from_website, 
    extract_documentation_from_local_file,
    create_documentation_summary,
    fetch_api_data,
    look_memory,
    write_memory,
    get_memory_documentation,
]

print("Available tools:")
for i, tool in enumerate(tools_context_agent, 1):
    print(f"   {i}. {tool.name}: {tool.description.split('.')[0] if tool.description else 'No description'}")

In [None]:
# Tools Bindings for Analysis Agent
tools_analysis_agent = [
    analyze_data_with_pandas,
    debug_csv_storage,
    list_available_data
]

print("Available tools:")
for i, tool in enumerate(tools_analysis_agent, 1):
    print(f"   {i}. {tool.name}: {tool.description.split('.')[0] if tool.description else 'No description'}")

---

### **Systems Prompts**

In [None]:
# System Prompt Context Agent
system_prompt_context_agent = """
You are a specialized API assistant with long-term memory capabilities that helps users learn and interact with APIs efficiently.

## Your Memory System:
You have access to a persistent memory scratchpad that stores API documentation summaries. This allows you to:
- Avoid re-extracting documentation you've already processed
- Provide faster responses by using cached knowledge
- Build up a knowledge base of API documentation over time

## Your Workflow:
1. **ALWAYS start by checking memory** using `look_memory` to see if you already have documentation for the requested API
2. **If documentation exists in memory**: Use `get_memory_documentation` to retrieve it and proceed directly to API calls
3. **If no documentation in memory**: 
   - **ALTERNATIVE**: Extract documentation using `extract_documentation_from_website`
   - **MANUAL**: Summarize it using `create_documentation_summary` and save it to memory using `write_memory`

## Available Tools and How to Use Them:

### 1. Memory Management Tools:
- `look_memory(website_url="https://example.com")` - Check if documentation exists for a specific URL
- `get_memory_documentation(api_name="example")` - Retrieve full documentation from memory
- `write_memory` - Save new documentation

### 2. Documentation Tools (PREFERRED ORDER):
- `load_api_endpoints_from_file(file_path="_api_endpoints.txt")` - Load OpenF1 API documentation from local file
- `extract_documentation_from_website(url="https://example.com")` - Extract raw documentation from a website
- `create_documentation_summary(content="...")` - Convert raw documentation into structured summary

### 3. API Data Fetching Tool:
- `fetch_api_data(endpoint="https://api.example.com/v1/data")` - Fetch data from any API endpoint
   parameters:
   - endpoint: The API endpoint to fetch data from
   - parameters: A dictionary of parameters to pass to the API endpoint
   IMPORTANT: Before fetching data, check if you have documentation for the API using `get_memory_documentation` and fetch the data using the documentation.
   Never fetch data without documentation. Do not make API calls without documentation.

**IMPORTANT**: For OpenF1 API, the system automatically adds `csv=true` parameter to get CSV format. You can still add other parameters:
- Example: `fetch_api_data(endpoint="https://api.openf1.org/v1/laps", parameters={"example_parameter_1": example_value_1, "example_parameter_2": example_value_2})`
- This will become: `https://api.openf1.org/v1/laps?example_parameter_1=example_value_1&example_parameter_2=example_value_2&csv=true`

## Key Behaviors:
- **Efficiency First**: Always check memory before extracting new documentation
- **Memory Building**: Save all new documentation summaries to build your knowledge base
- **Context Awareness**: Use the retrieved summary documentation to provide information on how to use the API

## CRITICAL: API Rate Limit Rules:
- **AVOID making multiple API calls** when one comprehensive call would work
- **ALWAYS fetch the largest dataset possible** in a single call
- **Only apply specific filters** when user explicitly requests them
- **STOP AFTER SUCCESSFUL FETCH**: Once data is successfully fetched and stored, provide the result and stop

## Data Fetching Strategy - CRITICAL RULES:

### Fetch only based on documentation. Do not make API calls without checking documentation first.

### Fetching Examples:
- **GOOD**: `fetch_api_data(endpoint="https://api.openf1.org/v1/laps")` - Gets ALL laps
- **GOOD**: `fetch_api_data(endpoint="https://api.openf1.org/v1/sessions")` - Gets ALL sessions
- **BAD**: Multiple calls like `fetch_api_data(..., parameters={"driver_number": 1})` then `fetch_api_data(..., parameters={"driver_number": 2})`
- **BAD**: Fetching small filtered datasets when user asks for "performance analysis"

### Smart Fetching Strategy:
- **For general analysis requests**: Fetch complete datasets without filters
- **For specific requests**: Only then apply the specific filter if it doesn't limit the analysis capabilities
- **STOP AFTER SUCCESS**: Once data is fetched and stored, provide confirmation and stop

## CRITICAL: STOP CONDITIONS:
- **After successful data fetch**: Once you get a successful response and data is stored, STOP
- **After providing documentation**: Once you provide API documentation information, STOP
- **After answering user question**: Once you answer the user's question completely, STOP
- **Do NOT make additional calls** unless the user specifically asks for more data or different parameters

## WORKFLOW COMPLETION:
When you complete your workflow, provide a clear summary of what you accomplished:
- What documentation you found/summarized and saved to memory
- What data you fetched and stored
- What the user can do next

Be helpful, efficient, and always leverage your memory system to provide the best experience. Remember: STOP after successful completion of each task and provide a clear summary.
"""

In [None]:
# System Prompt Analysis Agent
system_prompt_analysis_agent = """
You are a specialized data analysis agent that can analyze CSV datasets using pandas and generate visualizations.

## Your Context:
You can analyze CSV data that was fetched by the Context Agent from APIs (like OpenF1). This data is stored persistently and can be accessed directly when needed. The system supports multiple DataFrames simultaneously, allowing for comparative analysis across different CSV files.

## Available Tools and How to Use Them:

### 1. Data Discovery Tools:
- `list_available_data()` - Get a comprehensive view of all available data sources
- `debug_csv_storage()` - Check what CSV data is available in persistent storage (more detailed)

### 2. Analysis Tool:
- `analyze_data_with_pandas(analysis_query="your question here")` - Analyze all available CSV datasets
- `analyze_data_with_pandas(analysis_query="your question here", csv_names="dataset1,dataset2")` - Analyze specific CSV datasets

**IMPORTANT**: The analysis tool can:
- Work with multiple DataFrames simultaneously
- Perform joins and merges between DataFrames
- Generate visualizations (graphs, charts, plots)
- Execute complex pandas operations
- Answer natural language questions about the data

## Your Workflow - MANDATORY STEPS:
1. **ALWAYS start by checking what data is available** using `list_available_data()` or `debug_csv_storage()`
2. **If no data is available**: Ask the user to fetch data using the Context Agent first
3. **BEFORE EVERY ANALYSIS**: Always call `list_available_data()` to get the most current list of datasets
4. **Once data is available**: Use `analyze_data_with_pandas()` to perform analysis

## CRITICAL: Always Check for New Data:
- **NEVER assume** you know what datasets are available
- **ALWAYS call** `list_available_data()` before any analysis
- **New datasets** may have been added by the Context Agent since your last check
- **This prevents** analyzing outdated or incomplete data

## Analysis Capabilities:
- **Multi-Dataset Analysis**: Compare data across different CSV files, perform joins, find relationships
- **Visualization**: Generate graphs, charts, plots to help users understand the data
- **Complex Queries**: Answer natural language questions about the data

## Key Behaviors:
- **Always check data availability first** before attempting analysis
- **MANDATORY: Call `list_available_data()` before every analysis** to get current datasets
- **Use natural language** for your analysis queries - the tool understands complex questions
- **Generate visualizations** when they help explain the data
- **Be specific** about which datasets to analyze when needed
- **You can call tools multiple times** to get the information you need

Be helpful, efficient, and always provide clear insights with visualizations when appropriate.
"""

---

### **ReAct Agents**

In [None]:
# Create Context Agent
context_agent = create_react_agent(
    model=llm_worker_agent, 
    tools=tools_context_agent, 
    prompt=system_prompt_context_agent,
    checkpointer=InMemorySaver(),
    name="context_agent"
)

In [None]:
# Create Analysis Agent
analysis_agent = create_react_agent(
    model=llm_worker_agent, 
    tools=tools_analysis_agent, 
    prompt=system_prompt_analysis_agent,
    checkpointer=InMemorySaver(),
    name="analysis_agent"
)

---

### **Testing Agents**

In [None]:
config = {"configurable": {"thread_id": "session_separeted_by_agent"}}

In [None]:
response = context_agent.invoke(
    {"messages": [{"role": "user", "content": "What documentation I have in memory?"}]},
    config
)
print(response["messages"][-1].content)

In [None]:
response = analysis_agent.invoke(
    {"messages": [{"role": "user", "content": "Which datasets I have in memory?"}]},
    config
)
print(response["messages"][-1].content)

In [None]:
response = analysis_agent.invoke(
    {"messages": [{"role": "user", "content": "Which datasets I have in memory?"}]},
    config
)
print(response["messages"][-1].content)

In [None]:
response = analysis_agent.invoke(
    {"messages": [{"role": "user", "content": "Debug the data in memory"}]},
    config
)
print(response["messages"][-1].content)

---

### **Supervisor Agent**


In [None]:
# from langgraph_supervisor import create_supervisor
# from langchain.chat_models import init_chat_model

# # Create supervisor agent using langgraph-supervisor
# pre_built_supervisor = create_supervisor(
#     model=init_chat_model("openai:gpt-4o"),
#     agents=[context_agent, analysis_agent],
#     prompt=(
#         "You are a supervisor managing two specialized agents:\n"
#         "- a context agent: Handles API documentation extraction, memory management, and data fetching from APIs like OpenF1\n"
#         "- an analysis agent: Performs data analysis on CSV datasets using pandas and generates visualizations\n"
#         "\n"
#         "CRITICAL RULES:\n"
#         "1. Assign work to one agent at a time, do not call agents in parallel\n"
#         "2. Do not do any work yourself - delegate all tasks to the appropriate agent\n"
#         "3. Let each agent complete their FULL workflow before considering the task done\n"
#         "4. For data-related workflows: first use the context agent to fetch data, then use the analysis agent to analyze it\n"
#         "5. WAIT for the agent to complete their entire internal workflow before making any decisions\n"
#         "6. Only delegate to another agent if the current agent explicitly indicates they cannot complete the task\n"
#         "7. Do not interrupt agents mid-workflow - let them finish their internal reasoning and tool calls\n"
#         "\n"
#         "The context agent has a specific workflow: check memory -> extract docs if needed -> fetch data -> store results\n"
#         "The analysis agent has a specific workflow: check available data -> perform analysis -> generate insights\n"
#         "Let each agent complete their full workflow before considering the task complete."
#     ),
#     add_handoff_back_messages=True,
#     output_mode="full_history",
# ).compile()

---

### **Testing Pre-Built Supervisor Agent**


In [None]:
# config = {"configurable": {"thread_id": "pre_built_supervisor_session"}}

In [None]:
# # Test the pre_built_supervisor with a multi-step task

# response = pre_built_supervisor.invoke(
#     {"messages": [{"role": "user", "content": "What documentation do I have in memory?"}]},
#     config
# )
# print(response["messages"][-1].content)

In [None]:
# # Test the pre_built_supervisor with a multi-step task

# response = pre_built_supervisor.invoke(
#     {"messages": [{"role": "user", "content": "Get the documentation from OpenF1 API and save it to memory?"}]},
#     config
# )
# print(response["messages"][-1].content)

In [None]:
# # Test the pre_built_supervisor with a multi-step task

# response = pre_built_supervisor.invoke(
#     {"messages": [{"role": "user", "content": "Fetch and save to memory the data necessary to show me the most winners racers from Ferrari constructors"}]},
#     config
# )
# print(response["messages"][-1].content)

---

### **Testing Custom Supervisor Agent**


In [None]:
from langgraph.graph import END
from typing import Annotated
from langchain_core.tools import tool, InjectedToolCallId
from langgraph.prebuilt import InjectedState
from langgraph.graph import StateGraph, START, MessagesState
from langgraph.types import Command

# Create handoff tool
def create_handoff_tool(*, agent_name: str, description: str | None = None):
    name = f"transfer_to_{agent_name}"
    description = description or f"Ask {agent_name} for help."

    @tool(name, description=description)
    def handoff_tool(
        state: Annotated[MessagesState, InjectedState],
        tool_call_id: Annotated[str, InjectedToolCallId],
    ) -> Command:
        tool_message = {
            "role": "tool",
            "content": f"Successfully transferred to {agent_name}",
            "name": name,
            "tool_call_id": tool_call_id,
        }
        return Command(
            goto=agent_name,  
            update={**state, "messages": state["messages"] + [tool_message]},  
            graph=Command.PARENT,  
        )
    return handoff_tool


# Handoffs
assign_to_context_agent = create_handoff_tool(
    agent_name="context_agent",
    description="Assign task to a context agent.",
)

assign_to_analysis_agent = create_handoff_tool(
    agent_name="analysis_agent",
    description="Assign task to a analysis agent.",
)


# Create supervisor agent with handoff tools
supervisor_agent = create_react_agent(
    model=ChatOpenAI(api_key=os.environ["OPENAI_API_KEY"], model="gpt-4.1"),
    tools=[assign_to_context_agent, assign_to_analysis_agent],
    prompt=(
        "You are a supervisor managing two specialized agents:\n"
        "- context_agent: Handles API documentation extraction, memory management, and data fetching\n"
        "- analysis_agent: Performs data analysis on CSV datasets using pandas\n"
        "\n"
        "Your job is to delegate tasks to the appropriate agent and let them complete their full workflow.\n"
        "Do not do any work yourself - delegate all tasks to the appropriate agent.\n"
        "Do not call agents in parallel - let them complete their full workflow.\n"
        "Do not interrupt them - let them finish their internal reasoning and tool calls.\n"
        "Only delegate to another agent if the current agent explicitly indicates they cannot complete the task.\n"
        "\n"
        "For data-related workflows: first use context_agent to fetch data, then use analysis_agent to analyze it."
    ),
    name="supervisor"
)

# Create custom supervisor graph
custom_supervisor = (
    StateGraph(MessagesState)
    .add_node(supervisor_agent, destinations={"context_agent", "analysis_agent", END})
    .add_node("context_agent", context_agent)
    .add_node("analysis_agent", analysis_agent)
    .add_edge(START, "supervisor")
    .add_edge("context_agent", "supervisor")
    .add_edge("analysis_agent", "supervisor")
    .compile()
)

In [None]:
config = {"configurable": {"thread_id": "custom_supervisor_session"}}

In [None]:
# Notes:
# Cannot get laps without filtering because OpenF1 exceeds the rate limit if do not filter.
# Better models for supervisor and summarizer it influences how well the scratchpad is written.