In [2]:
!pip install -q langchain langchain-community langchain-openai youtube-transcript-api faiss-cpu gradio tiktoken python-dotenv

In [3]:

import re
from typing import List
from urllib.parse import urlparse, parse_qs

In [4]:
from youtube_transcript_api import YouTubeTranscriptApi
from langchain_core.documents import Document
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

### Ingest Module

In [5]:
def extract_video_url(youtube_url):
  if "youtu.be" in youtube_url:
    return youtube_url.split("/")[-1].split("?")[0]

    parsed = urlparse(youtube_url)

    if parsed.query:
      qs = parse_qs(parsed.query)
      if "v" in qs:
        return qs["v"][0]

    match = re.searchr(r"/(embed|shorts)/([^/?]+)", parsed.path)

    if match:
      return match.group(2)

    raise ValueError("INVALID YOUTUBE URL")

### Fetch Transcript

In [6]:
def fetch_transcript(youtube_url):
  """
  Fetch raw transcript segment for a Youtube Video.

  Returns:
    List of dict with keys: text, start, duration
  """

  video_id = extract_video_url(youtube_url)

  api = YouTubeTranscriptApi()
  transcript = api.fetch(video_id)
  segments = transcript.to_raw_data()

  return segments

### Chunking Module

In [7]:
def build_chunks(segments: List[dict], max_chars: int = 500):
  """
  Each chunk is returned as a LangChain document with:
  - page_content: combined text
  - metadata: start time, end time
  """

  document = []
  buffer = []
  start_time = None

  for seg in segments:
    if start_time is None:
      start_time = seg["start"]

    buffer.append(seg["text"])

    current_length = sum(len(t) for t in buffer)

    if current_length >= max_chars:
      end_time = seg["start"] + seg["duration"]

      document.append(
          Document(
              page_content = " ".join(buffer),
              metadata= {
                  "start_time": start_time,
                  "end_time": end_time
              }
          )
      )

      buffer = []
      start_time = None

  # Handle Leftover text
  if buffer:
    end_time = segments[-1]["start"] + segments[-1]["duration"]
    document.append(
        Document(
            page_content = " ".join(buffer),
            metadata={
                "start_time": start_time,
                "end_time": end_time
            }
        )
    )

  return document

### Embedding Module

In [8]:
def get_embeddings(api_key: str):
    """
    Returns a LangChain embedding function.

    Uses OpenAI-compatible embeddings via OpenRouter.
    """
    return OpenAIEmbeddings(
        model="text-embedding-3-small",
        api_key=api_key,
        base_url="https://openrouter.ai/api/v1"
    )

### Vector store module

In [9]:
def build_vectorstore(
    documents: List[Document],
    embeddings
):
    """
    Build a FAISS vector store from documents.
    """
    vectorstore = FAISS.from_documents(
        documents=documents,
        embedding=embeddings
    )
    return vectorstore

In [10]:
def get_retriever(
    vectorstore,
    k: int = 4
):
    """
    Return a retriever for semantic search.
    """
    return vectorstore.as_retriever(
        search_kwargs={"k": k}
    )

### QA module

In [11]:
PROMPT = ChatPromptTemplate.from_template("""
You are answering questions about a YouTube video.

Use ONLY the retrieved transcript chunks below.
If the answer is not present, say: "Not found in this video."

For your answer:
1. Give a concise explanation
2. If explicitly asked for a brief explanation, give it!
3. Avoid answering in bullet points untill explicitly asked!

Transcript chunks:
{context}

Question:
{question}
""")

In [12]:
def format_docs(docs: List[Document]) -> str:
    """
    Format retrieved documents into timestamped context.
    """
    formatted = []

    for doc in docs:
        start = int(doc.metadata["start_time"])
        end = int(doc.metadata["end_time"])

        formatted.append(
            f"[{start//60:02d}:{start%60:02d} - {end//60:02d}:{end%60:02d}]\n"
            f"{doc.page_content}"
        )

    return "\n\n".join(formatted)

In [13]:
def get_llm(api_key: str):
    """
    Return a ChatOpenAI LLM via OpenRouter.
    """
    return ChatOpenAI(
        model="openai/gpt-4o-mini",
        api_key=api_key,
        base_url="https://openrouter.ai/api/v1",
        temperature=0
    )

In [14]:
def answer_question(
    question: str,
    retriever,
    api_key: str
) -> str:
    """
    Retrieve relevant chunks and generate an answer.
    """
    docs = retriever.invoke(question)

    context = format_docs(docs)

    llm = get_llm(api_key)

    messages = PROMPT.format_messages(
        context=context,
        question=question
    )

    response = llm.invoke(messages)
    return response.content

### Application

In [15]:
def yt_rag_qa(youtube_url: str, question: str, api_key: str) -> str:
    """
    Complete RAG pipeline for YouTube video QA.
    """
    try:
        # 1. Ingest
        segments = fetch_transcript(youtube_url)

        # 2. Chunk
        documents = build_chunks(segments)

        # 3. Embed
        embeddings = get_embeddings(api_key)

        # 4. Store
        vectorstore = build_vectorstore(documents, embeddings)

        # 5. Retrieve
        retriever = get_retriever(vectorstore)

        # 6 & 7. Prompt + Generate
        answer = answer_question(
            question=question,
            retriever=retriever,
            api_key=api_key
        )

        return answer

    except Exception as e:
        return f"Error: {str(e)}"


### Gradio UI

In [16]:
!pip install gradio



In [17]:
import gradio as gr

In [20]:
def launch_app():
    """
    Launch the Gradio interface for the YouTube RAG QA system.
    """
    with gr.Blocks(title="YouTube Transcript RAG QA") as demo:
        gr.Markdown("""
        # ðŸŽ¥ YouTube Transcript RAG QA

        **RAG answers from transcription only.**
        Answers are grounded in retrieved transcript chunks and include exact timestamps.

        ---

        ### How to use:
        1. Get your **OpenRouter API key** from [openrouter.ai](https://openrouter.ai)
        2. Paste a YouTube video URL
        3. Ask a question about the video content
        4. Click "Ask" to get timestamped answers
        """)

        with gr.Row():
            with gr.Column(scale=1):
                youtube_url = gr.Textbox(
                    label="YouTube Video URL",
                    placeholder="https://www.youtube.com/watch?v=..."
                )
                question = gr.Textbox(
                    label="Question",
                    placeholder="What does the speaker say about X?"
                )
                api_key = gr.Textbox(
                    label="OpenRouter API Key",
                    type="password",
                    placeholder="sk-or-..."
                )
                submit = gr.Button("Ask", variant="primary")

            with gr.Column(scale=1):
                output = gr.Textbox(
                    label="Answer (based on transcript)",
                    lines=14
                )

        submit.click(
            fn=yt_rag_qa,
            inputs=[youtube_url, question, api_key],
            outputs=output
        )

        gr.Markdown("""
        ---
        ### RAG Pipeline:
        `Ingest â†’ Chunk â†’ Embed â†’ Store â†’ Retrieve â†’ Prompt â†’ Generate`

        Built with: LangChain, OpenRouter, FAISS, YouTube Transcript API
        """)

    # For Colab, use share=True to create a public link
    demo.launch(share=True, debug=True)


In [None]:
if __name__ == "__main__":
  launch_app()

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://678196d140a6fabb77.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
