# Phase 3: Visual Frame Extraction & Analysis
## Extract frames at visual reference timestamps

This notebook:
- Loads Phase 2 references (timestamped visual references)
- Extracts video frames at those timestamps
- Detects duplicates using perceptual hashing
- Optional: OCR text extraction (Tesseract/PaddleOCR)
- Optional: AI description (supports Gemini API)

**Note:** API integrations are optional - basic frame extraction works without any APIs

In [None]:
# ====================================================================
# CELL 1: Install Dependencies
# ====================================================================
print("üì¶ Installing dependencies...")

# Core dependencies (always needed)
!pip install -q opencv-python pillow imagehash

# Optional: OCR (uncomment if you want text extraction)
# !pip install -q pytesseract
# !apt-get install -y tesseract-ocr

# Optional: Better OCR for equations/diagrams
# !pip install -q paddlepaddle paddleocr

# Optional: Google Gemini for AI descriptions
# !pip install -q google-generativeai

print("‚úÖ Dependencies installed!")

In [None]:
# ====================================================================
# CELL 2: Import Libraries
# ====================================================================
import os
import json
import cv2
import numpy as np
from PIL import Image
import imagehash
from pathlib import Path
from typing import Dict, List, Any, Tuple, Optional
import shutil

# Optional imports (will fail gracefully if not installed)
try:
    import pytesseract
    TESSERACT_AVAILABLE = True
except:
    TESSERACT_AVAILABLE = False
    print("‚ö†Ô∏è  Tesseract not available (OCR disabled)")

try:
    from paddleocr import PaddleOCR
    PADDLE_AVAILABLE = True
except:
    PADDLE_AVAILABLE = False
    print("‚ö†Ô∏è  PaddleOCR not available")

try:
    import google.generativeai as genai
    GEMINI_AVAILABLE = True
except:
    GEMINI_AVAILABLE = False
    print("‚ö†Ô∏è  Google Gemini not available")

print("‚úÖ Libraries imported")
print(f"   OCR (Tesseract): {'‚úì' if TESSERACT_AVAILABLE else '‚úó'}")
print(f"   OCR (Paddle): {'‚úì' if PADDLE_AVAILABLE else '‚úó'}")
print(f"   AI (Gemini): {'‚úì' if GEMINI_AVAILABLE else '‚úó'}")

In [None]:
# ====================================================================
# CELL 3: Configuration
# ====================================================================

CONFIG = {
    # Input files
    "phase1_file": "/kaggle/working/output/transcript_XXXXX.json",  # ‚ö†Ô∏è UPDATE
    "phase2_file": "/kaggle/working/output/phase2_references_XXXXX.json",  # ‚ö†Ô∏è UPDATE
    
    # Frame extraction
    "frame_offsets_seconds": [-1, 0, 1, 2],  # Extract at -1s, 0s, +1s, +2s from reference
    "max_frames_per_reference": 4,
    
    # Duplicate detection
    "enable_dedup": True,
    "perceptual_hash_threshold": 5,  # Hamming distance (0-64, lower=stricter)
    
    # Quality filtering
    "min_brightness": 20,   # Skip very dark frames
    "max_brightness": 250,  # Skip very bright/washed out frames
    "min_sharpness": 50,    # Skip blurry frames
    
    # OCR settings
    "enable_ocr": False,  # Set True to enable
    "ocr_engine": "tesseract",  # "tesseract" or "paddle"
    "ocr_languages": "eng",  # Language codes
    
    # AI description (Gemini)
    "enable_ai_description": False,  # Set True to enable
    "gemini_api_key": None,  # Set your API key or use env var
    "gemini_model": "gemini-1.5-flash",
    
    # Output
    "output_dir": "/kaggle/working/output",
    "frames_dir": "/kaggle/working/frames",
    "save_thumbnails": True,  # Save smaller versions
    "thumbnail_size": (640, 360)
}

# Create directories
os.makedirs(CONFIG['output_dir'], exist_ok=True)
os.makedirs(CONFIG['frames_dir'], exist_ok=True)

print("‚úÖ Configuration loaded")
print(f"   Frame offsets: {CONFIG['frame_offsets_seconds']}")
print(f"   Deduplication: {'Enabled' if CONFIG['enable_dedup'] else 'Disabled'}")
print(f"   OCR: {'Enabled (' + CONFIG['ocr_engine'] + ')' if CONFIG['enable_ocr'] else 'Disabled'}")
print(f"   AI Description: {'Enabled' if CONFIG['enable_ai_description'] else 'Disabled'}")

In [None]:
# ====================================================================
# CELL 4: Load Phase 1 & 2 Data
# ====================================================================

print("üìÇ Loading previous phase outputs...")

# Load Phase 1 (transcript)
with open(CONFIG['phase1_file'], 'r', encoding='utf-8') as f:
    phase1_data = json.load(f)

# Load Phase 2 (references)
with open(CONFIG['phase2_file'], 'r', encoding='utf-8') as f:
    phase2_data = json.load(f)

video_path = phase1_data['video_path']
video_id = phase1_data['video_id']
references = phase2_data['references']

print(f"‚úÖ Data loaded:")
print(f"   Video: {Path(video_path).name}")
print(f"   Video ID: {video_id}")
print(f"   References: {len(references)}")

if len(references) == 0:
    print("\n‚ö†Ô∏è  No references found! Phase 2 didn't detect any visual references.")
    print("   Try lowering the similarity threshold in Phase 2.")
else:
    print(f"\nüìç First reference: {references[0]['timestamp_ms']/1000:.1f}s")
    print(f"   Text: {references[0]['text'][:100]}...")

In [None]:
# ====================================================================
# CELL 5: Frame Extraction Functions
# ====================================================================

class FrameExtractor:
    """Extract frames from video at specified timestamps."""
    
    def __init__(self, video_path: str):
        self.video_path = video_path
        self.cap = cv2.VideoCapture(video_path)
        self.fps = self.cap.get(cv2.CAP_PROP_FPS)
        self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        if not self.cap.isOpened():
            raise ValueError(f"Could not open video: {video_path}")
    
    def extract_frame_at_timestamp(self, timestamp_ms: int, output_path: str) -> bool:
        """Extract single frame at timestamp."""
        # Convert timestamp to frame number
        frame_num = int((timestamp_ms / 1000.0) * self.fps)
        
        # Validate frame number
        if frame_num < 0 or frame_num >= self.total_frames:
            return False
        
        # Seek to frame
        self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
        ret, frame = self.cap.read()
        
        if not ret:
            return False
        
        # Save frame
        cv2.imwrite(output_path, frame)
        return True
    
    def extract_frames_with_offsets(self, base_timestamp_ms: int, 
                                    offsets_seconds: List[float],
                                    output_dir: str,
                                    base_name: str) -> List[Dict[str, Any]]:
        """Extract multiple frames around a timestamp."""
        frames = []
        
        for i, offset in enumerate(offsets_seconds):
            timestamp_ms = base_timestamp_ms + int(offset * 1000)
            output_path = os.path.join(output_dir, f"{base_name}_offset_{offset:+.1f}s.jpg")
            
            success = self.extract_frame_at_timestamp(timestamp_ms, output_path)
            
            if success:
                frames.append({
                    "path": output_path,
                    "timestamp_ms": timestamp_ms,
                    "offset_seconds": offset
                })
        
        return frames
    
    def close(self):
        """Release video capture."""
        self.cap.release()

print("‚úÖ Frame extractor defined")

In [None]:
# ====================================================================
# CELL 6: Quality Assessment & Deduplication
# ====================================================================

def assess_frame_quality(image_path: str) -> Dict[str, Any]:
    """Assess frame quality (brightness, sharpness)."""
    img = cv2.imread(image_path)
    
    # Convert to grayscale
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    
    # Brightness (mean pixel value)
    brightness = np.mean(gray)
    
    # Sharpness (Laplacian variance)
    laplacian = cv2.Laplacian(gray, cv2.CV_64F)
    sharpness = laplacian.var()
    
    # Overall quality score (0-1)
    brightness_score = 1 - abs(brightness - 127.5) / 127.5  # Prefer mid-range
    sharpness_score = min(sharpness / 500, 1.0)  # Normalize
    quality_score = (brightness_score * 0.3 + sharpness_score * 0.7)
    
    return {
        "brightness": float(brightness),
        "sharpness": float(sharpness),
        "quality_score": float(quality_score),
        "is_good_quality": (
            CONFIG['min_brightness'] < brightness < CONFIG['max_brightness'] and
            sharpness > CONFIG['min_sharpness']
        )
    }

def calculate_perceptual_hash(image_path: str) -> str:
    """Calculate perceptual hash for duplicate detection."""
    img = Image.open(image_path)
    phash = imagehash.phash(img)
    return str(phash)

def is_duplicate(phash: str, seen_hashes: List[str], threshold: int = 5) -> bool:
    """Check if frame is duplicate based on perceptual hash."""
    for seen_hash in seen_hashes:
        hash1 = imagehash.hex_to_hash(phash)
        hash2 = imagehash.hex_to_hash(seen_hash)
        distance = hash1 - hash2
        
        if distance <= threshold:
            return True
    
    return False

def create_thumbnail(image_path: str, output_path: str, size: Tuple[int, int]):
    """Create thumbnail of image."""
    img = Image.open(image_path)
    img.thumbnail(size, Image.Resampling.LANCZOS)
    img.save(output_path, "JPEG", quality=85)

print("‚úÖ Quality assessment functions defined")

In [None]:
# ====================================================================
# CELL 7: OCR Functions (Optional)
# ====================================================================

def extract_text_tesseract(image_path: str, lang: str = "eng") -> Dict[str, Any]:
    """Extract text using Tesseract OCR."""
    if not TESSERACT_AVAILABLE:
        return {"text": "", "error": "Tesseract not available"}
    
    try:
        img = Image.open(image_path)
        text = pytesseract.image_to_string(img, lang=lang)
        confidence = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)
        
        # Calculate average confidence
        confs = [c for c in confidence['conf'] if c != -1]
        avg_conf = np.mean(confs) if confs else 0
        
        return {
            "text": text.strip(),
            "confidence": float(avg_conf),
            "engine": "tesseract"
        }
    except Exception as e:
        return {"text": "", "error": str(e)}

def extract_text_paddle(image_path: str, lang: str = "en") -> Dict[str, Any]:
    """Extract text using PaddleOCR (better for diagrams/equations)."""
    if not PADDLE_AVAILABLE:
        return {"text": "", "error": "PaddleOCR not available"}
    
    try:
        ocr = PaddleOCR(use_angle_cls=True, lang=lang, show_log=False)
        result = ocr.ocr(image_path, cls=True)
        
        # Extract text and confidence
        texts = []
        confidences = []
        
        if result and result[0]:
            for line in result[0]:
                texts.append(line[1][0])
                confidences.append(line[1][1])
        
        return {
            "text": "\n".join(texts),
            "confidence": float(np.mean(confidences)) if confidences else 0,
            "engine": "paddle"
        }
    except Exception as e:
        return {"text": "", "error": str(e)}

def extract_text_from_frame(image_path: str) -> Dict[str, Any]:
    """Extract text using configured OCR engine."""
    if not CONFIG['enable_ocr']:
        return None
    
    if CONFIG['ocr_engine'] == 'paddle':
        return extract_text_paddle(image_path, CONFIG['ocr_languages'])
    else:
        return extract_text_tesseract(image_path, CONFIG['ocr_languages'])

print("‚úÖ OCR functions defined")

In [None]:
# ====================================================================
# CELL 8: AI Description (Gemini - Optional)
# ====================================================================

def generate_ai_description(image_path: str) -> Optional[Dict[str, Any]]:
    """Generate AI description using Google Gemini."""
    if not CONFIG['enable_ai_description']:
        return None
    
    if not GEMINI_AVAILABLE:
        return {"description": "", "error": "Gemini not available"}
    
    try:
        # Configure API
        api_key = CONFIG['gemini_api_key'] or os.getenv('GEMINI_API_KEY')
        if not api_key:
            return {"description": "", "error": "No API key provided"}
        
        genai.configure(api_key=api_key)
        model = genai.GenerativeModel(CONFIG['gemini_model'])
        
        # Load image
        img = Image.open(image_path)
        
        # Generate description
        prompt = """
        Analyze this educational video frame and provide:
        1. A concise description of what's shown (1-2 sentences)
        2. Any text, equations, or diagrams visible
        3. The main educational concept being illustrated
        
        Format as JSON: {"description": "...", "type": "diagram/text/equation/mixed", "concept": "..."}
        """
        
        response = model.generate_content([prompt, img])
        
        # Try to parse JSON response
        try:
            result = json.loads(response.text)
        except:
            result = {
                "description": response.text,
                "type": "unknown",
                "concept": ""
            }
        
        return result
        
    except Exception as e:
        return {"description": "", "error": str(e)}

print("‚úÖ AI description function defined")
if CONFIG['enable_ai_description'] and not CONFIG['gemini_api_key']:
    print("‚ö†Ô∏è  To use Gemini, set CONFIG['gemini_api_key'] or GEMINI_API_KEY env var")

In [None]:
# ====================================================================
# CELL 9: Main Processing Loop
# ====================================================================

print("="*70)
print("üöÄ STARTING PHASE 3: VISUAL EXTRACTION")
print("="*70)

# Initialize extractor
extractor = FrameExtractor(video_path)

all_frames = []
seen_hashes = []
total_extracted = 0
duplicates_skipped = 0
low_quality_skipped = 0

print(f"\nProcessing {len(references)} references...\n")

for ref_idx, reference in enumerate(references, 1):
    ref_id = reference['reference_id']
    timestamp_ms = reference['timestamp_ms']
    
    print(f"[{ref_idx}/{len(references)}] {ref_id} @ {timestamp_ms/1000:.1f}s")
    print(f"  Text: {reference['text'][:80]}...")
    
    # Create reference directory
    ref_dir = os.path.join(CONFIG['frames_dir'], ref_id)
    os.makedirs(ref_dir, exist_ok=True)
    
    # Extract frames with offsets
    frames = extractor.extract_frames_with_offsets(
        timestamp_ms,
        CONFIG['frame_offsets_seconds'],
        ref_dir,
        ref_id
    )
    
    print(f"  Extracted {len(frames)} frames")
    
    # Process each frame
    for frame in frames:
        total_extracted += 1
        frame_path = frame['path']
        
        # Quality assessment
        quality = assess_frame_quality(frame_path)
        
        if not quality['is_good_quality']:
            print(f"    ‚ö†Ô∏è  Low quality (brightness={quality['brightness']:.0f}, sharpness={quality['sharpness']:.0f})")
            low_quality_skipped += 1
            continue
        
        # Duplicate detection
        phash = calculate_perceptual_hash(frame_path)
        is_dup = is_duplicate(phash, seen_hashes, CONFIG['perceptual_hash_threshold']) if CONFIG['enable_dedup'] else False
        
        if is_dup:
            print(f"    üîÑ Duplicate detected (skipping)")
            duplicates_skipped += 1
            continue
        
        seen_hashes.append(phash)
        
        # Create thumbnail
        if CONFIG['save_thumbnails']:
            thumb_path = frame_path.replace('.jpg', '_thumb.jpg')
            create_thumbnail(frame_path, thumb_path, CONFIG['thumbnail_size'])
        else:
            thumb_path = None
        
        # OCR extraction
        ocr_data = extract_text_from_frame(frame_path)
        if ocr_data and ocr_data.get('text'):
            print(f"    üìù OCR: {ocr_data['text'][:60]}...")
        
        # AI description
        ai_desc = generate_ai_description(frame_path)
        if ai_desc and ai_desc.get('description'):
            print(f"    ü§ñ AI: {ai_desc['description'][:60]}...")
        
        # Store frame info
        frame_info = {
            "frame_id": f"{ref_id}_F{len(all_frames)}",
            "reference_id": ref_id,
            "reference_text": reference['text'],
            "timestamp_ms": frame['timestamp_ms'],
            "offset_seconds": frame['offset_seconds'],
            "frame_path": frame_path,
            "thumbnail_path": thumb_path,
            "perceptual_hash": phash,
            "quality": quality,
            "ocr_data": ocr_data,
            "ai_description": ai_desc
        }
        
        all_frames.append(frame_info)
        print(f"    ‚úÖ Processed (quality={quality['quality_score']:.2f})")
    
    print()

# Cleanup
extractor.close()

print("="*70)
print("‚úÖ EXTRACTION COMPLETE")
print("="*70)
print(f"üìä Statistics:")
print(f"   Total extracted: {total_extracted}")
print(f"   Duplicates skipped: {duplicates_skipped}")
print(f"   Low quality skipped: {low_quality_skipped}")
print(f"   Unique frames kept: {len(all_frames)}")
print("="*70)

In [None]:
# ====================================================================
# CELL 10: Save Results
# ====================================================================

result = {
    "video_id": video_id,
    "video_path": video_path,
    "frames": all_frames,
    "frame_count": len(all_frames),
    "statistics": {
        "total_extracted": total_extracted,
        "duplicates_skipped": duplicates_skipped,
        "low_quality_skipped": low_quality_skipped,
        "unique_frames": len(all_frames)
    },
    "config": {
        "frame_offsets": CONFIG['frame_offsets_seconds'],
        "dedup_enabled": CONFIG['enable_dedup'],
        "ocr_enabled": CONFIG['enable_ocr'],
        "ai_enabled": CONFIG['enable_ai_description']
    }
}

# Save JSON
output_file = f"{CONFIG['output_dir']}/phase3_frames_{video_id}.json"
with open(output_file, 'w', encoding='utf-8') as f:
    json.dump(result, f, ensure_ascii=False, indent=2)

# Save text report
report_file = f"{CONFIG['output_dir']}/phase3_report_{video_id}.txt"
with open(report_file, 'w', encoding='utf-8') as f:
    f.write("Visual Frame Extraction Report\n")
    f.write(f"Video ID: {video_id}\n")
    f.write("="*70 + "\n\n")
    
    for i, frame in enumerate(all_frames, 1):
        f.write(f"[{i}] Frame ID: {frame['frame_id']}\n")
        f.write(f"    Time: {frame['timestamp_ms']/1000:.1f}s (offset: {frame['offset_seconds']:+.1f}s)\n")
        f.write(f"    Reference: {frame['reference_text'][:80]}...\n")
        f.write(f"    Quality: {frame['quality']['quality_score']:.2f}\n")
        f.write(f"    Path: {frame['frame_path']}\n")
        
        if frame.get('ocr_data') and frame['ocr_data'].get('text'):
            f.write(f"    OCR: {frame['ocr_data']['text'][:100]}...\n")
        
        if frame.get('ai_description') and frame['ai_description'].get('description'):
            f.write(f"    AI: {frame['ai_description']['description'][:100]}...\n")
        
        f.write("\n")

file_size = os.path.getsize(output_file) / 1024

print("\nüíæ Files saved:")
print(f"   JSON: {output_file} ({file_size:.1f} KB)")
print(f"   Report: {report_file}")
print(f"   Frames: {CONFIG['frames_dir']}/")
print("\n‚úÖ Phase 3 complete!")

In [None]:
# ====================================================================
# CELL 11: Display Sample Frames
# ====================================================================

import matplotlib.pyplot as plt
from matplotlib import gridspec

if len(all_frames) > 0:
    # Show first 6 frames
    num_samples = min(6, len(all_frames))
    
    fig = plt.figure(figsize=(16, 8))
    gs = gridspec.GridSpec(2, 3, hspace=0.3, wspace=0.2)
    
    for i in range(num_samples):
        frame = all_frames[i]
        
        ax = fig.add_subplot(gs[i])
        
        # Load and display image
        img = cv2.imread(frame['frame_path'])
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        ax.imshow(img_rgb)
        
        # Title with info
        time = frame['timestamp_ms'] / 1000
        quality = frame['quality']['quality_score']
        title = f"{frame['frame_id']}\n{time:.1f}s | Q={quality:.2f}"
        ax.set_title(title, fontsize=10)
        ax.axis('off')
    
    plt.suptitle(f"Sample Extracted Frames ({num_samples}/{len(all_frames)})", 
                 fontsize=14, fontweight='bold')
    plt.savefig(f"{CONFIG['output_dir']}/phase3_samples_{video_id}.png", 
                dpi=150, bbox_inches='tight')
    plt.show()
    
    print(f"\nüì∏ Sample visualization saved!")
else:
    print("‚ö†Ô∏è  No frames to display")

In [None]:
# ====================================================================
# CELL 12: Timeline Visualization
# ====================================================================

if len(all_frames) > 0:
    fig, ax = plt.subplots(figsize=(16, 4))
    
    # Plot frames on timeline
    timestamps = [f['timestamp_ms'] / 1000 for f in all_frames]
    qualities = [f['quality']['quality_score'] for f in all_frames]
    
    scatter = ax.scatter(timestamps, qualities, 
                        s=100, alpha=0.6, c=qualities, 
                        cmap='viridis', edgecolors='black', linewidth=0.5)
    
    # Add reference markers
    ref_times = [r['timestamp_ms'] / 1000 for r in references]
    for t in ref_times:
        ax.axvline(x=t, color='red', alpha=0.3, linestyle='--', linewidth=1)
    
    ax.set_xlabel('Time (seconds)', fontsize=12)
    ax.set_ylabel('Frame Quality Score', fontsize=12)
    ax.set_title('Extracted Frames Timeline\n(Red lines = reference timestamps)', 
                 fontsize=14, fontweight='bold')
    ax.grid(True, alpha=0.3)
    
    plt.colorbar(scatter, label='Quality Score', ax=ax)
    plt.tight_layout()
    plt.savefig(f"{CONFIG['output_dir']}/phase3_timeline_{video_id}.png", 
                dpi=150, bbox_inches='tight')
    plt.show()
    
    print("üìä Timeline visualization saved!")

print("\n‚úÖ All visualizations complete!")
print(f"\nüìÅ Download all files from: {CONFIG['output_dir']}/")