# Adaptive RAG

Aiming to use both query analysis and active/self-corrective RAG

In [1]:
# %pip install -U langchain_community tiktoken langchain-google-genai langchain-huggingface langchainhub chromadb langchain langgraph tavily-python sentence-transformers

In [51]:
import getpass
import os

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = input(f"{var}: ")

_set_env("GEMINI_API_KEY")
_set_env("SECOND_GEMINI_API_KEY")
_set_env("TAVILY_API_KEY")


### Create Index

Setting up a vector database using **HuggingFace** for embeddings(free, the model will be cached to your machine) and **Chroma vector database**. Data will be retrieved directly from the URLs specified.


In [34]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_huggingface import HuggingFaceEmbeddings

# Setting up embedding model
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")

# Docs to index
urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]

# Load the documents
docs = [WebBaseLoader(url).load() for url in urls]
# Flattening the docs into docs_list. From - [[doc1], [doc2], [doc3]] to [doc1, doc2, doc3]
docs_list = [item for sublist in docs for item in sublist]
# This single line double for loop is equivalent to -
"""
docs_list = []
for sublist in docs:
    for item in sublist:
        docs_list.append(item)
"""

# Splitting the documents into manageable chunks
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=500, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)

# Create vectorstore
vectorstore = Chroma.from_documents(
    documents = doc_splits,
    collection_name = "rag-chroma",
    embedding = embeddings
)

retriever = vectorstore.as_retriever()

#### **Query Analysis via a Router**

In the prompt we need to define what topics should be redirected to the RAG.

This process is kept manual as of now. We can make this automatic and let the llm summarize the RAG and define the prompt
for our Router but this can become very expensive for large documents. So in our case since we're only learning and experimenting
I've kept this manual for now.

In [None]:
from typing import Literal

from langchain_core.prompts import ChatPromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint

from pydantic import BaseModel, Field
import os


class RouteQuery(BaseModel):
    """I want a structured data object named RouteQuery that must follow certain rules"""
    """Route user query"""
    datasource: Literal["vectorstore", "web_search"] = Field(
        ..., 
        description="Given a user query choose to route it to a web search or a vector store"
    )

hf_endpoint = HuggingFaceEndpoint(
    repo_id="mistralai/Mistral-7B-Instruct-v0.2",
    task="text-generation",
    max_new_tokens=256,
    temperature=0.0,
)

# Defining our llm
llm = ChatGoogleGenerativeAI(
    model = "gemini-2.5-flash",
    google_api_key = os.getenv("GEMINI_API_KEY"),
    temperature = 0
)
llm_for_document_check = ChatGoogleGenerativeAI(
    model = "gemini-2.5-flash",
    google_api_key = os.getenv("SECOND_GEMINI_API_KEY"),
    temperature = 0
)
# response = llm_for_document_check.invoke("Hi! how are you?")
# print(response)
# Testing that llm was setup correctly using this
# response = llm.invoke("Hi How are you")
# print(response.content)
structured_llm_router = llm.with_structured_output(RouteQuery)

# Defining system prompt
system = """You are an expert at routing a user question to a vectorstore or web search.
The vectorstore contains documents related to agents, prompt engineering, and adversarial attacks.
Use the vectorstore for questions on these topics. Otherwise, use web-search."""
route_prompt = ChatPromptTemplate.from_messages([
    ("system", system),
    ("human", "{question}")
])

# Defining our chain, query will be used to call route_prompt, the output of route_prompt will
# be fed into the llm
question_router = route_prompt | structured_llm_router

print(question_router.invoke({"question": "What are the types of agent memory?"}))
print(question_router.invoke({"question": "Who won the FIFA worldcup in 2022?"}))

content='I don\'t have feelings or a physical body, so I can\'t really "be" anything in the way humans are! But I\'m functioning perfectly and ready to help you.\n\nHow are **you** doing today?' additional_kwargs={} response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'model_name': 'gemini-2.5-flash', 'safety_ratings': [], 'grounding_metadata': {}, 'model_provider': 'google_genai'} id='lc_run--7d0e778c-060f-41e6-8f02-665d3e93e6ee-0' usage_metadata={'input_tokens': 7, 'output_tokens': 79, 'total_tokens': 86, 'input_token_details': {'cache_read': 0}, 'output_token_details': {'reasoning': 31}}
datasource='vectorstore'
datasource='web_search'


#### **Retrieval Grader**

After performing the retrieval, we'll evaluate the results. This is just a second check, even though we chose RAG based
on the query we'll still make sure that the document content retrieved are sufficiently relevant to the query.

Again we'll let the llm decide, its output will be a binary yes or no.


In [53]:
class GradeDocuments(BaseModel):
    binary_score: str = Field(
        description = "Documents are relevant to the question, 'yes' or 'no"
    )

structured_llm_router = llm_for_document_check.with_structured_output(GradeDocuments)

# System prompt 
system = """You are a grader assessing relevance of a retrieved document to a user question. \n 
    If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n
    It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""

grade_prompt = ChatPromptTemplate.from_messages([
    ("system", system),
    ("human", "Retrieved document: \n\n {document} \n\n User question: {question}")
])

retrieval_grader = grade_prompt | structured_llm_router

# Testing the retrieval grader
question = "agent memory"
docs = retriever.invoke(question)
doc_content = docs[1].page_content
print(question)
print(retrieval_grader.invoke({"question": question, "document": doc_content}))

question = "Mercedes benz"
docs = retriever.invoke(question)
doc_content = docs[1].page_content
print(question)
print(retrieval_grader.invoke({"question": question, "document": doc_content}))

agent memory
binary_score='yes'
Mercedes benz
binary_score='no'


In [6]:
docs = retriever.invoke("Agents memory")

In [7]:
# Generate

from langchain_core.output_parsers import StrOutputParser
from IPython.display import Markdown, display

system = """
You are a helpful assistant for question-answering tasks.
Use the following retrieved context to answer the user's question.

If you don't find the answer in the context, say you don't know — do not make up an answer.
"""

human = """
Context:
{context}

Question:
{question}
"""

generate_prompt = ChatPromptTemplate.from_messages([
    ("system", system),
    ("human", human),
])

question = "agent memory"
docs = retriever.invoke(question)

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

generate_rag_chain = generate_prompt | llm | StrOutputParser()

docs_txt = format_docs(docs)
generation = generate_rag_chain.invoke({"context": docs_txt, "question": question})

display(Markdown(generation))

In an LLM-powered autonomous agent system, memory is a crucial component, categorized into:

*   **Short-term memory:** This refers to the in-context learning capabilities of the model, often limited by the prompt's token or word limit (e.g., AutoGPT mentions a ~4000-word limit). It's used for immediate learning within a single interaction or task.
*   **Long-term memory:** This allows the agent to retain and recall information over extended periods. It typically leverages an external vector store and fast retrieval mechanisms to store and access a potentially infinite amount of information.

For instance, in the Generative Agents simulation, a "memory stream" acts as a long-term memory module, recording agents' experiences in natural language within an external database. A "retrieval model" then surfaces relevant context from this memory based on recency, importance, and relevance to inform the agent's behavior.

### **Hallucination Grader**

This agent will verify if the LLMs produced any hallucinations while producing the output.

In [8]:
class GradeHallucinations(BaseModel):
    binary_score: str = Field(description = "Grounded answer in the facts, 'yes' or 'no'")

structured_llm_grader = llm.with_structured_output(GradeHallucinations)

system = """You are a grader assessing whether an LLM generation is grounded in / supported by a set of retrieved facts. \n 
     Give a binary score 'yes' or 'no'. 'Yes' means that the answer is grounded in / supported by the set of facts."""

hallucination_prompt = ChatPromptTemplate.from_messages([
    ("system", system),
    ("human", "Set of facts: \n\n {documents} \n\n LLM generation: {generation}"),
])

hallucination_grader = hallucination_prompt | structured_llm_grader
hallucination_grader.invoke({"documents": docs, "generation": generation})

GradeHallucinations(binary_score='yes')

### **Answer Grader**

Finally evaluate the answer.

Quick Recap - 
Steps we've followed so far as a part of adaptive RAG. 

Query analysis(RAG or web_search) -> Retrieval Grader(Rag content relevant?) -> Hallucination grader(Whether llm hallucinated by comparing output to the RAG retrieval) 

In [9]:
class GradeAnswer(BaseModel):
    binary_score: str = Field(description="Answers with 'yes' or 'no'")

structured_llm_grader = llm.with_structured_output(GradeAnswer)

system = """You are a grader assessing whether an answer addresses / resolves a question \n 
     Give a binary score 'yes' or 'no'. Yes' means that the answer resolves the question."""

answer_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "User question: \n\n {question} \n\n LLM generation: {generation}"),
    ]
)

answer_grader = answer_prompt | structured_llm_grader
answer_grader.invoke({"question": question, "generation": generation})

GradeAnswer(binary_score='yes')

### **Question Rewriting**

The user's question might not be in a suitable form to query the RAG. To improve the retrieval
we'll rephrase the question to ensure it gives better results with the vector similarity search

In [None]:
system = """You are a question rewriter that improves user queries for semantic search over a vectorstore.

The vectorstore contains documents on:
1. LLM-powered autonomous agents (architecture, memory, planning, tool use)
2. Prompt engineering (prompt design, in-context learning, chain-of-thought)
3. Model red-teaming (adversarial attacks, jailbreaks, safety, robustness)

Rephrase the given question so it aligns with these topics and retrieves the most relevant information possible.
Focus on clarity, intent, and alignment with the above domains."""


re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Here is the initial question: \n\n {question} \n Formulate an improved question.",),
    ]
)

# Original question - "agent memory"
question_rewriter = re_write_prompt | llm | StrOutputParser()
question_rewriter.invoke({"question": question})

'What are the key concepts, types, and architectural considerations for memory systems in intelligent agents?'

### **Web Search Tool**

Use Tavily search tool api to get information from the web. It has 1000 free credits. You can explore different web search tools as well :)

FYI - these are the params tavily accepts - 

search_tool = TavilySearchResults(\
    max_results=10, \
    topic="news",\
    include_answer=True,\
    include_raw_content=True,\
    include_images=True,\
    include_image_descriptions=True,\
    search_depth="advanced",\
    time_range="week",\
    start_date="2025-11-01",\
    end_date="2025-11-08",\
    include_domains=["bbc.com", "nytimes.com"],\
    exclude_domains=["some-low-quality-site.com"]\
)

In [11]:
from tavily import TavilyClient
import os

tavily_client = TavilyClient(api_key=os.environ["TAVILY_API_KEY"])

### **Construct the Graph**

Now that we have created all the agents, we'll start with the agenticAI part. Agent - The LLMs, with appropriate prompts we've created.\
Here are some details to keep in handy while creating this graph - 

| Name                     | Purpose                                  | Inputs                    | Outputs                                         |
| ------------------------ | ---------------------------------------- | ------------------------- | ----------------------------------------------- |
| **question_router**      | Routes query to RAG or Web               | `question`                | `datasource` → `"vectorstore"` / `"web_search"` |
| **retriever**            | Fetches docs from vectorstore            | `query`                   | List of `Document`                              |
| **web_search_tool**      | Fetches info from web (Tavily)           | `query`                   | List of web results                             |
| **retrieval_grader**     | Checks if retrieved docs are relevant    | `question`, `document`    | `binary_score` → `"yes"` / `"no"`               |
| **generate_rag_chain**   | Generates answer using retrieved context | `context`, `question`     | `generation` (string)                           |
| **hallucination_grader** | Checks if answer is grounded in facts    | `documents`, `generation` | `binary_score` → `"yes"` / `"no"`               |
| **answer_grader**        | Evaluates if answer resolves question    | `question`, `generation`  | `binary_score` → `"yes"` / `"no"`               |
| **question_rewriter**    | Rewrites question for better retrieval   | `question`                | `rewritten_question` (string)                   |


In [None]:
from typing import List
from typing_extensions import TypedDict
from langchain_core.documents import Document

class GraphState(TypedDict):
    """Represnts the state of the Graph"""
    question: str
    generation: str
    documents: List[Document]
    retries: int

### **Graph Flow**

In [None]:


def retriever(state):
    """Retrieve documents, returns -> the documents retrieved and the question"""

    print("-------Retrieval step-------")
    # Getting the already populated question variable from state
    question = state["question"]

    docs = retriever.invoke(question)

    if docs and isinstance(docs[0], dict):
        docs = [Document(page_content=d.get("page_content", ""), metadata=d.get("metadata", {})) for d in docs]

    return {"documents": docs, "question": question}


def generate(state):
    """Generate the answer, returns -> state with generation variable populated"""
    print("-------Generation step-------")
    question = state["question"]
    documents = state["documents"]

    docs_txt = format_docs(documents)
    generation = generate_rag_chain.invoke({"context": docs_txt, "question": question})
    return {"documents": documents, "question": question, "generation": generation}


def grade_documents(state):
    """
    Grade whether the retrieved documents are relevant to the question or not
        returns -> Appends a score to each doc RELEVANT or NOT RELEVANT
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Now lets score each of the documents
    filtered_docs = []
    for d in documents:
        score = retrieval_grader.invoke({"question": question, "document": d.page_content})
        grade = score.binary_score
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")

    return {"documents": filtered_docs, "question": question}


def transform_query(state):
    """
    Transform the query to produce a better question. Returns -> a better question state["question"]
    """

    print("---TRANSFORM QUERY---")
    question = state["question"]
    documents = state["documents"]
    retries = state.get("retries", 0)

    # Re-write question
    better_question = question_rewriter.invoke({"question": question})
    return {"documents": documents, "question": better_question}


def web_search(state):
    """Web search the query. Returns -> updated documents query with the search results"""

    print("-----WEB SEARCH-----")
    question = state["question"]

    # web search 
    response = tavily_client.search(query=question)
    if response and "results" in response:
        web_results_texts = [r["content"] for r in response["results"] if r.get("content")]
        joined_text = "\n".join(web_results_texts)
        web_results = Document(page_content=joined_text)
    else:
        web_results = Document(page_content="")

    return {"documents": [web_results], "question": question}

### **Graph Edges**

The previous code block is quite long, but it basically defines all our graph nodes that we'll be using. Now we'll define our graph edges. Graph Edges represent the decision logic that determines which node should run next based on a node’s output or the current state.

In [31]:
def route_question(state):
    """Route question to web search or RAG. returns -> 'vectorstore' or 'web_search'"""

    print("---ROUTE QUESTION---")
    question = state["question"]
    source = question_router.invoke({"question": question})

    if source.datasource == "web_search":
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "web_search"
    elif source.datasource == "vectorstore":
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"
    

def decide_to_generate(state):
    """Determines whether to generate the response or to re-generate the question. 
        Returns -> 'transform_query' or 'generate'"""
    
    print("---ASSESS GRADED DOCUMENTS---")
    state["question"]
    filtered_documents = state["documents"]

    if not filtered_documents:
        # this means that the grade documents node returned an empty docs_list
        print(
            "---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---"
        )
        return "transform_query"
    else:
        print("---DECISION: GENERATE---")
        return "generate"
    

def rate_answer(state):
    """
    We'll use hallucination_grader here to check if the llm hallucinated in generating the response first.
    Second we'll use the answer_grader to check if the final response generated was sufficient in 
    addressing the user's query
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke({"documents": documents, "generation": generation})
    grade = score.binary_score

    # Checking for hallucination
    if grade == "yes":
        print("---DECISION: NO HALLUCINATIONS---")
        # Checking for the answer relevance now
        print("GRADING GENERATION VS. QUESTION")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score.binary_score
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        print("---DECISION: LLM HALLUCINATED, RE-TRY---")
        return "not supported"

### **Compiling Graph**

Now that we've defined all our nodes and edges, we'll compile the final graph, essentially mapping how the nodes connect to each other through edges and defining the overall execution flow.

In [32]:
from langgraph.graph import START, END, StateGraph

# StateGraph is the core class in LangGraph used to define a graph of nodes that share and update a common state
# our GraphState in this case
workflow = StateGraph(GraphState)

# Defining the nodes
workflow.add_node("web_search", web_search) 
workflow.add_node("retrieve", retriever)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("generate", generate)
workflow.add_node("transform_query", transform_query)

# Building the graph
workflow.add_conditional_edges(
    START,
    route_question, # decides web or vectorstore
    {
        "web_search": "web_search",
        "vectorstore": "retrieve"
    }
)
workflow.add_edge("web_search", "generate")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "transform_query": "transform_query",
        "generate": "generate"
    }
)
workflow.add_edge("transform_query", "retrieve")
workflow.add_conditional_edges(
    "generate",
    rate_answer,
    {
        "not supported": "generate",  # not supported means that the llm hallucinated, so retry generation
        "useful": END,
        "not useful": "transform_query"
    }
)

app = workflow.compile()

### **Use Graph**

All the steps for this adaptive RAG design are now complete :) \
We'll now look at various examples where we test and see how it works for different scenarios.

In [None]:
inputs = {
    "question": "Summarize Lionel Messi's achievements during the 2022 FIFA World Cup."
}

for output in app.stream(inputs):
    for key, value in output.items():
        print(f"Node '{key}':")
    print("\n---\n")

print(value["generation"])

---ROUTE QUESTION---
---ROUTE QUESTION TO WEB SEARCH---
-----WEB SEARCH-----
Node 'web_search':

---

-------Generation step-------
---CHECK HALLUCINATIONS---
---DECISION: NO HALLUCINATIONS---
GRADING GENERATION VS. QUESTION
---DECISION: GENERATION ADDRESSES QUESTION---
Node 'generate':

---

During the 2022 FIFA World Cup, Lionel Messi achieved several significant milestones:

*   He finally claimed the World Cup trophy with Argentina.
*   He broke Diego Maradona's record for most World Cup appearances for Argentina.
*   He surpassed Lothar Matthaus for the record of most appearances at the World Cup Finals (26 appearances).
*   He matched Maradona's Argentinian record of eight assists at World Cups.
*   He scored seven goals, including two in the final.
*   He provided three assists.
*   He played a starring role in Argentina's victory, fulfilling his lifelong dream.


In [35]:
inputs = {
    "question": "What are the types of agent memory?"
}

for output in app.stream(inputs):
    for key, value in output.items():
        print(f"Node '{key}':")
    print("\n---\n")

print(value["generation"])

---ROUTE QUESTION---
---ROUTE QUESTION TO RAG---
-------Retrieval step-------
Node 'retrieve':

---

---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
Node 'grade_documents':

---

-------Generation step-------
---CHECK HALLUCINATIONS---
---DECISION: NO HALLUCINATIONS---
GRADING GENERATION VS. QUESTION
---DECISION: GENERATION ADDRESSES QUESTION---
Node 'generate':

---

Based on the provided context, the main type of agent memory described is the **Memory stream**.

It is characterized as:
*   A long-term memory module (external database).
*   It records a comprehensive list of agents’ experiences in natural language.
*   Each element is an observation, an event directly provided by the agent.
*   Inter-agent communication can trigger new natural language statements within this memory.


In [54]:
inputs = {
    "question": "Can prompt engineering prevent hallucination?"
}

for output in app.stream(inputs):
    for key, value in output.items():
        print(f"Node '{key}':")
    print("\n---\n")

print(value["generation"])

---ROUTE QUESTION---
---ROUTE QUESTION TO RAG---
-------Retrieval step-------
Node 'retrieve':

---

---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---
Node 'grade_documents':

---

---TRANSFORM QUERY---
Node 'transform_query':

---

-------Retrieval step-------
Node 'retrieve':

---

---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
Node 'grade_documents':

---

-------Generation step-------
---CHECK HALLUCINATIONS---
---DECISION: NO HALLUCINATIONS---
GRADING GENERATION VS. QUESTION
---DECISION: GENERATION ADDRESSES QUESTION---
Node 'generate':

---

Based on the provided contex