# üöÄ Depth Anything V2 - Pro Video Processor

**Robust. Hardware-Agnostic. Advanced.**

This notebook provides a complete pipeline for monocular depth estimation on videos, featuring:
- **Dual Modes**: Standard Relative Depth (Visuals) & Metric Depth (Measurements).
- **Hardware Smart**: Automatically uses GPU (FP16) for speed or CPU (FP32) for compatibility.
- **3D Snapshots**: Export high-quality 3D Point Clouds (.ply) from any frame.
- **Robust Engine**: Flicker reduction, high-quality FFmpeg encoding, and memory safety.

In [None]:
# @title 1. Install Dependencies
# @markdown Run this cell once to setup the environment.

import subprocess
import sys

def install_dependencies():
    print("‚öôÔ∏è Installing system dependencies... (This may take 1-2 minutes)")
    packages = [
        "git+https://github.com/huggingface/transformers.git",
        "accelerate",
        "opencv-python",
        "yt-dlp",
        "torch",
        "pillow",
        "numpy"
    ]
    command = [sys.executable, "-m", "pip", "install", "-q"] + packages
    try:
        subprocess.check_call(command)
        print("‚úÖ Dependencies installed successfully.")
    except subprocess.CalledProcessError as e:
        print(f"‚ùå Installation failed: {e}")

if __name__ == "__main__":
    install_dependencies()

In [None]:
# @title 2. Initialize Depth Engine
# @markdown This cell defines the core processing logic. Run it to load the engine.

import os
import sys
import logging
import subprocess
import cv2
import torch
import numpy as np
import torch.nn.functional as F
from collections import deque
from datetime import datetime
from transformers import AutoImageProcessor, AutoModelForDepthEstimation
import yt_dlp

# --- Logging Setup (Restored Legacy Style) ---
def log(msg):
    print(msg)

class DepthVideoEngine:
    def __init__(self, model_type="Relative", model_size="small"):
        """
        Args:
            model_type: 'Relative' (Visuals) or 'Metric' (Measurements).
            model_size: 'small', 'base', 'large' (Only for Relative).
        """
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model_type = model_type
        
        # Select Checkpoint
        if model_type == "Metric":
            self.checkpoint = "depth-anything/Depth-Anything-V2-Metric-Hypersim-Small-hf"
            log(f"üìè Mode: Metric Depth ({self.checkpoint})")
        else:
            self.checkpoint = f"depth-anything/Depth-Anything-V2-{model_size.title()}-hf"
            log(f"üé® Mode: Relative Depth ({self.checkpoint})")

        log(f"üöÄ Acceleration: {self.device.upper()}")
        
        try:
            self.processor = AutoImageProcessor.from_pretrained(self.checkpoint)
            self.model = AutoModelForDepthEstimation.from_pretrained(self.checkpoint).to(self.device)
            log("‚úÖ Model loaded successfully.")
        except Exception as e:
            log(f"‚ùå Model load failed: {e}")
            raise e

    def process_video(self, video_path, output_resolution="480p", smooth_window=0, snapshot_time=None):
        if not os.path.exists(video_path):
            log(f"‚ùå File not found: {video_path}")
            return None, None

        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        orig_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        orig_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        # Resolution Logic
        if output_resolution == "Native":
            target_h, target_w = orig_h, orig_w
        else:
            target_p = int(output_resolution.replace("p", ""))
            if orig_h > target_p:
                scale = target_p / orig_h
                target_h = target_p
                target_w = int(orig_w * scale)
                if target_w % 2 != 0: target_w -= 1
            else:
                target_h, target_w = orig_h, orig_w

        log(f"üé¨ Processing: {orig_w}x{orig_h} -> Resizing to: {target_w}x{target_h} @ {fps}fps")

        # Writers
        temp_out = "temp_depth.avi"
        out = cv2.VideoWriter(temp_out, cv2.VideoWriter_fourcc(*'MJPG'), fps, (target_w, target_h))
        
        # Optimization: Only use buffer if smoothing is requested
        use_smoothing = smooth_window > 1
        if use_smoothing:
            buffer = deque(maxlen=smooth_window)
        
        snapshot_frame_idx = int(snapshot_time * fps) if snapshot_time is not None else -1
        snapshot_ply_path = None

        frame_idx = 0
        try:
            while True:
                ret, frame = cap.read()
                if not ret: break

                # Resize Input (Critical Speedup)
                if (target_w, target_h) != (orig_w, orig_h):
                    frame_in = cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_AREA)
                else:
                    frame_in = frame

                # Inference
                inputs = self.processor(images=cv2.cvtColor(frame_in, cv2.COLOR_BGR2RGB), return_tensors="pt").to(self.device)
                
                with torch.no_grad():
                    if self.device == "cuda":
                        with torch.cuda.amp.autocast():
                            outputs = self.model(**inputs)
                            depth = outputs.predicted_depth
                    else:
                        outputs = self.model(**inputs)
                        depth = outputs.predicted_depth

                # Interpolate
                depth = F.interpolate(depth.unsqueeze(1), size=(target_h, target_w), mode="bicubic", align_corners=False).squeeze().cpu().numpy()

                # 3D Snapshot
                if frame_idx == snapshot_frame_idx:
                    log(f"üì∏ Capturing 3D Snapshot at {snapshot_time}s...")
                    snapshot_ply_path = self.save_ply(frame_in, depth, f"snapshot_{frame_idx}.ply")

                # Smoothing vs Raw
                if use_smoothing:
                    buffer.append(depth)
                    avg_depth = np.mean(buffer, axis=0)
                    final_depth = avg_depth
                else:
                    final_depth = depth

                # Normalize & Colorize
                d_min, d_max = final_depth.min(), final_depth.max()
                if d_max - d_min > 1e-6:
                    depth_norm = (final_depth - d_min) / (d_max - d_min)
                else:
                    depth_norm = np.zeros_like(final_depth)
                
                depth_uint8 = (depth_norm * 255).astype(np.uint8)
                heatmap = cv2.applyColorMap(depth_uint8, cv2.COLORMAP_INFERNO)

                out.write(heatmap)
                frame_idx += 1
                
                if frame_idx % 50 == 0:
                    log(f"‚è≥ Processed {frame_idx} / {total_frames or '?'}")

        except KeyboardInterrupt:
            log("‚ö†Ô∏è Interrupted. Saving partial result...")
        finally:
            cap.release()
            out.release()

        # Encode
        log("‚öôÔ∏è Encoding final MP4...")
        final_mp4 = f"depth_output_{output_resolution}.mp4"
        self.encode_ffmpeg(temp_out, final_mp4)
        return final_mp4, snapshot_ply_path

    def save_ply(self, image, depth, filename):
        # Simple PLY writer to avoid heavy dependencies
        height, width = depth.shape
        # Create grid
        x, y = np.meshgrid(np.arange(width), np.arange(height))
        x = x.flatten()
        y = y.flatten()
        z = depth.flatten()
        
        # Color
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        r = image[:,:,0].flatten()
        g = image[:,:,1].flatten()
        b = image[:,:,2].flatten()
        
        # Filter zeros or far points if needed, but keeping simple for now
        points = np.stack((x, y, z, r, g, b), axis=1)
        
        header = f"""ply
format ascii 1.0
element vertex {len(points)}
property float x
property float y
property float z
property uchar red
property uchar green
property uchar blue
end_header
"""
        with open(filename, "w") as f:
            f.write(header)
            np.savetxt(f, points, fmt="%f %f %f %d %d %d")
        
        log(f"üíæ Saved 3D Snapshot: {filename}")
        return filename

    def encode_ffmpeg(self, input_file, output_file):
        if os.path.exists(output_file): os.remove(output_file)
        cmd = [
            "ffmpeg", "-y", "-i", input_file,
            "-c:v", "libx264", "-pix_fmt", "yuv420p",
            "-crf", "23", "-preset", "fast",
            "-loglevel", "error", output_file
        ]
        subprocess.run(cmd, check=True)
        if os.path.exists(input_file): os.remove(input_file)

    def download_video(self, url):
        filename = "input_video.mp4"
        if os.path.exists(filename): os.remove(filename)
        
        ydl_opts = {'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]', 'outtmpl': filename, 'quiet': True}
        try:
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                ydl.download([url])
            return filename
        except:
            # Fallback direct
            import urllib.request
            urllib.request.urlretrieve(url, filename)
            return filename

In [None]:
# @title 3. Run Dashboard
# @markdown Configure your settings and run processing.

from google.colab import files
from IPython.display import display, Video

# --- Parameters ---
VIDEO_SOURCE = "https://videos.pexels.com/video-files/30982132/13244096_1080_1920_30fps.mp4" # @param {type:"string"}
MODEL_TYPE = "Relative" # @param ["Relative", "Metric"]
MODEL_SIZE = "small" # @param ["small", "base", "large"]
RESOLUTION = "480p" # @param ["Native", "720p", "480p", "360p"]
SMOOTHING = 0 # @param {type:"slider", min:0, max:10, step:1}
SNAPSHOT_TIME = 2.5 # @param {type:"number"}
GENERATE_SNAPSHOT = True # @param {type:"boolean"}

def run_dashboard():
    print("üîß Initializing Engine...")
    engine = DepthVideoEngine(model_type=MODEL_TYPE, model_size=MODEL_SIZE)
    
    # Get Video
    if VIDEO_SOURCE.startswith("http"):
        print("‚¨áÔ∏è Downloading video...")
        video_path = engine.download_video(VIDEO_SOURCE)
    else:
        video_path = VIDEO_SOURCE
        
    if not os.path.exists(video_path):
        print("‚ùå Video not found. Please upload or check URL.")
        return

    # Preview Input
    print(f"\nüé¨ Input Preview: {video_path}")
    display(Video(video_path, embed=True, width=400))

    # Process
    print("\n‚öôÔ∏è Processing...")
    snap_t = SNAPSHOT_TIME if GENERATE_SNAPSHOT else None
    final_video, ply_file = engine.process_video(video_path, RESOLUTION, smooth_window=SMOOTHING, snapshot_time=snap_t)
    
    # Display Output
    if final_video:
        print(f"\n‚ú® Video Ready: {final_video}")
        print("üé¨ Output Preview:")
        display(Video(final_video, embed=True, width=400))
        
        print("‚¨áÔ∏è Downloading result...")
        files.download(final_video)
        
    if ply_file:
        print(f"\nüßä 3D Snapshot Ready: {ply_file}")
        files.download(ply_file)

if __name__ == "__main__":
    run_dashboard()