In [19]:
# Cell 1: imports and model setup
from typing import List, Any, Dict, Annotated, TypedDict, Optional
from operator import add
from pydantic import BaseModel, HttpUrl, ValidationError
import os, json, time
from urllib.parse import urlparse, parse_qs, urlunparse
import shutil

In [20]:
# external libs (these do the real work inside nodes)
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
from pytube import Playlist
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface.embeddings import HuggingFaceEmbeddings
# from langchain_community.vectorstores import Chroma
from langchain_chroma import Chroma
from sentence_transformers import CrossEncoder
from langchain_ollama.chat_models import ChatOllama
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langgraph.graph import StateGraph, START, END, add_messages, MessagesState
from langchain_core.documents import Document

In [21]:
# LLM client (adjust if your Ollama endpoint differs)
llm = ChatOllama(model="llama3.1:latest", base_url="http://localhost:11434", reasoning=False, streaming=False, request_timeout=600.0)
# Cross-encoder for rerank
cross_encoder = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-12-v2")

In [22]:
# languages you care about
COMMON_LANGUAGES = ['en', 'hi', 'es', 'zh-Hans', 'ar', 'fr', 'ru', 'pt', 'bn', 'de']

In [23]:
# ==========================================================
# üß† Cell 2 ‚Äî State schema
# ==========================================================
class VideoRAGState(TypedDict):
    video_url: str
    query: str
    video_id: str
    transcripts: Dict[str, List[Dict[str, Any]]]
    docs_with_meta: List[Dict[str, Any]]
    chunks: List[Any]
    unique_chunks: List[Any]
    vector_store: Optional[Any]
    retriever: Optional[Any]
    retrieved_docs: List[Any]
    reranked_docs: List[Any]
    context_text: str
    answer: str


In [24]:
# ==========================================================
# üßæ Utility: more robust YouTube URL cleaning (handles youtu.be)
# ==========================================================
def clean_video_url(video_url: str):
    """
    Returns (cleaned_url, video_id).
    Handles both standard and short YouTube URLs.
    """
    parsed = urlparse(video_url)
    qs = parse_qs(parsed.query)

    # Standard watch?v=
    if 'v' in qs and qs['v']:
        vid = qs['v'][0]
        cleaned = parsed._replace(query=f"v={vid}")
        return urlunparse(cleaned), vid

    # Short youtu.be/<id>
    if parsed.netloc.endswith("youtu.be"):
        vid = parsed.path.lstrip("/")
        if vid:
            cleaned = parsed._replace(query=f"v={vid}")
            return urlunparse(cleaned), vid

    raise ValueError("Invalid YouTube URL: could not extract video ID")

In [25]:
# ==========================================================
# üßæ Cell 3: transcript fetcher node (fixed)
# ==========================================================
def fetch_multilingual_transcripts_node(state: VideoRAGState):
    clean_url, vid = clean_video_url(state["video_url"])
    transcripts_data = {}

    os.makedirs("transcripts", exist_ok=True)
    for lang in COMMON_LANGUAGES:
        fpath = f"transcripts/{vid}_{lang}.json"
        if os.path.exists(fpath):
            with open(fpath, "r", encoding="utf-8") as f:
                transcripts_data[lang] = json.load(f)
            continue

        try:
            # Fixed: Use the correct API method
            api = YouTubeTranscriptApi()
            transcript_list = api.fetch(vid)
            try:
                transcript = transcript_list.find_transcript([lang])
                translated_transcript = transcript.translate(lang)
                tlist = translated_transcript.fetch()
            except:
                # Try to get any available transcript and translate
                try:
                    transcript = transcript_list.find_transcript(['en'])
                    translated_transcript = transcript.translate(lang)
                    tlist = translated_transcript.fetch()
                except:
                    continue

            structured = [
                {"start": s["start"], "duration": s.get("duration", 0), "text": s["text"].strip()}
                for s in tlist
            ]
            transcripts_data[lang] = structured
            with open(fpath, "w", encoding="utf-8") as f:
                json.dump(structured, f, ensure_ascii=False, indent=2)
        except Exception as e:
            print(f"Error fetching {lang} transcript:", e)
            continue

    print("Fetched languages:", list(transcripts_data.keys()))
    return {"video_id": vid, "transcripts": transcripts_data}

In [26]:
# ==========================================================
# üß© Cell 4 ‚Äî Build docs with metadata (fixed)
# ==========================================================
def build_docs_with_meta_node(state: VideoRAGState):
    docs = []
    for lang, snippets in state["transcripts"].items():
        for sn in snippets:
            meta = {
                "video_id": state["video_id"],
                "language": lang,
                "start_time": sn["start"],
                "end_time": sn["start"] + sn["duration"],
            }
            docs.append({"text": sn["text"].strip(), "metadata": meta})
    print("‚úÖ Prepared docs_with_meta:", len(docs))
    return {"docs_with_meta": docs}

In [27]:
# ==========================================================
# ‚úÇÔ∏è Cell 5 ‚Äî Split into chunks (FIXED: Use Document objects properly)
# ==========================================================
def split_into_chunks_node(state: VideoRAGState):
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=500, chunk_overlap=100,
        separators=["\n\n", "\n", ".", "‡•§", "ÿü", "!", "„ÄÇ", "Ôºå"]
    )
    
    all_chunks = []
    for doc in state["docs_with_meta"]:
        # Create Document objects for splitting
        doc_obj = Document(page_content=doc["text"], metadata=doc["metadata"])
        split_docs = splitter.split_documents([doc_obj])
        all_chunks.extend(split_docs)

    # Filter + deduplicate
    filtered = [c for c in all_chunks if c.page_content.strip()]
    seen, unique = set(), []
    for c in filtered:
        if c.page_content not in seen:
            seen.add(c.page_content)
            unique.append(c)

    # Convert to dict format for state
    chunk_dicts = [
        {"text": c.page_content, "metadata": c.metadata}
        for c in all_chunks
    ]
    unique_dicts = [
        {"text": c.page_content, "metadata": c.metadata}
        for c in unique
    ]

    print(f"‚úÖ Chunks: {len(chunk_dicts)}, Unique: {len(unique_dicts)}")
    return {"chunks": chunk_dicts, "unique_chunks": unique_dicts}

In [28]:
# ==========================================================
# üß† Cell 6 ‚Äî Build or load vector store (FIXED: Proper Chroma usage)
# ==========================================================
def build_vector_store_node(state: VideoRAGState):
    vid = state["video_id"]
    db_dir = f"chroma_db/{vid}"
    os.makedirs("chroma_db", exist_ok=True)

    embeddings = HuggingFaceEmbeddings(
        model_name="sentence-transformers/distiluse-base-multilingual-cased-v2",
        model_kwargs={"trust_remote_code": True},
    )

    texts = [c["text"] for c in state["unique_chunks"]]
    metadatas = [c["metadata"] for c in state["unique_chunks"]]

    if os.path.exists(db_dir) and os.listdir(db_dir):
        print(f"‚úÖ Using cached vector store for {vid}")
        vectordb = Chroma(persist_directory=db_dir, embedding_function=embeddings)
    else:
        print(f"üöÄ Creating new vector store for {vid}")
        vectordb = Chroma.from_texts(
            texts=texts,
            embedding=embeddings,
            metadatas=metadatas,
            persist_directory=db_dir,
        )

    retriever = vectordb.as_retriever(search_type="similarity", search_kwargs={"k": 10})
    return {"vector_store": vectordb, "retriever": retriever}

In [29]:
# ==========================================================
# üîç Cell 7 ‚Äî Retrieve (FIXED: Use the vector_store from state)
# ==========================================================
def retrieve_node(state: VideoRAGState):
    if "retriever" not in state or state["retriever"] is None:
        # Fallback: recreate retriever if not in state
        embeddings = HuggingFaceEmbeddings(
            model_name="sentence-transformers/distiluse-base-multilingual-cased-v2",
            model_kwargs={"trust_remote_code": True},
        )
        vectordb = Chroma(persist_directory=f"chroma_db/{state['video_id']}", embedding_function=embeddings)
        retriever = vectordb.as_retriever(search_type="similarity", search_kwargs={"k": 10})
    else:
        retriever = state["retriever"]

    docs = retriever.invoke(state["query"])
    print("‚úÖ Retrieved:", len(docs))

    # Convert Document objects ‚Üí dicts
    docs_dict = [{"text": d.page_content, "metadata": d.metadata} for d in docs]
    return {"retrieved_docs": docs_dict}

In [30]:
# ==========================================================
# üìä Cell 8 ‚Äî Rerank (keeps your cross-encoder usage)
# ==========================================================
def rerank_node(state: VideoRAGState, top_n=5):
    docs = state["retrieved_docs"]
    if not docs:
        return {"reranked_docs": []}

    pairs = [[state["query"], d["text"]] for d in docs]
    scores = cross_encoder.predict(pairs)
    ranked = sorted(zip(scores, docs), key=lambda x: x[0], reverse=True)
    top_docs = [d for _, d in ranked[:top_n]]

    print("‚úÖ Reranked top_n:", len(top_docs))
    return {"reranked_docs": top_docs}

In [31]:
# ==========================================================
# üß± Cell 9 ‚Äî Format context (FIXED: Handle empty case)
# ==========================================================
def format_context_node(state: VideoRAGState):
    if not state["reranked_docs"]:
        return {"context_text": "No relevant context found."}

    context_text = "\n\n".join(
        f"[{d['metadata'].get('start_time', 0):.2f}s - {d['metadata'].get('end_time', 0):.2f}s] {d['text']}"
        for d in state["reranked_docs"]
    )
    return {"context_text": context_text}

In [32]:
# ==========================================================
# üìù Cell 10 ‚Äî Summarize full video (FIXED: Use dict access)
# ==========================================================
def summarize_video_node(state: VideoRAGState, max_chars=30000):
    if not state["unique_chunks"]:
        return {"answer": "No content available for summarization."}
    
    all_text = " ".join([c["text"] for c in state["unique_chunks"]])[:max_chars]
    prompt_text = f"""
You are an expert video summarizer.
Summarize the following YouTube transcript clearly and in detail.

Transcript:
{all_text}
"""
    resp = llm.invoke(prompt_text)
    print("‚úÖ Full video summary generated.")
    return {"answer": resp.content}

In [33]:
# ==========================================================
# üí¨ Cell 11 ‚Äî LLM answer node (FIXED: Proper chain invocation)
# ==========================================================
prompt = PromptTemplate(
    input_variables=["context_text", "query"],
    template="Context:\n{context_text}\n\nQuestion: {query}\n\nAnswer succinctly with timestamps if possible."
)

def llm_answer_node(state: VideoRAGState):
    chain = prompt | llm | StrOutputParser()
    resp = chain.invoke({"context_text": state["context_text"], "query": state["query"]})
    print("‚úÖ LLM answer ready.")
    return {"answer": resp}

In [34]:
# ==========================================================
# üéØ Router function to decide summary vs Q&A
# ==========================================================
def route_question(state: VideoRAGState):
    """Route to summary or Q&A based on query"""
    query = state["query"].lower().strip()
    if query == "summarize" or "summary" in query:
        return "summarize"
    else:
        return "retrieve"

In [35]:
# ==========================================================
# üîó Cell 12 ‚Äî Build graph (FIXED: Proper routing)
# ==========================================================
graph = StateGraph(VideoRAGState)

graph.add_node("fetch_transcripts", fetch_multilingual_transcripts_node)
graph.add_node("build_docs", build_docs_with_meta_node)
graph.add_node("split", split_into_chunks_node)
graph.add_node("vectorize", build_vector_store_node)
graph.add_node("retrieve", retrieve_node)
graph.add_node("rerank", rerank_node)
graph.add_node("format", format_context_node)
graph.add_node("answer", llm_answer_node)
graph.add_node("summarize", summarize_video_node)

graph.add_edge(START, "fetch_transcripts")
graph.add_edge("fetch_transcripts", "build_docs")
graph.add_edge("build_docs", "split")
graph.add_edge("split", "vectorize")

# Add conditional routing after vectorize
graph.add_conditional_edges(
    "vectorize",
    route_question,
    {
        "summarize": "summarize",
        "retrieve": "retrieve",
    }
)

graph.add_edge("retrieve", "rerank")
graph.add_edge("rerank", "format")
graph.add_edge("format", "answer")
graph.add_edge("summarize", END)
graph.add_edge("answer", END)

app = graph.compile()
print("‚úÖ Graph compiled successfully!")

‚úÖ Graph compiled successfully!


In [None]:
# ==========================================================
# üöÄ Runner (FIXED: Proper state initialization)
# ==========================================================
if __name__ == "__main__":
    # Gather inputs once (avoid double prompt)
    video_url = input("Enter YouTube URL: ").strip()
    query = input("Enter your question: ").strip()

    # Build initial TypedDict state
    state: VideoRAGState = {
        "video_url": video_url,
        "query": query,
        "video_id": "",
        "transcripts": {},
        "docs_with_meta": [],
        "chunks": [],
        "unique_chunks": [],
        "vector_store": None,
        "retriever": None,
        "retrieved_docs": [],
        "reranked_docs": [],
        "context_text": "",
        "answer": ""
    }

    try:
        # Run the compiled graph
        final_state = app.invoke(state)
    except Exception as e:
        print(f"‚ùå Error: {e}")
        print("Please check the YouTube URL and try again.")

Fetched languages: ['en']
‚úÖ Prepared docs_with_meta: 1104
‚úÖ Chunks: 1104, Unique: 1103
‚úÖ Using cached vector store for bdpyQm5l78o
‚úÖ Retrieved: 10
‚úÖ Reranked top_n: 5
‚úÖ LLM answer ready.

üéØ Final Answer:
 The video discusses empathy and understanding by sharing personal stories, aiming to open up one's heart to comprehend others' perspectives. 

Possible timestamp ranges for key points:
- 1253.48s - 1258.84s: Opening hearts to understanding
- 2243.72s - 2247.64s: Understanding others
- 2231.52s - 2236.08s: Recognizing one's own understanding


In [37]:
print("\nüéØ Final Answer:\n", final_state["answer"])


üéØ Final Answer:
 The video discusses empathy and understanding by sharing personal stories, aiming to open up one's heart to comprehend others' perspectives. 

Possible timestamp ranges for key points:
- 1253.48s - 1258.84s: Opening hearts to understanding
- 2243.72s - 2247.64s: Understanding others
- 2231.52s - 2236.08s: Recognizing one's own understanding
