In [1]:
%%capture
# @title Install Prerequisites Packages

# Install yt-dlp to handle YouTube URLs
!pip install -q yt-dlp
# Install the FFmpeg Python wrapper
!pip install ffmpeg-python
# Install the command-line tool (essential for the Python wrapper)
!apt-get install -y ffmpeg

# Install whisperx
!pip3 install git+https://github.com/m-bain/whisperx.git -q
# Install Hugging Face Package
!pip3 install -u huggingface_hub -q
# Install libcudnn8
!apt-get install libcudnn8 -q

# Install ollama and PCI Utilities
!sudo apt-get install pciutils
!curl -fsSL https://ollama.com/install.sh | sh
# Install Ollama Python library
!pip install -q ollama

# Recommended installs for Knowledge Base
!pip install sentence-transformers chromadb langchain-text-splitters
!pip install duckduckgo-search beautifulsoup4 readability-lxml tldextract httpx

In [2]:
%%capture
# @title Load Libraries and Initialize Directories, ChromaDB & Embedder
import json, os, re, time, tldextract, hashlib, traceback, yt_dlp, requests, subprocess
from bs4 import BeautifulSoup
from readability import Document
from pathlib import Path
import httpx, chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
from duckduckgo_search import DDGS
from typing import List, Dict, Any, Tuple, List, Union
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

# Initialize Directories
KB_DIR, AUDIO_DIR, WHISPERX_DIR, LLM_DIR = "kb_data", "audio_data", "whisperx_data", "llm_data"
for d in [KB_DIR, AUDIO_DIR, WHISPERX_DIR, LLM_DIR]: Path(d).mkdir(parents=True, exist_ok=True)

# Initialize ChromaDB and Embedder
# loads a sentence-transformers model for multilingual embeddings
embedder = SentenceTransformer("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")

# creates a persistent Chroma database at KB_DIR. allow_reset=True can reset/overwrite DB on some setups
chroma_client = chromadb.PersistentClient(path=KB_DIR, settings=Settings(allow_reset=True))

# gets or creates a collection named "yt_kb" (vector collection) using cosine similarity
kb = chroma_client.get_or_create_collection(name="yt_kb", metadata={"hnsw:space": "cosine"})


In [None]:
# @title 1. Audio Extraction
import subprocess, os, ffmpeg

def extract_youtube_audio(youtube_url, output_dir):
  """ Downloads and processes audio from YouTube. """
  ffmpeg_executable_path = '/usr/bin/ffmpeg'

  try:
      os.makedirs(output_dir, exist_ok=True); print(f"Output directory '{output_dir}' created.")
      result = subprocess.run(["yt-dlp", "-f", "bestaudio[ext=m4a]/bestaudio", "--get-url", youtube_url], capture_output=True, text=True, check=True)
      audio_url = result.stdout.strip()

      if not audio_url: print("Failed to get direct audio stream URL."); return None
      print(f"Successfully extracted audio URL from {youtube_url}")

      audio_output = os.path.join(output_dir, 'audio.wav')
      (ffmpeg.input(audio_url).output(audio_output, vn=None).global_args('-loglevel','error').run(overwrite_output=True, cmd=ffmpeg_executable_path))
      print("Successfully converted audio to WAV format"); return audio_output

  except subprocess.CalledProcessError as e: print(f"yt-dlp failed to extract URL: {e}"); return None

In [15]:
# @title 2. Audio Transcription
from pathlib import Path
from google.colab import userdata
import subprocess

def transcribe_audio(audio_file: str, output_dir: str, hf_token: str, chunk_sizes: list[int], output_format: str = "srt"):
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    transcript_paths = {}

    for chunk_size in chunk_sizes:
        print(f"\n▶ Processing chunk size {chunk_size} sec")
        srt_file = output_dir / f"transcript_chunk{chunk_size}.srt"

        try:
            subprocess.run([
                "whisperx", "--model", "large-v2", "--device", "cuda",
                "--compute_type", "float16", "--chunk_size", str(chunk_size),
                "--diarize", "--hf_token", hf_token, "--language", "ta",
                "--align_model", "Amrrs/wav2vec2-large-xlsr-53-tamil",
                "--output_dir", str(output_dir), "--output_format", "srt", audio_file
            ], check=True)

            if (output_file := output_dir / "audio.srt").exists():
                output_file.rename(srt_file)
                print(f"Saved: {srt_file}")
                transcript_paths[chunk_size] = srt_file
            else:
                print("WhisperX output file not found.")

        except subprocess.CalledProcessError as e:
            print(f"Error for chunk size {chunk_size}: {e}")

    return transcript_paths

In [None]:
# @title Start Ollama using nohup
!nohup ollama serve > ollama.log 2>&1 &

In [None]:
# @title Pull LLM and Initiate Ollama Client
import ollama
ollama.pull('gemma3:12b') # Pull the model
llm_model = 'gemma3:12b' # Set model name for later use


In [18]:
%%capture
# @title Helper functions
from typing import List, Dict, Any
import hashlib

_id_for = lambda text: hashlib.md5(text.encode("utf-8")).hexdigest()

def embed_texts(texts: List[str]) -> List[List[float]]:
    return embedder.encode(texts, normalize_embeddings=True).tolist()

def chunk_text(text: str, max_chars: int = 500) -> List[str]:
    return [text[i:i+max_chars] for i in range(0, len(text), max_chars)]

def upsert_to_kb(texts: List[str], metadatas: List[Dict[str, Any]], chunk_size: int = 500):
    if not texts: return

    all_chunks, all_metas = [], []
    for text, meta in zip(texts, metadatas):
        chunks = chunk_text(text, chunk_size)
        all_chunks.extend(chunks)
        all_metas.extend([meta.copy() for _ in chunks])

    embeddings = embed_texts(all_chunks)

    if len(all_chunks) != len(embeddings) != len(all_metas):
        raise ValueError(f"Length mismatch: chunks={len(all_chunks)}, embeddings={len(embeddings)}, metas={len(all_metas)}")

    ids = [_id_for(f"{m.get('source_url','')}|{m.get('entity','')}|{chunk[:50]}|{i}")
           for i, (chunk, m) in enumerate(zip(all_chunks, all_metas))]

    kb.upsert(ids=ids, documents=all_chunks, metadatas=all_metas, embeddings=embeddings)

In [19]:
%%capture
# @title 3. YouTube Page Scraping and NER Extraction

def default_yt_data(url):
    return {"url": url, "type": "youtube_metadata", "title": "Unknown Title",
            "uploader": "Unknown Channel", "upload_date": "", "description": "",
            "tags": [], "view_count": 0, "like_count": 0, "categories": [], "webpage_url": url}

def empty_ner():
    return {k: [] for k in ["people","organizations","movies_or_shows","awards","places","others"]}

def scrape_youtube_extract_ner(youtube_url: str):
    """Scrapes YouTube video, extracts NER, and performs web searches for extracted entities."""
    yt_data = {"url": youtube_url, "type": "youtube_metadata"}
    try:
        with yt_dlp.YoutubeDL({"quiet": True, "skip_download": True}) as ydl:
            info = ydl.extract_info(youtube_url, download=False)
            yt_data.update({k: info.get(k,"") for k in ["title","uploader","upload_date","description","webpage_url"]})
            yt_data.update({k: info.get(k,[]) for k in ["tags","categories"]})
            yt_data.update({k: info.get(k,0) for k in ["view_count","like_count"]})
    except Exception as e:
        print(f"YouTube metadata extraction failed: {e}"); yt_data = default_yt_data(youtube_url)

    try:
        resp = requests.get(yt_data["webpage_url"], timeout=10, headers={"User-Agent": "Mozilla/5.0"})
        soup = BeautifulSoup(Document(resp.text).summary() or resp.text, "html.parser")
        page_text = soup.get_text(" ", strip=True)
    except Exception as e:
        print(f"Webpage scraping failed: {e}"); page_text = ""

    if yt_data.get("title") or yt_data.get("description"):
        yt_text = (f"Title: {yt_data.get('title','N/A')}\nChannel: {yt_data.get('uploader','N/A')}\n"
                   f"Upload Date: {yt_data.get('upload_date','N/A')}\nViews: {yt_data.get('view_count','N/A')}\n"
                   f"Likes: {yt_data.get('like_count','N/A')}\nDescription: {yt_data.get('description','')[:1000]}\n"
                   f"Tags: {', '.join(yt_data.get('tags',[])) or 'N/A'}\nCategories: {', '.join(yt_data.get('categories',[])) or 'N/A'}")
        metadata = {k:v for k,v in {
            "source_url": yt_data.get("webpage_url", youtube_url),
            "entity": "__youtube_metadata__","type":"youtube_metadata",
            "title": yt_data.get("title","Unknown Title"), "uploader": yt_data.get("uploader"),
            "upload_date": yt_data.get("upload_date"), "view_count": yt_data.get("view_count"),
            "like_count": yt_data.get("like_count"), "tags": ', '.join(yt_data.get("tags",[])),
            "categories": ', '.join(yt_data.get("categories",[]))
        }.items() if v is not None}
        upsert_to_kb([yt_text],[metadata])

    ner_prompt = f"""Extract UNIQUE named entities from this Tamil/English content as JSON with: people, organizations, movies_or_shows, awards, places, others.

CONTENT SOURCES:
TITLE: {yt_data.get('title','')}
DESCRIPTION: {yt_data.get('description','')[:3000]}
TAGS: {', '.join(yt_data.get('tags', []))}
CATEGORIES: {', '.join(yt_data.get('categories', []))}
PAGE_TEXT: {page_text[:8000]}

INSTRUCTIONS:
- Extract named entities from ALL content sources above
- Tags and categories may contain important entities
- Remove duplicates across different sources
- Return ONLY valid JSON without any additional text
- JSON format: {{"people": [], "organizations": [], "movies_or_shows": [], "awards": [], "places": [], "others": []}}"""

    try:
        resp = ollama.generate(model=llm_model, prompt=ner_prompt, options={'temperature':0.1,'num_predict':1000})
        ner_response = resp['response']; print(f"Raw LLM Response: {ner_response[:200]}...")
        ner = json.loads(re.search(r'\{[\s\S]*\}', ner_response).group(0)); print("NER extraction successful!")
    except Exception as e:
        print(f"NER extraction failed: {e}"); ner = empty_ner()

    entity_docs = []
    with DDGS() as ddgs:
        for cat, ents in ner.items():
            for ent in ents:
                try:
                    for res in list(ddgs.text(ent, max_results=2)):
                        if url := res.get("href"):
                            try:
                                r = requests.get(url, timeout=10, headers={"User-Agent":"Mozilla/5.0"})
                                text = BeautifulSoup(r.text,'html.parser').get_text()
                                entity_docs.append((
                                    f"Entity: {ent}\nCategory: {cat}\nSource: {url}\nContent: {text[:2000]}",
                                    {"source_url":url,"entity":ent,"entity_category":cat,"type":"web_context","title":res.get("title","")}
                                ))
                            except requests.exceptions.RequestException: continue
                except Exception: continue
    if entity_docs: contents,metas = zip(*entity_docs); upsert_to_kb(contents,metas)
    return yt_data, ner


In [None]:
# @title 4. Agentic Transcript Processor
import chroma_client

def get_error_kb_collection():
    return chroma_client.get_or_create_collection(name="error_kb", metadata={"hnsw:space": "cosine"})

def upsert_error_to_kb(error_text, language):
    get_error_kb_collection().upsert(
        ids=[_id_for(f"{language}|{error_text}")],
        documents=[error_text],
        metadatas=[{"language": language, "type": "subtitle_error"}],
        embeddings=embed_texts([error_text])
    )

def retrieve_relevant_errors(query_text, language, top_k=5):
    r = get_error_kb_collection().query(
        query_embeddings=embed_texts([query_text]),
        n_results=top_k,
        where={"language": language},
        include=["documents"]
    )
    return r["documents"][0] if r["documents"] else []

def format_as_srt_from_segments(segments: list[dict]) -> str:
    def format_time(seconds):
        if ":" in str(seconds): return seconds
        h = int(seconds // 3600); m = int((seconds % 3600) // 60)
        s = int(seconds % 60); ms = int((seconds - int(seconds)) * 1000)
        return f"{h:02}:{m:02}:{s:02},{ms:03d}"

    srt_lines = []
    for idx, seg in enumerate(segments, 1):
        start, end, text = format_time(seg["start"]), format_time(seg["end"]), str(seg["text"]).replace("\n", " ").strip()
        srt_lines.append(f"{idx}\n{start} --> {end}\n{text}\n")
    return "\n".join(srt_lines)

def ollama_chat(messages, temp=0.3):
    return ollama.chat(model=llm_model, messages=messages, options={'temperature': temp})['message']['content']

def validate_subtitles(lang, srt_text, kb_context):
    rules = {"tamil": "- Entities must match KB. - No empty lines. - Valid SRT numbering/timestamps.",
             "english": "- Numbering/timestamps intact. - No empty lines. - Entities must match KB."}[lang]
    prompt = f"Validate {lang.capitalize()} subtitles:\n{rules}\nReply: PASS or FAIL: <issues>.\n{srt_text[:4000]}"
    role = "subtitle validator" if lang=="tamil" else "subtitle QA agent"
    return ollama_chat([{"role": "system", "content": f"You are a {role}."},
                        {"role": "user", "content": prompt}], temp=0).strip()

def parse_srt(srt_file):
    segments = []
    with open(srt_file, "r", encoding="utf-8") as f:
        for block in f.read().strip().split("\n\n"):
            lines = block.split("\n")
            if len(lines) >= 3:
                start, end = lines[1].split(" --> ")
                segments.append({"start": start, "end": end, "text": " ".join(lines[2:])})
    return segments

def retrieve_kb_context(kb_collection, query_text="video context + named entities", top_k=10):
    if not kb_collection: return ""
    results = kb_collection.query(query_embeddings=embed_texts([query_text]), n_results=top_k, include=["documents"])
    return "\n".join(docs for docs_per_query in results.get("documents", []) for docs in docs_per_query)

def process_subtitles(lang, kb_context, errors, base_segments=None, merged_texts=None, prev_srt=None, max_loops=3):
    for loop in range(max_loops):
        print(f"Loop {loop+1}: Processing {lang.capitalize()} subtitles...")
        if lang == "tamil":
            prompt = f"""You are an autonomous AI subtitle agent.
Task: Create polished Tamil subtitles from noisy WhisperX transcripts.
Knowledge Context: - KB Entities: {kb_context} - Past Tamil issues: {errors}
Rules: - Follow transcript_chunk6.srt timestamps & speaker labels exactly.
- Compare 3s/6s/15s/30s transcripts. - Correct entities via KB+errors.
- Maintain numbering/order. - Output valid Tamil SRT only.
Transcripts:\n{merged_texts}"""
        else:
            prompt = f"""Translate Tamil subtitles into English SRT.
Knowledge Context: - KB Entities: {kb_context} - Past English issues: {errors}
Rules: - Follow transcript_chunk6.srt timestamps & speaker labels exactly.
- Preserve numbering/timestamps. - Keep colloquial tone. - Correct entities via KB+errors.
Tamil SRT:\n{prev_srt[:4000]}"""

        result = ollama_chat([{"role": "system", "content": f"You are a {lang} subtitle agent."},
                              {"role": "user", "content": prompt}])

        if lang == "tamil":
            lines = [l.strip() for l in result.split("\n") if l.strip()]
            result = format_as_srt_from_segments([{"start": seg["start"], "end": seg["end"], "text": line}
                                                  for seg, line in zip(base_segments, lines)])

        verdict = validate_subtitles(lang, result, kb_context)
        print(f"{lang.capitalize()} validation verdict: {verdict}")
        if verdict.startswith("PASS"): return result
        print(f"Issues in {lang}, refining..."); upsert_error_to_kb(verdict, lang)
    return result

def agentic_tamil_english_subtitles(kb_collection, transcript_srt_files, max_loops=3):
    print("Agentic Tamil + English subtitle consolidation with KB-backed error memory...")
    kb_context = retrieve_kb_context(kb_collection)
    base_segments = parse_srt(transcript_srt_files[-1])
    merged_texts = [" ".join(seg["text"] for seg in parse_srt(fp)) for fp in transcript_srt_files]

    tamil_srt = process_subtitles("tamil", kb_context, retrieve_relevant_errors("Tamil transcript errors", "tamil"),
                                 base_segments=base_segments, merged_texts=merged_texts, max_loops=max_loops)
    open(f"{LLM_DIR}/Tamil_subtitles.srt", "w", encoding="utf-8").write(tamil_srt)

    english_srt = process_subtitles("english", kb_context, retrieve_relevant_errors("English subtitle issues", "english"),
                                   prev_srt=tamil_srt, base_segments=base_segments, max_loops=max_loops)
    open(f"{LLM_DIR}/English_subtitles.srt", "w", encoding="utf-8").write(english_srt)

    return f"{LLM_DIR}/Tamil_subtitles.srt", f"{LLM_DIR}/English_subtitles.srt"

In [None]:
# @title 5. Agentic Tamil/English Subtitle Pipeline

def process_youtube_video(youtube_url):
    print("\n▶ Step 1: Extract audio")
    audio_path = extract_youtube_audio(youtube_url, AUDIO_DIR)

    print("\n▶ Step 2: Transcribe audio with WhisperX")
    srt_file_list = list(transcribe_audio(
        audio_file=audio_path, output_dir=WHISPERX_DIR, hf_token=userdata.get("HF_TOKEN"),
        chunk_sizes=[3, 6, 15, 30], output_format="srt"
    ).values())

    print("\n▶ Step 3: Scrape YouTube metadata + NER")
    yt_data, ner = scrape_youtube_extract_ner(youtube_url)

    print("\n▶ Step 4: Agentic AI subtitle processing")
    return agentic_tamil_english_subtitles(kb_collection=kb, transcript_srt_files=srt_file_list)

if __name__ == "__main__":
    tamil_srt, english_srt = process_youtube_video("https://www.youtube.com/watch?v=Cz_9yG2FQ3E&ab_channel=ALWAYSREACTION")

In [None]:
# @title 6. RAG Q&A function
def rag_query(question, language="english"):
    print(f"Answering question in {language}: {question}")
    results = kb.query(query_embeddings=[embed_texts([question])[0]], n_results=5, include=["documents","metadatas"])
    context = "".join(f"[Source {i+1}]: {doc}\n\n" for i, doc in enumerate(results["documents"][0]))

    prompts = {
        "tamil": f"கீழ்க்கண்ட தகவல்களைப் பயன்படுத்தி கேள்விக்கு பதிலளிக்கவும். தமிழில் மட்டுமே பதிலளிக்கவும்.\nதகவல்: {context}\nகேள்வி: {question}\nபதில்:",
        "english": f"Use this information to answer the question in English only.\nContext: {context}\nQuestion: {question}\nAnswer:"
    }

    try:
        return ollama.generate(model=llm_model, prompt=prompts.get(language.lower(),"english"), options={'temperature':0.2,'top_p':0.9})['response']
    except Exception as e:
        return f"Error generating answer: {e}"

if __name__ == "__main__":
    print(rag_query("Summarize the video and list key people", language="english"))
    #print(rag_query("இந்த வீடியோவின் முக்கிய நபர்களை சொல்லுங்கள்", language="tamil"))