In [1]:
# Fast Depth Pipeline - Jupyter Notebook Setup
# Run these cells in order in your Jupyter notebook

# Cell 1: Install packages (run once)
import sys
import subprocess

def install_packages():
    """Install required packages in Jupyter"""
    packages = [
        'torch', 'torchvision', 'torchaudio',  # PyTorch with Metal support
        'opencv-python', 'pillow', 'numpy',    # Image processing
        'open3d', 'timm', 'matplotlib'         # 3D processing and MiDaS deps
    ]
    
    for package in packages:
        try:
            subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])
            print(f"✅ {package} installed successfully")
        except subprocess.CalledProcessError:
            print(f"❌ Failed to install {package}")

# Uncomment and run this line once:
# install_packages()

# Cell 2: Optional ZoeDepth installation (run once if needed)
# !pip install git+https://github.com/isl-org/ZoeDepth.git

# Cell 3: Test Metal backend
import torch
print(f"🔋 Metal backend available: {torch.backends.mps.is_available()}")
print(f"🐍 PyTorch version: {torch.__version__}")

# Cell 4: Fast Depth Pipeline Implementation
import os
import time
import numpy as np
import cv2
import torch
import torch.nn.functional as F
from PIL import Image
import open3d as o3d
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

class FastDepthPipeline:
    def __init__(self, model_type='midas_small', input_size=256, use_metal=True):
        """
        Initialize the fast depth estimation pipeline
        
        Args:
            model_type: 'midas_small' or 'zoe_nano'
            input_size: Input image size (256, 384, or 512)
            use_metal: Use Apple Metal backend for acceleration
        """
        self.model_type = model_type
        self.input_size = input_size
        self.device = self._setup_device(use_metal)
        self.model = self._load_model()
        
        print(f"🚀 Pipeline initialized:")
        print(f"   Model: {model_type}")
        print(f"   Input size: {input_size}x{input_size}")
        print(f"   Device: {self.device}")
    
    def _setup_device(self, use_metal):
        """Setup optimal device for M2 Mac"""
        if use_metal and torch.backends.mps.is_available():
            return torch.device('mps')
        elif torch.cuda.is_available():
            return torch.device('cuda')
        else:
            return torch.device('cpu')
    
    def _load_model(self):
        """Load lightweight depth estimation model"""
        if self.model_type == 'midas_small':
            # Use MiDaS Small (DPT-Hybrid)
            model = torch.hub.load('intel-isl/MiDaS', 'DPT_Hybrid', pretrained=True)
            model.to(self.device)
            model.eval()
            return model
        else:
            raise ValueError(f"Unknown model type: {self.model_type}")
    
    def preprocess_image(self, image_path):
        """Fast image preprocessing with resizing"""
        # Load image
        img = cv2.imread(str(image_path))
        if img is None:
            raise ValueError(f"Could not load image: {image_path}")
        
        # Convert BGR to RGB
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        # Resize for speed
        h, w = img.shape[:2]
        scale = self.input_size / max(h, w)
        new_h, new_w = int(h * scale), int(w * scale)
        img = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
        
        # Pad to square
        pad_h = self.input_size - new_h
        pad_w = self.input_size - new_w
        img = np.pad(img, ((0, pad_h), (0, pad_w), (0, 0)), mode='constant')
        
        # Convert to tensor
        img = img.astype(np.float32) / 255.0
        img = torch.from_numpy(img).permute(2, 0, 1).unsqueeze(0)
        
        return img.to(self.device), (h, w), scale
    
    def estimate_depth(self, image_tensor):
        """Fast depth estimation"""
        with torch.no_grad():
            depth = self.model(image_tensor)
            depth = depth.squeeze().cpu().numpy()
            depth = (depth - depth.min()) / (depth.max() - depth.min())
        return depth
    
    def create_sparse_point_cloud(self, depth_map, rgb_image, sample_rate=4):
        """Create sparse point cloud for faster processing"""
        h, w = depth_map.shape
        
        # Sample every Nth pixel for speed
        y_coords, x_coords = np.meshgrid(
            np.arange(0, h, sample_rate),
            np.arange(0, w, sample_rate),
            indexing='ij'
        )
        
        # Get depth and color values
        depths = depth_map[y_coords, x_coords]
        colors = rgb_image[y_coords, x_coords]
        
        # Create 3D coordinates
        focal_length = max(h, w)
        cx, cy = w // 2, h // 2
        
        z = depths * 10
        x = (x_coords - cx) * z / focal_length
        y = (y_coords - cy) * z / focal_length
        
        points = np.stack([x.flatten(), y.flatten(), z.flatten()], axis=1)
        colors = colors.reshape(-1, 3) / 255.0
        
        # Remove invalid points
        valid_mask = ~np.isnan(points).any(axis=1)
        points = points[valid_mask]
        colors = colors[valid_mask]
        
        return points, colors
    
    def analyze_3d_shape(self, points, colors):
        """Fast 3D shape analysis"""
        if len(points) < 10:
            return {"type": "unknown", "confidence": 0.0}
        
        # Calculate basic statistics
        bbox_min = np.min(points, axis=0)
        bbox_max = np.max(points, axis=0)
        bbox_size = bbox_max - bbox_min
        
        z_values = points[:, 2]
        z_std = np.std(z_values)
        
        aspect_ratio_xy = bbox_size[0] / bbox_size[1] if bbox_size[1] > 0 else 1.0
        aspect_ratio_z = bbox_size[2] / max(bbox_size[0], bbox_size[1]) if max(bbox_size[0], bbox_size[1]) > 0 else 1.0
        
        # Simple classification
        if z_std < 0.5 and aspect_ratio_z < 0.1:
            return {"type": "flat_surface", "confidence": 0.8}
        elif aspect_ratio_xy > 3.0 or aspect_ratio_xy < 0.3:
            return {"type": "elongated_object", "confidence": 0.7}
        elif z_std > 1.0:
            return {"type": "3d_object", "confidence": 0.6}
        else:
            return {"type": "unknown", "confidence": 0.3}
    
    def process_image(self, image_path, sample_rate=4, show_results=True):
        """Process image and optionally display results"""
        start_time = time.time()
        
        # Process pipeline
        image_tensor, original_size, scale = self.preprocess_image(image_path)
        depth_map = self.estimate_depth(image_tensor)
        
        # Create point cloud
        rgb_image = cv2.imread(str(image_path))
        rgb_image = cv2.cvtColor(rgb_image, cv2.COLOR_BGR2RGB)
        rgb_image = cv2.resize(rgb_image, (self.input_size, self.input_size))
        
        points, colors = self.create_sparse_point_cloud(depth_map, rgb_image, sample_rate)
        analysis = self.analyze_3d_shape(points, colors)
        
        total_time = time.time() - start_time
        
        # Display results in notebook
        if show_results:
            import matplotlib.pyplot as plt
            
            fig, axes = plt.subplots(1, 3, figsize=(15, 5))
            
            # Original image
            orig_img = cv2.imread(str(image_path))
            orig_img = cv2.cvtColor(orig_img, cv2.COLOR_BGR2RGB)
            axes[0].imshow(orig_img)
            axes[0].set_title('Original Image')
            axes[0].axis('off')
            
            # Depth map
            axes[1].imshow(depth_map, cmap='plasma')
            axes[1].set_title('Depth Map')
            axes[1].axis('off')
            
            # Point cloud projection
            if len(points) > 0:
                axes[2].scatter(points[:, 0], points[:, 1], c=points[:, 2], cmap='viridis', s=1)
                axes[2].set_title('Point Cloud (Top View)')
                axes[2].set_aspect('equal')
            
            plt.tight_layout()
            plt.show()
            
            # Print analysis
            print(f"📊 Analysis Results:")
            print(f"   Shape: {analysis['type']}")
            print(f"   Confidence: {analysis['confidence']:.2f}")
            print(f"   Processing time: {total_time:.2f}s")
            print(f"   Points generated: {len(points)}")
        
        return {
            'analysis': analysis,
            'points': points,
            'colors': colors,
            'depth_map': depth_map,
            'processing_time': total_time
        }

# Cell 5: Quick test function
def quick_test():
    """Quick test with a sample image"""
    # Create a test image
    test_img = np.random.randint(0, 255, (400, 400, 3), dtype=np.uint8)
    # Add some structure
    cv2.circle(test_img, (200, 200), 80, (255, 0, 0), -1)
    cv2.rectangle(test_img, (50, 50), (150, 150), (0, 255, 0), -1)
    
    # Save test image
    test_path = "test_image.jpg"
    cv2.imwrite(test_path, test_img)
    
    # Run pipeline
    pipeline = FastDepthPipeline(input_size=256)
    result = pipeline.process_image(test_path, sample_rate=8)
    
    # Cleanup
    os.remove(test_path)
    
    return result

print("🎯 Setup complete! Ready to use in Jupyter.")
print("📝 Run 'quick_test()' to test the pipeline.")

🔋 Metal backend available: True
🐍 PyTorch version: 2.7.1
🎯 Setup complete! Ready to use in Jupyter.
📝 Run 'quick_test()' to test the pipeline.
