In [4]:
# CHUNK 1: Install Dependencies
!pip install -q \
  langchain-groq \
  sentence-transformers \
  faiss-cpu \
  tiktoken \
  PyPDF2 \
  python-docx \
  gradio \
  datasets \
  pandas \
  numpy \
  requests

print("Dependencies installed")

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[0m [31m47.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m232.6/232.6 kB[0m [31m17.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m253.0/253.0 kB[0m [31m19.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m131.4/131.4 kB[0m [31m9.1 MB/s[0m eta [36m0:00:00[0m
[?25hDependencies installed


In [None]:
%%writefile .env
GROQ_API_KEY=""
HF_TOKEN=  ""
web_search=""


Writing .env


In [6]:
#  CHUNK 2: Configuration & API Keys
import os
import re
import json
import time
import math
import numpy as np
import pandas as pd
from typing import List, Dict, Any, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("AI-Agent")

CONFIG = {
    "use_memory": True,
    "use_planning": True,
    "model": "llama3-8b-8192",
    "temperature": 0.1,
    "max_doc_chunks": 10,
    "eval_n": 50,
    "rag_k": 3,
    "max_retries": 2,
    "verification_mode": True,
    "enhanced_prompts": True,
    "search_results": 5,
    "chunk_size": 256,
    "enable_streaming": True,
    "max_plan_steps": 8,
}

class PerformanceMonitor:
    """Real-time performance tracking and optimization suggestions."""

    def __init__(self):
        self.stats = {
            "total_queries": 0,
            "tool_usage": {"math_solver": 0, "calculator": 0, "web_search": 0, "doc_qa": 0, "direct_answer": 0},
            "accuracy_tracker": [],
            "response_times": [],
            "error_count": 0
        }

    def log_query(self, tool_used: str, response_time: float, success: bool):
        """Log query performance metrics."""
        self.stats["total_queries"] += 1
        self.stats["tool_usage"][tool_used] = self.stats["tool_usage"].get(tool_used, 0) + 1
        self.stats["response_times"].append(response_time)
        if not success:
            self.stats["error_count"] += 1

    def log_accuracy(self, predicted: str, actual: str, correct: bool):
        """Track accuracy for continuous improvement."""
        self.stats["accuracy_tracker"].append({
            "predicted": predicted,
            "actual": actual,
            "correct": correct,
            "timestamp": time.time()
        })

    def get_insights(self) -> str:
        """Generate performance insights and optimization suggestions."""
        if self.stats["total_queries"] == 0:
            return "No queries processed yet."

        avg_response_time = np.mean(self.stats["response_times"]) if self.stats["response_times"] else 0
        error_rate = (self.stats["error_count"] / self.stats["total_queries"]) * 100

        # Recent accuracy (last 10 predictions)
        recent_accuracy = 0
        if len(self.stats["accuracy_tracker"]) > 0:
            recent_correct = sum(1 for x in self.stats["accuracy_tracker"][-10:] if x["correct"])
            recent_accuracy = (recent_correct / min(10, len(self.stats["accuracy_tracker"]))) * 100

        insights = f"""PERFORMANCE INSIGHTS:

Accuracy: {recent_accuracy:.1f}% (last 10 predictions)
Avg Response Time: {avg_response_time:.2f}s
Error Rate: {error_rate:.1f}%
Total Queries: {self.stats["total_queries"]}

Tool Usage Distribution:
"""

        for tool, count in self.stats["tool_usage"].items():
            percentage = (count / self.stats["total_queries"]) * 100 if self.stats["total_queries"] > 0 else 0
            insights += f"  {tool}: {count} ({percentage:.1f}%)\n"

        # Optimization suggestions
        insights += "\nOPTIMIZATION SUGGESTIONS:\n"

        if recent_accuracy < 80:
            insights += "  • Consider refining math_solver prompts for higher accuracy\n"
        if avg_response_time > 3:
            insights += "  • Response time high - consider prompt optimization\n"
        if error_rate > 10:
            insights += "  • High error rate detected - review error handling\n"
        if self.stats["tool_usage"].get("math_solver", 0) > self.stats["total_queries"] * 0.7:
            insights += "  • High math problem volume - consider specialized fine-tuning\n"

        return insights

    def suggest_controller_improvements(self) -> List[str]:
        """Analyze routing patterns and suggest controller improvements."""
        suggestions = []

        # Analyze misclassifications from accuracy tracker
        math_errors = [x for x in self.stats["accuracy_tracker"] if not x["correct"]]

        if len(math_errors) > 3:
            suggestions.append("Consider adding more math keyword patterns to controller")

        if self.stats["tool_usage"].get("direct_answer", 0) > self.stats["total_queries"] * 0.3:
            suggestions.append("High direct_answer usage suggests routing could be more specific")

        return suggestions

# Initialize performance monitor
perf_monitor = PerformanceMonitor()

def enhanced_execute_step(step: Dict, memory_ctx: str = "") -> str:
    """Enhanced step execution with performance monitoring."""
    start_time = time.time()
    tool = step.get("tool", "").lower()

    try:
        result = execute_step(step, memory_ctx)
        success = not result.startswith(("Error", "Failed", "LLM-ERROR"))
        response_time = time.time() - start_time

        perf_monitor.log_query(tool, response_time, success)
        return result

    except Exception as e:
        response_time = time.time() - start_time
        perf_monitor.log_query(tool, response_time, False)
        return f"Enhanced execution error: {e}"

# Replace execute_step calls with enhanced_execute_step in your execution pipeline

from dotenv import load_dotenv
load_dotenv()

GROQ_API_KEY = os.getenv("GROQ_API_KEY")
SERPER_API_KEY = os.getenv("web_search")
HF_TOKEN = os.getenv("HF_TOKEN")

print("GROQ_API_KEY:", bool(GROQ_API_KEY))
print("SERPER_API_KEY:", bool(SERPER_API_KEY))
print("HF_TOKEN:", bool(HF_TOKEN))


GROQ_API_KEY: True
SERPER_API_KEY: True
HF_TOKEN: True


In [5]:
#  CHUNK 3: LLM & Tools
from langchain_groq import ChatGroq

llm = None
if GROQ_API_KEY:
    llm = ChatGroq(
        model=CONFIG["model"],
        temperature=CONFIG["temperature"],
        groq_api_key=GROQ_API_KEY
    )
    print("LLM client ready")
else:
    print("GROQ_API_KEY missing")

def call_llm(messages: List[Dict]) -> str:
    if llm is None:
        return "LLM-UNAVAILABLE"
    try:
        out = llm.invoke(messages)
        return getattr(out, "content", str(out))
    except Exception as e:
        return f"LLM-ERROR: {e}"






LLM client ready


In [6]:
def calculator_tool(expr: str) -> str:
    try:
        # Clean and normalize the expression
        expr = expr.replace(",", "").strip()

        # Handle percentage expressions
        if "%" in expr:
            expr = expr.replace("%", "/100")
            # Handle cases like "15% * 38140000" -> "0.15 * 38140000"
            expr = re.sub(r'(\d+(?:\.\d+)?)%', r'(\1/100)', expr)

        # Validate expression - only allow safe math operations
        if re.search(r"[^0-9\.\+\-\*\/\(\)\s\.\,eE]", expr):
            return f"CalculatorError: unsupported characters in '{expr}'"

        # Use a safer evaluation method
        allowed_vars = {
            'math': math,
            'sqrt': math.sqrt,
            'pow': math.pow,
            'sin': math.sin,
            'cos': math.cos,
            'tan': math.tan,
            'pi': math.pi,
            'e': math.e
        }

        result = eval(expr, {"__builtins__": None}, allowed_vars)

        # Format the result nicely
        if isinstance(result, float) and result.is_integer():
            return str(int(result))
        elif isinstance(result, float):
            return f"{result:.2f}"
        else:
            return str(result)
    except ZeroDivisionError:
        return "CalculatorError: Division by zero"
    except Exception as e:
        return f"CalculatorError: {e}"

In [7]:
def web_search_tool(query: str, max_results: int = 5) -> str:
    if not SERPER_API_KEY:
        return "SearchError: SERPER_API_KEY not set"
    url = "https://google.serper.dev/search"
    payload = json.dumps({"q": query, "num": max_results})
    headers = {
        'X-API-KEY': SERPER_API_KEY,
        'Content-Type': 'application/json'
    }
    try:
        import requests
        response = requests.post(url, headers=headers, data=payload, timeout=10)
        if response.status_code != 200:
            return f"SearchError: {response.status_code}"
        results = response.json().get("organic", [])[:max_results]
        return json.dumps([{"title": r.get("title"), "link": r.get("link"), "snippet": r.get("snippet")} for r in results], indent=2)
    except Exception as e:
        return f"SearchError: {e}"

In [8]:
#CHUNK 4: Document Loader, Token-Aware Chunking & RAG
import os
import re
import json
import numpy as np
from typing import List

# --- Import FAISS ---
try:
    import faiss
    FAISS_AVAILABLE = True
    print("FAISS imported successfully")
except ImportError:
    FAISS_AVAILABLE = False
    print("faiss not available — install faiss-cpu")

# --- Load Embedding Model ---
try:
    from sentence_transformers import SentenceTransformer
    EMB_MODEL = SentenceTransformer("all-MiniLM-L6-v2")
    print("Embedding model loaded")
except Exception as e:
    EMB_MODEL = None
    FAISS_AVAILABLE = False
    print("Embedding model not available:", e)

# --- Load Llama-3 Tokenizer for Accurate Chunking ---
try:
    from transformers import AutoTokenizer
    tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.2-1B-Instruct")
    print("Llama-3 tokenizer loaded for accurate chunking")
except Exception as e:
    tokenizer = None
    print("Tokenizer not available for chunking:", e)





FAISS imported successfully


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Embedding model loaded


tokenizer_config.json:   0%|          | 0.00/54.5k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/9.09M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/296 [00:00<?, ?B/s]

Llama-3 tokenizer loaded for accurate chunking


In [9]:
from google.colab import files
import os

# Create docs folder
os.makedirs("docs", exist_ok=True)

# Upload your file
print(" Upload a document (PDF, TXT, or DOCX):")
uploaded = files.upload()

# Save to docs/ folder
for filename in uploaded.keys():
    print(f" Saving {filename} to docs/ folder")
    with open(f"docs/{filename}", "wb") as f:
        f.write(uploaded[filename])

print(f" Successfully uploaded {len(uploaded)} file(s)!")

 Upload a document (PDF, TXT, or DOCX):


Saving Hussain.pdf to Hussain.pdf
 Saving Hussain.pdf to docs/ folder
 Successfully uploaded 1 file(s)!


In [10]:
#  Text Chunking Functin
def chunk_text(text: str, max_tokens: int = 256) -> List[str]:
    """Split text into chunks using Llama-3 tokenizer for accuracy."""
    if not text or not text.strip():
        return []

    if tokenizer is None:

        words = text.split()
        return [
            " ".join(words[i:i + max_tokens * 4])
            for i in range(0, len(words), max_tokens * 4)
        ]


    tokens = tokenizer.encode(text, add_special_tokens=True)
    chunks = []
    for i in range(0, len(tokens), max_tokens):
        chunk_tokens = tokens[i:i + max_tokens]
        chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
        if chunk_text.strip():
            chunks.append(chunk_text.strip())
    return chunks


def load_docs_from_files(folders: List[str] = ["docs/"]) -> List[str]:
    """Load and chunk text from .txt, .pdf, and .docx files."""
    docs = []
    os.makedirs("docs", exist_ok=True)

    for folder in folders:
        if not os.path.exists(folder):
            continue
        for fname in os.listdir(folder):
            path = os.path.join(folder, fname)
            try:
                text = ""

                if fname.endswith(".txt"):
                    with open(path, 'r', encoding='utf-8') as f:
                        text = f.read()

                elif fname.endswith(".pdf"):
                    from PyPDF2 import PdfReader
                    reader = PdfReader(path)
                    text = " ".join([
                        page.extract_text()
                        for page in reader.pages
                        if page.extract_text()
                    ])

                elif fname.endswith(".docx"):
                    from docx import Document
                    doc = Document(path)
                    text = "\n".join([para.text for para in doc.paragraphs])


                chunks = chunk_text(text, max_tokens=CONFIG["chunk_size"])
                docs.extend(chunks[:CONFIG["max_doc_chunks"]])
            except Exception as e:
                print(f" Failed to read {fname}: {e}")

    print(f" Loaded {len(docs)} document chunks")
    return docs


DOCS = load_docs_from_files(["docs/"])
if not DOCS:
    print(" No documents found. Using sample docs.")
    DOCS = [
        "RAG stands for Retrieval-Augmented Generation: retrieve relevant text and generate using the LLM.",
        "LangChain is a framework for building LLM apps with prompts, chains and agents.",
        "Groq provides ultra-fast inference for LLMs and supports tool use."
    ]


doc_embs = None
idx = None

if EMB_MODEL and DOCS:
    try:
        doc_embs = EMB_MODEL.encode(DOCS, normalize_embeddings=True).astype("float32")
        if FAISS_AVAILABLE:
            idx = faiss.IndexFlatIP(doc_embs.shape[1])
            idx.add(doc_embs)
            print(f" FAISS index built with {idx.ntotal} chunks")
        else:
            print(" FAISS not available — using simple retrieval")
    except Exception as e:
        print(" Failed to build FAISS index:", e)
        idx = None
else:
    print(" No documents or embedding model — RAG disabled")

# RAG: Retrieve & Answer
def rag_retrieve(query: str, k: int = 3) -> List[str]:
    """Retrieve top-k most relevant document chunks."""
    if not DOCS:
        return []
    if EMB_MODEL is None or idx is None:

        return DOCS[-k:]

    try:
        qv = EMB_MODEL.encode([query], normalize_embeddings=True).astype("float32").flatten()
        D, I = idx.search(qv.reshape(1, -1), k)
        return [DOCS[i] for i in I[0]]
    except Exception as e:
        print("⚠️ Retrieval failed:", e)
        return DOCS[:k]



def doc_qa_tool(query: str, memory_ctx: str = "") -> str:
    """Answer from documents with optional memory context."""
    context = "\n".join(rag_retrieve(query, k=CONFIG["rag_k"]))
    full_context = f"Document Context:\n{context}"
    if memory_ctx:
        full_context += f"\n\nMemory Context:\n{memory_ctx}"

    prompt = [
        {"role": "system", "content": "Answer based only on the provided context. Be concise and precise."},
        {"role": "user", "content": f"{full_context}\n\nQuestion: {query}"}
    ]
    return call_llm(prompt)

def doc_qa_tool(query: str, memory_ctx: str = "") -> str:
    """Answer from documents with optional memory context."""
    context = "\n".join(rag_retrieve(query, k=CONFIG["rag_k"]))
    full_context = f"Document Context:\n{context}"
    if memory_ctx:
        full_context += f"\n\nMemory Context:\n{memory_ctx}"

    prompt = [
        {"role": "system", "content": "Answer based only on the provided context. Be concise and precise."},
        {"role": "user", "content": f"{full_context}\n\nQuestion: {query}"}
    ]
    return call_llm(prompt)

 Loaded 3 document chunks
 FAISS index built with 3 chunks


In [12]:
# Replace with a real question from your file
query = "tell me about hussain"

print("🔍 RETRIEVED CONTEXT:")
for i, doc in enumerate(rag_retrieve(query, k=2)):
    print(f"  [{i+1}] {doc}")

# Get answer from documents
print("\n💡 AGENT ANSWER:")
answer = doc_qa_tool(query)
print(answer)

query = "project in the file"

print("🔍 RETRIEVED CONTEXT:")
for doc in rag_retrieve(query, k=2):
    print("  -", doc[:200], "...")

print("\n💡 ANSWER:")
print(doc_qa_tool(query))

🔍 RETRIEVED CONTEXT:
  [1] Muhammad 
Hussain Mobin
muhammadhussain112002@g
mail.com
03241536349
Lahore,Pakistan
PROFILE
Motivated and detail-oriented
Software Engineering graduate with
practical experience in web and
software development. Passionate
about designing scalable applications
and collaborating in dynamic, agile
teams. Seeking a full-time role in
software development, web
development, or mobile
development where I can apply my
skills and continue growing
professionally.
EDUCATION
Bachelors of Software Engineering
University of Central Punjab
10/2021 – 07/2025    
Lahore, Pakistan
Fsc Pre Engineering
Punjab Group of colleges
2018 – 2020 | Lahore, PakistanCERTIFICATES
The Complete Python developer
Machine Learning
introduction to AI and Machine Learning on Google 
Cloud
PROJECTS
Evolvium
•Developed an interactive learning platform connecting 
students and educators through personalized 
courses, progress tracking, and certification.
•Built using the MERN stack (MongoDB, Express

In [13]:
# 🧮 CHUNK 5: Improved Controller & Math Solver
import re
import json
from typing import Dict, Optional

# --- Extract Final Number ---
def extract_final_number(text: str) -> Optional[str]:
    """
    ULTRA-ENHANCED number extraction with multiple fallback strategies.
    """
    if not text or not isinstance(text, str):
        return None

    # Clean and normalize text
    text = re.sub(r'\s+', ' ', text.strip())

    # Priority-based extraction patterns
    extraction_patterns = [
        # Highest priority: Exact formats
        (r"FINAL_ANSWER\s*[:\-=]\s*([-+]?\d+(?:\.\d+)?)", 1.0),
        (r"####\s*([-+]?\d+(?:\.\d+)?)", 0.95),
        (r"The answer is\s*([-+]?\d+(?:\.\d+)?)", 0.9),
        (r"Final answer:\s*([-+]?\d+(?:\.\d+)?)", 0.9),

        # High priority: Common answer patterns
        (r"(?:answer|result|solution|total|equals?)\s*[:\-=]\s*([-+]?\d+(?:\.\d+)?)", 0.8),
        (r"=\s*([-+]?\d+(?:\.\d+)?)(?:\s|$)", 0.7),
        (r"is\s*([-+]?\d+(?:\.\d+)?)(?:\s|$)", 0.6),

        # Medium priority: Contextual patterns
        (r"Therefore,?\s*([-+]?\d+(?:\.\d+)?)", 0.5),
        (r"So,?\s*([-+]?\d+(?:\.\d+)?)", 0.4),
        (r"\(([-+]?\d+(?:\.\d+)?)\)\s*$", 0.3),

        # Lower priority: Last resort
        (r"([-+]?\d+(?:\.\d+)?)\s*$", 0.2)
    ]

    candidates = []

    # Extract all candidates with confidence scores
    for pattern, confidence in extraction_patterns:
        matches = re.findall(pattern, text, re.IGNORECASE)
        for match in matches:
            # Validate number
            try:
                num_val = float(match)
                # Reasonable bounds check (adjust as needed)
                if -1000000 < num_val < 1000000:
                    candidates.append((match, confidence, abs(len(text) - text.rfind(match))))
            except ValueError:
                continue

    if not candidates:
        # Final fallback: any number in text
        all_numbers = re.findall(r"[-+]?\d+(?:\.\d+)?", text)
        if all_numbers:
            return all_numbers[-1]
        return None

    # Sort by confidence (descending) then by position (later in text = higher priority)
    candidates.sort(key=lambda x: (x[1], -x[2]), reverse=True)

    # Return best candidate
    result = candidates[0][0]

    # Clean result
    result = re.sub(r'[^\d\.\-]', '', result)

    # Handle integer vs float formatting
    try:
        if '.' in result:
            float_val = float(result)
            if float_val.is_integer():
                return str(int(float_val))
        return result
    except:
        return result

# --- Improved Math Solver Tool ---
def math_solver_tool(question: str, memory_ctx: str = "") -> str:
    """
    ULTRA-ENHANCED math solver with multiple strategies for maximum accuracy.
    """
    # Multi-prompt ensemble approach for higher accuracy
    strategies = [
        {
            "name": "step_by_step",
            "system": """You are a world-class mathematician. Solve this step-by-step with extreme precision.

CRITICAL RULES:
1. Show ALL intermediate calculations clearly
2. Double-check each arithmetic step
3. End with: FINAL_ANSWER: <number>
4. Use NO commas in final number
5. If percentage, convert to decimal first then multiply

Example format:
Step 1: Identify what we need to find...
Step 2: Set up the calculation: 25 × 4 = ?
Step 3: Perform multiplication: 25 × 4 = 100
FINAL_ANSWER: 100"""
        },
        {
            "name": "verification",
            "system": """You are a math verification expert. Solve this problem using a different approach to verify the answer.

VERIFICATION PROCESS:
1. Identify the problem type
2. Use alternative calculation method
3. Cross-check with original approach
4. Provide confident final answer

End with: FINAL_ANSWER: <number>"""
        }
    ]

    best_answer = None
    confidence_scores = []

    for strategy in strategies:
        system_prompt = strategy["system"]
        if memory_ctx:
            system_prompt += f"\nContext from memory: {memory_ctx}"

        prompt = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": f"Problem: {question}"}
        ]

        # Set ultra-low temperature for consistency
        original_temp = None
        if hasattr(llm, 'temperature'):
            original_temp = llm.temperature
            llm.temperature = 0.01  # Maximum consistency

        try:
            response = call_llm(prompt)

            # Extract answer and calculate confidence
            extracted = extract_final_number(response)
            if extracted and extracted != "ERROR":
                # Simple confidence based on response quality
                confidence = len(re.findall(r'\d+', response)) / 10.0  # More numbers = more work shown
                confidence += 0.3 if "step" in response.lower() else 0
                confidence += 0.2 if "=" in response else 0
                confidence = min(confidence, 1.0)

                confidence_scores.append((extracted, confidence, response))

        except Exception as e:
            print(f"Strategy {strategy['name']} failed: {e}")
        finally:
            # Restore temperature
            if original_temp is not None and hasattr(llm, 'temperature'):
                llm.temperature = original_temp

    # Select best answer based on confidence
    if confidence_scores:
        # Sort by confidence and take the best
        confidence_scores.sort(key=lambda x: x[1], reverse=True)
        best_answer = confidence_scores[0][2]  # Full response

    # Fallback to single enhanced prompt if ensemble fails
    if not best_answer:
        enhanced_system = """You are an expert mathematician with perfect accuracy.

ULTRA-PRECISE METHODOLOGY:
1. READ the problem 3 times to ensure understanding
2. IDENTIFY exactly what needs to be calculated
3. BREAK DOWN into smallest possible steps
4. PERFORM each calculation with extreme care
5. VERIFY your arithmetic at each step
6. STATE your final answer clearly

MANDATORY FORMAT:
- Show step-by-step work
- End with exactly: FINAL_ANSWER: <number>
- NO commas, NO extra text after final answer

Remember: One small error ruins everything. Be absolutely certain."""

        if memory_ctx:
            enhanced_system += f"\nMemory context: {memory_ctx}"

        prompt = [
            {"role": "system", "content": enhanced_system},
            {"role": "user", "content": f"Solve this math problem with perfect accuracy: {question}"}
        ]

        best_answer = call_llm(prompt)

    return best_answer or "MathSolverError: All strategies failed"

# --- Improved Controller: Heuristic Routing ---
def controller_heuristic(q: str) -> Dict[str, Any]:
    """
    Rule-based tool routing with better pattern matching.
    """
    ql = q.lower().strip()

    # Pure arithmetic (numbers and operators only)
    if re.fullmatch(r"^[\d\s\+\-\*\/\(\)\.\,]+$", ql.replace(" ", "")):
        return {
            "tool": "calculator",
            "args": {"expression": ql},
            "rationale": "pure arithmetic expression"
        }

    # Math word problems - expanded keyword list
    math_keywords = [
        "how many", "total", "each", "left", "per", "cost", "area",
        "volume", "percentage", "percent", "times", "twice", "half",
        "third", "quarter", "ratio", "proportion", "average", "sum",
        "difference", "product", "quotient", "multiple", "factor",
        "solve for", "calculate", "compute", "determine", "find"
    ]

    if any(k in ql for k in math_keywords):
        # Check if it's simple enough for calculator
        simple_math_patterns = [
            r"(\d+)\s*[\+\-\*\/]\s*(\d+)",
            r"(\d+)\s*percent\s*of\s*(\d+)",
            r"(\d+)\s*%\s*of\s*(\d+)",
            r"(\d+)\s*times\s*(\d+)",
            r"(\d+)\s*plus\s*(\d+)",
            r"(\d+)\s*minus\s*(\d+)"
        ]

        for pattern in simple_math_patterns:
            if re.search(pattern, ql):
                return {
                    "tool": "calculator",
                    "args": {"expression": ql},
                    "rationale": "simple math expression in text"
                }

        return {
            "tool": "math_solver",
            "args": {"question": ql},
            "rationale": "math word problem requiring reasoning"
        }

    # Factual queries
    factual_keywords = [
        "who is", "what is", "when was", "where is", "why does",
        "latest", "news", "current", "today", "now", "population",
        "price", "rate", "weather", "temperature", "capital of",
        "president of", "ceo of", "founder of"
    ]

    if any(k in ql for k in factual_keywords):
        return {
            "tool": "web_search",
            "args": {"query": ql},
            "rationale": "fact-finding query"
        }

    # Document queries
    doc_keywords = [
        "document", "notes", "pdf", "according to", "in our docs",
        "based on", "file says", "text mentions", "report shows"
    ]

    if any(k in ql for k in doc_keywords):
        return {
            "tool": "doc_qa",
            "args": {"query": ql},
            "rationale": "document query"
        }

    # Fallback
    return {
        "tool": "direct_answer",
        "args": {"query": ql},
        "rationale": "general question fallback"
    }

# --- Improved Safe JSON Parser ---
def safe_json_parse(text: str) -> Optional[Dict]:
    """
    Safely extract and parse JSON from LLM output with better error handling.
    """
    if not text or not isinstance(text, str):
        return None

    # Try multiple patterns to extract JSON
    patterns = [
        r'\[\s*\{.*\}\s*\]',  # JSON array
        r'\{\s*".*"\s*\}',    # JSON object
        r'\{.*\}',            # Loose JSON object
        r'\[.*\]'             # Loose JSON array
    ]

    for pattern in patterns:
        match = re.search(pattern, text, re.DOTALL)
        if match:
            try:
                return json.loads(match.group(0))
            except json.JSONDecodeError:
                # Try to fix common JSON issues
                try:
                    # Fix trailing commas
                    fixed_json = re.sub(r',\s*\}', '}', match.group(0))
                    fixed_json = re.sub(r',\s*\]', ']', fixed_json)
                    return json.loads(fixed_json)
                except:
                    continue

    return None

# --- Improved LLM-Based Controller ---
def controller_llm_route(q: str, memory_ctx: str = "") -> Dict[str, Any]:
    """
    Use LLM to decide tool routing with better prompting.
    """
    if llm is None:
        return {
            "tool": "direct_answer",
            "args": {"query": q},
            "rationale": "LLM unavailable"
        }

    prompt = [
        {
            "role": "system",
            "content": """You are a smart AI agent controller. Decide which tool to use.
Return ONLY a valid JSON object in this exact format:
{
  "tool": "calculator" or "web_search" or "math_solver" or "doc_qa" or "direct_answer",
  "args": {"expression": "..."} or {"query": "..."} or {"question": "..."},
  "rationale": "brief reason for tool choice"
}

RULES:
- Use calculator for: arithmetic expressions, percentages, simple math
- Use math_solver for: word problems, multi-step reasoning, complex calculations
- Use web_search for: current facts, real-time data, unknown information
- Use doc_qa for: questions about documents, files, or stored knowledge
- Use direct_answer for: general knowledge, definitions, explanations

IMPORTANT: Return ONLY JSON, no other text."""
        },
        {"role": "user", "content": f"Memory context: {memory_ctx}" if memory_ctx else "No memory context"},
        {"role": "user", "content": f"Question: {q}"}
    ]

    try:
        out = call_llm(prompt)
        parsed = safe_json_parse(out)

        if parsed and isinstance(parsed, dict):
            return parsed
        elif parsed and isinstance(parsed, list) and len(parsed) > 0:
            return parsed[0]
        else:
            # Fallback to heuristic if LLM routing fails
            return controller_heuristic(q)

    except Exception as e:
        return controller_heuristic(q)  # Fallback to heuristic

# --- Final Controller Route ---
def controller_route(q: str, memory_ctx: str = "") -> Dict[str, Any]:
    """
    SUPER-SMART controller with enhanced pattern recognition and LLM backup.
    """
    ql = q.lower().strip()

    # ENHANCED MATH DETECTION with more sophisticated patterns
    advanced_math_patterns = [
        # Word problems indicators
        (r"\b(?:how many|total|each|left|per|cost|area|volume|percentage|percent)\b", 0.9),
        (r"\b(?:times|twice|half|third|quarter|ratio|proportion|average)\b", 0.8),
        (r"\b(?:sum|difference|product|quotient|multiple|factor)\b", 0.8),
        (r"\b(?:solve for|calculate|compute|determine|find)\b", 0.7),
        (r"\b(?:more than|less than|increased by|decreased by|added|subtracted)\b", 0.7),
        (r"\b(?:profit|loss|discount|tax|interest|salary|budget)\b", 0.8),

        # Mathematical operations in words
        (r"\b(?:add|plus|sum of|combined|together)\b", 0.6),
        (r"\b(?:subtract|minus|take away|remove|difference)\b", 0.6),
        (r"\b(?:multiply|times|product of|each)\b", 0.7),
        (r"\b(?:divide|split|share|distribute|per)\b", 0.7),

        # Number + unit combinations (strong math indicators)
        (r"\d+\s*(?:dollars?|cents?|€|£|\$|%|percent)", 0.9),
        (r"\d+\s*(?:years?|days?|hours?|minutes?|seconds?)", 0.7),
        (r"\d+\s*(?:meters?|feet|inches?|miles?|km)", 0.8),
        (r"\d+\s*(?:pounds?|kg|tons?|grams?)", 0.8),
    ]

    # Calculate math confidence score
    math_confidence = 0.0
    for pattern, weight in advanced_math_patterns:
        if re.search(pattern, ql):
            math_confidence += weight

    # Normalize confidence
    math_confidence = min(math_confidence, 1.0)

    # DECISION LOGIC
    # High confidence math problems
    if math_confidence > 0.8:
        return {
            "tool": "math_solver",
            "args": {"question": q},
            "rationale": f"high-confidence math problem (score: {math_confidence:.2f})"
        }

    # Pure arithmetic expressions
    arithmetic_pattern = r"^[\d\s\+\-\*\/\(\)\.\,\%]+$"
    if re.match(arithmetic_pattern, ql.replace(" ", "")):
        return {
            "tool": "calculator",
            "args": {"expression": ql},
            "rationale": "pure arithmetic expression"
        }

    # Simple math expressions in text
    simple_math_patterns = [
        r"(\d+)\s*[\+\-\*\/]\s*(\d+)",
        r"(\d+)\s*percent\s*of\s*(\d+)",
        r"(\d+)\s*%\s*of\s*(\d+)",
    ]

    for pattern in simple_math_patterns:
        if re.search(pattern, ql):
            return {
                "tool": "calculator",
                "args": {"expression": ql},
                "rationale": "simple math expression detected"
            }

    # Medium confidence math - still use math_solver
    if math_confidence > 0.4:
        return {
            "tool": "math_solver",
            "args": {"question": q},
            "rationale": f"medium-confidence math problem (score: {math_confidence:.2f})"
        }

    # ENHANCED WEB SEARCH DETECTION
    web_search_patterns = [
        (r"\b(?:who is|what is|when was|where is|why does|how does)\b", 0.8),
        (r"\b(?:latest|news|current|today|now|recently|2024|2025)\b", 0.9),
        (r"\b(?:population|price|rate|weather|temperature|stock)\b", 0.8),
        (r"\b(?:capital of|president of|ceo of|founder of|born in)\b", 0.9),
        (r"\b(?:exchange rate|currency|bitcoin|inflation)\b", 0.8),
    ]

    web_confidence = 0.0
    for pattern, weight in web_search_patterns:
        if re.search(pattern, ql):
            web_confidence += weight

    web_confidence = min(web_confidence, 1.0)

    if web_confidence > 0.6:
        return {
            "tool": "web_search",
            "args": {"query": q},
            "rationale": f"factual query requiring real-time data (score: {web_confidence:.2f})"
        }

    # DOCUMENT QUERY DETECTION
    doc_patterns = [
        r"\b(?:document|notes|pdf|according to|in our docs)\b",
        r"\b(?:based on|file says|text mentions|report shows)\b",
        r"\b(?:in the document|from the file|as stated)\b"
    ]

    if any(re.search(pattern, ql) for pattern in doc_patterns):
        return {
            "tool": "doc_qa",
            "args": {"query": q},
            "rationale": "document-based query detected"
        }

    # LLM-ASSISTED ROUTING as intelligent fallback
    if llm is not None:
        try:
            routing_prompt = [
                {"role": "system", "content": """You are an expert task classifier. Analyze the query and return ONLY a JSON object:
{
  "tool": "math_solver" | "calculator" | "web_search" | "doc_qa" | "direct_answer",
  "confidence": 0.0-1.0,
  "rationale": "brief explanation"
}

RULES:
- math_solver: Word problems, multi-step reasoning, complex calculations
- calculator: Simple arithmetic, basic expressions
- web_search: Current facts, real-time data, recent information
- doc_qa: Questions about uploaded documents/files
- direct_answer: General knowledge, definitions, explanations

Respond with ONLY valid JSON."""},
                {"role": "user", "content": f"Classify: {q}"}
            ]

            llm_response = call_llm(routing_prompt)
            parsed_route = safe_json_parse(llm_response)

            if parsed_route and isinstance(parsed_route, dict) and "tool" in parsed_route:
                return {
                    "tool": parsed_route["tool"],
                    "args": {"query": q} if parsed_route["tool"] in ["web_search", "doc_qa", "direct_answer"]
                           else {"question": q} if parsed_route["tool"] == "math_solver"
                           else {"expression": q},
                    "rationale": f"LLM-routed: {parsed_route.get('rationale', 'intelligent classification')}"
                }
        except Exception as e:
            print(f"LLM routing failed: {e}")

    # FINAL FALLBACK
    return {
        "tool": "direct_answer",
        "args": {"query": q},
        "rationale": "fallback to direct LLM response"
    }


In [14]:
# 📄 CHUNK 6: IMPROVED Planner & Execution Engine
import json
import re
import requests
from typing import List, Dict, Optional

# --- 1. IMPROVED PLAN GENERATION ---
def plan_query(user_query: str, memory_ctx: str = "") -> List[Dict]:
    """
    SIMPLIFIED plan generation - uses direct routing instead of complex planning
    """
    # For GSM8K questions, use math_solver directly (no complex planning needed)
    math_keywords = ["how many", "total", "each", "left", "per", "cost", "area",
                    "percentage", "percent", "times", "twice", "half", "third",
                    "ratio", "proportion", "average", "sum", "difference"]

    if any(keyword in user_query.lower() for keyword in math_keywords):
        return [{
            "tool": "math_solver",
            "args": {"question": user_query},
            "rationale": "math word problem - direct to math solver"
        }]

    # For simple arithmetic, use calculator directly
    if re.search(r"(\d+)\s*[\+\-\*\/]\s*(\d+)", user_query.lower()):
        return [{
            "tool": "calculator",
            "args": {"expression": user_query},
            "rationale": "simple arithmetic - direct to calculator"
        }]

    # For everything else, use LLM planning with a better prompt
    if llm is None or not CONFIG["use_planning"]:
        route = controller_route(user_query, memory_ctx)
        return [route]

    prompt = [
        {"role": "system", "content": """You are a precision planner. Return ONLY a JSON array of steps.

RULES:
- Use calculator for arithmetic expressions
- Use math_solver for word problems that require reasoning
- Use web_search for current data
- Use doc_qa for document questions
- Use {result} to chain steps
- Return ONLY JSON. No explanations.

Example for math problems:
[{"tool": "math_solver", "args": {"question": "the full question"}}]

Example for calculations:
[{"tool": "calculator", "args": {"expression": "5 + 3"}}]"""},
        {"role": "user", "content": f"Create a plan for: {user_query}"}
    ]

    try:
        out = call_llm(prompt)
        # Extract pure JSON
        match = re.search(r"\[\s*\{.*\}\s*\]", out, re.DOTALL)
        if not match:
            # Fallback to direct routing
            route = controller_route(user_query, memory_ctx)
            return [route]

        plan = json.loads(match.group(0))
        # Validate and limit plan steps
        valid_plan = []
        for step in plan[:CONFIG["max_plan_steps"]]:
            if isinstance(step, dict) and "tool" in step and "args" in step:
                valid_plan.append(step)

        return valid_plan if valid_plan else [controller_route(user_query, memory_ctx)]

    except Exception as e:
        print(f"⌚ Plan generation failed: {e}")
        route = controller_route(user_query, memory_ctx)
        return [route]

# --- 2. IMPROVED SINGLE STEP EXECUTION ---
def execute_step(step: Dict, memory_ctx: str = "") -> str:
    """
    Executes one tool step safely with better error handling.
    """
    tool = step.get("tool", "").lower()
    args = step.get("args", {})

    try:
        if tool == "calculator":
            expression = args.get("expression", "")
            if not expression:
                return "CalculatorError: No expression provided"
            return calculator_tool(expression)

        elif tool == "web_search":
            query = args.get("query", "")
            if not query:
                return "SearchError: No query provided"
            return web_search_tool(query, max_results=CONFIG["search_results"])

        elif tool == "doc_qa":
            query = args.get("query", "")
            if not query:
                return "DocQAError: No query provided"
            return doc_qa_tool(query, memory_ctx)

        elif tool == "math_solver":
            question = args.get("question", "")
            if not question:
                return "MathSolverError: No question provided"
            return math_solver_tool(question, memory_ctx)

        elif tool == "direct_answer":
            query = args.get("query", "")
            if not query:
                return "DirectAnswerError: No query provided"
            return call_llm([{"role": "user", "content": query}])

        else:
            return f"UnknownToolError: {tool}"

    except Exception as e:
        return f"StepExecutionError: {e}"



In [15]:
# DRAMATICALLY IMPROVED SUPERVISOR SYSTEM
import json
import re
import math
from typing import List, Dict, Any, Optional, Tuple

class CriticalSupervisorAgent:
    """
    Supervisor that actually validates mathematical correctness and provides meaningful quality control
    """

    def __init__(self, llm_client=None, max_retries: int = 2):
        self.llm = llm_client
        self.max_retries = max_retries
        self.validation_stats = {
            "plans_reviewed": 0,
            "plans_approved": 0,
            "plans_rejected": 0,
            "results_reviewed": 0,
            "results_approved": 0,
            "results_rejected": 0,
            "math_errors_caught": 0,
            "retries_triggered": 0,
            "improvements_made": 0,
            "critical_errors_prevented": 0
        }

        # Track known correct answers for validation
        self.answer_cache = {}

    def validate_math_with_independent_solution(self, step: Dict, result: str, original_query: str) -> Dict[str, Any]:
        """
        Independent mathematical validation using multiple verification methods
        """
        extracted_answer = extract_final_number(result)

        if not extracted_answer or extracted_answer == "ERROR":
            return {
                "approved": False,
                "confidence": 0.0,
                "issues": ["No numerical answer extracted"],
                "reasoning": "Cannot extract numerical result"
            }

        # Method 1: Independent LLM verification
        independent_answer = self._get_independent_solution(original_query)

        # Method 2: Sanity checks
        sanity_issues = self._check_answer_sanity(original_query, extracted_answer)

        # Method 3: Arithmetic verification within the solution
        calculation_issues = self._verify_calculations_in_solution(result)

        # Combine all validation methods
        all_issues = sanity_issues + calculation_issues

        # Compare with independent solution
        if independent_answer and independent_answer != "ERROR":
            try:
                extracted_num = float(extracted_answer)
                independent_num = float(independent_answer)

                # Allow 1% tolerance for floating point differences
                if abs(extracted_num - independent_num) / max(abs(independent_num), 1) > 0.01:
                    all_issues.append(f"Independent verification disagrees: expected ~{independent_answer}, got {extracted_answer}")
                    self.validation_stats["math_errors_caught"] += 1

                    # This is a critical error - flag for major revision
                    if abs(extracted_num - independent_num) / max(abs(independent_num), 1) > 0.5:
                        self.validation_stats["critical_errors_prevented"] += 1

            except ValueError:
                all_issues.append("Cannot compare with independent solution")

        # Determine approval based on severity of issues
        critical_issues = [issue for issue in all_issues if any(word in issue.lower()
                          for word in ["disagrees", "unreasonable", "impossible", "error"])]

        if len(critical_issues) > 0:
            confidence = 0.0
            approved = False
        elif len(all_issues) > 2:
            confidence = 0.3
            approved = False
        elif len(all_issues) == 0:
            confidence = 0.9
            approved = True
        else:
            confidence = 0.6
            approved = len(all_issues) <= 1

        return {
            "approved": approved,
            "confidence": confidence,
            "issues": all_issues,
            "reasoning": f"Multi-method validation: {len(all_issues)} issues found",
            "independent_answer": independent_answer,
            "extracted_answer": extracted_answer
        }

    def _get_independent_solution(self, query: str) -> str:
        """
        Get independent solution using different approach/prompting
        """
        if not self.llm:
            return None

        # Use a completely different prompting approach
        independent_prompt = [
            {"role": "system", "content": """You are a mathematical verification expert. Solve this problem using a different method than typical step-by-step approaches.

CRITICAL REQUIREMENTS:
1. Double-check all arithmetic manually
2. Verify your answer makes logical sense
3. Consider if there are any tricks or edge cases
4. End with exactly: VERIFIED_ANSWER: <number>

Be extremely careful with calculations."""},
            {"role": "user", "content": f"Independently solve and verify: {query}"}
        ]

        try:
            response = call_llm(independent_prompt)
            independent_answer = self._extract_verified_answer(response)
            return independent_answer
        except Exception as e:
            print(f"Independent solution failed: {e}")
            return None

    def _extract_verified_answer(self, text: str) -> str:
        """Extract answer from verification response"""
        patterns = [
            r"VERIFIED_ANSWER[:\s]*([+-]?\d+(?:\.\d+)?)",
            r"ANSWER[:\s]*([+-]?\d+(?:\.\d+)?)",
            r"=\s*([+-]?\d+(?:\.\d+)?)(?:\s|$)"
        ]

        for pattern in patterns:
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                return match.group(1)

        # Fallback to standard extraction
        return extract_final_number(text)

    def _check_answer_sanity(self, query: str, answer: str) -> List[str]:
        """
        Comprehensive sanity checks for mathematical answers
        """
        issues = []

        try:
            num_answer = float(answer)
            query_lower = query.lower()

            # Check for impossible negative answers
            negative_context_words = ["total", "how many", "sum", "cost", "price", "area", "volume", "distance", "time"]
            if num_answer < 0 and any(word in query_lower for word in negative_context_words):
                issues.append(f"Negative answer ({num_answer}) inappropriate for context")

            # Check for unreasonably large answers
            if num_answer > 100000:
                # Extract numbers from question to see if large answer makes sense
                query_numbers = [float(n) for n in re.findall(r'\d+(?:\.\d+)?', query)]
                if query_numbers:
                    max_input = max(query_numbers)
                    if num_answer > max_input * 100:  # More than 100x largest input
                        issues.append(f"Answer ({num_answer}) seems unreasonably large relative to inputs")

            # Percentage problems should generally be reasonable
            if ("percent" in query_lower or "%" in query) and num_answer > 1000:
                base_numbers = [float(n) for n in re.findall(r'\d+', query)]
                if base_numbers and num_answer > max(base_numbers) * 5:
                    issues.append(f"Percentage result ({num_answer}) seems too large")

            # Money problems - check for reasonable amounts
            if ("$" in query or "dollar" in query_lower or "cost" in query_lower):
                if num_answer > 1000000:  # More than $1M seems suspicious for typical problems
                    issues.append(f"Monetary answer ({num_answer}) seems unreasonably large")
                elif 0 < num_answer < 0.01:  # Less than 1 cent
                    issues.append(f"Monetary answer ({num_answer}) seems unreasonably small")

            # Rate problems (speed, efficiency, etc.)
            if any(word in query_lower for word in ["speed", "rate", "per", "mph", "km/h"]):
                if num_answer > 1000:  # Very high speeds are suspicious
                    issues.append(f"Rate/speed answer ({num_answer}) seems unreasonably high")

            # Time problems
            if any(word in query_lower for word in ["hours", "minutes", "days", "time"]):
                if num_answer > 10000:  # More than 10k hours/minutes seems wrong for typical problems
                    issues.append(f"Time answer ({num_answer}) seems unreasonably large")

        except ValueError:
            issues.append(f"Answer '{answer}' is not a valid number")

        return issues

    def _verify_calculations_in_solution(self, solution: str) -> List[str]:
        """
        Verify arithmetic shown in the solution text
        """
        issues = []

        # Find arithmetic expressions and verify them
        patterns = [
            r'(\d+(?:\.\d+)?)\s*\+\s*(\d+(?:\.\d+)?)\s*=\s*(\d+(?:\.\d+)?)',
            r'(\d+(?:\.\d+)?)\s*-\s*(\d+(?:\.\d+)?)\s*=\s*(\d+(?:\.\d+)?)',
            r'(\d+(?:\.\d+)?)\s*\*\s*(\d+(?:\.\d+)?)\s*=\s*(\d+(?:\.\d+)?)',
            r'(\d+(?:\.\d+)?)\s*×\s*(\d+(?:\.\d+)?)\s*=\s*(\d+(?:\.\d+)?)',
            r'(\d+(?:\.\d+)?)\s*/\s*(\d+(?:\.\d+)?)\s*=\s*(\d+(?:\.\d+)?)',
        ]

        for pattern in patterns:
            matches = re.findall(pattern, solution)
            for match in matches:
                try:
                    a, b, claimed = float(match[0]), float(match[1]), float(match[2])

                    if '+' in pattern or 'plus' in solution.lower():
                        expected = a + b
                        operation = "addition"
                    elif '-' in pattern or 'minus' in solution.lower():
                        expected = a - b
                        operation = "subtraction"
                    elif '*' in pattern or '×' in pattern or 'times' in solution.lower():
                        expected = a * b
                        operation = "multiplication"
                    elif '/' in pattern or 'divided' in solution.lower():
                        expected = a / b if b != 0 else float('inf')
                        operation = "division"
                    else:
                        continue

                    # Allow small floating point tolerance
                    if abs(expected - claimed) > 0.001:
                        issues.append(f"Arithmetic error in {operation}: {a} op {b} = {claimed}, should be {expected}")

                except (ValueError, ZeroDivisionError):
                    continue

        return issues

    def validate_agent_result(self, step: Dict, result: str, original_query: str, context: str = "") -> Dict[str, Any]:
        """
        Main validation entry point with tool-specific logic
        """
        self.validation_stats["results_reviewed"] += 1
        tool_used = step.get("tool", "unknown")

        if tool_used == "math_solver":
            validation = self.validate_math_with_independent_solution(step, result, original_query)
        elif tool_used == "calculator":
            validation = self._validate_calculator_strict(step, result)
        elif tool_used == "web_search":
            validation = self._validate_search_comprehensive(step, result, original_query)
        else:
            validation = self._validate_general_strict(step, result, original_query)

        if validation["approved"]:
            self.validation_stats["results_approved"] += 1
        else:
            self.validation_stats["results_rejected"] += 1

        return validation

    def _validate_calculator_strict(self, step: Dict, result: str) -> Dict[str, Any]:
        """Re-compute and verify calculator results"""
        expression = step.get("args", {}).get("expression", "")

        if not expression or result.startswith("CalculatorError"):
            return {
                "approved": False,
                "confidence": 0.0,
                "issues": ["Calculator error or missing expression"],
                "reasoning": "Calculator tool failed"
            }

        try:
            expected = calculator_tool(expression)
            if expected.startswith("CalculatorError"):
                return {
                    "approved": False,
                    "confidence": 0.0,
                    "issues": ["Expression cannot be computed"],
                    "reasoning": "Invalid mathematical expression"
                }

            result_clean = re.sub(r'[^\d\.\-]', '', str(result))
            expected_clean = re.sub(r'[^\d\.\-]', '', str(expected))

            if abs(float(result_clean) - float(expected_clean)) < 0.001:
                return {
                    "approved": True,
                    "confidence": 1.0,
                    "issues": [],
                    "reasoning": "Calculator result verified by re-computation"
                }
            else:
                return {
                    "approved": False,
                    "confidence": 0.0,
                    "issues": [f"Calculation wrong: expected {expected}, got {result}"],
                    "reasoning": "Failed verification by re-computation"
                }

        except Exception as e:
            return {
                "approved": False,
                "confidence": 0.0,
                "issues": [f"Verification error: {e}"],
                "reasoning": "Could not verify calculation"
            }

    def _validate_search_comprehensive(self, step: Dict, result: str, query: str) -> Dict[str, Any]:
        """More comprehensive search validation"""
        if result.startswith("SearchError") or not result.strip():
            return {
                "approved": False,
                "confidence": 0.0,
                "issues": ["Search failed"],
                "reasoning": "Search tool error"
            }

        try:
            search_data = json.loads(result)
            if not isinstance(search_data, list) or len(search_data) == 0:
                return {
                    "approved": False,
                    "confidence": 0.0,
                    "issues": ["No search results"],
                    "reasoning": "Empty results"
                }

            # More sophisticated relevance checking
            query_terms = set(word.lower() for word in query.split() if len(word) > 2)
            relevant_count = 0

            for item in search_data:
                if isinstance(item, dict):
                    title = item.get("title", "").lower()
                    snippet = item.get("snippet", "").lower()
                    content_terms = set((title + " " + snippet).split())

                    # Calculate term overlap
                    overlap = len(query_terms.intersection(content_terms))
                    if overlap >= min(2, len(query_terms)):
                        relevant_count += 1

            relevance_rate = relevant_count / len(search_data)

            if relevance_rate < 0.6:  # Require 60% relevance
                return {
                    "approved": False,
                    "confidence": relevance_rate,
                    "issues": [f"Low relevance: {relevant_count}/{len(search_data)} results relevant"],
                    "reasoning": "Search results not sufficiently relevant"
                }

            return {
                "approved": True,
                "confidence": min(0.9, relevance_rate + 0.2),
                "issues": [],
                "reasoning": f"Search returned {len(search_data)} relevant results"
            }

        except json.JSONDecodeError:
            return {
                "approved": False,
                "confidence": 0.0,
                "issues": ["Invalid search result format"],
                "reasoning": "Could not parse search results"
            }

    def _validate_general_strict(self, step: Dict, result: str, query: str) -> Dict[str, Any]:
        """Stricter general validation"""
        if not result or result.strip() == "" or result.startswith(("Error", "LLM-ERROR", "Failed")):
            return {
                "approved": False,
                "confidence": 0.0,
                "issues": ["Empty or error result"],
                "reasoning": "Tool execution failed"
            }

        # Check for non-answers
        non_answer_phrases = [
            "i don't know", "i'm not sure", "unable to", "cannot determine",
            "unclear", "insufficient information", "need more details"
        ]

        if any(phrase in result.lower() for phrase in non_answer_phrases):
            return {
                "approved": False,
                "confidence": 0.2,
                "issues": ["Non-committal or uncertain response"],
                "reasoning": "Agent expressed uncertainty rather than providing answer"
            }

        # Minimum quality thresholds
        if len(result.strip()) < 20:
            return {
                "approved": False,
                "confidence": 0.3,
                "issues": ["Response too brief"],
                "reasoning": "Insufficient response detail"
            }

        return {
            "approved": True,
            "confidence": 0.7,
            "issues": [],
            "reasoning": "General quality checks passed"
        }

    def get_validation_stats(self) -> str:
        """Enhanced statistics"""
        stats = self.validation_stats
        plan_approval = (stats["plans_approved"] / max(stats["plans_reviewed"], 1)) * 100
        result_approval = (stats["results_approved"] / max(stats["results_reviewed"], 1)) * 100

        return f"""
📊 CRITICAL SUPERVISOR STATS:
   📋 Plans: {stats['plans_reviewed']} reviewed, {plan_approval:.1f}% approved
   ✅ Results: {stats['results_reviewed']} reviewed, {result_approval:.1f}% approved
   🔄 Total Rejections: {stats['plans_rejected'] + stats['results_rejected']}
   🧮 Math Errors Caught: {stats['math_errors_caught']}
   🚨 Critical Errors Prevented: {stats['critical_errors_prevented']}
   🔁 Retries Triggered: {stats['retries_triggered']}
   📈 Improvements Made: {stats['improvements_made']}
        """

# ENHANCED EXECUTION WITH SMARTER RETRIES
def critical_supervised_execute_plan(plan: List[Dict],
                                    memory_ctx: str = "",
                                    supervisor: CriticalSupervisorAgent = None,
                                    original_query: str = "",
                                    log: List = None) -> List[Dict]:
    """
    Execute with critical supervision and intelligent retries
    """
    if not plan:
        return [{"step": {"tool": "error", "args": {}}, "result": "Empty plan"}]

    results = []
    last_result = ""

    for i, step in enumerate(plan):
        max_attempts = 3  # Allow more attempts for critical problems
        best_result = None
        best_validation = None

        for attempt in range(max_attempts):
            try:
                print(f"   🔧 Executing step {i+1} (attempt {attempt+1}): {step.get('tool', 'unknown')}")

                # Execute step
                step_copy = json.loads(json.dumps(step))
                for key, value in step_copy.get("args", {}).items():
                    if isinstance(value, str):
                        value = value.replace("{result}", str(last_result))
                        step_copy["args"][key] = value

                if attempt == 0:
                    # First attempt - normal execution
                    result = execute_step(step_copy, memory_ctx=memory_ctx)
                else:
                    # Retry attempts with enhanced prompting
                    result = _execute_step_with_enhancement(step_copy, memory_ctx, attempt, original_query)

                # Supervisor validation
                quality_status = "NO_QC"
                validation = None

                if supervisor:
                    validation = supervisor.validate_agent_result(step_copy, result, original_query, memory_ctx)

                    if validation["approved"]:
                        print(f"   ✅ Quality Control: PASSED (confidence: {validation['confidence']:.2f})")
                        best_result = result
                        best_validation = validation
                        quality_status = "QC_PASSED" if attempt == 0 else f"QC_PASSED_RETRY_{attempt}"
                        break  # Success - exit retry loop
                    else:
                        print(f"   ❌ Quality Control: FAILED - {', '.join(validation['issues'])}")
                        if attempt == 0:
                            supervisor.validation_stats["retries_triggered"] += 1

                        # Store this attempt in case we need it as fallback
                        if best_result is None:
                            best_result = result
                            best_validation = validation
                            quality_status = f"QC_FAILED_ATTEMPT_{attempt+1}"
                else:
                    # No supervisor - use first result
                    best_result = result
                    quality_status = "NO_QC"
                    break

            except Exception as e:
                print(f"   ⚠️ Attempt {attempt+1} failed: {e}")
                if attempt == max_attempts - 1:
                    best_result = f"Step {i+1} failed after {max_attempts} attempts: {e}"
                    quality_status = "ERROR"

        # Use best result from all attempts
        if best_result and best_validation and best_validation.get("approved"):
            if supervisor:
                supervisor.validation_stats["improvements_made"] += 1

        last_result = best_result
        results.append({
            "step": step_copy if 'step_copy' in locals() else step,
            "result": best_result,
            "quality_control": quality_status,
            "supervisor_validation": best_validation
        })

        if log is not None:
            log.append({
                "step": step.get("tool", "unknown"),
                "result": str(best_result)[:400] if best_result else "None",
                "quality_status": quality_status
            })

    return results

def _execute_step_with_enhancement(step: Dict, memory_ctx: str, attempt: int, original_query: str) -> str:
    """
    Execute step with enhancement based on attempt number
    """
    tool = step.get("tool", "").lower()

    if tool == "math_solver" and attempt > 0:
        # Enhanced math solving with more explicit instructions
        question = step.get("args", {}).get("question", "")
        enhanced_prompt = f"""
RETRY ATTEMPT {attempt}: Previous attempt failed validation.

CRITICAL INSTRUCTIONS:
1. Read this problem VERY carefully: {original_query}
2. Break it into the smallest possible steps
3. Double-check EVERY single calculation
4. Verify your final answer makes logical sense
5. Use format: FINAL_ANSWER: <exact_number>

ORIGINAL QUESTION: {question}

Be extremely careful with arithmetic. Show all work clearly.
"""
        return math_solver_tool(enhanced_prompt, memory_ctx)

    elif tool == "calculator" and attempt > 0:
        # For calculator, try to simplify or verify expression
        expr = step.get("args", {}).get("expression", "")
        return calculator_tool(expr)  # Calculator should be deterministic

    else:
        # Default execution for other tools or first attempt
        return execute_step(step, memory_ctx)

# Initialize Critical Supervisor
critical_supervisor = CriticalSupervisorAgent(llm_client=llm, max_retries=3)
print("✅ Critical Supervisor Agent initialized with independent verification")

# Direct planning function (used internally by supervisor)
def direct_plan_query(user_query: str, memory_ctx: str = "") -> List[Dict]:
    """
    Direct planning without supervision (used internally by supervisor)
    """
    # Use direct controller routing
    route = controller_route(user_query, memory_ctx)
    return [route]

# Plan validation with supervisor
def supervised_plan_query(user_query: str, memory_ctx: str = "", supervisor: CriticalSupervisorAgent = None) -> List[Dict]:
    """
    Generate plan with supervisor validation
    """
    max_retries = supervisor.max_retries if supervisor else 1

    for attempt in range(max_retries + 1):
        # Generate initial plan
        plan = direct_plan_query(user_query, memory_ctx)

        if not supervisor:
            return plan

        # For now, assume plan validation passes (can be enhanced later)
        # supervisor.validate_plan_decomposition(user_query, plan)
        print(f"   ✅ Plan approved (confidence: 0.90)")
        supervisor.validation_stats["plans_reviewed"] += 1
        supervisor.validation_stats["plans_approved"] += 1
        return plan

    return plan

# Main supervised agent query function
def supervised_agent_query(user_query: str, use_supervisor: bool = True) -> Dict[str, Any]:
    """
    Complete agent pipeline with supervisor oversight
    """
    print(f"🤖 Processing: {user_query[:100]}...")

    # Get memory context
    mem_ctx = "\n".join(pmem.retrieve(user_query, k=3)) if CONFIG["use_memory"] and pmem else ""

    # Generate plan with supervision
    active_supervisor = supervisor if use_supervisor else None
    plan = supervised_plan_query(user_query, mem_ctx, active_supervisor)

    print(f"📋 Plan: {[step.get('tool', 'unknown') for step in plan]}")

    # Execute plan with quality control
    trace_log = []
    results = supervised_execute_plan(
        plan,
        memory_ctx=mem_ctx,
        supervisor=active_supervisor,
        original_query=user_query,
        log=trace_log
    )

    # Get final result
    final_result = results[-1]["result"] if results else "No result"
    quality_passed = results[-1].get("quality_control", "UNKNOWN") in ["QC_PASSED", "QC_PASSED_RETRY_1", "QC_PASSED_RETRY_2"]

    # Update memory
    if CONFIG["use_memory"]:
        pmem.add(f"User asked: {user_query}")
        pmem.add(f"Agent answered: {final_result}")

    # Get supervisor stats
    supervisor_stats = supervisor.get_validation_stats() if use_supervisor else "No supervision used"

    return {
        "query": user_query,
        "plan": plan,
        "results": results,
        "final_answer": final_result,
        "quality_passed": quality_passed,
        "trace_log": trace_log,
        "supervisor_stats": supervisor_stats
    }

# Function reassignments - activate the critical supervisor system
supervisor = critical_supervisor
supervised_execute_plan = critical_supervised_execute_plan

print("✅ Critical supervisor system activated and ready for testing")




✅ Critical Supervisor Agent initialized with independent verification
✅ Critical supervisor system activated and ready for testing


In [16]:
# 🧠 CHUNK 7: Persistent Memory
class PersistentMemory:
    def __init__(self, path="agent_memory.jsonl", embed_model=None):
        self.path = path
        self.embed_model = embed_model
        self.entries = []
        if os.path.exists(path):
            with open(path, "r", encoding="utf-8") as f:
                for line in f:
                    self.entries.append(json.loads(line))
            self._rebuild_index()
        else:
            self.mat = None

    def _rebuild_index(self):
        if not self.entries or self.embed_model is None:
            self.mat = None
            return
        self.mat = np.vstack([np.array(e["vec"], dtype="float32") for e in self.entries])

    def add(self, text: str):
        if self.embed_model is None:
            ent = {"text": text, "ts": time.time(), "vec": None}
        else:
            vec = self.embed_model.encode([text], normalize_embeddings=True)[0].astype(float).tolist()
            ent = {"text": text, "ts": time.time(), "vec": vec}
        self.entries.append(ent)
        with open(self.path, "a", encoding="utf-8") as f:
            f.write(json.dumps(ent, ensure_ascii=False) + "\n")
        self._rebuild_index()

    def retrieve(self, query: str, k: int = 3) -> List[str]:
        if not self.entries:
            return []
        if self.embed_model is None or self.mat is None:
            return [e["text"] for e in self.entries][-k:]
        qv = self.embed_model.encode([query], normalize_embeddings=True)[0].astype("float32")
        sims = self.mat.dot(qv)
        topk = sims.argsort()[::-1][:k]
        return [self.entries[i]["text"] for i in topk]

pmem = PersistentMemory("agent_memory.jsonl", EMB_MODEL)
pmem.add("User prefers concise answers.")



In [17]:
# 📊 CHUNK 8: Enhanced Benchmark Evaluation with Supervisor Integration
from datasets import load_dataset
import pandas as pd
from typing import Dict, Optional

def robust_supervised_math_solve(question: str, memory_ctx: str = "", max_retries: int = 2) -> tuple:
    """
    Robust math solving with supervisor validation and retry logic.
    """
    for attempt in range(max_retries + 1):
        try:
            # Generate solution using math_solver_tool
            result = math_solver_tool(question, memory_ctx)

            # Create step dict for supervisor validation
            step = {
                "tool": "math_solver",
                "args": {"question": question},
                "rationale": "math word problem solving"
            }

            # Supervisor validation
            if supervisor:
                validation = supervisor.validate_agent_result(step, result, question, memory_ctx)

                if validation["approved"]:
                    answer = extract_final_number(result)
                    if answer and answer != "ERROR":
                        return result, answer, "QC_PASSED", validation
                    else:
                        print(f"    Warning: No extractable answer from approved result")
                else:
                    print(f"    Supervisor rejected result: {', '.join(validation.get('issues', []))}")
                    if attempt < max_retries:
                        print(f"    Retrying attempt {attempt + 2}/{max_retries + 1}")
                        continue
            else:
                # No supervisor - use direct extraction
                answer = extract_final_number(result)
                if answer and answer != "ERROR":
                    return result, answer, "NO_QC", None

        except Exception as e:
            print(f"    Math solve attempt {attempt + 1} failed: {e}")
            if attempt == max_retries:
                return "FAILED", None, "ERROR", None

    return "FAILED", None, "QC_FAILED", None

def evaluate_gsm8k_with_supervisor(n: int = 30, use_memory: bool = True, use_supervisor: bool = True) -> tuple[pd.DataFrame, Dict]:
    """
    Enhanced GSM8K evaluation with full supervisor integration
    """
    try:
        gsm = load_dataset("openai/gsm8k", "main", split=f"test[:{n}]")
        print(f"✅ Loaded GSM8K test set: {len(gsm)} examples")
        print(f"🔍 Using supervisor: {'Yes' if use_supervisor else 'No'}")
    except Exception as e:
        print("❌ Could not load GSM8K:", e)
        return None, None

    rows = []
    baseline_ok = agent_ok = supervised_ok = 0

    # Reset supervisor stats
    if supervisor and use_supervisor:
        supervisor.validation_stats = {
            "plans_reviewed": 0,
            "plans_approved": 0,
            "plans_rejected": 0,
            "results_reviewed": 0,
            "results_approved": 0,
            "results_rejected": 0,
            "math_errors_caught": 0,
            "retries_triggered": 0,
            "improvements_made": 0,
            "critical_errors_prevented": 0
        }

    for i, ex in enumerate(gsm):
        print(f"\n📊 Processing question {i+1}/{len(gsm)}...")

        q = ex["question"]
        gold_match = re.search(r"####\s*([-+]?\d+(?:\.\d+)?)", ex["answer"])
        gold = gold_match.group(1) if gold_match else None
        if not gold:
            print(f"    ⚠️ Skipping question {i+1}: No gold answer found")
            continue

        print(f"    Question: {q[:100]}...")
        print(f"    Gold Answer: {gold}")

        # === BASELINE: Direct LLM ===
        try:
            baseline_prompt = [
                {"role": "system", "content": "Solve this math problem step by step. End with your final numerical answer."},
                {"role": "user", "content": q}
            ]
            baseline_out = call_llm(baseline_prompt)
            baseline_ans = extract_final_number(baseline_out) or baseline_out
            baseline_flag = str(baseline_ans).strip() == str(gold).strip()
            baseline_ok += int(baseline_flag)
            print(f"    Baseline: {baseline_ans} {'✅' if baseline_flag else '❌'}")
        except Exception as e:
            print(f"    Baseline error: {e}")
            baseline_flag = False
            baseline_ans = "ERROR"

        # === AGENT WITHOUT SUPERVISOR ===
        try:
            mem_ctx = "\n".join(pmem.retrieve(q, k=3)) if use_memory and pmem else ""

            # Plan generation without supervisor
            plan = direct_plan_query(q, mem_ctx)
            print(f"    Plan: {[step.get('tool', 'unknown') for step in plan]}")

            # Create a simple execution function for no supervisor case
            def simple_execute_plan(plan, memory_ctx=""):
                results = []
                last_result = ""
                for step in plan:
                    result = execute_step(step, memory_ctx)
                    results.append({"step": step, "result": result})
                    last_result = result
                return results

            agent_results = simple_execute_plan(plan, memory_ctx=mem_ctx)
            last_result = agent_results[-1]["result"] if agent_results else ""
            agent_ans = extract_final_number(last_result) or last_result

            agent_flag = str(agent_ans).strip() == str(gold).strip()
            agent_ok += int(agent_flag)
            print(f"    Agent (No Supervisor): {agent_ans} {'✅' if agent_flag else '❌'}")

        except Exception as e:
            print(f"    Agent error: {e}")
            agent_flag = False
            agent_ans = "ERROR"

        # === AGENT WITH SUPERVISOR ===
        supervised_flag = False
        supervised_ans = "ERROR"
        qc_status = "ERROR"

        if use_supervisor:
            try:
                mem_ctx = "\n".join(pmem.retrieve(q, k=3)) if use_memory and pmem else ""

                # STEP 1: Plan with supervisor validation
                plan = supervised_plan_query(q, mem_ctx, supervisor)
                print(f"    Supervised Plan: {[step.get('tool', 'unknown') for step in plan]}")

                # STEP 2: Execute with supervisor quality control
                supervised_results = supervised_execute_plan(
                    plan,
                    memory_ctx=mem_ctx,
                    supervisor=supervisor,
                    original_query=q
                )

                if supervised_results:
                    last_supervised = supervised_results[-1]
                    supervised_ans = extract_final_number(last_supervised["result"]) or last_supervised["result"]
                    qc_status = last_supervised.get("quality_control", "UNKNOWN")

                    supervised_flag = str(supervised_ans).strip() == str(gold).strip()
                    supervised_ok += int(supervised_flag)

                    qc_symbol = "✅" if qc_status.startswith("QC_PASSED") else "❌"
                    print(f"    Supervised Agent: {supervised_ans} {'✅' if supervised_flag else '❌'} [QC: {qc_status} {qc_symbol}]")
                else:
                    print(f"    Supervised Agent: No results")

            except Exception as e:
                print(f"    Supervised Agent error: {e}")
        else:
            supervised_ans = agent_ans  # Same as non-supervised
            supervised_flag = agent_flag
            qc_status = "NO_SUPERVISOR"

        # Store comprehensive results
        rows.append({
            "question_id": i + 1,
            "question": q[:150] + "..." if len(q) > 150 else q,
            "gold": gold,
            "baseline": baseline_ans,
            "baseline_ok": baseline_flag,
            "agent": agent_ans,
            "agent_ok": agent_flag,
            "supervised": supervised_ans,
            "supervised_ok": supervised_flag,
            "qc_status": qc_status,
            "qc_passed": qc_status.startswith("QC_PASSED")
        })

        # Log accuracy to performance monitor
        if 'perf_monitor' in globals():
            perf_monitor.log_accuracy(str(supervised_ans), str(gold), supervised_flag)

        # Progress update
        if (i + 1) % 5 == 0:
            current_supervised_acc = sum(row["supervised_ok"] for row in rows) / len(rows)
            print(f"    📈 Progress: {i+1}/{len(gsm)} | Supervised Accuracy: {current_supervised_acc:.1%}")

    # Create DataFrame and calculate statistics
    df = pd.DataFrame(rows)

    if len(df) == 0:
        print("❌ No valid examples processed")
        return None, None

    stats = {
        "n": len(df),
        "baseline_acc": df["baseline_ok"].mean(),
        "agent_acc": df["agent_ok"].mean(),
        "supervised_acc": df["supervised_ok"].mean(),
        "qc_pass_rate": df["qc_passed"].mean() if use_supervisor else 0,
        "baseline_correct": df["baseline_ok"].sum(),
        "agent_correct": df["agent_ok"].sum(),
        "supervised_correct": df["supervised_ok"].sum(),
        "supervisor_used": use_supervisor
    }

    # Enhanced reporting
    print(f"\n🎯 FINAL EVALUATION RESULTS:")
    print(f"   📊 Dataset size: {stats['n']} questions")
    print(f"   🤖 Baseline (Direct LLM): {stats['baseline_correct']}/{stats['n']} = {stats['baseline_acc']:.1%}")
    print(f"   🧠 Agent (No Supervisor): {stats['agent_correct']}/{stats['n']} = {stats['agent_acc']:.1%}")
    print(f"   👮 Supervised Agent: {stats['supervised_correct']}/{stats['n']} = {stats['supervised_acc']:.1%}")

    if use_supervisor:
        print(f"   🔍 Quality Control Pass Rate: {stats['qc_pass_rate']:.1%}")

    # Show improvements
    agent_improvement = stats['agent_acc'] - stats['baseline_acc']
    supervisor_improvement = stats['supervised_acc'] - stats['agent_acc']
    total_improvement = stats['supervised_acc'] - stats['baseline_acc']

    print(f"   📈 Agent Improvement: {agent_improvement:+.1%}")
    print(f"   🎖️ Supervisor Improvement: {supervisor_improvement:+.1%}")
    print(f"   🏆 Total Improvement: {total_improvement:+.1%}")

    # Show supervisor statistics
    if supervisor and use_supervisor:
        print(f"\n{supervisor.get_validation_stats()}")

    # Show performance insights
    if 'perf_monitor' in globals():
        print(f"\n{perf_monitor.get_insights()}")

    return df, stats

def eval_factual_knowledge_supervised(n: int = 50, use_supervisor: bool = True) -> pd.DataFrame:
    """
    Evaluate factual knowledge with supervisor validation
    """
    sample_facts = [
        {"template": "The capital of France is [MASK].", "obj_label": "Paris"},
        {"template": "The CEO of Apple is [MASK].", "obj_label": "Tim Cook"},
        {"template": "The largest planet in our solar system is [MASK].", "obj_label": "Jupiter"},
        {"template": "The currency of Japan is [MASK].", "obj_label": "Yen"},
        {"template": "The author of Harry Potter is [MASK].", "obj_label": "J.K. Rowling"},
        {"template": "The capital of Germany is [MASK].", "obj_label": "Berlin"},
        {"template": "The founder of Microsoft is [MASK].", "obj_label": "Bill Gates"},
        {"template": "The smallest country in the world is [MASK].", "obj_label": "Vatican City"},
        {"template": "The chemical symbol for gold is [MASK].", "obj_label": "Au"},
        {"template": "The longest river in the world is [MASK].", "obj_label": "Nile"},
        {"template": "The speed of light is approximately [MASK] meters per second.", "obj_label": "300000000"},
        {"template": "The first person to walk on the moon was [MASK].", "obj_label": "Neil Armstrong"},
        {"template": "The largest ocean on Earth is the [MASK].", "obj_label": "Pacific"},
        {"template": "The inventor of the telephone was [MASK].", "obj_label": "Alexander Graham Bell"},
        {"template": "The tallest mountain in the world is [MASK].", "obj_label": "Mount Everest"},
        {"template": "The capital of Australia is [MASK].", "obj_label": "Canberra"},
        {"template": "The author of 1984 is [MASK].", "obj_label": "George Orwell"},
        {"template": "The chemical formula for water is [MASK].", "obj_label": "H2O"},
        {"template": "The President of the United States in 2021 was [MASK].", "obj_label": "Joe Biden"},
        {"template": "The largest mammal in the world is [MASK].", "obj_label": "Blue Whale"}
    ]

    facts = (sample_facts * ((n // len(sample_facts)) + 1))[:n]

    def normalize_token(tok: str) -> str:
        return re.sub(r"[^a-z0-9\-\s]", "", str(tok).strip().lower())

    rows = []
    correct_no_supervisor = 0
    correct_with_supervisor = 0

    print(f"🧠 Evaluating factual knowledge with {'supervisor' if use_supervisor else 'no supervisor'}")

    for i, ex in enumerate(facts):
        masked = ex["template"]
        gold = ex["obj_label"]

        print(f"📋 Question {i+1}/{len(facts)}: {masked}")

        # Without supervisor
        prompt = [
            {"role": "system", "content": "You are a factual knowledge expert. Fill in the [MASK] with the most accurate answer. Respond with ONLY the answer, no explanation."},
            {"role": "user", "content": f"Complete this factual statement: {masked}"}
        ]

        try:
            out_no_supervisor = call_llm(prompt)
            pred_no_supervisor = out_no_supervisor.strip().split('\n')[0].strip() if out_no_supervisor and not out_no_supervisor.startswith("LLM-") else out_no_supervisor
            pred_clean_no_supervisor = re.sub(r'^[^a-zA-Z0-9]*', '', pred_no_supervisor)
            pred_clean_no_supervisor = pred_clean_no_supervisor.split('.')[0]

            gold_norm = normalize_token(gold)
            pred_norm_no_supervisor = normalize_token(pred_clean_no_supervisor)

            is_correct_no_supervisor = (gold_norm == pred_norm_no_supervisor or
                                       gold_norm in pred_norm_no_supervisor or
                                       pred_norm_no_supervisor in gold_norm)

            if is_correct_no_supervisor:
                correct_no_supervisor += 1

        except Exception as e:
            print(f"    Error (no supervisor): {e}")
            pred_clean_no_supervisor = "ERROR"
            pred_norm_no_supervisor = "error"
            is_correct_no_supervisor = False

        # With supervisor
        is_correct_with_supervisor = is_correct_no_supervisor
        pred_clean_with_supervisor = pred_clean_no_supervisor
        qc_status = "NO_QC"

        if use_supervisor:
            try:
                # Create step for supervisor validation
                step = {
                    "tool": "direct_answer",
                    "args": {"query": masked},
                    "rationale": "factual knowledge query"
                }

                # Supervisor validation
                validation = supervisor.validate_agent_result(step, pred_clean_no_supervisor, masked, "")

                if validation["approved"]:
                    pred_clean_with_supervisor = pred_clean_no_supervisor
                    is_correct_with_supervisor = is_correct_no_supervisor
                    qc_status = "QC_PASSED"
                else:
                    # Retry with enhanced prompt
                    enhanced_prompt = [
                        {"role": "system", "content": "You are an expert fact-checker. Provide the most accurate and specific answer to complete the statement. Be precise and concise."},
                        {"role": "user", "content": f"Complete with the exact correct answer: {masked}"}
                    ]

                    retry_out = call_llm(enhanced_prompt)
                    pred_retry = retry_out.strip().split('\n')[0].strip() if retry_out else retry_out
                    pred_clean_retry = re.sub(r'^[^a-zA-Z0-9]*', '', pred_retry)
                    pred_clean_retry = pred_clean_retry.split('.')[0]

                    # Re-validate
                    retry_validation = supervisor.validate_agent_result(step, pred_clean_retry, masked, "")

                    if retry_validation["approved"]:
                        pred_clean_with_supervisor = pred_clean_retry
                        pred_norm_retry = normalize_token(pred_clean_retry)
                        is_correct_with_supervisor = (gold_norm == pred_norm_retry or
                                                    gold_norm in pred_norm_retry or
                                                    pred_norm_retry in gold_norm)
                        qc_status = "QC_PASSED_RETRY"
                    else:
                        pred_clean_with_supervisor = pred_clean_retry
                        pred_norm_retry = normalize_token(pred_clean_retry)
                        is_correct_with_supervisor = (gold_norm == pred_norm_retry or
                                                    gold_norm in pred_norm_retry or
                                                    pred_norm_retry in gold_norm)
                        qc_status = "QC_FAILED"

            except Exception as e:
                print(f"    Supervisor error: {e}")
                pred_clean_with_supervisor = pred_clean_no_supervisor
                is_correct_with_supervisor = is_correct_no_supervisor
                qc_status = "QC_ERROR"

        if is_correct_with_supervisor:
            correct_with_supervisor += 1

        print(f"    Gold: {gold}")
        print(f"    No Supervisor: {pred_clean_no_supervisor} {'✅' if is_correct_no_supervisor else '❌'}")
        if use_supervisor:
            print(f"    With Supervisor: {pred_clean_with_supervisor} {'✅' if is_correct_with_supervisor else '❌'} [QC: {qc_status}]")

        rows.append({
            "masked": masked,
            "gold": gold,
            "pred_no_supervisor": pred_clean_no_supervisor,
            "correct_no_supervisor": is_correct_no_supervisor,
            "pred_with_supervisor": pred_clean_with_supervisor,
            "correct_with_supervisor": is_correct_with_supervisor,
            "qc_status": qc_status,
            "gold_norm": gold_norm,
            "pred_norm_no_supervisor": pred_norm_no_supervisor
        })

    df = pd.DataFrame(rows)

    acc_no_supervisor = correct_no_supervisor / len(rows) if rows else 0
    acc_with_supervisor = correct_with_supervisor / len(rows) if rows else 0
    improvement = acc_with_supervisor - acc_no_supervisor

    print(f"\n🎯 FACTUAL KNOWLEDGE RESULTS:")
    print(f"   📊 Questions: {len(df)}")
    print(f"   🤖 No Supervisor: {correct_no_supervisor}/{len(rows)} = {acc_no_supervisor:.1%}")
    print(f"   👮 With Supervisor: {correct_with_supervisor}/{len(rows)} = {acc_with_supervisor:.1%}")
    print(f"   📈 Improvement: {improvement:+.1%}")

    if use_supervisor:
        qc_pass_rate = (df['qc_status'].isin(['QC_PASSED', 'QC_PASSED_RETRY']).sum() / len(df)) if len(df) > 0 else 0
        print(f"   🔍 Quality Control Pass Rate: {qc_pass_rate:.1%}")

    return df


In [18]:
# 📄 CHUNK 9: Enhanced Report Generation
def generate_comprehensive_report(gsm8k_stats: dict, factual_df: pd.DataFrame = None):
    """Generate comprehensive evaluation report with supervisor metrics"""

    timestamp = time.strftime("%Y-%m-%d %H:%M:%S")

    with open("comprehensive_agent_report.md", "w") as f:
        f.write("# 🧠 AI Agent Evaluation Report with Supervisor Integration\n\n")
        f.write(f"Generated: {timestamp}\n\n")

        f.write("## 📊 Executive Summary\n")
        f.write(f"- **Test Set Size**: {gsm8k_stats['n']} GSM8K problems\n")
        f.write(f"- **Supervisor Integration**: {'Enabled' if gsm8k_stats['supervisor_used'] else 'Disabled'}\n\n")

        f.write("## 📈 GSM8K Mathematical Reasoning Results\n")
        f.write(f"- **Baseline (Direct LLM)**: {gsm8k_stats['baseline_acc']:.1%} ({gsm8k_stats['baseline_correct']}/{gsm8k_stats['n']})\n")
        f.write(f"- **Agent (No Supervisor)**: {gsm8k_stats['agent_acc']:.1%} ({gsm8k_stats['agent_correct']}/{gsm8k_stats['n']})\n")
        f.write(f"- **Supervised Agent**: {gsm8k_stats['supervised_acc']:.1%} ({gsm8k_stats['supervised_correct']}/{gsm8k_stats['n']})\n")

        if gsm8k_stats['supervisor_used']:
            f.write(f"- **Quality Control Pass Rate**: {gsm8k_stats['qc_pass_rate']:.1%}\n")

        f.write("\n### 🎯 Performance Improvements\n")
        agent_improvement = gsm8k_stats['agent_acc'] - gsm8k_stats['baseline_acc']
        supervisor_improvement = gsm8k_stats['supervised_acc'] - gsm8k_stats['agent_acc']
        total_improvement = gsm8k_stats['supervised_acc'] - gsm8k_stats['baseline_acc']

        f.write(f"- **Agent vs Baseline**: {agent_improvement:+.1%}\n")
        f.write(f"- **Supervisor vs Agent**: {supervisor_improvement:+.1%}\n")
        f.write(f"- **Total Improvement**: {total_improvement:+.1%}\n\n")

        # Factual knowledge results
        if factual_df is not None and not factual_df.empty:
            f.write("## 🧠 Factual Knowledge Results\n")
            acc_no_supervisor = factual_df['correct_no_supervisor'].mean()
            acc_with_supervisor = factual_df['correct_with_supervisor'].mean()
            factual_improvement = acc_with_supervisor - acc_no_supervisor

            f.write(f"- **Questions Evaluated**: {len(factual_df)}\n")
            f.write(f"- **No Supervisor**: {acc_no_supervisor:.1%}\n")
            f.write(f"- **With Supervisor**: {acc_with_supervisor:.1%}\n")
            f.write(f"- **Improvement**: {factual_improvement:+.1%}\n\n")

        # Quality control analysis
        if gsm8k_stats['supervisor_used']:
            f.write("## 🔍 Quality Control Analysis\n")
            f.write(f"- **Overall QC Pass Rate**: {gsm8k_stats['qc_pass_rate']:.1%}\n")
            f.write("- **Quality Control Categories**:\n")
            f.write("  - QC_PASSED: Answer approved on first attempt\n")
            f.write("  - QC_PASSED_RETRY: Answer approved after retry\n")
            f.write("  - QC_FAILED: Answer rejected by supervisor\n")
            f.write("  - QC_ERROR: Quality control system error\n\n")

        # Supervisor statistics
        if supervisor and gsm8k_stats['supervisor_used']:
            f.write("## 👮 Supervisor Agent Statistics\n")
            stats = supervisor.validation_stats
            plan_approval = (stats["plans_approved"] / max(stats["plans_reviewed"], 1)) * 100
            result_approval = (stats["results_approved"] / max(stats["results_reviewed"], 1)) * 100

            f.write(f"- **Plans Reviewed**: {stats['plans_reviewed']}\n")
            f.write(f"- **Plan Approval Rate**: {plan_approval:.1%}\n")
            f.write(f"- **Results Reviewed**: {stats['results_reviewed']}\n")
            f.write(f"- **Result Approval Rate**: {result_approval:.1%}\n")
            f.write(f"- **Total Rejections**: {stats['plans_rejected'] + stats['results_rejected']}\n\n")

        f.write("## ⚙️ System Configuration\n")
        f.write(f"- **Model**: {CONFIG['model']}\n")
        f.write(f"- **Temperature**: {CONFIG['temperature']}\n")
        f.write(f"- **Max Retries**: {CONFIG['max_retries']}\n")
        f.write(f"- **Memory Enabled**: {CONFIG['use_memory']}\n")
        f.write(f"- **Planning Enabled**: {CONFIG['use_planning']}\n")
        f.write(f"- **Verification Mode**: {CONFIG['verification_mode']}\n\n")

        f.write("## 🔧 Technical Details\n")
        f.write("### Supervisor Agent Features\n")
        f.write("- **Plan Validation**: Reviews controller decomposition before execution\n")
        f.write("- **Result Quality Control**: Validates agent outputs for correctness\n")
        f.write("- **Retry Mechanism**: Automatically retries failed operations\n")
        f.write("- **Performance Tracking**: Monitors accuracy and response times\n")
        f.write("- **Error Recovery**: Handles and recovers from various error conditions\n\n")

        f.write("### Agent Pipeline\n")
        f.write("1. **Query Analysis**: User question is processed\n")
        f.write("2. **Plan Generation**: Controller creates execution plan\n")
        f.write("3. **Plan Validation**: Supervisor reviews and approves/rejects plan\n")
        f.write("4. **Plan Execution**: Individual agents execute their tasks\n")
        f.write("5. **Result Validation**: Supervisor validates each agent's output\n")
        f.write("6. **Quality Control**: Final approval or retry mechanism\n")
        f.write("7. **Response Generation**: Formatted response to user\n\n")

    print(f"📄 Comprehensive report saved to comprehensive_agent_report.md")


In [20]:
# 🧪 CHUNK 10: Supervised Testing and Validation
def run_supervised_evaluation_suite():
    """
    Run complete evaluation suite with supervisor integration
    """
    print("🚀 Starting Supervised Agent Evaluation Suite\n")
    print("="*60)

    # Test 1: GSM8K Mathematical Reasoning
    print("🧮 PHASE 1: Mathematical Reasoning (GSM8K)")
    print("-" * 40)

    gsm8k_df, gsm8k_stats = evaluate_gsm8k_with_supervisor(n=20, use_supervisor=True)

    if gsm8k_df is not None:
        print("✅ GSM8K evaluation completed successfully")
    else:
        print("❌ GSM8K evaluation failed")
        return

    print("\n" + "="*60)

    # Test 2: Factual Knowledge
    print("🧠 PHASE 2: Factual Knowledge Evaluation")
    print("-" * 40)

    factual_df = eval_factual_knowledge_supervised(n=15, use_supervisor=True)

    if factual_df is not None:
        print("✅ Factual knowledge evaluation completed successfully")
    else:
        print("❌ Factual knowledge evaluation failed")
        factual_df = pd.DataFrame()  # Empty dataframe for report

    print("\n" + "="*60)

    # Test 3: Supervisor Performance Analysis
    print("👮 PHASE 3: Supervisor Performance Analysis")
    print("-" * 40)

    if supervisor:
        print(supervisor.get_validation_stats())

    print("\n" + "="*60)

    # Generate comprehensive report
    print("📄 PHASE 4: Report Generation")
    print("-" * 40)

    generate_comprehensive_report(gsm8k_stats, factual_df)

    print("\n🎉 Evaluation Suite Completed Successfully!")
    print("📊 Summary:")
    print(f"   Mathematical Reasoning: {gsm8k_stats['supervised_acc']:.1%}")
    if not factual_df.empty:
        factual_acc = factual_df['correct_with_supervisor'].mean()
        print(f"   Factual Knowledge: {factual_acc:.1%}")
    print(f"   Quality Control Pass Rate: {gsm8k_stats['qc_pass_rate']:.1%}")

    return gsm8k_df, factual_df, gsm8k_stats

def test_supervisor_integration():
    """
    Test supervisor integration with sample queries
    """
    print("🧪 Testing Supervisor Integration")
    print("="*50)

    test_queries = [
        "What is 25% of 480?",
        "If Alice has 12 apples and gives away 5, then buys 8 more, how many does she have?",
        "There are 15 trees initially. Workers plant more trees. Final count is 21 trees. How many were planted?",
        "What is the capital of France?",
        "Who is the current CEO of Apple?"
    ]

    results = []

    for i, query in enumerate(test_queries, 1):
        print(f"\n📋 Test {i}: {query}")
        print("-" * 40)

        try:
            # Test with supervisor
            result = supervised_agent_query(query, use_supervisor=True)

            print(f"Answer: {result['final_answer']}")
            print(f"Quality Control: {'PASSED' if result['quality_passed'] else 'FAILED'}")
            print(f"Plan: {[step.get('tool', 'unknown') for step in result['plan']]}")

            results.append({
                'query': query,
                'answer': result['final_answer'],
                'quality_passed': result['quality_passed'],
                'plan': result['plan']
            })

        except Exception as e:
            print(f"❌ Error: {e}")
            results.append({
                'query': query,
                'answer': 'ERROR',
                'quality_passed': False,
                'plan': []
            })

    # Summary
    success_rate = sum(1 for r in results if r['quality_passed']) / len(results)
    print(f"\n🎯 Integration Test Summary:")
    print(f"   Success Rate: {success_rate:.1%}")
    print(f"   Tests Passed: {sum(1 for r in results if r['quality_passed'])}/{len(results)}")

    return results


In [19]:
# 💬 CHUNK 11: Enhanced Gradio UI with Supervisor Integration (with File Upload)
import gradio as gr
from typing import List
import time

def enhanced_gradio_agent(message: str, history: List, use_supervisor: bool = True, uploaded_files=None):
    """
    Enhanced agent with supervisor integration for Gradio UI, now with optional file upload
    """
    try:
        start_time = time.time()

        # Process uploaded files (add to context)
        file_context = ""
        if uploaded_files:
            for file in uploaded_files:
                try:
                    with open(file.name, "r", encoding="utf-8") as f:
                        content = f.read()[:2000]  # Read first 2000 characters
                    file_context += f"\n[DOCUMENT: {file.name.split('/')[-1]}]\n{content}\n"
                except Exception as e:
                    file_context += f"\n[ERROR reading {file.name}: {e}]\n"

        # Get memory context
        mem_ctx = "\n".join(pmem.retrieve(message, k=3)) if CONFIG["use_memory"] and pmem else ""
        full_context = mem_ctx + file_context if file_context else mem_ctx

        # Process with or without supervisor
        if use_supervisor:
            result = supervised_agent_query(message, use_supervisor=True)
            answer = result['final_answer']
            quality_status = "QC: PASSED" if result['quality_passed'] else "QC: FAILED"
            plan_info = f"Plan: {' → '.join([step.get('tool', 'unknown') for step in result['plan']])}"
        else:
            plan = plan_query(message, memory_ctx=full_context)
            trace_log = []

            def simple_execute(plan, memory_ctx=""):
                results = []
                for step in plan:
                    result = execute_step(step, memory_ctx)
                    results.append({"step": step, "result": result})
                return results

            execution_results = simple_execute(plan, memory_ctx=full_context)
            answer = execution_results[-1]["result"] if execution_results else "I don't know."
            quality_status = "QC: DISABLED"
            plan_info = f"Plan: {' → '.join([step.get('tool', 'unknown') for step in plan])}"

        response_time = time.time() - start_time

        # Update memory
        if CONFIG["use_memory"]:
            pmem.add(f"User asked: {message}")
            pmem.add(f"Agent answered: {answer}")

        # Format response with metadata
        uploaded_count = len(uploaded_files) if uploaded_files else 0
        formatted_response = f"{answer}\n\n---\n{quality_status} | {plan_info} | Time: {response_time:.1f}s | 📎 Files: {uploaded_count}"

        # Streaming effect for better UX
        streaming_response = ""
        for word in formatted_response.split():
            streaming_response += word + " "
            time.sleep(0.03)
            yield streaming_response.strip()

        yield formatted_response

    except Exception as e:
        error_msg = f"❌ Error: {str(e)}"
        yield error_msg


def create_enhanced_interface():
    """
    Create enhanced Gradio interface with supervisor controls + file upload
    """
    with gr.Blocks(title="AI Agent with Supervisor", theme=gr.themes.Soft()) as demo:
        gr.Markdown("# 🧠 AI Agent with Supervisor Integration")
        gr.Markdown("Advanced AI agent with quality control and supervisor validation")

        with gr.Row():
            with gr.Column(scale=3):
                chatbot = gr.Chatbot(
                    height=500,
                    show_label=False,
                    container=False
                )

                with gr.Row():
                    msg = gr.Textbox(
                        placeholder="Ask me anything...",
                        container=False,
                        scale=7,
                        show_label=False
                    )
                    submit_btn = gr.Button("Send", variant="primary", scale=1)

                # 🔽 File Upload Added Here — Only Change
                with gr.Accordion("📎 Upload Document (TXT, PDF, CSV)", open=False):
                    file_upload = gr.File(
                        label="Upload files to provide context",
                        file_count="multiple",
                        file_types=[".txt", ".pdf", ".csv", ".docx"]
                    )

                with gr.Row():
                    supervisor_toggle = gr.Checkbox(
                        label="Enable Supervisor",
                        value=True,
                        info="Use supervisor for quality control"
                    )
                    clear_btn = gr.Button("Clear Chat", scale=1)

            with gr.Column(scale=1):
                gr.Markdown("### 📊 System Status")

                status_display = gr.Textbox(
                    label="Current Status",
                    value="Ready",
                    interactive=False
                )

                metrics_display = gr.Textbox(
                    label="Performance Metrics",
                    value=f"Queries: 0\nAccuracy: --\nQC Rate: --",
                    interactive=False,
                    lines=3
                )

                gr.Markdown("### ⚡ Quick Actions")
                eval_btn = gr.Button("Run Evaluation", variant="secondary")
                stats_btn = gr.Button("Show Stats", variant="secondary")

                gr.Markdown("### 💡 Example Queries")
                examples = [
                    "What is 25% of 480?",
                    "If I save $500 monthly for 3 years at 5% interest, what's my total?",
                    "Who is the current CEO of Microsoft?",
                    "What does our document say about RAG?",
                    "Calculate: (15 + 25) × 3 - 10"
                ]

                for example in examples:
                    example_btn = gr.Button(example, size="sm")
                    example_btn.click(
                        lambda x=example: x,
                        outputs=[msg]
                    )

        # Event Handlers (updated to include file upload)
        def respond(message, history, use_supervisor, files):
            if not message.strip():
                return history, "", None
            bot_message = ""
            for partial in enhanced_gradio_agent(message, history, use_supervisor, uploaded_files=files):
                bot_message = partial
                yield history + [[message, bot_message]], "", None
            return history + [[message, bot_message]], "", None

        def run_quick_eval():
            try:
                test_results = test_supervisor_integration()
                success_rate = sum(1 for r in test_results if r['quality_passed']) / len(test_results)
                return f"✅ Evaluation complete!\nSuccess Rate: {success_rate:.1%}"
            except Exception as e:
                return f"❌ Evaluation failed: {e}"

        def show_stats():
            if supervisor:
                return supervisor.get_validation_stats()
            else:
                return "Supervisor not available"

        # Connect events
        submit_btn.click(
            respond,
            inputs=[msg, chatbot, supervisor_toggle, file_upload],
            outputs=[chatbot, msg, file_upload]
        )
        msg.submit(
            respond,
            inputs=[msg, chatbot, supervisor_toggle, file_upload],
            outputs=[chatbot, msg, file_upload]
        )
        clear_btn.click(
            lambda: ([], "", None),
            outputs=[chatbot, msg, file_upload]
        )
        eval_btn.click(run_quick_eval, outputs=[status_display])
        stats_btn.click(show_stats, outputs=[metrics_display])

    return demo

In [21]:
# 🚀 CHUNK 12: Main Execution and Testing
def main():
    """
    Main execution function with comprehensive testing
    """
    print("🚀 AI Agent with Supervisor Integration")
    print("="*60)

    # Verify system components
    print("🔧 System Check:")
    print(f"   LLM Available: {'✅' if llm else '❌'}")
    print(f"   Supervisor Active: {'✅' if supervisor else '❌'}")
    print(f"   Memory Enabled: {'✅' if CONFIG['use_memory'] else '❌'}")
    print(f"   Documents Loaded: {len(DOCS)} chunks")

    print("\n" + "="*60)

    # Quick integration test
    print("🧪 Quick Integration Test")
    print("-" * 30)

    test_results = test_supervisor_integration()

    print("\n" + "="*60)

    # Optional: Run full evaluation suite
    run_full_eval = input("🔍 Run full evaluation suite? (y/N): ").lower().strip() == 'y'

    if run_full_eval:
        gsm8k_df, factual_df, gsm8k_stats = run_supervised_evaluation_suite()
        print("\n📊 Full evaluation completed - check comprehensive_agent_report.md")

    print("\n" + "="*60)

    # Launch interface
    launch_ui = input("🌐 Launch Gradio interface? (Y/n): ").lower().strip()

    if launch_ui != 'n':
        print("🌐 Launching enhanced Gradio interface...")
        demo = create_enhanced_interface()
        demo.launch(
            share=True,
            server_name="0.0.0.0",
            show_error=True
        )
    else:
        print("✅ Setup complete. Use supervised_agent_query() for programmatic access.")

# Test the critical supervisor system with sample queries
print("🚀 Testing Critical Supervisor with Independent Verification...")

# Test with supervisor
result = supervised_agent_query("What is 25% of 480?", use_supervisor=True)
print(f"Answer: {result['final_answer']}")
print(f"Quality Control: {'PASSED' if result['quality_passed'] else 'FAILED'}")
print(result['supervisor_stats'])

result = supervised_agent_query("There are 15 trees in the grove. Grove workers will plant trees in the grove today. After they are done, there will be 21 trees. How many trees did the grove workers plant today?", use_supervisor=True)
print(f"Answer: {result['final_answer']}")
print(f"Quality Control: {'PASSED' if result['quality_passed'] else 'FAILED'}")
print(result['supervisor_stats'])

# Initialize and run if this is the main module
if __name__ == "__main__":
    print("✅ Enhanced AI Agent with Supervisor Integration loaded successfully!")
    print("\n🎯 Key Functions Available:")
    print("   • supervised_agent_query(query, use_supervisor=True)")
    print("   • test_supervisor_integration()")
    print("   • run_supervised_evaluation_suite()")
    print("   • evaluate_gsm8k_with_supervisor(n=30)")
    print("   • create_enhanced_interface()")

    # Auto-run main if in interactive environment
    try:
        main()
    except KeyboardInterrupt:
        print("\n👋 Goodbye!")
    except Exception as e:
        print(f"❌ Error in main execution: {e}")
        print("✅ Core functions still available for manual use")



🚀 Testing Critical Supervisor with Independent Verification...
🤖 Processing: What is 25% of 480?...
   ✅ Plan approved (confidence: 0.90)
📋 Plan: ['math_solver']
   🔧 Executing step 1 (attempt 1): math_solver
   ❌ Quality Control: FAILED - Arithmetic error in addition: 0.25 op 480.0 = 120.0, should be 480.25
   🔧 Executing step 1 (attempt 2): math_solver
   ❌ Quality Control: FAILED - Arithmetic error in addition: 0.25 op 480.0 = 120.0, should be 480.25, Arithmetic error in addition: 25.0 op 100.0 = 0.25, should be 125.0, Independent verification disagrees: expected ~214.6, got 120
   🔧 Executing step 1 (attempt 3): math_solver
   ❌ Quality Control: FAILED - Arithmetic error in addition: 0.25 op 480.0 = 120.0, should be 480.25, Arithmetic error in addition: 25.0 op 100.0 = 0.25, should be 125.0
Answer: Step 1: Identify what we need to find...
We need to find 25% of 480.

Step 2: Set up the calculation...
25% is equivalent to 0.25 (as a decimal). So, we need to multiply 0.25 by 480.

St

README.md: 0.00B [00:00, ?B/s]

main/train-00000-of-00001.parquet:   0%|          | 0.00/2.31M [00:00<?, ?B/s]

main/test-00000-of-00001.parquet:   0%|          | 0.00/419k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/7473 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/1319 [00:00<?, ? examples/s]

✅ Loaded GSM8K test set: 20 examples
🔍 Using supervisor: Yes

📊 Processing question 1/20...
    Question: Janet’s ducks lay 16 eggs per day. She eats three for breakfast every morning and bakes muffins for ...
    Gold Answer: 18
    Baseline: 9 ❌
    Plan: ['math_solver']
    Agent (No Supervisor): 18 ✅
   ✅ Plan approved (confidence: 0.90)
    Supervised Plan: ['math_solver']
   🔧 Executing step 1 (attempt 1): math_solver
   ❌ Quality Control: FAILED - Arithmetic error in addition: 16.0 op 7.0 = 9.0, should be 23.0, Arithmetic error in addition: 9.0 op 2.0 = 18.0, should be 11.0
   🔧 Executing step 1 (attempt 2): math_solver
   ❌ Quality Control: FAILED - Arithmetic error in addition: 16.0 op 7.0 = 9.0, should be 23.0, Arithmetic error in addition: 9.0 op 2.0 = 18.0, should be 11.0, Independent verification disagrees: expected ~9, got 18
   🔧 Executing step 1 (attempt 3): math_solver
   ❌ Quality Control: FAILED - Arithmetic error in addition: 16.0 op 7.0 = 9.0, should be 23.0, Arith

  chatbot = gr.Chatbot(


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://0aac72be8fe43ae001.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
