# Paradigm 04: Quality Gates Research Agent

This notebook implements the **Industrial Control Theory (Quality Gates)** paradigm from the Research Paradigms document.

## Core Concept

Quality gates act as checkpoints between research phases, enforcing quality criteria before proceeding:
- **Post-Search Gate**: Validates source quantity and diversity
- **Post-Synthesis Gate**: Validates claim coverage and evidence support
- **Retry Logic**: Modifies queries on gate failure

## Literature Validation

> "Quality gates for research validation have emerged as a key architectural pattern in AI Scientist and Agent Laboratory... demonstrating the effectiveness of staged quality checkpoints." —Feasibility Report

## Technology Stack

- **LLM**: `gpt-5-mini-2025-08-07`
- **Web Search**: Tavily API
- **Tracing**: LangSmith
- **Framework**: LangGraph

## 1. Setup and Configuration

In [None]:
import os
import operator
import asyncio
import re
from pathlib import Path
from typing import List, Annotated, TypedDict, Literal
from collections import Counter

from dotenv import load_dotenv
from pydantic import BaseModel, Field

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from tavily import TavilyClient

from langgraph.graph import StateGraph, START, END

# Load environment variables
env_path = Path("../.env")
load_dotenv(env_path)

# Configure LangSmith tracing
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "deep_research_new"

print("Environment configured successfully")

In [None]:
# Initialize LLM and Tavily client
MODEL_NAME = "gpt-5-mini-2025-08-07"
llm = ChatOpenAI(model=MODEL_NAME, temperature=0)
tavily_client = TavilyClient()

# Quality Gate Configuration
MIN_SOURCES = 5  # Minimum sources required to pass Gate 1
MIN_DOMAIN_DIVERSITY = 3  # Minimum unique domains
MAX_RETRIES = 2  # Maximum retry attempts per gate

print(f"Using model: {MODEL_NAME}")
print(f"Gate 1: Min {MIN_SOURCES} sources, {MIN_DOMAIN_DIVERSITY} unique domains")
print(f"Max retries: {MAX_RETRIES}")

## 2. State Definitions

In [None]:
class SearchResult(BaseModel):
    """A search result with source metadata."""
    title: str = Field(description="Title of the source")
    content: str = Field(description="Content snippet")
    url: str = Field(description="Source URL")
    domain: str = Field(default="", description="Domain of the source")

class GateResult(BaseModel):
    """Result of a quality gate check."""
    passed: bool = Field(description="Whether the gate was passed")
    reason: str = Field(description="Explanation of the gate result")
    suggestions: List[str] = Field(default_factory=list, description="Suggestions for improvement")

In [None]:
class QualityGateState(TypedDict):
    """State for the Quality Gates Research Agent."""
    # Input
    question: str
    
    # Research plan
    search_queries: List[str]
    
    # Search results
    search_results: Annotated[List[str], operator.add]
    source_urls: Annotated[List[str], operator.add]
    
    # Gate tracking
    gate1_attempts: int
    gate2_attempts: int
    gate1_passed: bool
    gate2_passed: bool
    
    # Synthesis
    synthesis: str
    
    # Output
    final_report: str

## 3. Helper Functions

In [None]:
def extract_domain(url: str) -> str:
    """Extract domain from URL."""
    try:
        from urllib.parse import urlparse
        parsed = urlparse(url)
        domain = parsed.netloc
        # Remove www. prefix
        if domain.startswith("www."):
            domain = domain[4:]
        return domain
    except:
        return "unknown"

def search_web(query: str, max_results: int = 10) -> tuple[List[str], List[str]]:
    """Execute web search using Tavily. Returns (results, urls)."""
    try:
        # Truncate query if too long
        if len(query) > 400:
            query = query[:400]
        
        response = tavily_client.search(
            query=query,
            max_results=max_results,
            include_answer=True
        )
        
        results = []
        urls = []
        
        if response.get("answer"):
            results.append(f"Summary: {response['answer']}")
        
        for r in response.get("results", []):
            url = r.get('url', '')
            urls.append(url)
            domain = extract_domain(url)
            results.append(f"- [{domain}] {r.get('title', 'No title')}: {r.get('content', '')[:500]}... (Source: {url})")
        
        return results, urls
    except Exception as e:
        return [f"Search error: {str(e)}"], []

## 4. Node Functions

In [None]:
# Prompts
PLAN_QUERIES_PROMPT = """You are a research planning expert. Given a research question,
generate 5-7 specific web search queries that will gather comprehensive information.

Research Question: {question}

Consider different angles:
- Definition and background
- Current state and trends
- Key challenges and solutions
- Case studies and examples
- Expert opinions and analysis

Return ONLY the search queries, one per line.
"""

SYNTHESIS_PROMPT = """You are a research analyst synthesizing findings.

Research Question: {question}

Search Results:
{search_results}

Synthesize these findings into a coherent analysis (600-800 words):
1. Identify key themes and patterns
2. Note any contradictions or debates
3. Highlight important statistics or facts
4. Reference sources appropriately
"""

IMPROVE_QUERIES_PROMPT = """The previous search queries did not yield enough diverse sources.

Original Question: {question}
Previous Queries: {previous_queries}
Gap Analysis: {gap_analysis}

Generate 3 NEW search queries that will:
1. Target different types of sources (academic, industry, news)
2. Cover gaps identified in previous searches
3. Be more specific or use different terminology

Return ONLY the new search queries, one per line.
"""

FINAL_REPORT_PROMPT = """You are a senior research analyst writing a final report.

Research Question: {question}

Synthesis:
{synthesis}

Source URLs:
{sources}

Write a comprehensive research report (1000-1500 words) that:
1. Directly answers the research question
2. Provides evidence-based analysis
3. Includes proper citations
4. Acknowledges limitations or uncertainties
"""

In [None]:
async def generate_plan(state: QualityGateState) -> dict:
    """Generate initial research plan with search queries."""
    question = state["question"]
    
    prompt = PLAN_QUERIES_PROMPT.format(question=question)
    response = await llm.ainvoke([HumanMessage(content=prompt)])
    
    # Parse queries from response
    queries = [q.strip() for q in response.content.split("\n") if q.strip() and not q.strip().startswith("#")]
    queries = queries[:7]  # Limit to 7 queries
    
    print(f"Generated {len(queries)} search queries")
    
    return {
        "search_queries": queries,
        "gate1_attempts": 0,
        "gate2_attempts": 0,
        "gate1_passed": False,
        "gate2_passed": False
    }

In [None]:
async def execute_searches(state: QualityGateState) -> dict:
    """Execute all search queries."""
    queries = state.get("search_queries", [])
    
    all_results = []
    all_urls = []
    
    print(f"\n{'='*60}")
    print(f"Executing {len(queries)} searches...")
    print(f"{'='*60}")
    
    for i, query in enumerate(queries):
        print(f"  [{i+1}/{len(queries)}] {query[:60]}...")
        results, urls = search_web(query)
        all_results.extend(results)
        all_urls.extend(urls)
    
    print(f"  Collected {len(all_results)} results from {len(set(all_urls))} unique URLs")
    
    return {
        "search_results": all_results,
        "source_urls": all_urls
    }

In [None]:
async def quality_gate_1(state: QualityGateState) -> dict:
    """Quality Gate 1: Validate source quantity and diversity."""
    urls = state.get("source_urls", [])
    attempts = state.get("gate1_attempts", 0) + 1
    
    # Calculate metrics
    unique_urls = list(set(urls))
    domains = [extract_domain(url) for url in unique_urls]
    unique_domains = list(set(domains))
    
    print(f"\n--- Quality Gate 1 (Attempt {attempts}/{MAX_RETRIES + 1}) ---")
    print(f"  Unique sources: {len(unique_urls)} (min: {MIN_SOURCES})")
    print(f"  Unique domains: {len(unique_domains)} (min: {MIN_DOMAIN_DIVERSITY})")
    
    # Check gate criteria
    passed = len(unique_urls) >= MIN_SOURCES and len(unique_domains) >= MIN_DOMAIN_DIVERSITY
    
    if passed:
        print(f"  ✓ Gate 1 PASSED")
    else:
        print(f"  ✗ Gate 1 FAILED")
    
    return {
        "gate1_attempts": attempts,
        "gate1_passed": passed
    }

In [None]:
def route_after_gate1(state: QualityGateState) -> Literal["synthesize", "refine_queries", "synthesize_anyway"]:
    """Route based on Gate 1 result."""
    passed = state.get("gate1_passed", False)
    attempts = state.get("gate1_attempts", 0)
    
    if passed:
        return "synthesize"
    elif attempts <= MAX_RETRIES:
        return "refine_queries"
    else:
        print(f"  Max retries reached. Proceeding with available sources.")
        return "synthesize_anyway"

In [None]:
async def refine_queries(state: QualityGateState) -> dict:
    """Refine search queries after Gate 1 failure."""
    question = state["question"]
    previous_queries = state.get("search_queries", [])
    urls = state.get("source_urls", [])
    
    # Analyze gaps
    domains = [extract_domain(url) for url in urls]
    domain_counts = Counter(domains)
    
    gap_analysis = f"Found {len(set(urls))} unique sources from {len(set(domains))} domains. "
    gap_analysis += f"Most common domains: {dict(domain_counts.most_common(3))}. "
    gap_analysis += "Need more diverse sources."
    
    prompt = IMPROVE_QUERIES_PROMPT.format(
        question=question,
        previous_queries="\n".join(previous_queries),
        gap_analysis=gap_analysis
    )
    
    response = await llm.ainvoke([HumanMessage(content=prompt)])
    new_queries = [q.strip() for q in response.content.split("\n") if q.strip()]
    
    print(f"  Generated {len(new_queries)} refined queries for retry")
    
    return {
        "search_queries": new_queries[:3]
    }

In [None]:
async def synthesize(state: QualityGateState) -> dict:
    """Synthesize search results into coherent analysis."""
    question = state["question"]
    search_results = state.get("search_results", [])
    
    prompt = SYNTHESIS_PROMPT.format(
        question=question,
        search_results="\n\n".join(search_results[:30])  # Limit to avoid context overflow
    )
    
    response = await llm.ainvoke([HumanMessage(content=prompt)])
    
    print(f"\nSynthesis complete: {len(response.content)} characters")
    
    return {
        "synthesis": response.content
    }

In [None]:
async def quality_gate_2(state: QualityGateState) -> dict:
    """Quality Gate 2: Validate synthesis quality."""
    synthesis = state.get("synthesis", "")
    question = state["question"]
    attempts = state.get("gate2_attempts", 0) + 1
    
    # Check synthesis quality using LLM
    check_prompt = f"""Evaluate this research synthesis on a scale of 1-10:
    
Question: {question}

Synthesis:
{synthesis[:2000]}

Rate on:
1. Does it address the question? (1-10)
2. Is evidence provided? (1-10)
3. Is it coherent? (1-10)

Respond with ONLY three numbers separated by commas (e.g., "8, 7, 9")."""
    
    response = await llm.ainvoke([HumanMessage(content=check_prompt)])
    
    # Parse scores
    try:
        scores = [int(s.strip()) for s in response.content.split(",")][:3]
        avg_score = sum(scores) / len(scores)
    except:
        avg_score = 7  # Default if parsing fails
        scores = [7, 7, 7]
    
    print(f"\n--- Quality Gate 2 (Attempt {attempts}/{MAX_RETRIES + 1}) ---")
    print(f"  Relevance: {scores[0]}/10, Evidence: {scores[1]}/10, Coherence: {scores[2]}/10")
    print(f"  Average: {avg_score:.1f}/10 (threshold: 6.0)")
    
    passed = avg_score >= 6.0
    
    if passed:
        print(f"  ✓ Gate 2 PASSED")
    else:
        print(f"  ✗ Gate 2 FAILED")
    
    return {
        "gate2_attempts": attempts,
        "gate2_passed": passed
    }

In [None]:
def route_after_gate2(state: QualityGateState) -> Literal["write_report", "additional_research", "write_report_anyway"]:
    """Route based on Gate 2 result."""
    passed = state.get("gate2_passed", False)
    attempts = state.get("gate2_attempts", 0)
    
    if passed:
        return "write_report"
    elif attempts <= MAX_RETRIES:
        return "additional_research"
    else:
        print(f"  Max retries reached. Proceeding with current synthesis.")
        return "write_report_anyway"

In [None]:
async def additional_research(state: QualityGateState) -> dict:
    """Conduct additional research to improve synthesis."""
    question = state["question"]
    synthesis = state.get("synthesis", "")
    
    # Identify gaps in current synthesis
    gap_prompt = f"""Identify what's missing from this research synthesis:
    
Question: {question}
Current Synthesis: {synthesis[:1500]}

What specific topics or evidence are missing? Provide 2 focused search queries to fill the gaps.
Return ONLY the search queries, one per line."""
    
    response = await llm.ainvoke([HumanMessage(content=gap_prompt)])
    gap_queries = [q.strip() for q in response.content.split("\n") if q.strip()][:2]
    
    print(f"\nAdditional research: {len(gap_queries)} gap-filling queries")
    
    return {
        "search_queries": gap_queries
    }

In [None]:
async def write_report(state: QualityGateState) -> dict:
    """Write the final research report."""
    question = state["question"]
    synthesis = state.get("synthesis", "")
    urls = list(set(state.get("source_urls", [])))
    
    prompt = FINAL_REPORT_PROMPT.format(
        question=question,
        synthesis=synthesis,
        sources="\n".join(urls[:20])  # Limit sources in prompt
    )
    
    response = await llm.ainvoke([HumanMessage(content=prompt)])
    
    print(f"\nFinal report generated: {len(response.content)} characters")
    
    return {
        "final_report": response.content
    }

## 5. Graph Construction

In [None]:
# Build the Quality Gates Research Agent graph
qg_builder = StateGraph(QualityGateState)

# Add nodes
qg_builder.add_node("generate_plan", generate_plan)
qg_builder.add_node("execute_searches", execute_searches)
qg_builder.add_node("quality_gate_1", quality_gate_1)
qg_builder.add_node("refine_queries", refine_queries)
qg_builder.add_node("synthesize", synthesize)
qg_builder.add_node("quality_gate_2", quality_gate_2)
qg_builder.add_node("additional_research", additional_research)
qg_builder.add_node("write_report", write_report)

# Add edges
qg_builder.add_edge(START, "generate_plan")
qg_builder.add_edge("generate_plan", "execute_searches")
qg_builder.add_edge("execute_searches", "quality_gate_1")

# Gate 1 conditional routing
qg_builder.add_conditional_edges(
    "quality_gate_1",
    route_after_gate1,
    {
        "synthesize": "synthesize",
        "refine_queries": "refine_queries",
        "synthesize_anyway": "synthesize"
    }
)

qg_builder.add_edge("refine_queries", "execute_searches")
qg_builder.add_edge("synthesize", "quality_gate_2")

# Gate 2 conditional routing
qg_builder.add_conditional_edges(
    "quality_gate_2",
    route_after_gate2,
    {
        "write_report": "write_report",
        "additional_research": "additional_research",
        "write_report_anyway": "write_report"
    }
)

qg_builder.add_edge("additional_research", "execute_searches")
qg_builder.add_edge("write_report", END)

# Compile
quality_gate_graph = qg_builder.compile()

print("Quality Gates Research Agent compiled successfully")

In [None]:
# Visualize the graph
from IPython.display import Image, display

try:
    display(Image(quality_gate_graph.get_graph().draw_mermaid_png()))
except Exception as e:
    print(f"Could not display graph: {e}")

## 6. Agent Wrapper for Evaluation

In [None]:
def quality_gates_agent(inputs: dict) -> dict:
    """
    Wrapper function for Quality Gates research agent.
    
    Compatible with evaluation harness.
    
    Args:
        inputs: Dictionary with 'question' key
        
    Returns:
        Dictionary with 'output' key containing final report
    """
    question = inputs.get("question", "")
    
    # Run with recursion limit
    result = asyncio.run(
        quality_gate_graph.ainvoke(
            {"question": question},
            config={"recursion_limit": 50}
        )
    )
    
    return {
        "output": result.get("final_report", ""),
        "synthesis": result.get("synthesis", ""),
        "source_urls": result.get("source_urls", []),
        "gate1_passed": result.get("gate1_passed", False),
        "gate2_passed": result.get("gate2_passed", False)
    }

## 7. Manual Test

Run this cell to verify the agent works correctly with a simple test question.

In [None]:
# Simple test
test_question = "What are the key benefits and challenges of using large language models in enterprise applications?"

print(f"Testing Quality Gates Agent with question:\n{test_question}\n")
print("Running quality-gated research (this may take several minutes)...\n")

try:
    result = quality_gates_agent({"question": test_question})
    
    print("\n" + "=" * 80)
    print("FINAL REPORT")
    print("=" * 80)
    print(result["output"][:3000] + "..." if len(result["output"]) > 3000 else result["output"])
    print("\n" + "=" * 80)
    print(f"Report length: {len(result['output'])} characters")
    print(f"Gate 1 passed: {result.get('gate1_passed', 'N/A')}")
    print(f"Gate 2 passed: {result.get('gate2_passed', 'N/A')}")
    print(f"Unique sources: {len(set(result.get('source_urls', [])))}")
    print("Agent test PASSED ✓")
except Exception as e:
    print(f"Agent test FAILED: {e}")
    import traceback
    traceback.print_exc()
    raise

## 8. Evaluation Harness Integration

Once the manual test passes, uncomment and run the cells below for full evaluation.

In [None]:
# Import evaluation harness and metrics
import sys
sys.path.insert(0, "..")
from evaluation import (
    ExperimentHarness, 
    fact_recall, 
    citation_precision,
    coherence_judge, 
    depth_judge, 
    relevance_judge,
    minimum_sources_check
)

# Initialize harness with the golden test dataset
harness = ExperimentHarness(
    dataset_path="../data/deep_research_agent_test_dataset.yaml",
    langsmith_dataset_name="deep-research-golden-v2"
)

print("Evaluation harness initialized successfully!")
print(f"Dataset: {harness.dataset_path}")
print(f"LangSmith dataset name: {harness.langsmith_dataset_name}")

In [None]:
# Full Evaluation on All 20 Questions
# ⚠️ EXPENSIVE - Only uncomment when ready for full evaluation
# Uncomment to run:

# # Define comprehensive evaluator suite
# evaluators = [
#     fact_recall,              # Required facts coverage
#     citation_precision,       # Citation URL validity
#     minimum_sources_check,    # Minimum source count
#     coherence_judge,          # Logical structure
#     depth_judge,              # Analysis depth
#     relevance_judge,          # Addresses question
# ]
# 
# # Run full evaluation
# print("Starting FULL evaluation on all 20 questions...")
# print("Quality Gates Agent - this will take 1-2 hours.")
# print("=" * 80 + "\n")
# 
# results = harness.run_evaluation(
#     agent_fn=quality_gates_agent,
#     evaluators=evaluators,
#     experiment_name="quality_gates_v1",
#     monte_carlo_runs=1,  # Single run to reduce cost
#     max_concurrency=2,   # Lower concurrency for stability
#     description="Quality Gates paradigm evaluation on all difficulty tiers"
# )
# 
# # Display comprehensive results
# print("\n" + "=" * 80)
# print("FULL EVALUATION RESULTS")
# print("=" * 80)
# print(f"Experiment: {results.experiment_name}")
# print(f"Questions evaluated: {results.num_questions}")
# print(f"Runs per question: {results.num_runs}")
# 
# print(f"\n{'Metric':<30} {'Mean':<10}")
# print("-" * 40)
# for metric_name in sorted(results.metrics.keys()):
#     if not metric_name.endswith('_std'):
#         value = results.metrics.get(metric_name, 0)
#         print(f"{metric_name:<30} {value:<10.3f}")
# 
# # Save results to file
# import json
# from datetime import datetime
# 
# results_file = Path("../results") / f"quality_gates_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
# results_file.parent.mkdir(exist_ok=True)
# 
# with open(results_file, 'w') as f:
#     json.dump({
#         "experiment_name": results.experiment_name,
#         "num_questions": results.num_questions,
#         "num_runs": results.num_runs,
#         "metrics": results.metrics,
#         "per_question": results.per_question_results
#     }, f, indent=2)
# 
# print(f"\nResults saved to: {results_file}")

print("Full evaluation cell ready. Uncomment to run when ready.")