# Rag Structured Output

In [2]:

from langchain_core.messages import HumanMessage
import streamlit as st
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from youtube_transcript_api import YouTubeTranscriptApi
import re      
            
            
# _-----------------------------------------------------FUNCTIONS FOR RAG----------------------------------------------

# ------------------ Transcript Loader ------------------
def load_transcript(url: str) -> str | None:
    pattern = r'(?:v=|\/)([0-9A-Za-z_-]{11})'
    match = re.search(pattern, url)
    if match:
        video_id = match.group(1)
        try:
            captions = YouTubeTranscriptApi().fetch(video_id).snippets
            # join text + start_time
            data = [f"{item.text} ({item.start})" for item in captions]
            return " ".join(data)
        except Exception as e:
            print(f"Error fetching transcript: {e}")
            return None

# ------------------ Text Splitter ------------------
def text_splitter(transcript):
    splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
    return splitter.create_documents([transcript])

# ------------------ Vector Store & Retriever  ------------------
def generate_embeddings(chunks):
    embeddings = HuggingFaceEmbeddings(model_name='sentence-transformers/all-MiniLM-L6-v2')
    return FAISS.from_documents(chunks, embeddings)

def retriever_docs(vector_store):
    return vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 5})

def format_docs(retrieved_docs):
    return "\n\n".join(doc.page_content for doc in retrieved_docs)


In [3]:
# ------------------ Imports ------------------
from dotenv import load_dotenv
load_dotenv()

from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from pydantic import BaseModel, Field
from typing import TypedDict, Annotated
import re
import os
from langchain.prompts import PromptTemplate
#os.environ["LANGCHAIN_PROJECT"] = "TubeTalkAI Testing"

# ------------------ Build LLM (Gemini) ------------------
model = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0)

#--------------------Prompt Template----------------------
template = """
You are the YouTuber from the video, directly answering the viewer’s question.

Rules:
1. ONLY use the transcript provided below.
2. Give the answer in simple, clear sentences — without timestamps inside the text.
3. ALWAYS return the exact timestamp (in seconds) from the transcript line you used.
   - Do NOT round or estimate timestamps.
   - If multiple transcript parts are relevant, return the most direct one.
4. Do NOT add greetings, filler, or extra commentary.
5. If the transcript does not answer, say: "Sorry, I didn’t talk about that in this video."

Transcript:
{transcript}

Question:
{question}

Output format (for schema):
- "answer": A list of 1–3 short strings that directly answer the question (no timestamps here).
- "timestamps": The exact timestamp (in seconds) from the transcript where the answer was found.
"""




prompt = PromptTemplate(
    input_variables=["transcript", "question"],
    template=template,
)
# ------------------ Structured Output Schema ------------------
class AnsandTime(BaseModel):
    answer:str = Field(
        description="Answers to user's question (do NOT include timestamps here)"
    )
    timestamps: float = Field(
        description="The time (in seconds) from where the answer is taken"
    )

structured_model = model.with_structured_output(AnsandTime)

# ------------------ ChatState ------------------
class ChatState(TypedDict):
    messages: Annotated[list[BaseMessage], "add_messages"]

# ------------------ Chat Node ------------------
def chat_node(state: ChatState):
    # Extract user question from state
    user_message = state["messages"][-1].content  # last message is the user's input

    # Fill the prompt
    final_prompt = prompt.format(
        transcript=context,   # <-- your transcript goes here
        question=user_message
    )

    # Get structured output
    response = structured_model.invoke(final_prompt)
    ai_text = f"{response.answer}\nTimestamp: {response.timestamps}"

    return {
        "messages": [
            state["messages"][-1],  # include the HumanMessage again
            AIMessage(content=ai_text)  # add the AI reply
        ]
    }

# ------------------ Build Graph ------------------
checkpointer = InMemorySaver()

graph = StateGraph(ChatState)
graph.add_node("chat_node", chat_node)
graph.add_edge(START, "chat_node")
graph.add_edge("chat_node", END)

CONFIG = {'configurable': {'thread_id': "newthread"}}
workflow =graph.compile(checkpointer = checkpointer)

In [4]:
# ------------------ Load YouTube Transcript ------------------
youtube_input = "https://www.youtube.com/watch?v=s3KnSb9b4Pk"
youtube_captions = load_transcript(youtube_input)
print("Transcript Loaded:", youtube_captions[:200], "...")

# Split & Embed transcript
chunks = text_splitter(youtube_captions)
vector_store = generate_embeddings(chunks)
retriever = retriever_docs(vector_store)


Transcript Loaded: Hello all, my name is Krishna Nayak and (0.48) welcome to my YouTube channel. So guys, (2.56) today in this particular video, we are (4.96) going to go ahead and see the entire (7.12) road map to lear ...


  from .autonotebook import tqdm as notebook_tqdm


In [5]:
output_dict = {"human": [], "ai": []}
CONFIG = {'configurable': {'thread_id': "newthread"}}



In [6]:
while True : 
    user_input = input("User : ")
    if user_input == 'exit':
        break
    print("user :", user_input)
    retrieved_chunks = retriever.get_relevant_documents(user_input)
    context = format_docs(retrieved_chunks)
    result = workflow.invoke(
            {'messages': [HumanMessage(content=user_input)]},
            config=CONFIG,
        )
    for msg in result['messages']:
        if isinstance(msg, HumanMessage):
            if msg.content not in output_dict['human']:
                output_dict['human'].append(msg.content)
        elif isinstance(msg, AIMessage):
            if msg.content not in output_dict['ai']:
                output_dict['ai'].append(msg.content)

    print("AI:", output_dict['ai'][-1])

user : what is this video about?


  retrieved_chunks = retriever.get_relevant_documents(user_input)


AI: This video is about the entire road map to learn AI in 2025, including free resources, videos, and materials.
Timestamp: 9.679
user : when to learn Modern AI
AI: Nowadays, I usually prefer the modern route for learning AI.
Timestamp: 413.44
user : how my days does it take to learn Data Science
AI: Sorry, I didn’t talk about that in this video.
Timestamp: 0.0
user : how to many months does it take to learn Data Science
AI: It takes four months to learn data science, NLP, and computer vision.
Timestamp: 823.36
user : how many days does it takes to learn Data Science
AI: It takes four months to learn data science, NLP, and computer vision.
Timestamp: 823.36
user : what are the different Routes present in this Model
AI: There are three different routes: the traditional route, the modern route, and the advanced route.
Timestamp: 135.84
user : what is Advancd Route
AI: The advanced route is for really efficient people who are already in the technical domain and can quickly learn things. 

In [12]:
context

"definitely (123.36) be able to do amazing things over here (125.36) so quickly I will go ahead and share my (127.6) screen so here you can see that I have (129.599) written this amazing road map to learn (131.52) AI in 2025 (133.76) and here I have you know drawn three (135.84) different routes one is the traditional (139.36) route (142.56) The second one is the modern route and (143.84) the third one is something called as an (146.64) advanced route. Now why do we actually (148.64) require this routes? I will discuss (152.56) about this in a much more detailed (154.8) manner. So let's say uh as I said this (156.48) road map actually incorporates for (160.08) everyone whether you are a fresher (162.8) whether you are an experienced (166.0) professional (167.68) whether you are a leader you are in a (169.76) leadership position and whether you are (172.56) a person who is also coming from a (175.519) complete non-technical background. Okay, (177.92) nontechnical background basically\n\

In [None]:
print(output_dict['ai'][-1])

This video is about AI and geometry, and how a non-AI model was already better at geometry than most humans.
Timestamp: 80.44


In [None]:
result['messages']

[HumanMessage(content='what is this Video About', additional_kwargs={}, response_metadata={}),
 AIMessage(content='This video is about AI and geometry, and how a non-AI model was already better at geometry than most humans.\nTimestamp: 80.44', additional_kwargs={}, response_metadata={})]

# Rag using System Message and MemorySaver

In [1]:

from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.messages import SystemMessage , HumanMessage , AIMessage
from youtube_transcript_api import YouTubeTranscriptApi
import re      
from langchain_google_genai import ChatGoogleGenerativeAI            
from dotenv import load_dotenv
load_dotenv()

from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from pydantic import BaseModel, Field
from typing import TypedDict, Annotated
import re
import os
from langchain.prompts import PromptTemplate          
# _-----------------------------------------------------FUNCTIONS FOR RAG----------------------------------------------

# ------------------ Transcript Loader ------------------
def load_transcript(url: str) -> str | None:
    pattern = r'(?:v=|\/)([0-9A-Za-z_-]{11})'
    match = re.search(pattern, url)
    if match:
        video_id = match.group(1)
        try:
            captions = YouTubeTranscriptApi().fetch(video_id).snippets
            # join text + start_time
            data = [f"{item.text} ({item.start})" for item in captions]
            return " ".join(data)
        except Exception as e:
            print(f"Error fetching transcript: {e}")
            return None

# ------------------ Text Splitter ------------------
def text_splitter(transcript):
    splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
    return splitter.create_documents([transcript])

# ------------------ Vector Store & Retriever  ------------------
def generate_embeddings(chunks):
    embeddings = HuggingFaceEmbeddings(model_name='sentence-transformers/all-MiniLM-L6-v2')
    return FAISS.from_documents(chunks, embeddings)

def retriever_docs(vector_store):
    return vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 5})

def format_docs(retrieved_docs):
    return "\n\n".join(doc.page_content for doc in retrieved_docs)

import os

def save_embeddings_faiss(thread_id: str, transcript: str):
    # 1. Split transcript
    chunks = text_splitter(transcript)

    # 2. Build embeddings + FAISS store
    vector_store = generate_embeddings(chunks)

    # 3. Save FAISS index
    save_dir = f"faiss_indexes/{thread_id}"
    os.makedirs("faiss_indexes", exist_ok=True)
    vector_store.save_local(save_dir)

    print(f"✅ Embeddings for {thread_id} saved at {save_dir}")
    
def load_embeddings_faiss(thread_id: str):
    embeddings = HuggingFaceEmbeddings(model_name='sentence-transformers/all-MiniLM-L6-v2')
    save_dir = f"faiss_indexes/{thread_id}"

    if not os.path.exists(save_dir):
        raise ValueError(f"No FAISS index found for thread_id: {thread_id}")

    vector_store = FAISS.load_local(save_dir, embeddings, allow_dangerous_deserialization=True)
    retriever = retriever_docs(vector_store)
    return retriever


In [2]:
from pydantic import BaseModel , Field

# ------------------ Build LLM (Gemini) ------------------
model = ChatGoogleGenerativeAI(model="gemini-1.5-flash", temperature=0)

# ------------------ System Message ------------------
system_message = SystemMessage(content="""
You are the YouTuber from the video, directly answering the viewer’s question.

Rules:
1. ONLY use the transcript provided below.
2. Give the answer in clear, simple bullet points (not paragraphs).
3. Each bullet must include the exact timestamp (in seconds) from the transcript line used.
   - Do NOT round or estimate timestamps.
   - If multiple transcript parts are relevant, use separate bullets.
4. Do NOT add greetings, filler, or extra commentary.
5. If the transcript does not answer, say:
   - "Sorry, I didn’t talk about that in this video."
6. Greet only if the viewer greets first.
7. Always remember the viewer’s question when structuring the answer.
""")

# ------------------ Structured Schema ------------------
class AnsandTime(BaseModel):
    answer: list[str] = Field(description="Answers to user's question (no timestamps here)")
    timestamps: float = Field(description="The time (in seconds) from where the answer was taken")

structured_model = model.with_structured_output(AnsandTime)

# ------------------ Chat State ------------------
class ChatState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]

# ------------------ Chat Node ------------------
def chat_node(state: ChatState):
    user_question = state['messages'][-1].content
    
    # get context here
    retrieved_chunks = retriever.get_relevant_documents(user_question)
    context = format_docs(retrieved_chunks)

    # build messages
    messages = [
        system_message,  # rules
        SystemMessage(content=f"Transcript:\n{context}"),  # context for model only
        HumanMessage(content=user_question)  # clean user input
    ]

    response = structured_model.invoke(messages)
    ai_text = f"{' '.join(response.answer)}\nTimestamp: {response.timestamps}"

    return {
        "messages": [
            state['messages'][-1],       # store user only
            AIMessage(content=ai_text)   # store ai only
        ]
    }


In [3]:
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
# ------------------  Checkpointer ------------------
conn = sqlite3.connect(database="ragDatabase.db", check_same_thread=False)
checkpointer = SqliteSaver(conn=conn)

# ------------------ Build Graph ------------------
graph = StateGraph(ChatState)
graph.add_node("chat_node", chat_node)
graph.add_edge(START, "chat_node")
graph.add_edge("chat_node", END)
chatbot = graph.compile(checkpointer=checkpointer)
# ------------------ Example Run ------------------
thread_id = "thread5519"

CONFIG = {"configurable": {"thread_id": thread_id}}

In [4]:
# ------------------ Load YouTube Transcript ------------------
youtube_input = "https://www.youtube.com/watch?v=s3KnSb9b4Pk&t"
youtube_captions = load_transcript(youtube_input)

save_embeddings_faiss(thread_id=thread_id , transcript= youtube_captions)

  from .autonotebook import tqdm as notebook_tqdm


✅ Embeddings for thread5519 saved at faiss_indexes/thread5519


In [5]:
retriever = load_embeddings_faiss(thread_id=thread_id)
retrieved_chunks = retriever.get_relevant_documents("how many days months does it takes to learn DataScience")
context = format_docs(retrieved_chunks)
context

  retrieved_chunks = retriever.get_relevant_documents("how many days months does it takes to learn DataScience")


"science I would basically say (211.36) okay go ahead and first learn data (213.92) science first. Okay so here I will go (216.4) ahead and write here you have to learn (218.239) data science first. Now what you really (221.68) need to learn in data science. Okay here (224.959) you need to master some amazing things (227.76) like what do you need to master? you (229.84) need to master (232.08) DS that is data science, ML, (233.92) CV, (238.56) NLP. Okay. So here you need to master (240.159) data science, machine learning, computer (243.36) vision, NLP, right? And in all the (244.959) specific topics you should be able to (247.76) understand that stats and linear algebra (250.319) is included a part of it. So in short, (252.959) you're mastering all these technologies (254.959) like data science, machine learning, CV (257.759) and LP. And whenever I'm talking about (259.6) mastering, I'm talking with respect to (261.519) development of a end to-end project. (263.759) Okay. So first of\n

In [6]:
retriever = load_embeddings_faiss(thread_id=thread_id)
while True : 
    user_input = input("User : ")
    if user_input == 'exit':
        break
    print("user :", user_input)
    result = chatbot.invoke(
            {'messages': [HumanMessage(content=user_input)]},
            config=CONFIG,
        )
    print("AI :" ,result['messages'][-1].content)


user : what is this video abot


Retrying langchain_google_genai.chat_models._chat_with_retry.<locals>._chat_with_retry in 2.0 seconds as it raised ResourceExhausted: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. [violations {
  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerDayPerProjectPerModel-FreeTier"
  quota_dimensions {
    key: "model"
    value: "gemini-1.5-flash"
  }
  quota_dimensions {
    key: "location"
    value: "global"
  }
  quota_value: 50
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 16
}
].


AI : This video is about the roadmap to learn AI in 2025. It includes free resources, videos, and materials. The video covers generative AI and agentic AI applications. It provides a roadmap for learning AI within 8 months, even for those with 10+ years of experience and no coding background. The roadmap includes three stages with specific paths and free resources.
Timestamp: 2.56


In [7]:
conn = sqlite3.connect(database="ragDatabase.db", check_same_thread=False)
checkpointer = SqliteSaver(conn=conn)
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT thread_id FROM checkpoints")
threads = [row[0] for row in cursor.fetchall()]
print(threads)

['thread1']


In [8]:
def load_conversation(chatbot , thread_id  ):
    return chatbot.get_state(config={'configurable': {'thread_id': thread_id}}).values['messages']
load_conversation(chatbot=chatbot , thread_id = "thread1")

[HumanMessage(content='what is this video about?', additional_kwargs={}, response_metadata={}, id='d12e1858-e6eb-4f3b-b3fb-661d8c2e5bdf'),
 AIMessage(content="This video is about the roadmap to learn AI in 2025. It provides free resources, videos, and materials. The video covers generative AI and agentic AI applications. It's designed for various backgrounds, including developers, program managers, and leadership positions. A three-stage roadmap is presented, with specific details for each stage. The roadmap includes machine learning, deep learning, NLP, and MLOps. It also covers tools like CircleCI, AI graphana, airflow, bento ML, AWS sagemaker, DVC, and dockers.\nTimestamp: 2.56", additional_kwargs={}, response_metadata={}, id='890cad82-a47b-4422-a7d9-3bdb74069839'),
 HumanMessage(content='when to learn DataScience', additional_kwargs={}, response_metadata={}, id='67d9860d-bf88-44bc-a73a-38ab37fc57a7'),
 AIMessage(content='Learn data science first To master data science, you need to 

In [None]:
import os
import shutil
def clear_faiss_indexes(base_dir: str = "faiss_indexes"):
    """
    Deletes all files and subfolders inside faiss_indexes,
    but keeps the faiss_indexes folder itself.
    """
    if os.path.exists(base_dir):
        for item in os.listdir(base_dir):
            item_path = os.path.join(base_dir, item)
            if os.path.isfile(item_path):
                os.remove(item_path)       # delete file
            elif os.path.isdir(item_path):
                shutil.rmtree(item_path)   # delete folder
        print(f"✅ Cleared all contents inside: {base_dir}")
    else:
        print(f"⚠️ Base folder not found: {base_dir}")

# Improving Rag Accuracy

In [4]:
from youtube_transcript_api import YouTubeTranscriptApi
import re
def load_transcript(url: str) -> str | None:
    pattern = r'(?:v=|\/)([0-9A-Za-z_-]{11})'
    match = re.search(pattern, url)
    if match:
        video_id = match.group(1)
        try:
            captions = YouTubeTranscriptApi().fetch(video_id,languages=['en','hi']).snippets
            # join text + start_time
            data = [f"{item.text} ({item.start})" for item in captions]
            return " ".join(data)
        except Exception as e:
            print(f"Error fetching transcript: {e}")
            return None
        
youtube_captions = load_transcript("https://www.youtube.com/watch?v=ddq8JIMhz7c")
youtube_captions

"welcome to the huberman Lab podcast (0.32) where we discuss science and (2.159) science-based tools for everyday (3.8) [Music] (5.65) life I'm Andrew huberman and I'm a (8.16) professor of neurobiology and (10.599) Opthalmology at Stanford school of (12.36) medicine today we are discussing how to (14.4) study and learn that is what the (17.16) scientific data say is the best way to (19.32) study in order to remember information (22.08) and to be able to use that information (24.48) effectively in different areas of your (26.439) life so for those of you that are still (28.599) in school this could be any stage of (30.32) school today's discussion will be very (32.399) useful for you however even if you are (34.64) not formally enrolled in any kind of (37.239) school at the moment today's discussion (38.76) will also be extremely effective for you (41.12) to be able to study and learn better (43.76) information from say the internet or (46.44) podcasts or any area of your life where (4

In [None]:
from youtube_transcript_api import YouTubeTranscriptApi
import re
from typing import List, Dict, Optional
from typing import List, Dict, Iterable, Optional, Tuple
import numpy as np

# Embedding libs
from sentence_transformers import SentenceTransformer

# FAISS
import faiss

# Chroma (optional)
import chromadb
from chromadb.config import Settings
def get_clean_transcript(url: str, languages: List[str] = ["en", "hi"]) -> Optional[List[Dict]]:
    # Extract video_id from URL
    pattern = r'(?:v=|\/)([0-9A-Za-z_-]{11})'
    match = re.search(pattern, url)
    if not match:
        print("Invalid YouTube URL.")
        return None

    video_id = match.group(1)

    try:
        # Fetch transcript snippets
        snippets = YouTubeTranscriptApi().fetch(video_id, languages=languages).snippets
    except Exception as e:
        print(f"Error fetching transcript: {e}")
        return None

    # Cleaning step
    cleaned = []
    filler_pattern = re.compile(r"\b(uh|um|you know|like)\b", re.IGNORECASE)

    for s in snippets:
        text = s.text
        text = re.sub(r"\[.*?\]", "", text)      # remove [Music], [Applause], etc.
        text = filler_pattern.sub("", text)      # remove filler words
        text = re.sub(r"\s+", " ", text).strip() # clean spaces

        if text:
            cleaned.append({
                "text": text,
                "start": float(s.start),
                "end": float(s.start) + float(s.duration)
            })
    
    return cleaned

def merge_windows(transcript: List[Dict], window: int = 15) -> List[Dict]:
    """
    Merges transcript into time windows.
    Args:
        transcript (List[Dict]): [{"text": str, "start": float, "end": float}]
        window (int): Window size in seconds.
    Returns:
        List[Dict]: [{"text": str, "start": float, "end": float}]
    """
    if not transcript:
        return []

    merged = []
    current_chunk = {
        "text": transcript[0]["text"],
        "start": transcript[0]["start"],
        "end": transcript[0]["end"]
    }

    for entry in transcript[1:]:
        if entry["start"] - current_chunk["start"] <= window:
            current_chunk["text"] += " " + entry["text"]
            current_chunk["end"] = entry["end"]
        else:
            merged.append(current_chunk)
            current_chunk = {"text": entry["text"], "start": entry["start"], "end": entry["end"]}
    
    merged.append(current_chunk)
    return merged

import tiktoken

def chunk_transcript(transcript: List[Dict], chunk_size: int = 200, overlap: int = 30) -> List[Dict]:
    enc = tiktoken.get_encoding("cl100k_base")  

    chunks = []
    buffer, buffer_tokens = [], 0
    chunk_start, chunk_end = None, None

    for entry in transcript:
        tokens = enc.encode(entry["text"])
        token_len = len(tokens)

        if buffer_tokens + token_len > chunk_size and buffer:
            chunk_text = " ".join([b["text"] for b in buffer])
            chunks.append({"text": chunk_text, "start": chunk_start, "end": chunk_end})

            # Start new chunk with overlap
            overlap_tokens = enc.encode(buffer[-1]["text"])[-overlap:] if buffer else []
            overlap_text = enc.decode(overlap_tokens) if overlap_tokens else ""
            buffer = [{"text": overlap_text, "start": entry["start"], "end": entry["end"]}]
            buffer_tokens = len(enc.encode(overlap_text))
            chunk_start, chunk_end = entry["start"], entry["end"]
        else:
            if not buffer:
                chunk_start = entry["start"]
            buffer.append(entry)
            buffer_tokens += token_len
            chunk_end = entry["end"]

    if buffer:
        chunk_text = " ".join([b["text"] for b in buffer])
        chunks.append({"text": chunk_text, "start": chunk_start, "end": chunk_end})

    return chunks



In [None]:
captions = get_clean_transcript("https://www.youtube.com/watch?v=ddq8JIMhz7c&t=558s")
merged_transcript = merge_windows(captions)
chunks = chunk_transcript(merged_transcript)


{'text': " Incredibly actionable it's incredibly interesting how the fields of Education the fields of psychology and the fields of Neuroscience have now come together to define the optimal theme I'd to thank the sponsors of today's podcast our first sponsor is eight sleep eight sleep makes Smart mattress covers with cooling Heating and sleep tracking capacity I've spoken many times before on this podcast about the critical need to get sleep both enough sleep and enough quality sleep now one of the key things to getting a great night's sleep is that your body temperature actually has to drop by about 1 to 3° in order for you to fall and stay deeply asleep and to wake up feeling refreshed your body temperature actually has to increase by about 1 to 3° one of the best ways to ensure all of",
 'start': 128.64,
 'end': 179.36}

In [None]:
"""parts = []
for chunk in chunks:
    parts.append(f"{chunk['text']} {chunk['start']} {chunk['end']}")
parts"""

["welcome to the huberman Lab podcast where we discuss science and science-based tools for everyday life I'm Andrew huberman and I'm a professor of neurobiology and Opthalmology at Stanford school of medicine today we are discussing how to study and learn that is what the scientific data say is the best way to study in order to remember information and to be able to use that information effectively in different areas of your life so for those of you that are still in school this could be any stage of school today's discussion will be very useful for you however even if you are not formally enrolled in any kind of school at the moment today's discussion will also be extremely effective for you to be able to study and learn better information from say the internet or podcasts or any area of your life where you are seeking to learn and use new knowledge now one of the most important things that you're going to learn today is that learning that is the best learning practices are not intuit

[{'text': "welcome to the huberman Lab podcast where we discuss science and science-based tools for everyday life I'm Andrew huberman and I'm a professor of neurobiology and Opthalmology at Stanford school of medicine today we are discussing how to study and learn that is what the scientific data say is the best way to study in order to remember information and to be able to use that information effectively in different areas of your life so for those of you that are still in school this could be any stage of school today's discussion will be very useful for you however even if you are not formally enrolled in any kind of school at the moment today's discussion will also be extremely effective for you to be able to study and learn better information from say the internet or podcasts or any area of your life where you are seeking to learn and use new knowledge now one of the most important things that you're going to learn today is that learning that is the best learning practices are n

In [None]:
{'text': "welcome to the huberman Lab podcast where we discuss science and science-based tools for everyday life I'm Andrew huberman and I'm a professor of neurobiology and Opthalmology at Stanford school of medicine today we are discussing how to study and learn that is what the scientific data say is the best way to study in order to remember information and to be able to use that information effectively in different areas of your life so for those of you that are still in school this could be any stage of school today's discussion will be very useful for you however even if you are not formally enrolled in any kind of school at the moment today's discussion will also be extremely effective for you to be able to study and learn better information from say the internet or podcasts or any area of your life where you are seeking to learn and use new knowledge now one of the most important things that you're going to learn today is that learning that is the best learning practices are not intuitive so", 'start': 0.32, 'end': 67.43900000000001}
{'text': " use new knowledge now one of the most important things that you're going to learn today is that learning that is the best learning practices are not intuitive so student for many years and in some sense still considers himself a student of science and health information because of this podcast and certainly somebody who still teaches University courses both to medical students and graduate students and to undergraduate students at Stanford I thought I understood the whole teaching and learning process but I too learned that it is anything but intuitive in fact most of what we believe about the best ways to study are absolutely false fortunately today you will learn the best ways to study turns out there's a rich literature on this dating back well over a hundred years and the data are absolutely fascinating and Incredibly actionable it's incredibly interesting how the fields of Education the fields of psychology and the fields of Neuroscience have now come together to define the optimal", 'start': 64.839, 'end': 131.84}

# Claude Code 

In [None]:
import os
import re
import json
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime
import logging

# Core libraries
import numpy as np
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.formatters import TextFormatter

# LangChain imports
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS, Chroma
from langchain.llms import OpenAI
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate, PromptTemplate
from langchain.schema import Document
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from langchain.chains import RetrievalQA
from langchain.callbacks import StdOutCallbackHandler
from langchain.memory import ConversationBufferMemory

# Additional dependencies
import faiss
from sentence_transformers import SentenceTransformer
import chromadb

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class TranscriptChunk:
    """Represents a chunk of transcript with metadata"""
    text: str
    video_id: str
    start_time: float
    end_time: float
    chunk_id: str
    
class YouTubeTranscriptProcessor:
    """Handles YouTube transcript fetching and cleaning"""
    
    def __init__(self):
        self.formatter = TextFormatter()
        
    def extract_video_id(self, url: str) -> str:
        """Extract video ID from YouTube URL"""
        patterns = [
            r'(?:v=|\/)([0-9A-Za-z_-]{11}).*',
            r'(?:embed\/)([0-9A-Za-z_-]{11})',
            r'(?:youtu\.be\/)([0-9A-Za-z_-]{11})'
        ]
        
        for pattern in patterns:
            match = re.search(pattern, url)
            if match:
                return match.group(1)
        
        raise ValueError(f"Could not extract video ID from URL: {url}")
    
    def get_transcript(self, video_id: str) -> List[Dict]:
        """Fetch raw transcript from YouTube"""
        try:
            ytt_api = YouTubeTranscriptApi()
            captions = ytt_api.fetch(video_id).snippets
            transcript = list(captions)  # Convert to list if it's an iterator
            logger.info(f"Retrieved transcript for video {video_id}: {len(transcript)} segments")
            return transcript
        except Exception as e:
            logger.error(f"Error fetching transcript for {video_id}: {e}")
            raise
    
    def clean_text(self, text: str) -> str:
        """Clean transcript text"""
        # Remove common transcript artifacts
        text = re.sub(r'\[Music\]|\[Applause\]|\[Laughter\]', '', text)
        
        # Remove excessive filler words
        fillers = r'\b(?:uh|um|you know|like|so|well|actually|basically)\b'
        text = re.sub(fillers, '', text, flags=re.IGNORECASE)
        
        # Clean up whitespace
        text = re.sub(r'\s+', ' ', text).strip()
        
        # Remove very short segments
        if len(text.split()) < 3:
            return ""
            
        return text
    
    def merge_segments(self, segments: List[Dict], window_seconds: int = 15) -> List[Dict]:
        """Merge transcript segments into larger time windows"""
        if not segments:
            return []
            
        merged = []
        current_text = []
        current_start = segments[0]['start']
        current_end = segments[0]['start']
        
        for segment in segments:
            segment_text = self.clean_text(segment['text'])
            if not segment_text:
                continue
                
            # Check if we should start a new window
            if segment['start'] - current_start >= window_seconds:
                if current_text:
                    merged.append({
                        'text': ' '.join(current_text),
                        'start': current_start,
                        'duration': current_end - current_start
                    })
                current_text = [segment_text]
                current_start = segment['start']
                current_end = segment['start'] + segment['duration']
            else:
                current_text.append(segment_text)
                current_end = segment['start'] + segment['duration']
        
        # Add the last window
        if current_text:
            merged.append({
                'text': ' '.join(current_text),
                'start': current_start,
                'duration': current_end - current_start
            })
            
        return merged

class SmartChunker:
    """Handles intelligent chunking of transcript data"""
    
    def __init__(self, chunk_size: int = 250, chunk_overlap: int = 25):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=[". ", "! ", "? ", "\n", " "]
        )
    
    def create_chunks(self, segments: List[Dict], video_id: str) -> List[TranscriptChunk]:
        """Create smart chunks from transcript segments"""
        chunks = []
        
        for i, segment in enumerate(segments):
            text = segment['text']
            start_time = segment['start']
            end_time = start_time + segment['duration']
            
            # Split text if it's too long
            if len(text.split()) > self.chunk_size // 4:  # Rough token estimation
                text_chunks = self.splitter.split_text(text)
                chunk_duration = segment['duration'] / len(text_chunks)
                
                for j, chunk_text in enumerate(text_chunks):
                    chunk_start = start_time + (j * chunk_duration)
                    chunk_end = chunk_start + chunk_duration
                    
                    chunk = TranscriptChunk(
                        text=f"At {self.format_timestamp(chunk_start)}: {chunk_text}",
                        video_id=video_id,
                        start_time=chunk_start,
                        end_time=chunk_end,
                        chunk_id=f"{video_id}_{i}_{j}"
                    )
                    chunks.append(chunk)
            else:
                chunk = TranscriptChunk(
                    text=f"At {self.format_timestamp(start_time)}: {text}",
                    video_id=video_id,
                    start_time=start_time,
                    end_time=end_time,
                    chunk_id=f"{video_id}_{i}"
                )
                chunks.append(chunk)
        
        return chunks
    
    def format_timestamp(self, seconds: float) -> str:
        """Format seconds into MM:SS format"""
        minutes = int(seconds // 60)
        seconds = int(seconds % 60)
        return f"{minutes:02d}:{seconds:02d}"

class HybridRetriever:
    """Combines semantic and keyword search for better retrieval"""
    
    def __init__(self, embedding_model: str = "all-MiniLM-L6-v2"):
        self.embeddings = HuggingFaceEmbeddings(
            model_name=embedding_model,
            model_kwargs={'device': 'cpu'},
            encode_kwargs={'normalize_embeddings': True}
        )
        self.vector_store = None
        self.bm25_retriever = None
        self.ensemble_retriever = None
        
    def build_index(self, chunks: List[TranscriptChunk]):
        """Build vector index and BM25 index"""
        # Create documents for LangChain
        documents = []
        for chunk in chunks:
            doc = Document(
                page_content=chunk.text,
                metadata={
                    "video_id": chunk.video_id,
                    "start_time": chunk.start_time,
                    "end_time": chunk.end_time,
                    "chunk_id": chunk.chunk_id,
                    "timestamp": self.format_timestamp(chunk.start_time)
                }
            )
            documents.append(doc)
        
        # Build FAISS vector store
        self.vector_store = FAISS.from_documents(documents, self.embeddings)
        
        # Build BM25 retriever
        self.bm25_retriever = BM25Retriever.from_documents(documents)
        self.bm25_retriever.k = 5
        
        # Create ensemble retriever
        self.ensemble_retriever = EnsembleRetriever(
            retrievers=[self.vector_store.as_retriever(search_kwargs={"k": 5}), 
                       self.bm25_retriever],
            weights=[0.7, 0.3]  # Favor semantic search slightly
        )
        
        logger.info(f"Built index with {len(documents)} documents")
    
    def retrieve_with_time_filter(self, query: str, time_range: Optional[Tuple[float, float]] = None, k: int = 5) -> List[Document]:
        """Retrieve documents with optional time filtering"""
        # Get initial results
        docs = self.ensemble_retriever.get_relevant_documents(query)
        
        # Apply time filter if specified
        if time_range:
            start_time, end_time = time_range
            filtered_docs = []
            for doc in docs:
                doc_start = doc.metadata.get('start_time', 0)
                doc_end = doc.metadata.get('end_time', 0)
                
                # Check for overlap with time range
                if (doc_start <= end_time and doc_end >= start_time):
                    filtered_docs.append(doc)
            
            docs = filtered_docs[:k]
        
        return docs[:k]
    
    def format_timestamp(self, seconds: float) -> str:
        """Format seconds into MM:SS format"""
        minutes = int(seconds // 60)
        seconds = int(seconds % 60)
        return f"{minutes:02d}:{seconds:02d}"

class QuestionRewriter:
    """Expands and improves user queries"""
    
    def __init__(self, llm):
        self.llm = llm
        self.rewrite_template = PromptTemplate(
            input_variables=["query"],
            template="""
            Rewrite the following query to be more specific and searchable for a YouTube video transcript:
            
            Original query: {query}
            
            Rewritten query (expand abbreviations, add context, make it more descriptive):
            """
        )
    
    def rewrite_query(self, query: str) -> str:
        """Rewrite user query for better retrieval"""
        try:
            chain = self.rewrite_template | self.llm
            result = chain.invoke({"query": query})
            return result.strip()
        except Exception as e:
            logger.warning(f"Query rewriting failed: {e}")
            return query

class YouTubeChatbot:
    """Main chatbot class that orchestrates the RAG pipeline"""
    
    def __init__(self, openai_api_key: str):
        os.environ["OPENAI_API_KEY"] = openai_api_key
        
        self.transcript_processor = YouTubeTranscriptProcessor()
        self.chunker = SmartChunker()
        self.retriever = HybridRetriever()
        
        # Initialize LLM
        self.llm = ChatOpenAI(
            model="gpt-3.5-turbo",
            temperature=0,
            max_tokens=500
        )
        
        self.query_rewriter = QuestionRewriter(self.llm)
        self.memory = ConversationBufferMemory(return_messages=True)
        self.video_metadata = {}
        
        # Response template
        self.response_template = ChatPromptTemplate.from_template("""
        You are a helpful YouTube video assistant. Answer the user's question based ONLY on the provided transcript excerpts.

        Rules:
        1. Answer only from the transcript content provided
        2. Include timestamps in your response
        3. If the answer isn't in the transcript, say "Sorry, this topic wasn't covered in this video"
        4. Format your response with bullet points and timestamps
        5. Be conversational and helpful

        Transcript excerpts:
        {context}

        Question: {question}

        Answer with timestamps:
        """)
    
    def process_video(self, video_url: str):
        """Process a YouTube video and build the index"""
        try:
            # Extract video ID
            video_id = self.transcript_processor.extract_video_id(video_url)
            
            # Get and process transcript
            raw_transcript = self.transcript_processor.get_transcript(video_id)
            merged_segments = self.transcript_processor.merge_segments(raw_transcript)
            
            # Create chunks
            chunks = self.chunker.create_chunks(merged_segments, video_id)
            
            # Build retrieval index
            self.retriever.build_index(chunks)
            
            # Store metadata
            self.video_metadata[video_id] = {
                'url': video_url,
                'processed_at': datetime.now().isoformat(),
                'total_chunks': len(chunks),
                'duration': max(chunk.end_time for chunk in chunks)
            }
            
            logger.info(f"Successfully processed video {video_id}")
            return True
            
        except Exception as e:
            logger.error(f"Error processing video {video_url}: {e}")
            return False
    
    def parse_time_reference(self, query: str) -> Optional[Tuple[float, float]]:
        """Parse time references in user query"""
        # Look for patterns like "at 5:30", "around 10 minutes", "between 2:00 and 3:00"
        time_patterns = [
            r'at (\d{1,2}):(\d{2})',
            r'around (\d+) minutes?',
            r'(\d{1,2}):(\d{2})',
        ]
        
        for pattern in time_patterns:
            match = re.search(pattern, query.lower())
            if match:
                if ':' in pattern:
                    minutes = int(match.group(1))
                    seconds = int(match.group(2))
                    timestamp = minutes * 60 + seconds
                    return (timestamp - 30, timestamp + 30)  # ±30 second window
                else:
                    minutes = int(match.group(1))
                    timestamp = minutes * 60
                    return (timestamp - 60, timestamp + 60)  # ±1 minute window
        
        return None
    
    def answer_question(self, question: str) -> str:
        """Answer user question using RAG"""
        try:
            # Rewrite query for better retrieval
            enhanced_query = self.query_rewriter.rewrite_query(question)
            
            # Parse time references
            time_range = self.parse_time_reference(question)
            
            # Retrieve relevant documents
            relevant_docs = self.retriever.retrieve_with_time_filter(
                enhanced_query, 
                time_range=time_range,
                k=5
            )
            
            if not relevant_docs:
                return "Sorry, I couldn't find any relevant information in the video transcript."
            
            # Prepare context
            context = "\n".join([doc.page_content for doc in relevant_docs])
            
            # Generate response
            chain = self.response_template | self.llm
            response = chain.invoke({
                "context": context,
                "question": question
            })
            
            return response.content
            
        except Exception as e:
            logger.error(f"Error answering question: {e}")
            return "Sorry, I encountered an error while processing your question."
    
    def get_video_stats(self) -> Dict:
        """Get statistics about processed videos"""
        return {
            'total_videos': len(self.video_metadata),
            'videos': self.video_metadata
        }

# Usage Example and Demo
class YouTubeChatbotDemo:
    """Demo class showing how to use the YouTube RAG chatbot"""
    
    def __init__(self, openai_api_key: str):
        self.chatbot = YouTubeChatbot(openai_api_key)
    
    def run_demo(self):
        """Run an interactive demo"""
        print("🎥 YouTube RAG Chatbot Demo")
        print("=" * 50)
        
        # Example video URL (replace with actual video)
        video_url = input("Enter YouTube video URL: ")
        
        print("\n📚 Processing video transcript...")
        if not self.chatbot.process_video(video_url):
            print("❌ Failed to process video")
            return
        
        print("✅ Video processed successfully!")
        print("\n💬 You can now ask questions about the video")
        print("Examples:")
        print("- What did they say about machine learning?")
        print("- Explain the concept discussed around 5:30")
        print("- What was covered in the first 10 minutes?")
        print("\nType 'quit' to exit\n")
        
        while True:
            question = input("❓ Your question: ")
            if question.lower() in ['quit', 'exit', 'q']:
                break
            
            print("\n🤔 Thinking...")
            answer = self.chatbot.answer_question(question)
            print(f"\n🤖 Answer: {answer}\n")
            print("-" * 50)
        
        # Show stats
        stats = self.chatbot.get_video_stats()
        print(f"\n📊 Session Stats: {stats}")

# Main execution
if __name__ == "__main__":

    

INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: all-MiniLM-L6-v2
ERROR:__main__:Error processing video https://www.youtube.com/watch?v=ddq8JIMhz7c&t=558s: 'YouTubeTranscriptProcessor' object has no attribute 'fetch'
ERROR:__main__:Error answering question: 'NoneType' object has no attribute 'get_relevant_documents'
INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: all-MiniLM-L6-v2


Sorry, I encountered an error while processing your question.
🎥 YouTube RAG Chatbot Demo


ERROR:__main__:Error processing video https://www.youtube.com/watch?v=ddq8JIMhz7c&t=558s: 'YouTubeTranscriptProcessor' object has no attribute 'fetch'



📚 Processing video transcript...
❌ Failed to process video
