# Video Semantic Search - Production Version

GPU-accelerated semantic search for YouTube videos using Whisper + FAISS.

**Tech Stack:** Whisper (GPU), SentenceTransformers (CPU), FAISS vector search

## Installation

In [None]:
# Install dependencies
!pip install -q torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu124
!pip install -q transformers accelerate
!pip install -q yt-dlp librosa soundfile
!pip install -q sentence-transformers
!pip install -q faiss-cpu
!pip install -q static-ffmpeg

# Configure FFmpeg
import os
try:
    import static_ffmpeg
    static_ffmpeg.add_paths()
    print("FFmpeg configured")
except:
    print("FFmpeg auto-config failed")

## Imports

In [None]:
import os
import re
import glob
import pickle
from typing import List, Dict, Tuple

import torch
import numpy as np
import librosa
import faiss
import yt_dlp
from transformers import WhisperProcessor, WhisperForConditionalGeneration
from sentence_transformers import SentenceTransformer

os.environ['TOKENIZERS_PARALLELISM'] = 'false'

## Configuration

In [None]:
# Device and models
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
MODEL_SIZE = "openai/whisper-base"
EMBEDDING_MODEL_NAME = "all-MiniLM-L6-v2"

# Directories
FAISS_INDEX_DIR = "faiss_index"
AUDIO_DIR = "audio_cache"
os.makedirs(FAISS_INDEX_DIR, exist_ok=True)
os.makedirs(AUDIO_DIR, exist_ok=True)

# Search parameters
TOP_K_RESULTS = 5

print(f"Device: {DEVICE}")
print(f"Whisper Model: {MODEL_SIZE}")
print(f"Embedding Model: {EMBEDDING_MODEL_NAME}")

## Load Models

In [None]:
# Load Whisper (GPU)
print("Loading Whisper model...")
processor = WhisperProcessor.from_pretrained(MODEL_SIZE)
model = WhisperForConditionalGeneration.from_pretrained(MODEL_SIZE).to(DEVICE)

# Print model info
if DEVICE == "cuda":
    param_count = sum(p.numel() for p in model.parameters())
    gpu_memory = torch.cuda.memory_allocated() / (1024**3)
    print(f"Model loaded: {param_count/1e6:.1f}M parameters")
    print(f"GPU memory: {gpu_memory:.2f}GB")
else:
    print("Model loaded (CPU mode)")

## Transcription Function

In [None]:
def transcribe_audio(audio_path: str, chunk_duration: int = 30) -> Tuple[str, List[Dict]]:
    """Transcribe audio file using Whisper."""
    
    # Load audio
    audio, sr = librosa.load(audio_path, sr=16000)
    total_duration = len(audio) / sr
    print(f"Audio loaded: {total_duration:.1f} seconds")
    
    # Process in chunks
    chunk_size = chunk_duration * sr
    all_text = []
    
    for i in range(0, len(audio), chunk_size):
        chunk_audio = audio[i:i+chunk_size]
        
        # Convert to input features
        input_features = processor(
            chunk_audio, sampling_rate=16000, return_tensors="pt"
        ).input_features.to(DEVICE)
        
        # Generate transcription
        with torch.no_grad():
            predicted_ids = model.generate(
                input_features,
                language="en",
                max_length=448
            )
        
        # Decode
        transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
        all_text.append(transcription)
        
        # Cleanup
        del input_features, predicted_ids
        if DEVICE == "cuda":
            torch.cuda.empty_cache()
    
    full_text = " ".join(all_text)
    
    # Create time-based chunks for indexing
    chunks = []
    words = full_text.split()
    if len(words) == 0:
        return full_text, []
    
    words_per_second = len(words) / total_duration if total_duration > 0 else 1
    words_per_chunk = int(words_per_second * chunk_duration)
    
    for i in range(0, len(words), max(1, words_per_chunk)):
        chunk_words = words[i:i+words_per_chunk]
        start_time = i / words_per_second if words_per_second > 0 else 0
        end_time = min((i + len(chunk_words)) / words_per_second, total_duration)
        
        chunks.append({
            'start': start_time,
            'end': end_time,
            'text': ' '.join(chunk_words)
        })
    
    print(f"Transcription complete: {len(chunks)} chunks")
    return full_text, chunks

## FAISS Index Setup

In [None]:
# Load embedding model (CPU to avoid GPU conflicts)
print("Loading embedding model...")
embedding_model = SentenceTransformer(EMBEDDING_MODEL_NAME, device='cpu')
dimension = embedding_model.get_sentence_embedding_dimension()
print(f"Embedding dimension: {dimension}")

# Initialize or load FAISS index
index_path = os.path.join(FAISS_INDEX_DIR, "index.faiss")
metadata_path = os.path.join(FAISS_INDEX_DIR, "metadata.pkl")

if os.path.exists(index_path) and os.path.exists(metadata_path):
    print("Loading existing index...")
    index = faiss.read_index(index_path)
    with open(metadata_path, 'rb') as f:
        metadata_store = pickle.load(f)
    print(f"Index loaded: {index.ntotal} documents")
else:
    print("Creating new index...")
    index = faiss.IndexFlatL2(dimension)
    metadata_store = {
        'documents': [],
        'metadatas': [],
        'ids': []
    }
    print("New index created")

## YouTube Download Function

In [None]:
def extract_video_id(url: str) -> str:
    """Extract YouTube video ID from URL."""
    patterns = [
        r'(?:v=|\/)([0-9A-Za-z_-]{11}).*',
        r'(?:embed\/)([0-9A-Za-z_-]{11})',
    ]
    for pattern in patterns:
        match = re.search(pattern, url)
        if match:
            return match.group(1)
    raise ValueError(f"Invalid YouTube URL: {url}")


def download_youtube_audio(youtube_url: str) -> Tuple[str, str]:
    """Download audio from YouTube video."""
    
    video_id = extract_video_id(youtube_url)
    print(f"Video ID: {video_id}")
    
    # Check if already downloaded
    for ext in ['.wav', '.mp3', '.m4a']:
        audio_path = os.path.join(AUDIO_DIR, f"{video_id}{ext}")
        if os.path.exists(audio_path):
            print(f"Using cached audio: {audio_path}")
            return audio_path, video_id
    
    # Download
    print("Downloading audio...")
    audio_path_template = os.path.join(AUDIO_DIR, f'{video_id}.%(ext)s')
    
    ydl_opts = {
        'format': 'bestaudio/best',
        'outtmpl': audio_path_template,
        'quiet': True,
        'no_warnings': True,
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'wav',
        }],
    }
    
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([youtube_url])
    
    # Find downloaded file
    audio_files = glob.glob(os.path.join(AUDIO_DIR, f"{video_id}.*"))
    if audio_files:
        audio_path = audio_files[0]
        print(f"Downloaded: {audio_path}")
        return audio_path, video_id
    
    raise Exception("Download failed")

## Ingestion Pipeline

In [None]:
def ingest_video(youtube_url: str):
    """Complete ingestion pipeline: download -> transcribe -> embed -> index."""
    
    global index, metadata_store
    
    print(f"\nIngesting: {youtube_url}")
    print("="*80)
    
    # Download
    audio_path, video_id = download_youtube_audio(youtube_url)
    
    # Transcribe
    full_text, chunks = transcribe_audio(audio_path)
    
    if len(chunks) == 0:
        print("No transcription generated")
        return
    
    # Prepare for indexing
    documents = [chunk['text'] for chunk in chunks]
    metadatas = [
        {
            'video_id': video_id,
            'start_time': chunk['start'],
            'end_time': chunk['end'],
            'youtube_url': youtube_url
        }
        for chunk in chunks
    ]
    ids = [f"{video_id}_chunk_{i}" for i in range(len(chunks))]
    
    # Generate embeddings
    print("Generating embeddings...")
    embeddings = embedding_model.encode(documents, show_progress_bar=False)
    embeddings_np = np.array(embeddings).astype('float32')
    
    # Add to FAISS
    index.add(embeddings_np)
    metadata_store['documents'].extend(documents)
    metadata_store['metadatas'].extend(metadatas)
    metadata_store['ids'].extend(ids)
    
    # Save
    faiss.write_index(index, index_path)
    with open(metadata_path, 'wb') as f:
        pickle.dump(metadata_store, f)
    
    print(f"\nIngestion complete!")
    print(f"Total documents in index: {index.ntotal}")
    print("="*80)
    
    return video_id

## Search Function

In [None]:
def search(query: str, n_results: int = TOP_K_RESULTS):
    """Search for relevant video segments."""
    
    if index.ntotal == 0:
        print("Index is empty. Ingest videos first.")
        return []
    
    # Generate query embedding
    query_embedding = embedding_model.encode([query])
    query_embedding_np = np.array(query_embedding).astype('float32')
    
    # Search
    n_results = min(n_results, index.ntotal)
    distances, indices = index.search(query_embedding_np, n_results)
    
    # Format results
    results = []
    for idx, distance in zip(indices[0], distances[0]):
        if idx == -1:
            continue
        
        metadata = metadata_store['metadatas'][idx]
        results.append({
            'text': metadata_store['documents'][idx],
            'start_time': metadata['start_time'],
            'end_time': metadata['end_time'],
            'video_id': metadata['video_id'],
            'youtube_url': metadata['youtube_url'],
            'confidence': max(0, 1 - (distance / 10)),
            'distance': float(distance)
        })
    
    return results


def format_time(seconds: float) -> str:
    """Convert seconds to timestamp."""
    h = int(seconds // 3600)
    m = int((seconds % 3600) // 60)
    s = int(seconds % 60)
    if h > 0:
        return f"{h}:{m:02d}:{s:02d}"
    return f"{m}:{s:02d}"


def print_results(results: List[Dict]):
    """Print search results."""
    
    if not results:
        print("No results found.")
        return
    
    print(f"\nFound {len(results)} results:\n")
    print("="*80)
    
    for i, result in enumerate(results, 1):
        timestamp = format_time(result['start_time'])
        confidence = result['confidence'] * 100
        
        print(f"\n[{i}] {timestamp} | Confidence: {confidence:.1f}%")
        print(f"Video: {result['video_id']}")
        print(f"Text: {result['text'][:200]}...")
        print(f"URL: {result['youtube_url']}&t={int(result['start_time'])}s")
    
    print("\n" + "="*80)

## Usage Example

In [None]:
# Ingest a video
YOUTUBE_URL = "https://www.youtube.com/watch?v=x7X9w_GIm1s"
current_video_id = ingest_video(YOUTUBE_URL)

In [None]:
# Search
query = "Where does he talk about numpy?"
results = search(query, n_results=5)
print_results(results)