In [None]:
# --------All required installations--------

# ---- To dowload YouTube Videos ----
!pip install -qU "yt-dlp[default]"

# ---- To use Whisper ----
!pip install transformers

# ---- LangChain ----
!pip install -qU langchain langchain_experimental

# ---- To use Google's Gemini Models ----
!pip install -qU langchain-google-genai

# ---- Vector Database: Chroma ----
!pip -qU chromadb langchain_chroma

# ---- Updateing `Protocol Buffer` module due to compactibility issues with Gemini Models ----
!pip install --upgrade --force-reinstall protobuf

# ---- Installing 'typing' for Type Hints ----
!pip install -qU typing

# ---- Intalling Pydantic for Data Validation (Used in WebSearch) ----
!pip install -qU pydantic

# # ---- Serper's API Wrapper's dependencies ----
# !pip install -qU google-search-results

# ---- DuckDuckgo Search ----
!pip install -qU duckduckgo-search langchain-community
!pip install -U ddgs

In [None]:
# Importing all required librarires


# ---- General ----
import os
import warnings
warnings.filterwarnings("ignore")


# ---- To download YouTube Videos - "yt-dlp" ----
from yt_dlp import YoutubeDL


# ---- To use Whisher ----
from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq
from transformers import pipeline


# ---- Used for Chucking the documents and create Vector Enbeddings to be used for storage in VectorDB ----
from langchain_text_splitters import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
from langchain_community.embeddings import HuggingFaceEmbeddings

# ---- Using ChromaDB and creating Documents ----
from langchain_chroma import Chroma
from langchain_core.documents import Document

# ---- Document Retriever ----
from langchain_core.retrievers import BaseRetriever
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from typing import List, Any

# ---- Output Parases and LangChain's Runnable Classes ----
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel

# ---- Using Google's GenAI Models ----
from langchain_google_genai import ChatGoogleGenerativeAI

# ---- Prompt Templates of LangChain ----
from langchain_core.prompts import ChatPromptTemplate

# ---- For HTTP requests -> To be used in API calls ----
import requests

# # ---- Serper API ----
# from langchain_community.utilities import GoogleSerperAPIWrapper

# ---- DuckDuckGo ----
from langchain_community.tools import DuckDuckGoSearchRun

# ---- Tools ----
from langchain_core.tools import tool
from langchain.tools import BaseTool

# ---- LangChain LLM Model initializations and LangGraph's Agent's Fundamental Classes  ----
from langgraph.prebuilt import create_react_agent
from langchain.chat_models import init_chat_model

# ---- Using Pydant's BaseModel for Data Validation ----
from pydantic import BaseModel, Field

# ---- Importing requried Typing Hints -> To be used in creating State ----
from typing import TypedDict, List, Annotated, Optional, NotRequired

# ---- To be used in Maintaining Message History ----
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages

# ---- For Persistence ----
from langgraph.checkpoint.memory import MemorySaver

# ---- For Graph ----
from langgraph.graph import StateGraph, START, END

# ---- for RegEx (used in YouTube Video Title Handling) ----
import re

# ---- To be used in generating Unique FileName ----
import uuid

In [None]:
# All the required API Keys

os.environ['GOOGLE_API_KEY'] = ''

In [None]:
# Global variables

MODEL_INSTANCE = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0, timeout=10, max_retries=2)

OUTPUT_PARSER = StrOutputParser()

SENTENCE_TRANSFORMER = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")

TRANSCRIBER = pipeline(
    model="openai/whisper-small", 
    framework = "pt", 
    device = -1, 
    return_timestamps=True
)

# Added a Global variable for Vector DB

GLOBAL_VECTOR_STORE = None

<h1 style="color: #FFD700;">Agentic RAG</h1>

<h2 style="color: #AAF1D1;">Defining the Nodes</h2>

In [None]:
# Creating the State:

class State(BaseModel):
    
    class Config:
        arbitrary_types_allowed = True
        
    user_query: str = None
    youtubeURL: str = None
    localpath: str = r""       #Enter local your path here -> for YouTube Video Downloads
    video_details: str = ""
    transcription: str = ""
    vectorDB_flg: bool = False
    rewritten_query: Optional[str] = None
    rewritten_flg : bool = False
    retrieval_sync : bool = True
    documents: List[str] = []
    webResults: Optional[str] = None
    graph_output: str = ""
    graph_exit: bool = True

In [None]:
# Node-1: Downloading YouTube Video


# Define the utility function *outside* the node for cleaner code
def clean_filename(title: str) -> str:
    forbidden_chars = r'[<>:"/\\|?*]'
    cleaned_title = re.sub(forbidden_chars, '_', title)
    cleaned_title2 = re.sub(' ', '_', cleaned_title)
    return re.sub(r'__+', '_', cleaned_title2).strip('_')


def Node1_YTVideoDownload(state: State) -> dict:
    
    # Generating a unique, and safe temporary filename using 'uuid'
    temp_filename_base = str(uuid.uuid4())
    
    # Extracting Video Info (Title and Extension)
    with YoutubeDL({}) as yt:
        info = yt.extract_info(state.youtubeURL, download=False)
        raw_title = info.get("title")
        extension = info.get("ext")

    # Cleaning the Video Title name (to be used for downloading)
    safe_title_base = clean_filename(raw_title)
    target_filename = safe_title_base + "." + extension
    
    # Using "Temporary" name to download the file
    temp_download_name = temp_filename_base + "." + extension
    temp_path = os.path.join(state.localpath, temp_download_name)
    
    # Actual download (using Temporar Name and Path)
    yt_opts = {
        'format': 'bestaudio',
        'outtmpl': temp_path
    }
    
    #Changing path for `os's current directory` -> this is where the download takes place Locally
    os.chdir(state.localpath) 

    with YoutubeDL(yt_opts) as yt:
        yt.download([state.youtubeURL]) 
    
    # Renaming the downloaded file name from "Temporary Name" to "Clean Name"

    final_path = os.path.join(state.localpath, target_filename)
    try:
        os.rename(temp_path, final_path)
    except Exception as e:
        print(f"Error renaming file from {temp_path} to {final_path}: {e}")
        raise 

    print("Node-1 Executed!")
    return {"video_details": target_filename}

* Actual Node-1 here (Double click to expand)
<!-- 

def Node1_YTVideoDownload(state: State) -> dict:
    yt_opts = {
        'format': 'bestaudio',      # Downloading the best audio format -> High res audio is useful for better Transcriting
        'outtmpl': '%(title)s.%(ext)s'  # Saving the file with its title and original extension
    }

    os.chdir(state.localpath)
    
    with YoutubeDL(yt_opts) as yt:
        yt.download(state.youtubeURL)                                        # Downloading the YouTube Video
        info = yt.extract_info(state.youtubeURL, download=False)             # Using "extract_info" attribute to get the video details from the URL
        title = info.get("title")
        extension = info.get("ext")
        video_title = (title + "." + extension)


    import re 
    
    def clean_filename(title: str) -> str:
        """Removes illegal characters from a string to make it a valid filename."""
        # List of characters forbidden in Windows filenames
        forbidden_chars = r'[<>:"/\\|?*]'
        
        # Replace forbidden characters with an underscore or a space
        cleaned_title = re.sub(forbidden_chars, '_', title)
        cleaned_title2 = re.sub(' ', '_', cleaned_title)
        
        # Limiting the length and clean up multiple spaces/underscores
        cleaned_title = re.sub(r'__+', '_', cleaned_title2).strip('_')
        
        return cleaned_title

    cleaned_video_title = clean_filename(video_title)
    
    return {"video_details" : cleaned_video_title} -->

In [None]:
# Node-2: Video Transcription using a Local Model (OpenAI's Whisper)

def Node2_Transcription(state: State) -> dict:
    
    # Defining the local path for the video
    audio_path = os.path.join(state.localpath,state.video_details)

    text_out = TRANSCRIBER(audio_path)
    
    # Final Text to be stored in Vector DB
    transcribed_text = text_out['text']
    print("Node-2 Executed!")

    return {"transcription" : transcribed_text}

In [None]:
# Node-3: Creating a VectorDB Collection

def Node3_vectorDB(state: State) -> dict:
    global GLOBAL_VECTOR_STORE
    
    # Splitting the transcribed text
    text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=0,
    length_function=len,
    is_separator_regex=False
    )

    docs = text_splitter.create_documents([state.transcription])
    
    vector_store = Chroma(
        collection_name="YT_RAG", 
        embedding_function= SENTENCE_TRANSFORMER
    )
    
    # Adding documents to ChromaDB Collection - "YT_RAG"
    
    document_ids = list(f"id_{x}" for x in range(len(docs)))
    
    vector_store.add_documents(documents=docs, ids = document_ids)

    GLOBAL_VECTOR_STORE = vector_store
    
    print("Node-3 Executed!")
    
    return {"vectorDB_flg": True}
    

In [None]:
# Node-4: Retriever
def Node4_Retriever(state: State) -> dict:
    global GLOBAL_VECTOR_STORE
    
    # Access from global variable
    vector_store = GLOBAL_VECTOR_STORE
    
    class MyRetriever(BaseRetriever):
        class Config:
            arbitrary_types_allowed = True
    
        vdb: Chroma
        top_k: int = 3
    
        def _get_relevant_documents(self, qry, run_manager: CallbackManagerForRetrieverRun) -> List[Document]:
            results = self.vdb.similarity_search(query=qry, k=self.top_k)
            return results

        def get_relevant_documents(self, query):
            return self._get_relevant_documents(query, CallbackManagerForRetrieverRun)
    
    retriever = MyRetriever(vdb=vector_store, top_k=3)
    query = state.rewritten_query if state.rewritten_flg else state.user_query
    docs = retriever.get_relevant_documents(query)
    docs_list = list(doc.page_content for doc in docs)

    print("Node-4 Executed!")
    return {"documents": docs_list}

In [None]:
# Node-5: LLM Judge (determines the relevance of retrieved documents)

def Node5_llmJudge(state: State) -> dict:
    LLM_Judge = MODEL_INSTANCE
    
    Judge_sysMessage = """You are acting as a Judge for determining the relevance of provided input documents in the context
    and comparing it with the query.
    If you think the provided prompts contain the relevant details required to answer the question, say "True", otherwise, say "False".
    You will not generate any additional response other than a single word - "True" or "False".
    Context is : {context}
    \n
    Query is: {user_input}"""

    judge_prompt = ChatPromptTemplate([ 
            ("system", Judge_sysMessage), 
            ("human", "Context: {context} \n Query: {user_input}")
        ])
        

    judge_chain = (
        judge_prompt
        | LLM_Judge
        | OUTPUT_PARSER
    )

    if state.rewritten_flg:
        query_to_use = state.rewritten_query
    else:
        query_to_use = state.user_query

    judgement = judge_chain.invoke({"context": state.documents, "user_input": query_to_use})

    judgement_result = judgement.lower() == "true"

    print("Node-5 Executed!")
    
    return {"retrieval_sync": judgement_result}

In [None]:
# Node-6C: Conditional Node - Rewriting the query (depends on the State variable "retrieval_sync")


def Node6C_QueryRewriter(state: State) -> dict:

    LLM_Query_Rewrite = MODEL_INSTANCE
    
    rewrite_sysMessage = """You are an expert **Query Rewriter** for a sophisticated **Agentic RAG (Retrieval-Augmented Generation) system**. Your sole task is to take a user's initial query and rewrite it into a **highly optimized, self-contained, and comprehensive search query** that will maximize the chances of retrieving relevant documents from a technical knowledge base.
    
    **Key principles for the rewritten query:**
    1.  **Contextualization:** Expand acronyms, define ambiguous terms, or add missing context implied by the original conversation (if applicable, though for a first-pass query, focus on being self-contained).
    2.  **Explicitness:** Convert vague questions (e.g., "how to fix that?") into concrete statements or specific requests (e.g., "what are the troubleshooting steps for error code 503 on the Kubernetes control plane?").
    3.  **Keyword Density:** Increase the number of relevant, specific technical terms that an indexing system would recognize.
    4.  **Stand-alone Clarity:** The rewritten query **must** make sense and be an effective search term even if the original query is lost.
    
    Original User Query: {user_input}"""
    
    rewrite_prompt = ChatPromptTemplate([
        ("system", rewrite_sysMessage),
        ("human", "{user_input}")
    ])

    rewrite_chain = (rewrite_prompt | LLM_Query_Rewrite | OUTPUT_PARSER)

    rewritten_query = rewrite_chain.invoke({"user_input": state.user_query})

    print("Node-6 Executed!")
    
    return{"rewritten_query": rewritten_query, "rewritten_flg": True}

In [None]:
# Node-7C: Conditional Node - Performs a Websearch if the re-written query also doesn't fetch the required docs

def Node7C_WebSearch(state: State) -> dict:
    """Used to perform a web search using DuckDuckGo for the given (rewritten) query"""

    if not state.rewritten_query:
        # to prevent a failed tool call.
        print("ERROR: WebSearch Node reached without a rewritten_query. Skipping web search.")
        return {"webResults": "Web search skipped due to missing rewritten query."}
    
    search = DuckDuckGoSearchRun()
    webResult = search.invoke(state.rewritten_query)

    print("Node-7 Executed!")
    print(f"Search result: {webResult}")  # for Debugging

    return {"webResults" :webResult}

In [None]:
# Node-8: Response Generator Node

def Node8_GenerateResponse(state: State) -> dict:
    Generate_LLM = MODEL_INSTANCE

    Response_sysMessage = """
    You are a smart assistant which takes in the given query and generates a response using the provided context data.
    If the provided data is insuffucient to answer the question, reply with I don't know.
    The provided context data can be from retrieved documents or from web. Always mention the source of getting this data.
    Context is : {context}
    \n
    Query is: {user_input}"""
    
    generation_prompt = ChatPromptTemplate(
        [
            ("system", Response_sysMessage),
            ("human", "{user_input}")
        ]
    )

    response_chain = (generation_prompt | Generate_LLM | OUTPUT_PARSER)

    query = state.rewritten_query if state.rewritten_flg else  state.user_query

    context = state.documents if state.retrieval_sync else state.webResults
    
    result = response_chain.invoke({"context": context, "user_input": query})

    print("Node-8 Executed!")
    print(result)
    
    return {"graph_output" : result}

In [None]:
# Node-9: Placeholder Query. Used for "interrupt()"

def Node9_getUserInput(state: State) -> dict:
    """
    Used for taking in user input
    """
     # Taking in the New User Input, incase if the graph reached "get_next_query", and is waiting for new input
    next_query = input("\n>>What's your next question (or type 'quit' to exit): ")
    
    # Check if the graph reached END during the last run
    
    exitDecision = True if next_query.lower() == "quit" else False
        
    print("Node-9 Executed!")
    
    return {    "graph_exit": exitDecision,
                "user_query": next_query,
                "rewritten_query": None,  # Reset for new query
                "rewritten_flg": False,   # Reset flag
                "documents": [],          # Clear old docs
                "webResults": None        # Clear old web results
           }

<h2 style="color: #AAF1D1;">Building the Graph</h2>

In [None]:
# Instantiating the Graph
graph = StateGraph(State)

# Adding nodes

graph.add_node("1_YTVideoDownload", Node1_YTVideoDownload)
graph.add_node("2_Transcription", Node2_Transcription)
graph.add_node("3_vectorDB", Node3_vectorDB)
graph.add_node("4_Retriever", Node4_Retriever)
graph.add_node("5_llmJudge", Node5_llmJudge)
graph.add_node("6_QueryRewriter", Node6C_QueryRewriter)
graph.add_node("7_WebSearch", Node7C_WebSearch)
graph.add_node("8_GenerateResponse", Node8_GenerateResponse)
graph.add_node("9_getUserInput", Node9_getUserInput)

In [None]:
# Defining the Routing Functions

# To check if the VectorDB is laready created for the video
def vectorDBExists(state: State) -> bool:
    global GLOBAL_VECTOR_STORE
    
    if state.vectorDB_flg and GLOBAL_VECTOR_STORE is not None:
        return True
    else:
        return False

# To check if the initial documents retired using "Actual User Query" are relevant or not and then determine the next course of action

def retrievedDocsRelevant(state: State) -> str:
    if state.retrieval_sync:
        nextNode = "Response"
    
    elif not state.retrieval_sync and state.rewritten_flg:
        nextNode = "webSearch"
        
    elif not state.retrieval_sync and not state.rewritten_flg:
        nextNode = "Rewrite"
    
    else:
        # Fallback (shouldn't reach here)
        return "Response"
    
    return nextNode

def graphExit(state: State) -> bool:
    return state.graph_exit

In [None]:
# Adding Edges to the "graph" instance

graph.add_conditional_edges(START, vectorDBExists, {True: "4_Retriever", False: "1_YTVideoDownload"})
graph.add_edge("1_YTVideoDownload", "2_Transcription")
graph.add_edge("2_Transcription", "3_vectorDB")
graph.add_edge("3_vectorDB", "4_Retriever")
graph.add_edge("4_Retriever", "5_llmJudge")
graph.add_conditional_edges("5_llmJudge", retrievedDocsRelevant, {"Response": "8_GenerateResponse", "Rewrite": "6_QueryRewriter", "webSearch": "7_WebSearch"})
graph.add_edge("6_QueryRewriter", "4_Retriever")
graph.add_edge("7_WebSearch", "8_GenerateResponse")
graph.add_edge("8_GenerateResponse", "9_getUserInput")
graph.add_conditional_edges("9_getUserInput", graphExit, {True: END, False: "4_Retriever"})

In [None]:
# Compling the Graph
# Using a memory saver so the state (like vectorDB) persists between runs

app = graph.compile(checkpointer=MemorySaver())

# A unique ID for this conversation
thread_id = "chat-1"

<h1 style="color: orange">Input Variables</h1>

In [None]:
# Taking YT Video as an input for Q&A:

yt_url = input(">> Enter the YouTube URL for Q&A:")

In [None]:
# Taking in Initial User Query

initial_user_query = input(">>")

In [None]:
# --- ACTUAL CONVERSATION ---

# Starting the Graph with Initial User Query
app.update_state(
    config={
        "configurable": {
            "thread_id": thread_id
        }
    },
    values={
        "user_query": initial_user_query,
        "youtubeURL": yt_url
    }
)

In [None]:
result = app.invoke({}, config={"configurable": {"thread_id": thread_id}})

In [None]:
config = {"configurable": {"thread_id": thread_id}}
list(app.get_state_history(config))