In [1]:
import torch
import torch.nn.functional as F
import cv2
import numpy as np
import os
import time
from depth_anything_v2.dpt import DepthAnythingV2

xFormers not available
xFormers not available


In [None]:
import torch

print(torch.__version__)

2.5.1+cu121


In [2]:
class UltraOptimizedDepthAnythingV2:
    def __init__(self, encoder='vits', device=None):
        """Ultra-optimized version targeting sub-30ms on RTX 3050"""
        
        if device is None:
            self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        else:
            self.device = torch.device(device)
            
        # Model configurations
        self.model_configs = {
            'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]},
            'vitb': {'encoder': 'vitb', 'features': 128, 'out_channels': [96, 192, 384, 768]},
            'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]},
        }
        
        self.encoder = encoder
        self.model = self._load_model()
        
        # Pre-compute normalization tensors
        self.mean = torch.tensor([0.485, 0.456, 0.406], device=self.device).view(1, 3, 1, 1)
        self.std = torch.tensor([0.229, 0.224, 0.225], device=self.device).view(1, 3, 1, 1)
        
        # Pre-allocate tensors for common sizes to avoid memory allocation overhead
        self._tensor_cache = {}
        
    def _load_model(self):
        """Load model with maximum optimizations"""
        print(f"Loading {self.encoder} model with ultra optimizations...")
        
        model = DepthAnythingV2(**self.model_configs[self.encoder])
        
        checkpoint_path = f'checkpoints/depth_anything_v2_{self.encoder}.pth'
        model.load_state_dict(torch.load(checkpoint_path, map_location='cpu', weights_only=True))
        model = model.to(self.device).eval()
        
        if self.device.type == 'cuda':
            # Maximum performance settings
            torch.backends.cudnn.benchmark = True
            torch.backends.cudnn.deterministic = False
            torch.backends.cudnn.enabled = True
            torch.backends.cuda.matmul.allow_tf32 = True
            torch.backends.cudnn.allow_tf32 = True
            
            # Enable graph capture for even faster inference
            torch.cuda.empty_cache()
            
            # Compile with aggressive optimization
            try:
                model = torch.compile(model, mode='max-autotune', dynamic=False)
                print("Model compiled with maximum optimization")
            except:
                try:
                    model = torch.compile(model, mode='reduce-overhead')
                    print("Model compiled with reduce-overhead mode")
                except:
                    print("Compilation not available, using standard optimization")
        
        return model
    
    def _get_cached_tensor(self, shape, dtype=torch.float32):
        """Get pre-allocated tensor to avoid memory allocation overhead"""
        key = (shape, dtype)
        if key not in self._tensor_cache:
            self._tensor_cache[key] = torch.empty(shape, device=self.device, dtype=dtype)
        return self._tensor_cache[key]
    
    def infer_image_ultra_fast(self, raw_image, input_size=518):
        """Ultra-fast inference targeting sub-30ms"""
        
        original_height, original_width = raw_image.shape[:2]
        
        with torch.cuda.device(self.device):
            # Use pre-allocated tensor if possible
            input_tensor = self._get_cached_tensor((1, 3, input_size, input_size))
            
            # Direct GPU upload and processing
            image_gpu = torch.from_numpy(raw_image).to(self.device, non_blocking=True)
            
            # Optimized preprocessing pipeline
            image_gpu = image_gpu.flip(-1).permute(2, 0, 1).unsqueeze(0).float()
            image_gpu.div_(255.0)  # In-place division
            
            # Resize with minimal overhead
            F.interpolate(image_gpu, (input_size, input_size), 
                         mode='bilinear', align_corners=False, 
                         antialias=False, out=input_tensor)
            
            # Normalize in-place
            input_tensor.sub_(self.mean).div_(self.std)
            
            # Inference timing
            torch.cuda.synchronize()
            start_time = time.perf_counter()
            
            with torch.no_grad():
                depth = self.model(input_tensor)
            
            torch.cuda.synchronize()
            inference_time = time.perf_counter() - start_time
            
            # Fast resize back
            depth = F.interpolate(depth, (original_height, original_width), 
                                mode='bilinear', align_corners=False, antialias=False)
            
            depth = depth.squeeze().cpu().numpy()
            
        return depth, inference_time
    
    def warmup_ultra(self, input_size=518, num_runs=10):
        """Extended warmup for ultra-stable performance"""
        print(f"Ultra warmup with {num_runs} runs...")
        
        dummy_image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
        
        # Progressive warmup
        for i in range(num_runs):
            _, time_taken = self.infer_image_ultra_fast(dummy_image, input_size)
            print(f"Warmup {i+1}/{num_runs}: {time_taken*1000:.2f}ms")
            
        torch.cuda.empty_cache()
        print("Ultra warmup complete")
    
    def benchmark_ultra(self, image_path, num_runs=20):
        """Extended benchmark for statistical accuracy"""
        raw_image = cv2.imread(image_path)
        if raw_image is None:
            raise ValueError(f"Could not read image: {image_path}")
            
        print(f"Ultra benchmark with {num_runs} runs...")
        
        # Extended warmup first
        self.warmup_ultra(num_runs=15)
        
        # Benchmark runs
        times = []
        for i in range(num_runs):
            _, inference_time = self.infer_image_ultra_fast(raw_image)
            times.append(inference_time * 1000)  # Convert to ms
            print(f"Run {i+1:2d}/{num_runs}: {inference_time*1000:5.2f}ms")
            
        times = np.array(times)
        
        # Detailed statistics
        print(f"\n📊 ULTRA BENCHMARK RESULTS:")
        print(f"{'Mean:':<12} {times.mean():5.2f}ms")
        print(f"{'Median:':<12} {np.median(times):5.2f}ms")
        print(f"{'Std Dev:':<12} {times.std():5.2f}ms")
        print(f"{'Min:':<12} {times.min():5.2f}ms")
        print(f"{'Max:':<12} {times.max():5.2f}ms")
        print(f"{'95th %ile:':<12} {np.percentile(times, 95):5.2f}ms")
        print(f"{'5th %ile:':<12} {np.percentile(times, 5):5.2f}ms")
        
        # Performance classification
        mean_time = times.mean()
        if mean_time < 30:
            print("🚀 EXCELLENT: Sub-30ms performance!")
        elif mean_time < 50:
            print("✅ GREAT: Sub-50ms performance!")
        elif mean_time < 100:
            print("👍 GOOD: Sub-100ms performance")
        else:
            print("⚠️  Could be optimized further")
            
        return times


def compare_input_sizes(model, image_path):
    """Test different input sizes for speed vs accuracy"""
    sizes = [384, 448, 518, 588]
    
    print("🔬 INPUT SIZE COMPARISON:")
    print("Size  | Time (ms) | Speed Gain")
    print("-" * 35)
    
    baseline_time = None
    
    for size in sizes:
        raw_image = cv2.imread(image_path)
        
        # Warm up for this size
        for _ in range(3):
            model.infer_image_ultra_fast(raw_image, size)
            
        # Benchmark this size
        times = []
        for _ in range(10):
            _, t = model.infer_image_ultra_fast(raw_image, size)
            times.append(t * 1000)
            
        avg_time = np.mean(times)
        
        if baseline_time is None:
            baseline_time = avg_time
            
        speed_gain = baseline_time / avg_time
        print(f"{size:4d}  | {avg_time:7.2f}   | {speed_gain:.2f}x")

In [3]:
def main():
    """Ultra-optimized testing"""
    model = UltraOptimizedDepthAnythingV2(encoder='vits')
    
    image_path = r"C:\Codes\Python\obstacle_detection\Depth-Anything-V2\batch_input\left15.jpg"   # Update this
    
    try:
        print("🚀 ULTRA-OPTIMIZED DEPTH ANYTHING V2")
        print("=" * 50)
        
        # Extended benchmark
        times = model.benchmark_ultra(image_path, num_runs=20)
        
        # Test different input sizes
        print("\n" + "=" * 50)
        compare_input_sizes(model, image_path)
        
    except Exception as e:
        print(f"Error: {e}")


if __name__ == "__main__":
    main()

Loading vits model with ultra optimizations...
🚀 ULTRA-OPTIMIZED DEPTH ANYTHING V2
Ultra benchmark with 20 runs...
Ultra warmup with 15 runs...
Error: Expected a cuda device, but got: cpu
