# Deep Research Framework with Agentic RAG

This notebook implements a deep research framework that uses the Agentic RAG system for each sub-query. The architecture follows your correct approach:

1. **Search Planner**: Generate multiple focused queries from the original complex query
2. **Agentic RAG Processing**: For each sub-query, use the full Agentic RAG framework with all reliability features
3. **Answer Aggregation**: Synthesize all individual answers into a comprehensive final report

The system combines the reliability of Agentic RAG with the comprehensive coverage of deep research.

## System Architecture

```
Main Deep Research Framework
│
├── Search Planner (generate multiple queries)
│
├── For each query:
│   ├── AgenticRAG Framework
│   │   ├── Query Routing
│   │   ├── RAG Pipeline (with all reliability features)
│   │   │   ├── Document Retrieval
│   │   │   ├── Document Grading
│   │   │   ├── Query Rewriting (if needed)
│   │   │   └── Web Search Fallback
│   │   └── Answer Generation
│   └── Store individual answer
│
└── Final Report Generator (aggregate all answers)
```

## 1. Environment Setup

### 1.1 Imports and Dependencies

In [2]:
import json
import operator
import requests
import os
from typing import Annotated, TypedDict, List, Dict

from langchain_community.vectorstores import FAISS
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage, ToolMessage
from langchain_core.tools import tool
from langchain_huggingface import HuggingFaceEmbeddings
from langgraph.graph import END, StateGraph
from pydantic import BaseModel, Field

# Note: Using local Ollama Qwen model for LLM
from langchain_openai import ChatOpenAI

### 1.2 Research State Definition

Define the ResearchState for the main deep research workflow.

In [3]:
class ResearchState(TypedDict):
    messages: Annotated[list[BaseMessage], operator.add]
    original_query: str
    search_queries: List[str]
    query_answers: Dict[str, str]  # query -> answer
    current_query_index: int
    final_report: str

## 2. Knowledge Base

### 2.1 Vector Store Initialization

In [4]:
# Load pre-built FAISS database containing LLM-related content
db_path = './db/faiss_ilianweng_db'
vector_store = FAISS.load_local(
    folder_path=db_path,
    embeddings=HuggingFaceEmbeddings(model_name="BAAI/bge-base-en-v1.5"),
    allow_dangerous_deserialization=True
)

# Configure retriever to return top 2 documents for efficient retrieval
retriever = vector_store.as_retriever(search_kwargs={"k": 2})

  from .autonotebook import tqdm as notebook_tqdm


## 3. Tools and Integrations

### 3.1 Tool Definitions

In [5]:
# Define schema for web search queries using Pydantic for validation
class SearchQuery(BaseModel):
    query: str = Field(description="Search query for web search")


# Web search tool using Serper API for real-time information retrieval
@tool(args_schema=SearchQuery)
def web_search_tool(query: str):
    """Get real-time Internet information using Serper API"""
    # Get API key from environment variable or use placeholder
    api_key = os.environ.get('SERPER_API_KEY', 'd6205f8378e105dc9afdcb2dbbd44521c716b9c4')
    
    url = "https://google.serper.dev/search"
    payload = json.dumps({
        "q": query,
        "num": 3,  # Retrieve top 3 search results for comprehensive coverage
    })
    headers = {
        'X-API-KEY': api_key,
        'Content-Type': 'application/json'
    }
    
    try:
        response = requests.post(url, headers=headers, data=payload, timeout=10)
        response.raise_for_status()  # Raise an exception for bad status codes
        data = response.json()
        
        if 'organic' in data:
            results = []
            for item in data['organic'][:3]:  # Take top 3 results
                result = {
                    'title': item.get('title', ''),
                    'snippet': item.get('snippet', ''),
                    'link': item.get('link', '')
                }
                results.append(result)
            return json.dumps(results, ensure_ascii=False)
        else:
            return json.dumps({"error": "No organic results found"}, ensure_ascii=False)
    except requests.exceptions.RequestException as e:
        return json.dumps({"error": f"Network error occurred: {str(e)}"}, ensure_ascii=False)
    except json.JSONDecodeError as e:
        return json.dumps({"error": f"Failed to parse response: {str(e)}"}, ensure_ascii=False)
    except Exception as e:
        return json.dumps({"error": f"An unexpected error occurred: {str(e)}"}, ensure_ascii=False)


# Define schema for RAG queries using Pydantic for validation
class RagQuery(BaseModel):
    query: str = Field(description="Query for RAG document retrieval")


# RAG retrieval tool for internal document search from vector store
@tool(args_schema=RagQuery)
def rag_retrieval_tool(query: str):
    """
    Retrieves documents from the RAG vector store.
    This tool is the core of the Retrieval-Augmented Generation pipeline.
    """
    # 1. Retrieve relevant documents using the configured FAISS retriever
    documents = retriever.invoke(query)
        
    # 2. Format retrieved documents as a single context string
    doc_contents = "\n\n".join(d.page_content for d in documents)
    return json.dumps({"documents": doc_contents})

### 3.2 LLM Configuration

In [6]:
# Configure LLM with local Qwen model via Ollama for privacy and cost efficiency
llm = ChatOpenAI(
    model_name="qwen3:8b",
    base_url='http://localhost:11434/v1',
    openai_api_key="<KEY>",  # Placeholder key (not used for local models)
    temperature=0.3  # Low temperature for consistent, factual responses
)

# Bind tools for structured tool calling in LangGraph workflow
llm_with_tools = llm.bind_tools([rag_retrieval_tool, web_search_tool])

## 4. Agentic RAG Components

### 4.1 Agentic RAG State Definition

Define the AgentState for the Agentic RAG workflow that will be used for each sub-query.

In [7]:
class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], operator.add]
    rag_retries: int

### 4.2 Agentic RAG Node Functions

Define the core node functions that implement the Agentic RAG workflow for each sub-query.

In [9]:
# Route query to direct response, RAG, or web search
def route_query(state: AgentState):
    """
    Routes the user's question to the most appropriate processing path:
    - Direct response (for greetings or general knowledge)
    - RAG pipeline (for LLM-specific topics)
    - Web search (for time-sensitive information)
    """
    print("---ROUTING QUERY---")
    last_message = state["messages"][-1]
    
    # System prompt for query routing decisions
    system_prompt = """You are a query router. If the question is a simple greeting like 'hello' or can be answered directly without any retrieval or external information, respond with a friendly, concise answer. Otherwise, determine if the question can be answered using internal knowledge (call rag_retrieval_tool with the query) or requires external real-time information (call web_search_tool with the query). Use the vectorstore for questions on LLM agents, prompt engineering, and adversarial attacks. You do not need to be stringent with the keywords in the question related to these topics. Otherwise, use web-search. Do not provide any explanation or reasoning beyond the direct answer if applicable."""
    
    response = llm_with_tools.invoke([SystemMessage(content=system_prompt), last_message])
    print(f"Routing decision: {response.tool_calls if hasattr(response, 'tool_calls') else 'Direct response'}")
    
    tool_calls = response.tool_calls if hasattr(response, 'tool_calls') else []
    if tool_calls:
        # Return tool call for RAG or web search processing
        return {"messages": [AIMessage(content="", tool_calls=tool_calls)], "last_query": tool_calls[0]["args"]["query"]}
    else:
        # Return direct response for simple queries
        return {"messages": [AIMessage(content=response.content)]}

# RAG retrieval node - fetch documents from vector store
def rag_retrieval(state: AgentState):
    """
    Executes the RAG retrieval tool to fetch relevant documents from the vector store.
    """
    print("---RETRIEVING DOCUMENTS FROM RAG---")
    last_message = state["messages"][-1]
    query = last_message.tool_calls[0]["args"]["query"]
    
    # Invoke the RAG retrieval tool with the query
    result = rag_retrieval_tool.invoke({"query": query})
    doc_content = json.loads(result)["documents"]
    
    # Return retrieved documents as a tool message
    return {"messages": [ToolMessage(content=doc_content, tool_call_id=last_message.tool_calls[0]["id"])]}

# Grade RAG documents for relevance
def grade_rag_documents(state: AgentState):
    """
    Grades the relevance of retrieved documents to determine if they can answer the user's question.
    Returns 'Documents relevant' or 'Documents not relevant' based on LLM assessment.
    """
    print("---GRADING RAG DOCUMENTS---")
    last_message = state["messages"][-1]
    question = state["messages"][0].content
    
    # Handle case where no documents were retrieved
    if not last_message.content:
        print("No documents retrieved, grading as irrelevant.")
        return {"messages": [AIMessage(content="Documents not relevant.")], "rag_retries": state.get("rag_retries", 0) + 1}

    # Prompt LLM to grade document relevance
    grading_prompt = f"""You are a grader assessing the relevance of retrieved documents to a user question.
    User question: {question}
    Retrieved documents:
    ---
    {last_message.content}
    ---
    Are the documents relevant to the question? Respond with 'yes' or 'no' only, nothing else."""
    
    response = llm.invoke([HumanMessage(content=grading_prompt)])
    is_relevant = 'yes' in response.content.lower()
    print(f"Grading decision: {is_relevant}")
    
    if is_relevant:
        # Documents are relevant, reset retry counter and proceed to generation
        return {"messages": [AIMessage(content="Documents relevant.")], "rag_retries": 0}
    else:
        # Documents are not relevant, increment retry counter
        return {"messages": [AIMessage(content="Documents not relevant.")], "rag_retries": state.get("rag_retries", 0) + 1}

# Rewrite RAG query when initial retrieval fails
def rewrite_query(state: AgentState):
    """
    Rewrites the query to improve retrieval performance when the first attempt failed.
    """
    print("---REWRITING RAG QUERY---")
    question = state["messages"][0].content
    
    # Prompt LLM to generate a better query
    rewrite_prompt = f"""You are a query rewriting expert. The user's original question is: "{question}".
    This query will be used to retrieve documents from a vector database. Please generate a better, more specific query.
    Do not think step-by-step or add any explanations. Respond with the rewritten query only, nothing else."""
    
    response = llm.invoke([HumanMessage(content=rewrite_prompt)])
    rewritten_query = response.content.strip()
    
    # Clean up potential markdown code block artifacts
    if '```' in rewritten_query:
        rewritten_query = rewritten_query.split('```', 1)[-1].strip()
    
    if rewritten_query:
        print(f"Rewritten query: {rewritten_query}")
        tool_call = {"name": "rag_retrieval_tool", "args": {"query": rewritten_query}, "id": "call_" + str(hash(rewritten_query))[:8], "type": "tool_call"}
        return {"messages": [AIMessage(content="", tool_calls=[tool_call])]}
    else:
        print("No rewritten query generated, using original question.")
        tool_call = {"name": "rag_retrieval_tool", "args": {"query": question}, "id": "call_" + str(hash(question))[:8], "type": "tool_call"}
        return {"messages": [AIMessage(content="", tool_calls=[tool_call])]}

# Fallback to web search after max RAG retries
def fallback_to_web_search(state: AgentState):
    """
    Prepares a tool call for web search when RAG approaches have failed after maximum retries.
    """
    print("---FALLING BACK TO WEB SEARCH---")
    query = state["messages"][0].content
    
    tool_call = {"name": "web_search_tool", "args": {"query": query}, "id": "call_" + str(hash(query))[:8], "type": "tool_call"}
    return {"messages": [AIMessage(content="", tool_calls=[tool_call])]}

# Execute web search using Serper API
def web_search(state: AgentState):
    """
    Performs a web search using the Serper API to get real-time information.
    """
    print("---PERFORMING WEB SEARCH---")
    last_message = state["messages"][-1]
    query = last_message.tool_calls[0]["args"]["query"]
    
    search_results = web_search_tool.invoke({"query": query})
    return {"messages": [ToolMessage(content=search_results, tool_call_id=last_message.tool_calls[0]["id"])]}

# Generate final answer based on context
def generate_answer(state: AgentState):
    """
    Generates a final answer based on the retrieved context (from RAG or web search).
    """
    print("---GENERATING FINAL ANSWER---")
    question = state["messages"][0].content
    
    # Find the actual context (from RAG retrieval or web search)
    context = ""
    messages = state["messages"]
    for i in range(len(messages) - 1, -1, -1):
        if isinstance(messages[i], ToolMessage):
            context = messages[i].content
            break
        # If we find the "Documents relevant" message, the context is in the previous ToolMessage
        elif isinstance(messages[i], AIMessage) and "Documents relevant" in messages[i].content:
            # Look for the ToolMessage before this grading message
            for j in range(i - 1, -1, -1):
                if isinstance(messages[j], ToolMessage):
                    context = messages[j].content
                    break
            break
    
    # Prompt LLM to generate answer using context
    generation_prompt = f"""You are an expert assistant. Use the following retrieved context to answer the user's question.
    If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.
    Question: {question}
    Context:
    {context}
    ---
    Answer:
    """
    
    response = llm.invoke([HumanMessage(content=generation_prompt)])
    return {"messages": [AIMessage(content=response.content)]}

# Define conditional routing logic for Agentic RAG
def get_route(state: AgentState):
    """
    Determines the next node based on the routing decision.
    """
    last_msg = state["messages"][-1]
    if not last_msg.tool_calls:
        return "__end__"
    tool_name = last_msg.tool_calls[0]["name"]
    if tool_name == "rag_retrieval_tool":
        return "rag_retrieval"
    elif tool_name == "web_search_tool":
        return "web_search"
    else:
        return "__end__"

# Adaptive RAG routing logic
def should_retry_rag(state: AgentState):
    """
    Implements the Adaptive RAG decision logic:
    - If documents are relevant → generate answer
    - If retries < 3 → rewrite query
    - If retries >= 3 → fallback to web search
    """
    last_message = state["messages"][-1]
    if "Documents relevant." in last_message.content:
        return "generate"
    else:
        if state.get("rag_retries", 0) >= 3:
            return "fallback_to_web_search"
        else:
            return "rewrite"

### 4.3 Agentic RAG Graph Construction

Build the LangGraph workflow for the Agentic RAG that will be used for each sub-query.

In [10]:
# Initialize LangGraph workflow for Agentic RAG
rag_workflow = StateGraph(AgentState)

# Add all nodes to the workflow
rag_workflow.add_node("route_query", route_query)
rag_workflow.add_node("rag_retrieval", rag_retrieval)
rag_workflow.add_node("grade_rag_documents", grade_rag_documents)
rag_workflow.add_node("rewrite_query", rewrite_query)
rag_workflow.add_node("fallback_to_web_search", fallback_to_web_search)
rag_workflow.add_node("web_search", web_search)
rag_workflow.add_node("generate_answer", generate_answer)

# Set entry point for the workflow
rag_workflow.set_entry_point("route_query")

# Add edges and conditional routing
rag_workflow.add_conditional_edges(
    "route_query",
    get_route,
    {
        "rag_retrieval": "rag_retrieval",
        "web_search": "web_search",
        "__end__": END
    }
)

rag_workflow.add_edge("rag_retrieval", "grade_rag_documents")
rag_workflow.add_conditional_edges(
    "grade_rag_documents",
    should_retry_rag,
    {
        "generate": "generate_answer",
        "rewrite": "rewrite_query",
        "fallback_to_web_search": "fallback_to_web_search",
    }
)
rag_workflow.add_edge("rewrite_query", "rag_retrieval")
rag_workflow.add_edge("fallback_to_web_search", "web_search")
rag_workflow.add_edge("web_search", "generate_answer")
rag_workflow.add_edge("generate_answer", END)

# Compile the graph into an executable workflow
rag_graph = rag_workflow.compile()

## 5. Deep Research Components

### 5.1 Deep Research Node Functions

Define the core node functions for the main deep research workflow.

In [19]:
# Search Planner: Generate multiple search queries from the original query
def search_planner(state: ResearchState):
    """
    Generates multiple search queries based on the original query using the LLM.
    """
    print("---GENERATING SEARCH QUERIES---")
    original_query = state["original_query"]
    
    planner_prompt = f"""You are a research query planner. Generate 3-5 diverse search queries that would help research the topic: "{original_query}".
    Make the queries specific and focused on different aspects of the topic.
    Return ONLY a JSON array of strings, nothing else.
    
    Example format:
    ["query 1", "query 2", "query 3"]
    """
    
    response = llm.invoke([HumanMessage(content=planner_prompt)])
    
    try:
        # Try to parse as JSON
        search_queries = json.loads(response.content)
    except:
        # If parsing fails, try to extract queries from the text
        import re
        queries = re.findall(r'"([^"]+)"', response.content)
        if not queries:
            # Fallback: split by lines or common delimiters
            queries = [q.strip('"') for q in response.content.split('\n') if q.strip()]
        search_queries = queries[:5]  # Limit to 5 queries
    
    print(f"Generated queries: {search_queries}")
    return {"search_queries": search_queries, "current_query_index": 0}

# Process Query with Agentic RAG: Use the Agentic RAG workflow for each sub-query
def process_query_with_rag(state: ResearchState):
    """
    Processes a single query using the Agentic RAG workflow.
    """
    print("---PROCESSING QUERY WITH AGENTIC RAG---")
    current_index = state["current_query_index"]
    current_query = state["search_queries"][current_index]
    
    print(f"Processing query {current_index + 1}/{len(state['search_queries'])}: {current_query}")
    
    # Initialize the Agentic RAG state for this query
    initial_rag_state = {"messages": [HumanMessage(content=current_query)], "rag_retries": 0}
    
    # Run the Agentic RAG workflow
    final_rag_state = None
    for event in rag_graph.stream(initial_rag_state, {"recursion_limit": 25}):
        # Get the final state
        final_rag_state = event
    
    # Extract the answer
    answer = final_rag_state["generate_answer"]["messages"][0].content
    
    # Update the query_answers dictionary
    query_answers = state.get("query_answers", {}).copy()
    query_answers[current_query] = answer
    
    print(f"Answer for '{current_query}': {answer}")
    
    return {"query_answers": query_answers}

# Move to Next Query: Advance to the next query or finish
def move_to_next_query(state: ResearchState):
    """
    Advances to the next query or moves to report generation.
    """
    print("---MOVING TO NEXT QUERY---")
    current_index = state["current_query_index"]
    total_queries = len(state["search_queries"])
    
    if current_index + 1 < total_queries:
        print(f"Moving from query {current_index + 1} to {current_index + 2}")
        return {"current_query_index": current_index + 1}
    else:
        print("All queries processed, moving to final report generation")
        return {"current_query_index": current_index + 1}  # This will exceed the list length

# Final Report Generator: Create comprehensive report from all answers
def generate_final_report(state: ResearchState):
    """
    Generates a final comprehensive report based on all query answers.
    """
    print("---GENERATING FINAL RESEARCH REPORT---")
    original_query = state["original_query"]
    query_answers = state["query_answers"]
    
    # Check if we have any answers
    if not query_answers:
        final_report = f"# Research Report: {original_query}\n\nNo relevant information could be found for this topic."
        return {"final_report": final_report}
    
    # Format the query answers for the report
    formatted_answers = "\n\n".join([f"## {query}\n\n{answer}" for query, answer in query_answers.items()])
    
    report_prompt = f"""You are a research report writer. Create a comprehensive report on the topic: "{original_query}".
    
    Use the following research findings to create your report:
    
{formatted_answers}
    
    Please structure your report with:
    1. A title with the original query
    2. An introduction explaining the importance of the topic
    3. Detailed sections for each research query with key findings (use the query as a subheading)
    4. A conclusion summarizing the overall findings and their significance
    5. Proper formatting with markdown headers
    6. Keep the report professional and well-organized
    
    Report:"""
    
    response = llm.invoke([HumanMessage(content=report_prompt)])
    final_report = response.content
    
    print("Final research report generated successfully.")
    return {"final_report": final_report}

# Check if all queries are processed
def are_all_queries_processed(state: ResearchState):
    """
    Determines whether all queries have been processed.
    """
    current_index = state["current_query_index"]
    total_queries = len(state["search_queries"])
    
    if current_index < total_queries:
        return "process_query"
    else:
        return "generate_report"

### 5.2 Deep Research Graph Construction

Build the main LangGraph workflow for the deep research system.

In [20]:
# Initialize LangGraph workflow for Deep Research
research_workflow = StateGraph(ResearchState)

# Add all nodes to the workflow
research_workflow.add_node("search_planner", search_planner)
research_workflow.add_node("process_query_with_rag", process_query_with_rag)
research_workflow.add_node("move_to_next_query", move_to_next_query)
research_workflow.add_node("generate_final_report", generate_final_report)

# Set entry point for the workflow
research_workflow.set_entry_point("search_planner")

# Add edges
research_workflow.add_edge("search_planner", "process_query_with_rag")
research_workflow.add_edge("process_query_with_rag", "move_to_next_query")
research_workflow.add_conditional_edges(
    "move_to_next_query",
    are_all_queries_processed,
    {
        "process_query": "process_query_with_rag",
        "generate_report": "generate_final_report"
    }
)
research_workflow.add_edge("generate_final_report", END)

# Compile the graph into an executable workflow
research_graph = research_workflow.compile()

## 6. Testing and Validation

### 6.1 System Testing

Test the system with a complex research query.

In [None]:
# Test query
question = "What are the latest advancements in quantum computing?"

# Initialize state for the research workflow
initial_research_state = {
    "messages": [HumanMessage(content=question)],
    "original_query": question,
    "search_queries": [],
    "query_answers": {},
    "current_query_index": 0,
    "final_report": ""
}

# Uncomment the following lines to run the test
print(f"Starting deep research on: {question}")
print("=" * 50)
final_state = list(research_graph.stream(initial_research_state, {"recursion_limit": 100}))
print(final_state[-1]['generate_final_report']['final_report'])

Starting deep research on: What are the latest advancements in quantum computing?
---GENERATING SEARCH QUERIES---
Generated queries: ['latest quantum computing hardware innovations', 'new qubit technologies in 2023.', 'recent breakthroughs in quantum algorithms for optimization problems', 'advancements in quantum machine learning algorithms.', 'progress in quantum error correction techniques']
---PROCESSING QUERY WITH AGENTIC RAG---
Processing query 1/5: latest quantum computing hardware innovations
---ROUTING QUERY---
Routing decision: [{'name': 'web_search_tool', 'args': {'query': 'latest quantum computing hardware innovations'}, 'id': 'call_aar5m43i', 'type': 'tool_call'}]
---PERFORMING WEB SEARCH---
---GENERATING FINAL ANSWER---
Answer for 'latest quantum computing hardware innovations': <think>
Okay, the user is asking about the latest quantum computing hardware innovations. Let me check the provided context.

First, the context has three entries. The first one from BlueQubit talk

### 6.2 Knowledge Base Management

Code to rebuild the FAISS vector database from source documents.

In [12]:
import os

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings

# Source URLs for LLM-related content
urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]

# Load documents from URLs
docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]
print(f"len of documents :{len(docs_list)}")

# Split documents into chunks for better retrieval
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=512, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)
print(f"length of document chunks generated :{len(doc_splits)}")

# Create and save FAISS vector database
db_path = './db/faiss_ilianweng_db'
vector_db = FAISS.from_documents(doc_splits, HuggingFaceEmbeddings(model_name="BAAI/bge-base-en-v1.5"),)
        
# Save database to disk
os.makedirs(os.path.split(db_path)[0], exist_ok=True)
vector_db.save_local(db_path)
print(f"\nDatabase update complete! Total chunks: {vector_db.index.ntotal}")