# LangGraph Section 5 Notes - Advanced RAG Flows

* Be extremely careful with version dependencies, as LangGraph and LangChain are in active development, and very sensitive to versions

# Step 1 - Ingestion Process

In [9]:
# Solution will use vector store and ChromaDB (since it's local, fast and non persistent)

In [10]:
# Import statements:
from dotenv import load_dotenv
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from langchain_community.document_loaders import WebBaseLoader
from langchain_openai import OpenAIEmbeddings

In [11]:
load_dotenv()

urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]

docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=250, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)

# Creates vector store and splits the documents into it
vectorstore = Chroma.from_documents(
    documents=doc_splits,
    collection_name="rag-chroma",
    embedding=OpenAIEmbeddings(),
    persist_directory="./.chroma",
)

# This will be our retriever, but we will have persistence here via /chroma folder
retriever = Chroma(
    collection_name="rag-chroma",
    persist_directory="./.chroma",
    embedding_function=OpenAIEmbeddings(),
).as_retriever()

# Step 2 - Definition of State

In [67]:
from typing import List, TypedDict

# States in LangGraph (aka what is rememebred across the graph) - is defined as a TypedDict object
# TypeDict will set

# Documents will be a list of strings; all other properties is a string

class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        web_search: whether to add search
        documents: list of documents
    """

    question: str
    generation: str
    web_search: bool
    documents: List[str]

# Side Note - Invocation Operation

* Invoke has different meanings in LangGraph and LangChain.
* Depending on the context and object, invoke has different properties and usage:

In [13]:
# # Retriever's invoke: "find relevant documents"
# docs = retriever.invoke("query")

# # LLM's invoke: "generate text"
# response = llm.invoke("What is 2+2?")

# # Embeddings' invoke: "convert text to vectors"
# vectors = embeddings.invoke("convert this to numbers")

# # Prompt template's invoke: "fill in the template"
# filled = prompt.invoke({"question": "What is life?", "context": "..."})

# # Parser's invoke: "parse the output"
# parsed = output_parser.invoke("Raw text to parse")

# Runnable PassThrough

In [14]:
# Telling the pipeline in LG and LC to pass something or keep it th

In [15]:
# from langchain.schema import RunnablePassthrough
# from langchain_openai import ChatOpenAI
# from langchain.prompts import ChatPromptTemplate

# # WITHOUT RunnablePassthrough ❌
# basic_chain = (
#     retriever  
#     | ChatPromptTemplate.from_template("Context: {context}\nPlease summarize it.")
#     | ChatOpenAI()
# )
# # Flow:
# # 1. User asks: "Who was Einstein?"
# # 2. Retriever gets documents → [docs about Einstein]
# # 3. Template only gets {context} → "Context: [docs about Einstein]"
# # 4. LLM only sees the context, original question is lost!


# # WITH RunnablePassthrough ✅
# better_chain = (
#     {
#         "context": retriever,          # Gets relevant docs
#         "question": RunnablePassthrough()  # Preserves "Who was Einstein?"
#     }
#     | ChatPromptTemplate.from_template("""
#         Context: {context}
#         Question: {question}    # Can still use original question here!
#         Please answer the question.
#         """)
#     | ChatOpenAI()
# )
# # Flow:
# # 1. User asks: "Who was Einstein?"
# # 2. Retriever gets documents → {"context": [docs]}
# # 3. RunnablePassthrough keeps question → {"question": "Who was Einstein?"}
# # 4. Template gets both → "Context: [docs], Question: Who was Einstein?"
# # 5. LLM sees both context AND original question!

In [16]:
# # Best practices for piping:

# # Follow this structure:
# # Simple Chain
# loader | splitter | embedder

# # RAG Chain
# retriever | template | llm

# # RAG with preserved data
# {
#     "context": retriever,
#     "question": RunnablePassthrough()
# } | template | llm

# # Multi-source RAG
# {
#     "web": web_retriever,
#     "docs": doc_retriever,
#     "question": RunnablePassthrough()
# } | template | llm

# Step 3 - Retriever Node

* Technically vector databases will handle this for you, but let's abstract this lower and create a node that does it for you, for learning prposes!

In [17]:
# Retriever will refer to the vector store
from typing import Any, Dict

# From LangGraph family
from graph.state import GraphState

# let's call the ingestion file we made and use the retriver code
from ingestion import retriever # import from local Python file, retriever object

In [18]:
def retrieve(GraphState) -> Dict[str, Any]: # retrieve node, takes in Graph State. Output is dir with str or any
    print("---RETRIEVING FOR YOU!---")
    question = state["question"] # one of the defined attributes
    
    # Use the retriever here, calls the LC object we made
    documents = retriever.invoke(question)
    
    # Return state of a node is usually a dictionary
    # Update state's documents and question
    # We did not have to put the question, but Eden did it as a nexample case
    return {"documents": documents, "question": question}

# Step 4 - Document Grader

In [23]:
# Mke the retrieval_grader

# This is folder path searching: graph folder -> chains folder -> retrieval_grader python, importing these two defined objects and classes
from graph.chains.retrieval_grader import GradeDocuments, retrieval_grader

# searching
from ingestion import retriever
from dotenv import load_dotenv

load_dotenv(override=True)

True

In [45]:
# Define the workflow - answer yes
def test_retrival_grade_answer_yes(question: str) -> None:
    docs = retriever.invoke(question)
    
    # Get first retrieval
    doc_txt = docs[0].page_content
    
    # This is special type hint in Python
    # Pass this into the retrieval grader
    res: GradeDocuments = retrieval_grader.invoke(\
        {"question": question, "document": doc_txt}
        
    )
    
    # assertion:
    assert res.binary_score == "yes"
    
    return res

In [33]:
# Function
def test_retrival_grade_answer_no() -> None:
    question = "agent memory"
    docs = retriever.invoke(question)
    
    # Get first retrieval
    doc_txt = docs[0].page_content
    
    res: GradeDocuments = retrieval_grader.invoke(\
        {"question": "how do I make pizza", "document": doc_txt}
        
    )
    
    # return response
    return res
    
    # assert res.binary_score == "yes"

In [40]:
# We get an error when we switch to "yes", since it failed.
type(test_retrival_grade_answer_no()) # as expected
response = test_retrival_grade_answer_no()

print(response)

binary_score='no'


In [63]:
from langchain.schema.runnable import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field

llm = ChatOpenAI(temperature=0)

def test_retrival_grade_answer_yes(question: str) -> GradeDocuments:
    # Create a prompt template to format the input properly
    prompt = ChatPromptTemplate.from_template("""
        Question: {question}
        Document: {document}
        Is this document relevant to answer the question?
    """)
    
    # Create a chain that preserves both documents and question
    chain = {
        "document": lambda x: retriever.invoke(x["question"])[0].page_content,
        "question": RunnablePassthrough()
    } | prompt | llm
    
    # For many LangChain components
    print(chain.input_schema)  # Shows expected input structure
    
    # Inspect the chain structure
    print(chain)  # Often shows the component structure
    # Call the chain to check what it needs as input
    
    # Invoke the chain with the question
    response = chain.invoke({"question": question})
    
    # Pass to the grader
    res: GradeDocuments = retrieval_grader.invoke({
        "question": question,
        "document": retriever.invoke(question)[0].page_content
    })
    
    assert res.binary_score == "yes"
    return res

In [64]:
test_retrival_grade_answer_yes("agent memory")

<class 'langchain_core.utils.pydantic.RunnableParallel<document,question>Input'>
first={
  document: RunnableLambda(...),
  question: RunnablePassthrough()
} middle=[ChatPromptTemplate(input_variables=['document', 'question'], input_types={}, partial_variables={}, messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['document', 'question'], input_types={}, partial_variables={}, template='\n        Question: {question}\n        Document: {document}\n        Is this document relevant to answer the question?\n    '), additional_kwargs={})])] last=ChatOpenAI(client=<openai.resources.chat.completions.Completions object at 0x1287d50c0>, async_client=<openai.resources.chat.completions.AsyncCompletions object at 0x128902ce0>, root_client=<openai.OpenAI object at 0x1285dbb50>, root_async_client=<openai.AsyncOpenAI object at 0x1287d5060>, temperature=0.0, model_kwargs={}, openai_api_key=SecretStr('**********'))


In [71]:
# Let's define the node to grade documents now that we confirmed the function works.

from typing import Any, Dict
from graph.chains.retrieval_grader import retrieval_grader
from graph.state import GraphState


def grade_documents(state: GraphState) -> Dict[str, Any]:
    """
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant, we will set a flag to run web search

    Args:
        state (dict): The current graph state, "GraphState", was defined to include params such as question and documents

    Returns:
        state (dict): Filtered out irrelevant documents and updated web_search state
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"] # defined earlier for GraphState
    documents = state["documents"] # defined earlier

    filtered_docs = []
    web_search = False
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score.binary_score
        
        # Simple flow of adding document if it's relevant
        if grade.lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            web_search = True
            continue
    return {"documents": filtered_docs, "question": question, "web_search": web_search}


# Step 4 - Tavily

In [78]:
from typing import Any, Dict

from langchain.schema import Document
from langchain_community.tools.tavily_search import TavilySearchResults

from graph.state import GraphState

web_search_tool = TavilySearchResults(k=3)


def web_search(state: GraphState) -> Dict[str, Any]:
    print("---WEB SEARCH---")
    question = state["question"]
    documents = state["documents"]

    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)
    if documents is not None:
        documents.append(web_results)
    else:
        documents = [web_results]
    return {"documents": documents, "question": question}


# Step 5 - Create Generation Node

In [79]:
from typing import Any, Dict

from graph.chains.generation import generation_chain
from graph.state import GraphState


def generate(state: GraphState) -> Dict[str, Any]:
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    generation = generation_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}




# Putting it all together in a Graph:

In [82]:
from dotenv import load_dotenv

from langgraph.graph import END, StateGraph

# We built these pieces and retrievers and objects in other files, let's call them:
from graph.consts import RETRIEVE, GRADE_DOCUMENTS, GENERATE, WEBSEARCH
from graph.nodes import generate, grade_documents, retrieve, web_search

# Other imports:
# Type hints
from typing import TypedDict, List, Dict

# LangGraph core.
# After LC version 0.2.0+, it's recommended to use SG
from langgraph.graph import StateGraph, END, START 

# LangChain core and models
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

# Vector store and embeddings
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

In [87]:
# from dotenv import load_dotenv
# from langgraph.checkpoint.memory import MemorySaver
# from langgraph.checkpoint.sqlite import SqliteSaver

# from graph.chains.answer_grader import answer_grader
# from graph.chains.hallucination_grader import hallucination_grader
# from graph.chains.router import question_router, RouteQuery
# from graph.consts import GENERATE, GRADE_DOCUMENTS, RETRIEVE, WEBSEARCH
# from graph.nodes import generate, grade_documents, retrieve, web_search

# load_dotenv()
# memory = SqliteSaver.from_conn_string(":memory:")
# memory = MemorySaver()


# def decide_to_generate(state):
#     print("---ASSESS GRADED DOCUMENTS---")

#     if state["web_search"]:
#         print(
#             "---DECISION: NOT ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---"
#         )
#         return WEBSEARCH
#     else:
#         print("---DECISION: GENERATE---")
#         return GENERATE


# def grade_generation_grounded_in_documents_and_question(state: GraphState) -> str:
#     print("---CHECK HALLUCINATIONS---")
#     question = state["question"]
#     documents = state["documents"]
#     generation = state["generation"]

#     score = hallucination_grader.invoke(
#         {"documents": documents, "generation": generation}
#     )

#     if hallucination_grade := score.binary_score:
#         print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
#         print("---GRADE GENERATION vs QUESTION---")
#         score = answer_grader.invoke({"question": question, "generation": generation})
#         if answer_grade := score.binary_score:
#             print("---DECISION: GENERATION ADDRESSES QUESTION---")
#             return "useful"
#         else:
#             print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
#             return "not useful"
#     else:
#         print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
#         return "not supported"


# def route_question(state: GraphState) -> str:
#     print("---ROUTE QUESTION---")
#     question = state["question"]
#     source: RouteQuery = question_router.invoke({"question": question})
#     if source.datasource == WEBSEARCH:
#         print("---ROUTE QUESTION TO WEB SEARCH---")
#         return WEBSEARCH
#     elif source.datasource == "vectorstore":
#         print("---ROUTE QUESTION TO RAG---")
#         return RETRIEVE


# workflow = StateGraph(GraphState)
# workflow.add_node(RETRIEVE, retrieve)
# workflow.add_node(GRADE_DOCUMENTS, grade_documents)
# workflow.add_node(GENERATE, generate)
# workflow.add_node(WEBSEARCH, web_search)


# workflow.set_conditional_entry_point(
#     route_question,
#     {
#         WEBSEARCH: WEBSEARCH,
#         RETRIEVE: RETRIEVE,
#     },
# )
# workflow.add_edge(RETRIEVE, GRADE_DOCUMENTS)
# workflow.add_conditional_edges(
#     GRADE_DOCUMENTS,
#     decide_to_generate,
#     {
#         WEBSEARCH: WEBSEARCH,
#         GENERATE: GENERATE,
#     },
# )
# workflow.add_edge(WEBSEARCH, GENERATE)
# workflow.add_conditional_edges(
#     GENERATE,
#     grade_generation_grounded_in_documents_and_question,
#     {
#         "not supported": GENERATE,
#         "useful": END,
#         "not useful": WEBSEARCH,
#     },
# )

# app = workflow.compile(checkpointer=memory)
# app.get_graph().draw_mermaid_png(output_file_path="graph.png")


b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x01\x1c\x00\x00\x02\x8d\x08\x02\x00\x00\x00}DC\xcc\x00\x00\x00\x01sRGB\x00\xae\xce\x1c\xe9\x00\x00 \x00IDATx\x9c\xec\x9dg@\x13I\x1f\xc6\'\x95P\x02\xa1W\x11\xb1\x80b\x01\x84\x13\x15\xcf\x02\x9e\xa8`GQ\xb1{6\xc4\xee\xd9\xeb)\xf6\xee\t\x9e\xfdD\xb0\x9d\x8ap\x96\x03\x0b*6\xac\x07\n(E\x90N\x12\x08-=y?\xaco\xceS\xa4\x99\xec$\xd9\xf9}\n\xbb\x9b\x99\'K\x9e\xcc\xfcgg\xfeC\x92\xcb\xe5\x00\x81@(\x0f2l\x01\x08\x84\xb6\x81L\x85@(\x19d*\x04B\xc9 S!\x10J\x06\x99\n\x81P2\xc8T\x08\x84\x92\xa1\xc2\x16\xa0%\xf0\xabe\xdcbaM\xa5\xa4\xb6R*\x15\xcb%\x12\rxPA&\x03*\x9d\xacgH\xd17\xa4\xb2\xcc\xe9\x06,\nlEZ\x02\t=\xa7\xfa\x1e\xaa\xb8\xe2w/\xabsRk\xa4\x129]\x87\xacgD\xd17\xa4\x1a\x18Q\xc5"\x19li\rC\xa1\x92\xf8\xd5\xd2\x9aJim\xa5D.\x07"\x81\xcc\xb1\x93~\x9b\xce\x06\xc6Vt\xd8\xd24\x1bd\xaaf"\xe2\xcb\x1e\xc6qj*%\xc6\x16t\xc7N\xfaV\x0e\x0c\xd8\x8a\xbe\x97\xd2\x8f\xc2\x9c\xd4\x9a\x8a2\x11\x99B\xea\x11`\xa6o\x88\x1a\xaef\x82L\xd5\x1c^\xdf\xe3=\xb9\xce\xe9\xe1o\xda\xb1\xa7\

# Last Topic - Self RAG and Adaptive RAG

# Self RAG:
**Note - this stuff can be done by Arize Phoenix and other libraries, but we are manually coding it out.

* Hallucination Grader - check if answer is grounded in context, or hallucinated by LLM
* Answer Grader - check if the answer even answers the question (grounded in question)

Refer to the RAG Triad triangle from NLP class and the other advanced RAG course.

So self-RAG here means:
♻️ If hallucinating: Try generating again with same docs
🌐 If off-topic: Go search the web for better docs
✅ If good answer: We're done!

For code --> check self-rag-graph.py file!

# Adapative RAG:

* Research paper based, very fancy.
* Question Router - two flows, search on internet, second route - RAG
* The node decides if the answer requires looking in a vector database.

* The LLM makes the decision of whether to go with the internet, or to use RAG.


# Best Practice - Types of Chat Prompts:

Best Practices:
* Use ChatPromptTemplate for modern chat models (GPT-3.5/4)
* Use SystemMessage to set consistent behavior
* Use HumanMessage for user inputs
* Use AIMessage to include chat history, or to "simulate an LLM" giving some conversation message
* Use StringPromptTemplate for simple, non-chat tasks
* use general Prompt Templates for RAG

# RAG Example:



In [None]:
prompt = ChatPromptTemplate.from_messages([
    ("system", """Analyze the following documents and question carefully.
    1. Check if documents contain relevant information
    2. Only use facts from the documents
    3. Indicate if information is missing"""),
    ("user", "Documents: {documents}\nQuestion: {question}"),
    ("assistant", "Let me analyze this carefully..."),
    ("user", "Are you confident in your answer based on these documents?")
])