### **Load environment variables from the .env**

In [None]:
import os
from dotenv import load_dotenv

# Load environment variables from the .env file into the system environment
load_dotenv()

# Fetch API keys from environment variables
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
LANGCHAIN_API_KEY = os.getenv("LANGCHAIN_API_KEY")

# Explicitly set the API keys in os.environ
# (Useful when libraries expect keys to be present at runtime)
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
os.environ["TAVILY_API_KEY"] = TAVILY_API_KEY
os.environ["GROQ_API_KEY"] = GROQ_API_KEY
os.environ["LANGCHAIN_API_KEY"] = LANGCHAIN_API_KEY

# Enable LangChain tracing (v2) for debugging and observability
os.environ["LANGCHAIN_TRACING_V2"] = "true"

# Set the LangChain endpoint for tracing and monitoring (LangSmith)
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"


## **Initialize a chat-based Large Language Model (LLM)**

In [None]:
from langchain_google_genai import ChatGoogleGenerativeAI

# Initialize a chat-based Large Language Model (LLM) using Google Gemini
# "gemini-2.5-flash" is a fast, lightweight model optimized for low-latency responses
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash")



### **Initialize a Hugging Face embedding model**

In [None]:
from langchain_huggingface import HuggingFaceEmbeddings

# Initialize a Hugging Face embedding model
# "all-MiniLM-L6-v2" is a lightweight, fast, and widely used model
# It converts text into dense vector embeddings for tasks like
# semantic search, similarity matching, and retrieval (RAG)
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")


In [None]:
result = llm.invoke("Write a ballad about LangChain")
print(result.content)

### **Retrieval-Augmented Generation (RAG) Pipeline with LangChain**

In [None]:
from langchain_text_splitters.character import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma

# List of URLs to load documents from (blog posts by Lilian Weng)
urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]

# Load web pages and convert them into LangChain Document objects
# Each URL may return a list of documents
docs = [WebBaseLoader(url).load() for url in urls]

# Flatten the list of lists into a single list of documents
docs_list = [item for sublist in docs for item in sublist]

# Initialize a recursive text splitter
# Uses token-based splitting to respect LLM token limits
# chunk_size=250 keeps chunks small and retrieval-efficient
# chunk_overlap=0 avoids repeated context across chunks
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=250,
    chunk_overlap=0
)

# Split the loaded documents into smaller, retrievable chunks
doc_splits = text_splitter.split_documents(docs_list)

# Create a Chroma vector database from the document chunks
# Each chunk is embedded using the provided embeddings model
vectorstore = Chroma.from_documents(
    documents=doc_splits,
    collection_name="rag-chroma",
    embedding=embeddings,
)

# Convert the vector store into a retriever
# This enables semantic search for RAG pipelines
retriever = vectorstore.as_retriever()


### **Retrieval Grader for RAG Pipelines with LangChain**

In [None]:
### Retrieval Grader
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field

# Data model
class GradeDocuments(BaseModel):
    """Binary score for relevance check on retrieved documents."""

    binary_score: str = Field(
        description="Documents are relevant to the question, 'yes' or 'no'"
    )
# LLM with function call
structured_llm_grader = llm.with_structured_output(GradeDocuments)
# Prompt
system = """You are a grader assessing relevance of a retrieved document to a user question. \n 
    If the document contains keyword(s) or semantic meaning related to the question, grade it as relevant. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""
grade_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
    ]
)

retrieval_grader = grade_prompt | structured_llm_grader
question = "agent memory"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))

## **Load a pre-built prompt**

In [None]:
### Generate

from langchain_classic import hub
from langchain_core.output_parsers import StrOutputParser

# -----------------------------
# Load a pre-built prompt template from LangChain Hub
# "rlm/rag-prompt" is designed for Retrieval-Augmented Generation (RAG) workflows
# -----------------------------
prompt = hub.pull("rlm/rag-prompt")

# -----------------------------
# Post-processing helper function
# Converts a list of Document objects into a single string
# Each document's page_content is joined by two newlines for readability
# -----------------------------
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# -----------------------------
# Compose the RAG chain
# prompt -> LLM -> StrOutputParser ensures the final output is a plain string
# -----------------------------
rag_chain = prompt | llm | StrOutputParser()

# -----------------------------
# Run the RAG chain
# Pass the retrieved documents and user question to generate an answer
# -----------------------------
generation = rag_chain.invoke({"context": docs, "question": question})

# Output the generated answer
print(generation)


### **Question Re-Writing for Optimized Retrieval in RAG Pipelines**


In [None]:
### Question Re-writer

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# -----------------------------
# Define the system prompt for the question re-writer
# This instructs the LLM to take an input question and improve it
# The goal is to optimize the question for web search or retrieval
# It also encourages the model to reason about the underlying intent
# -----------------------------
system = """You are a question re-writer that converts an input question to a better version 
that is optimized for web search. Look at the input and try to reason about the underlying semantic intent / meaning."""

# -----------------------------
# Create a prompt template combining system instructions and human input
# "human" part provides the initial question and asks the LLM to reformulate it
# -----------------------------
re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        (
            "human",
            "Here is the initial question: \n\n {question} \n Formulate an improved question.",
        ),
    ]
)

# -----------------------------
# Compose the re-writer chain
# re_write_prompt -> LLM -> StrOutputParser
# StrOutputParser ensures the output is returned as a plain string
# -----------------------------
question_rewriter = re_write_prompt | llm | StrOutputParser()

# -----------------------------
# Example usage
# Invoke the chain with the original question to get an improved version
# -----------------------------
question_rewriter.invoke({"question": question})


### **Web Search Integration for RAG Pipelines**

In [None]:
### Search

from langchain_community.tools.tavily_search import TavilySearchResults

web_search_tool = TavilySearchResults(k=3)

In [None]:
from typing import List

from typing_extensions import TypedDict


class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        web_search: whether to add search
        documents: list of documents
    """

    question: str
    generation: str
    web_search: str
    documents: List[str]

### **End-to-End RAG Workflow: Retrieval, Grading, Query Rewriting, Web Search, and Generation**


In [None]:
from langchain_core.documents import Document

# -----------------------------
# Function: retrieve
# -----------------------------
def retrieve(state):
    """
    Retrieve documents from the retriever using the current question.
    
    Args:
        state (dict): The current state of the workflow containing the question.
    
    Returns:
        dict: Updates the state with the retrieved documents.
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Use the retriever to get relevant documents
    documents = retriever.invoke(question)
    return {"documents": documents, "question": question}


# -----------------------------
# Function: generate
# -----------------------------
def generate(state):
    """
    Generate an answer from the retrieved documents using the RAG chain.
    
    Args:
        state (dict): Current workflow state containing question and documents.
    
    Returns:
        dict: Updates the state with the LLM-generated answer.
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # Use the RAG chain to generate an answer
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}


# -----------------------------
# Function: grade_documents
# -----------------------------
def grade_documents(state):
    """
    Evaluate relevance of retrieved documents to the question using retrieval grader.
    
    Args:
        state (dict): Workflow state with question and documents.
    
    Returns:
        dict: Updates the state with only relevant documents and web_search flag.
    """
    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    filtered_docs = []
    web_search = "No"

    for d in documents:
        # Grade each document: yes = relevant, no = not relevant
        score = retrieval_grader.invoke({"question": question, "document": d.page_content})
        grade = score.binary_score

        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            web_search = "Yes"
            continue

    return {"documents": filtered_docs, "question": question, "web_search": web_search}


# -----------------------------
# Function: transform_query
# -----------------------------
def transform_query(state):
    """
    Re-write the question to improve retrieval or web search results.
    
    Args:
        state (dict): Workflow state containing question and documents.
    
    Returns:
        dict: Updates state with the improved question.
    """
    print("---TRANSFORM QUERY---")
    question = state["question"]
    documents = state["documents"]

    # Use the question re-writer chain
    better_question = question_rewriter.invoke({"question": question})
    return {"documents": documents, "question": better_question}


# -----------------------------
# Function: web_search
# -----------------------------
def web_search(state):
    """
    Perform a web search if retrieved documents are insufficient.
    
    Args:
        state (dict): Workflow state with question and optionally documents.
    
    Returns:
        dict: Updates documents with new web search results.
    """
    print("---WEB SEARCH---")
    question = state["question"]
    documents = state.get("documents", [])

    # Perform web search using TavilySearchResults
    docs = web_search_tool.invoke({"query": question})

    contents = []

    # Convert different result types into plain text
    for d in docs:
        if isinstance(d, str):
            contents.append(d)
        elif isinstance(d, dict):
            contents.append(d.get("content", str(d)))
        else:
            contents.append(str(d))

    # Combine search results and add as a Document object
    web_results_text = "\n".join(contents)
    documents.append(Document(page_content=web_results_text))

    return {"documents": documents, "question": question}


# -----------------------------
# Function: decide_to_generate
# -----------------------------
def decide_to_generate(state):
    """
    Decide the next step in the workflow: generate an answer or re-write the query.
    
    Args:
        state (dict): Current workflow state including graded documents and web_search flag.
    
    Returns:
        str: Next node to call ('generate' or 'transform_query').
    """
    print("---ASSESS GRADED DOCUMENTS---")
    web_search = state["web_search"]

    if web_search == "Yes":
        # All documents were filtered; re-write the query for better results
        print("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT, TRANSFORM QUERY---")
        return "transform_query"
    else:
        # We have relevant documents, proceed to generate an answer
        print("---DECISION: GENERATE---")
        return "generate"


### **Initialize the workflow graph**

In [None]:
from langgraph.graph import END, StateGraph, START

# -----------------------------
# Initialize the workflow graph
# GraphState is the shared state schema (contains question, documents, etc.)
# -----------------------------
workflow = StateGraph(GraphState)

# -----------------------------
# Define the nodes in the workflow
# Each node corresponds to a function defined earlier
# -----------------------------
workflow.add_node("retrieve", retrieve)            # Retrieve documents
workflow.add_node("grade_documents", grade_documents)  # Grade document relevance
workflow.add_node("generate", generate)            # Generate answer using RAG
workflow.add_node("transform_query", transform_query)  # Re-write question for better retrieval
workflow.add_node("web_search_node", web_search)   # Perform web search if needed

# -----------------------------
# Build the edges (connections) between nodes
# START -> retrieve -> grade_documents
# grade_documents can conditionally go to either transform_query or generate
# -----------------------------
workflow.add_edge(START, "retrieve")               # Workflow starts with document retrieval
workflow.add_edge("retrieve", "grade_documents")  # Next, grade retrieved documents

# Conditional branching after grading
# decide_to_generate determines next step based on document relevance
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,  # function that returns either "transform_query" or "generate"
    {
        "transform_query": "transform_query",  # If no relevant docs, re-write query
        "generate": "generate",                # If relevant docs exist, generate answer
    },
)

# Continue workflow edges
workflow.add_edge("transform_query", "web_search_node")  # If question re-written, do web search
workflow.add_edge("web_search_node", "generate")         # After web search, generate answer
workflow.add_edge("generate", END)                       # Workflow ends after generation

# -----------------------------
# Compile the workflow into an executable app
# -----------------------------
app = workflow.compile()


In [None]:
from pprint import pprint

# Run
inputs = {"question": "What are the types of agent memory?"}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint("\n---\n")

# Final generation
pprint(value["generation"])

In [None]:
from pprint import pprint

# Run
inputs = {"question": "How does the AlphaCodium paper work?"}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint("\n---\n")

# Final generation
pprint(value["generation"])