In [1]:
# Install required packages
%pip install transformers torch torchvision pillow opencv-python matplotlib numpy gradio plotly


Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

In [2]:
import torch
import cv2
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import warnings
from typing import Tuple, Dict, List
import time
import json

# Transformers and models
from transformers import pipeline, DPTImageProcessor, DPTForDepthEstimation
from transformers import AutoImageProcessor, AutoModelForDepthEstimation

# Visualization
import gradio as gr
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

# Create directories
import os
os.makedirs("drone_test_images", exist_ok=True)
os.makedirs("obstacle_analysis", exist_ok=True)
os.makedirs("calibration_data", exist_ok=True)

warnings.filterwarnings('ignore')
print("✅ All dependencies imported successfully!")


✅ All dependencies imported successfully!


In [3]:
class DroneDepthEstimator:
    """
    A class to handle multiple depth estimation models for drone obstacle avoidance
    """

    def __init__(self):
        self.models = {}
        self.current_model = None
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f"🔧 Using device: {self.device}")

    def load_model(self, model_name: str, model_id: str):
        """Load a depth estimation model"""
        try:
            print(f"📥 Loading {model_name}...")

            if "dpt" in model_id.lower():
                # DPT models
                processor = DPTImageProcessor.from_pretrained(model_id)
                model = DPTForDepthEstimation.from_pretrained(model_id)
            else:
                # Other models
                processor = AutoImageProcessor.from_pretrained(model_id)
                model = AutoModelForDepthEstimation.from_pretrained(model_id)

            model.to(self.device)
            model.eval()

            self.models[model_name] = {
                'processor': processor,
                'model': model,
                'model_id': model_id
            }

            print(f"✅ {model_name} loaded successfully!")
            return True

        except Exception as e:
            print(f"❌ Failed to load {model_name}: {str(e)}")
            return False

    def set_current_model(self, model_name: str):
        """Set the current active model"""
        if model_name in self.models:
            self.current_model = model_name
            print(f"🎯 Current model set to: {model_name}")
        else:
            print(f"❌ Model {model_name} not found!")

    def list_available_models(self):
        """List all loaded models"""
        print("📋 Available models:")
        for name, info in self.models.items():
            status = "🎯 ACTIVE" if name == self.current_model else "⚪ INACTIVE"
            print(f"  - {name} ({info['model_id']}) {status}")

# Initialize the depth estimator
depth_estimator = DroneDepthEstimator()


🔧 Using device: cpu


In [4]:
# Load multiple depth estimation models for comparison
models_to_load = [
    ("DPT-Large", "Intel/dpt-large"),
    ("DPT-Hybrid", "Intel/dpt-hybrid-midas"),
    ("MiDaS", "Intel/midas-v2-1-small-256"),
]

print("🚀 Loading depth estimation models for drone navigation...\n")

for model_name, model_id in models_to_load:
    success = depth_estimator.load_model(model_name, model_id)
    if success and depth_estimator.current_model is None:
        depth_estimator.set_current_model(model_name)
    print()

print("🎉 Model loading complete!")
depth_estimator.list_available_models()


🚀 Loading depth estimation models for drone navigation...

📥 Loading DPT-Large...


preprocessor_config.json:   0%|          | 0.00/285 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/942 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.37G [00:00<?, ?B/s]

Some weights of DPTForDepthEstimation were not initialized from the model checkpoint at Intel/dpt-large and are newly initialized: ['neck.fusion_stage.layers.0.residual_layer1.convolution1.bias', 'neck.fusion_stage.layers.0.residual_layer1.convolution1.weight', 'neck.fusion_stage.layers.0.residual_layer1.convolution2.bias', 'neck.fusion_stage.layers.0.residual_layer1.convolution2.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


✅ DPT-Large loaded successfully!
🎯 Current model set to: DPT-Large

📥 Loading DPT-Hybrid...


preprocessor_config.json:   0%|          | 0.00/382 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/9.88k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/490M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/490M [00:00<?, ?B/s]

✅ DPT-Hybrid loaded successfully!

📥 Loading MiDaS...
❌ Failed to load MiDaS: Intel/midas-v2-1-small-256 is not a local folder and is not a valid model identifier listed on 'https://huggingface.co/models'
If this is a private repository, make sure to pass a token having permission to this repo either by logging in with `huggingface-cli login` or by passing `token=<your_token>`

🎉 Model loading complete!
📋 Available models:
  - DPT-Large (Intel/dpt-large) 🎯 ACTIVE
  - DPT-Hybrid (Intel/dpt-hybrid-midas) ⚪ INACTIVE


In [5]:
def predict_depth(image: np.ndarray, model_name: str = None) -> Tuple[np.ndarray, Dict]:
    """
    Predict depth map from image using specified model

    Args:
        image: Input RGB image (numpy array)
        model_name: Name of model to use (if None, uses current model)

    Returns:
        depth_map: Depth map as numpy array
        metadata: Processing metadata and statistics
    """
    if model_name and model_name in depth_estimator.models:
        model_info = depth_estimator.models[model_name]
    elif depth_estimator.current_model:
        model_info = depth_estimator.models[depth_estimator.current_model]
        model_name = depth_estimator.current_model
    else:
        raise ValueError("No model available for depth prediction!")

    start_time = time.time()

    # Convert numpy array to PIL Image
    if isinstance(image, np.ndarray):
        if image.shape[2] == 3:  # RGB
            pil_image = Image.fromarray(image)
        else:
            pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    else:
        pil_image = image

    # Process image
    processor = model_info['processor']
    model = model_info['model']

    inputs = processor(images=pil_image, return_tensors="pt").to(depth_estimator.device)

    # Predict depth
    with torch.no_grad():
        outputs = model(**inputs)
        predicted_depth = outputs.predicted_depth

    # Convert to numpy and normalize
    depth_map = predicted_depth.squeeze().cpu().numpy()

    processing_time = time.time() - start_time

    metadata = {
        'model_used': model_name,
        'processing_time': processing_time,
        'min_depth': float(depth_map.min()),
        'max_depth': float(depth_map.max()),
        'mean_depth': float(depth_map.mean()),
        'image_shape': image.shape,
        'depth_shape': depth_map.shape
    }

    return depth_map, metadata


In [6]:
def calibrate_depth_to_distance(depth_map: np.ndarray,
                               min_distance: float = 0.5,
                               max_distance: float = 10.0) -> np.ndarray:
    """
    Calibrate relative depth values to real-world distances (meters)

    Args:
        depth_map: Raw depth map from model
        min_distance: Minimum real-world distance in meters
        max_distance: Maximum real-world distance in meters

    Returns:
        distance_map: Calibrated distance map in meters
    """
    # Invert depth if needed (some models output inverse depth)
    if depth_map.min() < 0:
        depth_map = -depth_map

    # Normalize to 0-1 range
    depth_normalized = (depth_map - depth_map.min()) / (depth_map.max() - depth_map.min())

    # Map to distance range (inverse relationship: closer objects = smaller depth values)
    # For many models, smaller values represent closer objects
    distance_map = min_distance + (1 - depth_normalized) * (max_distance - min_distance)

    return distance_map


In [None]:
# Test with a single image
def test_single_image(image_path: str, save_results: bool = True):
    """
    Test the drone navigation system with a single image
    
    Args:
        image_path: Path to test image
        save_results: Whether to save analysis results
    """
    print(f"\n🔍 Analyzing: {image_path}")
    
    # Load image
    image = cv2.imread(image_path)
    if image is None:
        print(f"❌ Could not load image: {image_path}")
        return None
    
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
    # Resize if too large (for faster processing)
    max_size = 800
    h, w = image.shape[:2]
    if max(h, w) > max_size:
        scale = max_size / max(h, w)
        new_w, new_h = int(w * scale), int(h * scale)
        image = cv2.resize(image, (new_w, new_h))
        print(f"📐 Resized image to {new_w}x{new_h}")
    
    # Process image
    results = process_drone_navigation(
        image,
        model_name=None,  # Use current model
        min_distance=0.5,
        max_distance=10.0,
        danger_threshold=1.5,
        warning_threshold=3.0,
        safe_threshold=5.0
    )
    
    # Display results
    print(f"\n📊 Analysis Results:")
    print(f"  ⏱️  Processing time: {results['processing_time']:.2f} seconds")
    print(f"  🎯 Model used: {results['depth_metadata']['model_used']}")
    print(f"  🚨 Closest obstacle: {results['flight_analysis']['overall']['closest_obstacle']:.2f} meters")
    print(f"  🧭 Navigation recommendation: {results['flight_analysis']['overall']['recommendation']}")
    print(f"  📊 Confidence: {results['flight_analysis']['overall']['confidence']:.1%}")
    
    # Zone statistics
    zone_stats = results['zone_statistics']
    print(f"\n🎨 Safety Zone Distribution:")
    print(f"  🔴 Danger zone: {zone_stats['danger_percentage']:.1f}%")
    print(f"  🟠 Warning zone: {zone_stats['warning_percentage']:.1f}%")
    print(f"  🟡 Caution zone: {zone_stats['caution_percentage']:.1f}%")
    print(f"  🟢 Safe zone: {zone_stats['safe_percentage']:.1f}%")
    
    # Show visualization
    plt.figure(figsize=(15, 10))
    plt.imshow(results['comprehensive_visualization'])
    plt.axis('off')
    plt.tight_layout()
    plt.show()
    
    # Show directional analysis
    results['directional_plot'].show()
    
    # Save results if requested
    if save_results:
        base_name = os.path.basename(image_path).split('.')[0]
        output_path = f"obstacle_analysis/{base_name}_analysis.jpg"
        cv2.imwrite(output_path, cv2.cvtColor(results['comprehensive_visualization'], cv2.COLOR_RGB2BGR))
        print(f"\n💾 Saved analysis to: {output_path}")
    
    return results

# Test with the first downloaded image
test_image_path = "drone_test_images/forest_path.jpg"
if os.path.exists(test_image_path):
    forest_results = test_single_image(test_image_path)


In [None]:
# Batch process all test images
def batch_process_images():
    """Process all test images and create a comparison report"""
    
    test_images_dir = "drone_test_images"
    image_files = [f for f in os.listdir(test_images_dir) if f.endswith('.jpg')]
    
    if not image_files:
        print("❌ No test images found!")
        return None
    
    results_summary = []
    
    print(f"🚁 Processing {len(image_files)} test images...\n")
    
    for img_file in image_files:
        img_path = os.path.join(test_images_dir, img_file)
        scenario_name = img_file.replace('.jpg', '').replace('_', ' ').title()
        
        print(f"\n{'='*60}")
        print(f"📸 Scenario: {scenario_name}")
        print(f"{'='*60}")
        
        try:
            results = test_single_image(img_path, save_results=True)
            
            if results:
                summary = {
                    'scenario': scenario_name,
                    'file': img_file,
                    'processing_time': results['processing_time'],
                    'closest_obstacle': results['flight_analysis']['overall']['closest_obstacle'],
                    'recommendation': results['flight_analysis']['overall']['recommendation'],
                    'confidence': results['flight_analysis']['overall']['confidence'],
                    'danger_percentage': results['zone_statistics']['danger_percentage'],
                    'safe_percentage': results['zone_statistics']['safe_percentage']
                }
                results_summary.append(summary)
                
        except Exception as e:
            print(f"❌ Error processing {img_file}: {str(e)}")
    
    return results_summary

# Run batch processing
all_results = batch_process_images()


In [None]:
# Create comparison visualization
if all_results:
    # Convert to DataFrame for easier analysis
    import pandas as pd
    df_results = pd.DataFrame(all_results)
    
    # Create comprehensive comparison plots
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=('Processing Time by Scenario', 'Closest Obstacle Distance',
                       'Safety Zone Distribution', 'Navigation Recommendations'),
        specs=[[{'type': 'bar'}, {'type': 'bar'}],
               [{'type': 'bar'}, {'type': 'bar'}]]
    )
    
    # 1. Processing Time
    fig.add_trace(
        go.Bar(x=df_results['scenario'], y=df_results['processing_time'],
               name='Processing Time (s)', marker_color='lightblue'),
        row=1, col=1
    )
    
    # 2. Closest Obstacle Distance
    colors = ['red' if d < 1.5 else 'orange' if d < 3.0 else 'yellow' if d < 5.0 else 'green' 
              for d in df_results['closest_obstacle']]
    fig.add_trace(
        go.Bar(x=df_results['scenario'], y=df_results['closest_obstacle'],
               name='Distance (m)', marker_color=colors),
        row=1, col=2
    )
    
    # 3. Safety Zone Distribution
    fig.add_trace(
        go.Bar(x=df_results['scenario'], y=df_results['danger_percentage'],
               name='Danger %', marker_color='red'),
        row=2, col=1
    )
    fig.add_trace(
        go.Bar(x=df_results['scenario'], y=df_results['safe_percentage'],
               name='Safe %', marker_color='green'),
        row=2, col=1
    )
    
    # 4. Confidence Levels
    fig.add_trace(
        go.Bar(x=df_results['scenario'], y=df_results['confidence'] * 100,
               name='Confidence %', marker_color='purple'),
        row=2, col=2
    )
    
    # Update layout
    fig.update_layout(
        height=800,
        showlegend=True,
        title_text="🚁 Drone Navigation Analysis Summary",
        title_font_size=20
    )
    
    # Update axes
    fig.update_xaxes(tickangle=45)
    fig.update_yaxes(title_text="Time (s)", row=1, col=1)
    fig.update_yaxes(title_text="Distance (m)", row=1, col=2)
    fig.update_yaxes(title_text="Percentage", row=2, col=1)
    fig.update_yaxes(title_text="Confidence %", row=2, col=2)
    
    fig.show()
    
    # Print summary table
    print("\n📋 Summary Report:")
    print("="*100)
    print(f"{'Scenario':<20} {'Recommendation':<25} {'Closest Obstacle':<20} {'Confidence':<15}")
    print("="*100)
    
    for _, row in df_results.iterrows():
        print(f"{row['scenario']:<20} {row['recommendation']:<25} "
              f"{row['closest_obstacle']:.1f}m{'':<15} {row['confidence']:.1%}")
    print("="*100)


In [None]:
def gradio_process_image(input_image, 
                        min_distance, max_distance,
                        danger_threshold, warning_threshold, safe_threshold,
                        model_choice):
    """
    Process image for Gradio interface
    """
    if input_image is None:
        return None, None, "Please upload an image"
    
    # Set the model
    if model_choice in depth_estimator.models:
        depth_estimator.set_current_model(model_choice)
    
    try:
        # Process the image
        results = process_drone_navigation(
            input_image,
            model_name=model_choice,
            min_distance=min_distance,
            max_distance=max_distance,
            danger_threshold=danger_threshold,
            warning_threshold=warning_threshold,
            safe_threshold=safe_threshold
        )
        
        # Create status message
        status = f"""
        🚁 **Navigation Analysis Complete**
        
        **Flight Recommendation:** {results['flight_analysis']['overall']['recommendation'].replace('_', ' ')}
        
        **Safety Metrics:**
        - 🚨 Closest Obstacle: {results['flight_analysis']['overall']['closest_obstacle']:.1f} meters
        - 📊 Confidence: {results['flight_analysis']['overall']['confidence']:.1%}
        - 🔴 Danger Zone: {results['zone_statistics']['danger_percentage']:.1f}%
        - 🟢 Safe Zone: {results['zone_statistics']['safe_percentage']:.1f}%
        
        **Processing Time:** {results['processing_time']:.2f} seconds
        """
        
        # Convert visualization to RGB
        viz = cv2.cvtColor(results['comprehensive_visualization'], cv2.COLOR_BGR2RGB)
        
        # Create directional plot as image
        directional_fig = results['directional_plot']
        
        return viz, directional_fig, status
        
    except Exception as e:
        return None, None, f"Error: {str(e)}"

# Create Gradio interface
def create_gradio_interface():
    """Create and launch the Gradio interface"""
    
    with gr.Blocks(title="🚁 Drone Obstacle Avoidance System") as demo:
        gr.Markdown("""
        # 🚁 Drone Obstacle Avoidance System
        
        Upload an image or use your webcam to test the drone navigation system.
        Adjust the parameters to fine-tune the safety zones for your specific drone setup.
        """)
        
        with gr.Row():
            with gr.Column(scale=1):
                # Input controls
                input_image = gr.Image(label="Input Image", type="numpy")
                
                with gr.Group():
                    gr.Markdown("### 🎛️ Calibration Parameters")
                    min_distance = gr.Slider(0.1, 5.0, 0.5, 
                                           label="Min Distance (m)",
                                           info="Minimum detectable distance")
                    max_distance = gr.Slider(5.0, 50.0, 10.0,
                                           label="Max Distance (m)",
                                           info="Maximum detectable distance")
                
                with gr.Group():
                    gr.Markdown("### 🚨 Safety Thresholds")
                    danger_threshold = gr.Slider(0.5, 5.0, 1.5,
                                               label="Danger Threshold (m)",
                                               info="Distance for immediate stop")
                    warning_threshold = gr.Slider(1.0, 10.0, 3.0,
                                                label="Warning Threshold (m)",
                                                info="Distance for caution")
                    safe_threshold = gr.Slider(2.0, 15.0, 5.0,
                                             label="Safe Threshold (m)",
                                             info="Distance for safe navigation")
                
                model_choice = gr.Dropdown(
                    choices=list(depth_estimator.models.keys()),
                    value=depth_estimator.current_model,
                    label="Depth Estimation Model"
                )
                
                process_btn = gr.Button("🚀 Analyze Navigation", variant="primary")
                
            with gr.Column(scale=2):
                # Output displays
                output_viz = gr.Image(label="Navigation Analysis")
                directional_plot = gr.Plot(label="Directional Safety Analysis")
                status_text = gr.Markdown(label="Analysis Results")
        
        # Example images
        gr.Examples(
            examples=[
                ["drone_test_images/forest_path.jpg"],
                ["drone_test_images/urban_street.jpg"],
                ["drone_test_images/corridor.jpg"],
                ["drone_test_images/warehouse.jpg"],
                ["drone_test_images/cluttered_room.jpg"]
            ],
            inputs=input_image,
            label="Example Scenarios"
        )
        
        # Connect the processing function
        process_btn.click(
            fn=gradio_process_image,
            inputs=[input_image, min_distance, max_distance,
                   danger_threshold, warning_threshold, safe_threshold,
                   model_choice],
            outputs=[output_viz, directional_plot, status_text]
        )
    
    return demo

# Launch the interface
demo = create_gradio_interface()
demo.launch(share=True)


In [None]:
class DroneVideoProcessor:
    """
    Real-time video processing for drone navigation
    """
    
    def __init__(self, nav_analyzer, depth_estimator):
        self.nav_analyzer = nav_analyzer
        self.depth_estimator = depth_estimator
        self.frame_skip = 5  # Process every nth frame for performance
        self.frame_count = 0
        self.last_recommendation = "HOVER_AND_REASSESS"
        self.fps_tracker = []
        
    def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, Dict]:
        """
        Process a single video frame
        
        Args:
            frame: Input frame (BGR)
            
        Returns:
            processed_frame: Frame with navigation overlay
            navigation_data: Navigation information
        """
        self.frame_count += 1
        start_time = time.time()
        
        # Skip frames for performance
        if self.frame_count % self.frame_skip != 0:
            # Return previous recommendation
            return self._add_overlay(frame, None, cached=True), {}
        
        # Convert BGR to RGB
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Resize for faster processing
        h, w = frame.shape[:2]
        processing_size = (640, 480)
        resized_frame = cv2.resize(rgb_frame, processing_size)
        
        try:
            # Predict depth
            depth_map, _ = predict_depth(resized_frame)
            
            # Calibrate distances
            distance_map = calibrate_depth_to_distance(depth_map, 0.5, 10.0)
            
            # Resize distance map back to original size
            distance_map = cv2.resize(distance_map, (w, h))
            
            # Create safety zones
            safety_zones, zone_stats = self.nav_analyzer.create_safety_zones(distance_map)
            
            # Analyze flight path
            flight_analysis = self.nav_analyzer.analyze_flight_path(distance_map)
            
            # Update recommendation
            self.last_recommendation = flight_analysis['overall']['recommendation']
            
            # Calculate FPS
            process_time = time.time() - start_time
            fps = 1.0 / process_time
            self.fps_tracker.append(fps)
            if len(self.fps_tracker) > 30:
                self.fps_tracker.pop(0)
            avg_fps = np.mean(self.fps_tracker)
            
            navigation_data = {
                'recommendation': self.last_recommendation,
                'closest_obstacle': flight_analysis['overall']['closest_obstacle'],
                'confidence': flight_analysis['overall']['confidence'],
                'fps': avg_fps,
                'zone_stats': zone_stats
            }
            
            # Add overlay to frame
            processed_frame = self._add_overlay(frame, safety_zones, navigation_data)
            
            return processed_frame, navigation_data
            
        except Exception as e:
            print(f"Error processing frame: {e}")
            return frame, {}
    
    def _add_overlay(self, frame: np.ndarray, safety_zones: np.ndarray = None, 
                    navigation_data: Dict = None, cached: bool = False) -> np.ndarray:
        """Add navigation overlay to frame"""
        
        overlay = frame.copy()
        h, w = frame.shape[:2]
        
        # Add safety zones overlay if available
        if safety_zones is not None:
            # Convert safety zones to BGR
            safety_zones_bgr = cv2.cvtColor(safety_zones, cv2.COLOR_RGB2BGR)
            overlay = cv2.addWeighted(overlay, 0.7, safety_zones_bgr, 0.3, 0)
        
        # Add navigation HUD
        if navigation_data or cached:
            # Background for text
            cv2.rectangle(overlay, (10, 10), (w-10, 100), (0, 0, 0), -1)
            cv2.rectangle(overlay, (10, 10), (w-10, 100), (0, 255, 0), 2)
            
            # Navigation recommendation
            if cached:
                rec_text = f"RECOMMENDATION: {self.last_recommendation.replace('_', ' ')}"
            else:
                rec_text = f"RECOMMENDATION: {navigation_data['recommendation'].replace('_', ' ')}"
            
            # Color based on recommendation
            if "EMERGENCY" in rec_text or "STOP" in rec_text:
                color = (0, 0, 255)  # Red
            elif "TURN" in rec_text or "CLIMB" in rec_text:
                color = (0, 165, 255)  # Orange
            else:
                color = (0, 255, 0)  # Green
            
            cv2.putText(overlay, rec_text, (20, 40), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
            
            # Additional info if available
            if navigation_data and not cached:
                info_text = f"Closest: {navigation_data['closest_obstacle']:.1f}m | " \
                           f"Confidence: {navigation_data['confidence']:.1%} | " \
                           f"FPS: {navigation_data['fps']:.1f}"
                cv2.putText(overlay, info_text, (20, 70),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
            
            # Add directional indicators
            center_x, center_y = w // 2, h // 2
            
            if "LEFT" in self.last_recommendation:
                cv2.arrowedLine(overlay, (center_x, center_y), 
                               (center_x - 100, center_y), color, 5)
            elif "RIGHT" in self.last_recommendation:
                cv2.arrowedLine(overlay, (center_x, center_y), 
                               (center_x + 100, center_y), color, 5)
            elif "CLIMB" in self.last_recommendation:
                cv2.arrowedLine(overlay, (center_x, center_y), 
                               (center_x, center_y - 100), color, 5)
        
        return overlay

# Create video processor instance
video_processor = DroneVideoProcessor(nav_analyzer, depth_estimator)

print("🎥 Video processor initialized!")
print("   - Processing every 5th frame for performance")
print("   - Real-time navigation overlay enabled")
print("   - FPS tracking enabled")


In [None]:
def save_calibration_data(results_summary: List[Dict], 
                         calibration_params: Dict,
                         model_performance: Dict):
    """
    Save calibration data and model performance metrics
    """
    timestamp = time.strftime("%Y%m%d_%H%M%S")
    
    calibration_data = {
        'timestamp': timestamp,
        'calibration_parameters': calibration_params,
        'model_performance': model_performance,
        'test_results': results_summary,
        'system_info': {
            'device': depth_estimator.device,
            'models_available': list(depth_estimator.models.keys()),
            'current_model': depth_estimator.current_model
        }
    }
    
    # Save JSON file
    output_file = f"calibration_data/drone_calibration_{timestamp}.json"
    with open(output_file, 'w') as f:
        json.dump(calibration_data, f, indent=2)
    
    print(f"💾 Calibration data saved to: {output_file}")
    
    # Create summary report
    report = f"""
    # Drone Navigation Calibration Report
    
    Generated: {timestamp}
    
    ## System Configuration
    - Device: {calibration_data['system_info']['device']}
    - Current Model: {calibration_data['system_info']['current_model']}
    - Available Models: {', '.join(calibration_data['system_info']['models_available'])}
    
    ## Calibration Parameters
    - Min Distance: {calibration_params['min_distance']}m
    - Max Distance: {calibration_params['max_distance']}m
    - Danger Threshold: {calibration_params['danger_threshold']}m
    - Warning Threshold: {calibration_params['warning_threshold']}m
    - Safe Threshold: {calibration_params['safe_threshold']}m
    
    ## Performance Metrics
    - Average Processing Time: {model_performance['avg_processing_time']:.3f}s
    - Average FPS: {model_performance['avg_fps']:.1f}
    - Success Rate: {model_performance['success_rate']:.1%}
    
    ## Test Scenarios Summary
    Total Scenarios Tested: {len(results_summary)}
    """
    
    # Save report
    report_file = f"calibration_data/drone_calibration_report_{timestamp}.md"
    with open(report_file, 'w') as f:
        f.write(report)
    
    print(f"📄 Calibration report saved to: {report_file}")
    
    return calibration_data

# Calculate performance metrics
if all_results:
    model_performance = {
        'avg_processing_time': np.mean([r['processing_time'] for r in all_results]),
        'avg_fps': 1.0 / np.mean([r['processing_time'] for r in all_results]),
        'success_rate': len(all_results) / len(os.listdir("drone_test_images"))
    }
    
    calibration_params = {
        'min_distance': 0.5,
        'max_distance': 10.0,
        'danger_threshold': 1.5,
        'warning_threshold': 3.0,
        'safe_threshold': 5.0
    }
    
    # Save calibration
    calibration_data = save_calibration_data(all_results, calibration_params, model_performance)


In [None]:
# Example drone integration class
class DroneNavigationController:
    """
    Example integration for drone flight controller
    """
    
    def __init__(self, video_processor, calibration_data):
        self.video_processor = video_processor
        self.calibration = calibration_data
        self.emergency_stop_enabled = True
        self.max_speed = 5.0  # m/s
        self.turn_rate = 30  # degrees/s
        
    def process_navigation_command(self, frame: np.ndarray) -> Dict:
        """
        Process frame and return drone control commands
        
        Args:
            frame: Current camera frame
            
        Returns:
            control_commands: Dictionary with drone control parameters
        """
        # Process frame
        processed_frame, nav_data = self.video_processor.process_frame(frame)
        
        if not nav_data:
            return self._get_default_commands()
        
        recommendation = nav_data['recommendation']
        closest_obstacle = nav_data['closest_obstacle']
        confidence = nav_data['confidence']
        
        # Generate control commands based on recommendation
        commands = {
            'timestamp': time.time(),
            'raw_recommendation': recommendation,
            'confidence': confidence
        }
        
        # Emergency stop
        if recommendation == "EMERGENCY_STOP" and self.emergency_stop_enabled:
            commands.update({
                'action': 'STOP',
                'forward_speed': 0,
                'lateral_speed': 0,
                'vertical_speed': 0,
                'yaw_rate': 0,
                'emergency': True
            })
        
        # Turn commands
        elif recommendation == "TURN_LEFT":
            speed_factor = min(1.0, closest_obstacle / 3.0)  # Slow down near obstacles
            commands.update({
                'action': 'TURN_LEFT',
                'forward_speed': self.max_speed * speed_factor * 0.3,
                'lateral_speed': -self.max_speed * speed_factor * 0.5,
                'vertical_speed': 0,
                'yaw_rate': -self.turn_rate,
                'emergency': False
            })
        
        elif recommendation == "TURN_RIGHT":
            speed_factor = min(1.0, closest_obstacle / 3.0)
            commands.update({
                'action': 'TURN_RIGHT',
                'forward_speed': self.max_speed * speed_factor * 0.3,
                'lateral_speed': self.max_speed * speed_factor * 0.5,
                'vertical_speed': 0,
                'yaw_rate': self.turn_rate,
                'emergency': False
            })
        
        # Climb
        elif recommendation == "CLIMB":
            commands.update({
                'action': 'CLIMB',
                'forward_speed': 0,
                'lateral_speed': 0,
                'vertical_speed': self.max_speed * 0.5,
                'yaw_rate': 0,
                'emergency': False
            })
        
        # Continue forward
        elif recommendation == "CONTINUE_FORWARD":
            speed_factor = min(1.0, (closest_obstacle - 1.5) / 3.5)  # Speed based on distance
            commands.update({
                'action': 'FORWARD',
                'forward_speed': self.max_speed * speed_factor,
                'lateral_speed': 0,
                'vertical_speed': 0,
                'yaw_rate': 0,
                'emergency': False
            })
        
        # Hover
        else:
            commands.update({
                'action': 'HOVER',
                'forward_speed': 0,
                'lateral_speed': 0,
                'vertical_speed': 0,
                'yaw_rate': 0,
                'emergency': False
            })
        
        # Add safety limits
        commands = self._apply_safety_limits(commands)
        
        return commands
    
    def _get_default_commands(self) -> Dict:
        """Get default hover commands"""
        return {
            'action': 'HOVER',
            'forward_speed': 0,
            'lateral_speed': 0,
            'vertical_speed': 0,
            'yaw_rate': 0,
            'emergency': False,
            'timestamp': time.time()
        }
    
    def _apply_safety_limits(self, commands: Dict) -> Dict:
        """Apply safety limits to control commands"""
        # Limit speeds
        commands['forward_speed'] = np.clip(commands['forward_speed'], -self.max_speed, self.max_speed)
        commands['lateral_speed'] = np.clip(commands['lateral_speed'], -self.max_speed, self.max_speed)
        commands['vertical_speed'] = np.clip(commands['vertical_speed'], -self.max_speed * 0.5, self.max_speed * 0.5)
        commands['yaw_rate'] = np.clip(commands['yaw_rate'], -self.turn_rate, self.turn_rate)
        
        return commands

# Create drone controller
drone_controller = DroneNavigationController(video_processor, calibration_data if 'calibration_data' in locals() else {})

print("🚁 Drone Navigation Controller initialized!")
print("   - Emergency stop: ENABLED")
print("   - Max speed: 5.0 m/s")
print("   - Turn rate: 30 deg/s")
print("\n⚡ Ready for integration with flight controller!")


In [7]:
class DroneNavigationAnalyzer:
    """
    Analyze depth maps for drone navigation and obstacle avoidance
    """

    def __init__(self,
                 danger_distance: float = 1.5,
                 warning_distance: float = 3.0,
                 safe_distance: float = 5.0):
        """
        Initialize navigation analyzer with safety thresholds

        Args:
            danger_distance: Distance threshold for danger zone (meters)
            warning_distance: Distance threshold for warning zone (meters)
            safe_distance: Distance threshold for safe zone (meters)
        """
        self.danger_distance = danger_distance
        self.warning_distance = warning_distance
        self.safe_distance = safe_distance

    def create_safety_zones(self, distance_map: np.ndarray) -> Tuple[np.ndarray, Dict]:
        """
        Create color-coded safety zones for navigation

        Args:
            distance_map: Distance map in meters

        Returns:
            safety_visualization: RGB image with color-coded zones
            zone_statistics: Statistics for each zone
        """
        h, w = distance_map.shape
        safety_zones = np.zeros((h, w, 3), dtype=np.uint8)

        # Define zone masks
        danger_mask = distance_map < self.danger_distance
        warning_mask = (distance_map >= self.danger_distance) & (distance_map < self.warning_distance)
        caution_mask = (distance_map >= self.warning_distance) & (distance_map < self.safe_distance)
        safe_mask = distance_map >= self.safe_distance

        # Color-code zones (BGR format)
        safety_zones[danger_mask] = [0, 0, 255]      # Red - DANGER
        safety_zones[warning_mask] = [0, 165, 255]   # Orange - WARNING
        safety_zones[caution_mask] = [0, 255, 255]   # Yellow - CAUTION
        safety_zones[safe_mask] = [0, 255, 0]        # Green - SAFE

        # Calculate statistics
        total_pixels = h * w
        zone_stats = {
            'danger_pixels': np.sum(danger_mask),
            'warning_pixels': np.sum(warning_mask),
            'caution_pixels': np.sum(caution_mask),
            'safe_pixels': np.sum(safe_mask),
            'danger_percentage': (np.sum(danger_mask) / total_pixels) * 100,
            'warning_percentage': (np.sum(warning_mask) / total_pixels) * 100,
            'caution_percentage': (np.sum(caution_mask) / total_pixels) * 100,
            'safe_percentage': (np.sum(safe_mask) / total_pixels) * 100,
            'closest_obstacle': float(distance_map.min()),
            'average_distance': float(distance_map.mean())
        }

        return safety_zones, zone_stats

    def analyze_flight_path(self, distance_map: np.ndarray) -> Dict:
        """
        Analyze the image for safe flight directions

        Args:
            distance_map: Distance map in meters

        Returns:
            flight_analysis: Recommended flight directions and safety assessment
        """
        h, w = distance_map.shape

        # Divide image into sectors for directional analysis
        center_h, center_w = h // 2, w // 2

        # Define sectors
        sectors = {
            'forward': distance_map[center_h-h//4:center_h+h//4, center_w-w//4:center_w+w//4],
            'left': distance_map[:, :w//3],
            'right': distance_map[:, 2*w//3:],
            'up': distance_map[:h//3, :],
            'down': distance_map[2*h//3:, :],
            'forward_left': distance_map[center_h-h//4:center_h+h//4, :w//2],
            'forward_right': distance_map[center_h-h//4:center_h+h//4, w//2:]
        }

        # Analyze each sector
        analysis = {}
        for direction, sector in sectors.items():
            min_dist = float(sector.min())
            mean_dist = float(sector.mean())

            # Determine safety level
            if min_dist < self.danger_distance:
                safety_level = "DANGER"
                recommendation = "AVOID"
            elif min_dist < self.warning_distance:
                safety_level = "WARNING"
                recommendation = "CAUTION"
            elif min_dist < self.safe_distance:
                safety_level = "CAUTION"
                recommendation = "PROCEED_CAREFULLY"
            else:
                safety_level = "SAFE"
                recommendation = "CLEAR"

            analysis[direction] = {
                'min_distance': min_dist,
                'mean_distance': mean_dist,
                'safety_level': safety_level,
                'recommendation': recommendation,
                'safe_percentage': (np.sum(sector >= self.safe_distance) / sector.size) * 100
            }

        # Overall flight recommendation
        forward_safety = analysis['forward']['safety_level']
        closest_obstacle = float(distance_map.min())

        if closest_obstacle < self.danger_distance:
            overall_recommendation = "EMERGENCY_STOP"
        elif forward_safety == "SAFE":
            overall_recommendation = "CONTINUE_FORWARD"
        elif analysis['left']['safety_level'] == "SAFE":
            overall_recommendation = "TURN_LEFT"
        elif analysis['right']['safety_level'] == "SAFE":
            overall_recommendation = "TURN_RIGHT"
        elif analysis['up']['safety_level'] == "SAFE":
            overall_recommendation = "CLIMB"
        else:
            overall_recommendation = "HOVER_AND_REASSESS"

        analysis['overall'] = {
            'recommendation': overall_recommendation,
            'closest_obstacle': closest_obstacle,
            'confidence': self._calculate_confidence(analysis)
        }

        return analysis

    def _calculate_confidence(self, analysis: Dict) -> float:
        """Calculate confidence score for navigation recommendations"""
        # Simple confidence calculation based on distance margins
        min_distances = [data['min_distance'] for data in analysis.values() if isinstance(data, dict)]
        if not min_distances:
            return 0.5

        avg_min_distance = np.mean(min_distances)

        if avg_min_distance >= self.safe_distance:
            return 0.9
        elif avg_min_distance >= self.warning_distance:
            return 0.7
        elif avg_min_distance >= self.danger_distance:
            return 0.4
        else:
            return 0.2

# Initialize navigation analyzer
nav_analyzer = DroneNavigationAnalyzer()
print("🧭 Navigation analyzer initialized with default safety thresholds")


🧭 Navigation analyzer initialized with default safety thresholds


In [8]:
def create_comprehensive_visualization(image: np.ndarray,
                                     distance_map: np.ndarray,
                                     safety_zones: np.ndarray,
                                     flight_analysis: Dict,
                                     zone_stats: Dict) -> np.ndarray:
    """
    Create a comprehensive visualization for drone navigation

    Args:
        image: Original RGB image
        distance_map: Distance map in meters
        safety_zones: Color-coded safety zones
        flight_analysis: Flight path analysis results
        zone_stats: Zone statistics

    Returns:
        visualization: Combined visualization image
    """
    h, w = image.shape[:2]

    # Create a larger canvas for multiple views
    canvas_height = h * 2 + 100  # Extra space for text
    canvas_width = w * 2 + 100
    canvas = np.zeros((canvas_height, canvas_width, 3), dtype=np.uint8)

    # 1. Original image (top-left)
    canvas[50:50+h, 50:50+w] = image

    # 2. Distance heatmap (top-right)
    distance_normalized = cv2.normalize(distance_map, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)
    distance_colored = cv2.applyColorMap(distance_normalized, cv2.COLORMAP_VIRIDIS)
    canvas[50:50+h, 50+w+50:50+w+50+w] = distance_colored

    # 3. Safety zones (bottom-left)
    canvas[50+h+50:50+h+50+h, 50:50+w] = safety_zones

    # 4. Combined overlay (bottom-right)
    overlay = cv2.addWeighted(image, 0.6, safety_zones, 0.4, 0)
    canvas[50+h+50:50+h+50+h, 50+w+50:50+w+50+w] = overlay

    # Add labels
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 0.8
    color = (255, 255, 255)
    thickness = 2

    cv2.putText(canvas, "Original Image", (50, 30), font, font_scale, color, thickness)
    cv2.putText(canvas, "Distance Map", (50+w+50, 30), font, font_scale, color, thickness)
    cv2.putText(canvas, "Safety Zones", (50, 50+h+30), font, font_scale, color, thickness)
    cv2.putText(canvas, "Navigation Overlay", (50+w+50, 50+h+30), font, font_scale, color, thickness)

    # Add navigation recommendation
    recommendation = flight_analysis['overall']['recommendation']
    confidence = flight_analysis['overall']['confidence']
    closest_dist = flight_analysis['overall']['closest_obstacle']

    # Color-code recommendation
    if recommendation == "EMERGENCY_STOP":
        rec_color = (0, 0, 255)  # Red
    elif recommendation in ["TURN_LEFT", "TURN_RIGHT", "CLIMB"]:
        rec_color = (0, 165, 255)  # Orange
    elif recommendation == "CONTINUE_FORWARD":
        rec_color = (0, 255, 0)  # Green
    else:
        rec_color = (0, 255, 255)  # Yellow

    # Add recommendation text
    rec_text = f"RECOMMENDATION: {recommendation.replace('_', ' ')}"
    cv2.putText(canvas, rec_text, (50, canvas_height - 60), font, 0.7, rec_color, 2)

    conf_text = f"Confidence: {confidence:.1%} | Closest: {closest_dist:.1f}m"
    cv2.putText(canvas, conf_text, (50, canvas_height - 30), font, 0.6, (255, 255, 255), 2)

    return canvas


In [9]:
def create_directional_analysis_plot(flight_analysis: Dict):
    """
    Create a plotly radar chart showing directional safety analysis
    """
    directions = ['forward', 'left', 'right', 'up', 'down', 'forward_left', 'forward_right']

    # Extract data for radar chart
    min_distances = [flight_analysis[d]['min_distance'] for d in directions]
    safety_percentages = [flight_analysis[d]['safe_percentage'] for d in directions]

    # Create radar chart
    fig = go.Figure()

    fig.add_trace(go.Scatterpolar(
        r=min_distances,
        theta=directions,
        fill='toself',
        name='Min Distance (m)',
        line_color='blue'
    ))

    fig.add_trace(go.Scatterpolar(
        r=[d/20 for d in safety_percentages],  # Normalize to similar scale
        theta=directions,
        fill='toself',
        name='Safety % (scaled)',
        line_color='green',
        opacity=0.6
    ))

    fig.update_layout(
        polar=dict(
            radialaxis=dict(
                visible=True,
                range=[0, max(min_distances) * 1.1]
            )),
        showlegend=True,
        title="🧭 Directional Safety Analysis"
    )

    return fig


In [10]:
def process_drone_navigation(image: np.ndarray,
                           model_name: str = None,
                           min_distance: float = 0.5,
                           max_distance: float = 10.0,
                           danger_threshold: float = 1.5,
                           warning_threshold: float = 3.0,
                           safe_threshold: float = 5.0) -> Dict:
    """
    Complete pipeline for drone navigation analysis

    Args:
        image: Input RGB image
        model_name: Depth estimation model to use
        min_distance: Minimum calibration distance (meters)
        max_distance: Maximum calibration distance (meters)
        danger_threshold: Danger zone threshold (meters)
        warning_threshold: Warning zone threshold (meters)
        safe_threshold: Safe zone threshold (meters)

    Returns:
        results: Complete analysis results with visualizations
    """
    start_time = time.time()

    # Step 1: Predict depth
    print("🔍 Predicting depth...")
    depth_map, depth_metadata = predict_depth(image, model_name)

    # Step 2: Calibrate to real-world distances
    print("📏 Calibrating distances...")
    distance_map = calibrate_depth_to_distance(depth_map, min_distance, max_distance)

    # Step 3: Update navigation analyzer thresholds
    nav_analyzer.danger_distance = danger_threshold
    nav_analyzer.warning_distance = warning_threshold
    nav_analyzer.safe_distance = safe_threshold

    # Step 4: Create safety zones
    print("🚨 Analyzing safety zones...")
    safety_zones, zone_stats = nav_analyzer.create_safety_zones(distance_map)

    # Step 5: Analyze flight paths
    print("🧭 Analyzing flight paths...")
    flight_analysis = nav_analyzer.analyze_flight_path(distance_map)

    # Step 6: Create visualizations
    print("🎨 Creating visualizations...")
    comprehensive_viz = create_comprehensive_visualization(
        image, distance_map, safety_zones, flight_analysis, zone_stats
    )

    # Create directional analysis plot
    directional_plot = create_directional_analysis_plot(flight_analysis)

    total_time = time.time() - start_time

    # Compile results
    results = {
        'processing_time': total_time,
        'depth_metadata': depth_metadata,
        'distance_map': distance_map,
        'safety_zones': safety_zones,
        'zone_statistics': zone_stats,
        'flight_analysis': flight_analysis,
        'comprehensive_visualization': comprehensive_viz,
        'directional_plot': directional_plot,
        'calibration_params': {
            'min_distance': min_distance,
            'max_distance': max_distance,
            'danger_threshold': danger_threshold,
            'warning_threshold': warning_threshold,
            'safe_threshold': safe_threshold
        }
    }

    print(f"✅ Analysis complete in {total_time:.2f} seconds")
    return results


In [11]:
# Download test images for different drone scenarios
test_images = [
    # Outdoor scenarios
    ("forest_path", "https://images.unsplash.com/photo-1441974231531-c6227db76b6e"),
    ("urban_street", "https://images.unsplash.com/photo-1506905925346-21bda4d32df4"),
    # ("mountain_view", "https://images.unsplash.com/photo-1464822759844-d150baef493e"),

    # Indoor scenarios
    ("corridor", "https://images.unsplash.com/photo-1586023492125-27b2c045efd7"),
    ("warehouse", "https://images.unsplash.com/photo-1553062407-98eeb64c6a62"),

    # Complex scenarios
    ("cluttered_room", "https://images.unsplash.com/photo-1555041469-a586c61ea9bc"),
]

print("📥 Downloading test images...")
import urllib.request

for name, url in test_images:
    try:
        filename = f"drone_test_images/{name}.jpg"
        urllib.request.urlretrieve(url, filename)
        print(f"✅ Downloaded: {name}")
    except Exception as e:
        print(f"❌ Failed to download {name}: {str(e)}")

print("\n🎉 Test images ready for drone navigation analysis!")


📥 Downloading test images...
✅ Downloaded: forest_path
✅ Downloaded: urban_street
✅ Downloaded: corridor
✅ Downloaded: warehouse
✅ Downloaded: cluttered_room

🎉 Test images ready for drone navigation analysis!


In [None]:
# Test with a single image
def test_single_image(image_path: str, save_results: bool = True):
    """
    Test the drone navigation system with a single image
    
    Args:
        image_path: Path to test image
        save_results: Whether to save analysis results
    """
    print(f"\n🔍 Analyzing: {image_path}")
    
    # Load image
    image = cv2.imread(image_path)
    if image is None:
        print(f"❌ Could not load image: {image_path}")
        return None
    
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
    # Resize if too large (for faster processing)
    max_size = 800
    h, w = image.shape[:2]
    if max(h, w) > max_size:
        scale = max_size / max(h, w)
        new_w, new_h = int(w * scale), int(h * scale)
        image = cv2.resize(image, (new_w, new_h))
        print(f"📐 Resized image to {new_w}x{new_h}")
    
    # Process image
    results = process_drone_navigation(
        image,
        model_name=None,  # Use current model
        min_distance=0.5,
        max_distance=10.0,
        danger_threshold=1.5,
        warning_threshold=3.0,
        safe_threshold=5.0
    )
    
    # Display results
    print(f"\n📊 Analysis Results:")
    print(f"  ⏱️  Processing time: {results['processing_time']:.2f} seconds")
    print(f"  🎯 Model used: {results['depth_metadata']['model_used']}")
    print(f"  🚨 Closest obstacle: {results['flight_analysis']['overall']['closest_obstacle']:.2f} meters")
    print(f"  🧭 Navigation recommendation: {results['flight_analysis']['overall']['recommendation']}")
    print(f"  📊 Confidence: {results['flight_analysis']['overall']['confidence']:.1%}")
    
    # Zone statistics
    zone_stats = results['zone_statistics']
    print(f"\n🎨 Safety Zone Distribution:")
    print(f"  🔴 Danger zone: {zone_stats['danger_percentage']:.1f}%")
    print(f"  🟠 Warning zone: {zone_stats['warning_percentage']:.1f}%")
    print(f"  🟡 Caution zone: {zone_stats['caution_percentage']:.1f}%")
    print(f"  🟢 Safe zone: {zone_stats['safe_percentage']:.1f}%")
    
    # Show visualization
    plt.figure(figsize=(15, 10))
    plt.imshow(results['comprehensive_visualization'])
    plt.axis('off')
    plt.tight_layout()
    plt.show()
    
    # Show directional analysis
    results['directional_plot'].show()
    
    # Save results if requested
    if save_results:
        base_name = os.path.basename(image_path).split('.')[0]
        output_path = f"obstacle_analysis/{base_name}_analysis.jpg"
        cv2.imwrite(output_path, cv2.cvtColor(results['comprehensive_visualization'], cv2.COLOR_RGB2BGR))
        print(f"\n💾 Saved analysis to: {output_path}")
    
    return results

# Test with the first downloaded image
test_image_path = "drone_test_images/forest_path.jpg"
if os.path.exists(test_image_path):
    forest_results = test_single_image(test_image_path)


In [None]:
# Batch process all test images
def batch_process_images():
    """Process all test images and create a comparison report"""
    
    test_images_dir = "drone_test_images"
    image_files = [f for f in os.listdir(test_images_dir) if f.endswith('.jpg')]
    
    if not image_files:
        print("❌ No test images found!")
        return None
    
    results_summary = []
    
    print(f"🚁 Processing {len(image_files)} test images...\n")
    
    for img_file in image_files:
        img_path = os.path.join(test_images_dir, img_file)
        scenario_name = img_file.replace('.jpg', '').replace('_', ' ').title()
        
        print(f"\n{'='*60}")
        print(f"📸 Scenario: {scenario_name}")
        print(f"{'='*60}")
        
        try:
            results = test_single_image(img_path, save_results=True)
            
            if results:
                summary = {
                    'scenario': scenario_name,
                    'file': img_file,
                    'processing_time': results['processing_time'],
                    'closest_obstacle': results['flight_analysis']['overall']['closest_obstacle'],
                    'recommendation': results['flight_analysis']['overall']['recommendation'],
                    'confidence': results['flight_analysis']['overall']['confidence'],
                    'danger_percentage': results['zone_statistics']['danger_percentage'],
                    'safe_percentage': results['zone_statistics']['safe_percentage']
                }
                results_summary.append(summary)
                
        except Exception as e:
            print(f"❌ Error processing {img_file}: {str(e)}")
    
    return results_summary

# Run batch processing
all_results = batch_process_images()


In [None]:
# Create comparison visualization
if all_results:
    # Convert to DataFrame for easier analysis
    import pandas as pd
    df_results = pd.DataFrame(all_results)
    
    # Create comprehensive comparison plots
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=('Processing Time by Scenario', 'Closest Obstacle Distance',
                       'Safety Zone Distribution', 'Navigation Recommendations'),
        specs=[[{'type': 'bar'}, {'type': 'bar'}],
               [{'type': 'bar'}, {'type': 'bar'}]]
    )
    
    # 1. Processing Time
    fig.add_trace(
        go.Bar(x=df_results['scenario'], y=df_results['processing_time'],
               name='Processing Time (s)', marker_color='lightblue'),
        row=1, col=1
    )
    
    # 2. Closest Obstacle Distance
    colors = ['red' if d < 1.5 else 'orange' if d < 3.0 else 'yellow' if d < 5.0 else 'green' 
              for d in df_results['closest_obstacle']]
    fig.add_trace(
        go.Bar(x=df_results['scenario'], y=df_results['closest_obstacle'],
               name='Distance (m)', marker_color=colors),
        row=1, col=2
    )
    
    # 3. Safety Zone Distribution
    fig.add_trace(
        go.Bar(x=df_results['scenario'], y=df_results['danger_percentage'],
               name='Danger %', marker_color='red'),
        row=2, col=1
    )
    fig.add_trace(
        go.Bar(x=df_results['scenario'], y=df_results['safe_percentage'],
               name='Safe %', marker_color='green'),
        row=2, col=1
    )
    
    # 4. Confidence Levels
    fig.add_trace(
        go.Bar(x=df_results['scenario'], y=df_results['confidence'] * 100,
               name='Confidence %', marker_color='purple'),
        row=2, col=2
    )
    
    # Update layout
    fig.update_layout(
        height=800,
        showlegend=True,
        title_text="🚁 Drone Navigation Analysis Summary",
        title_font_size=20
    )
    
    # Update axes
    fig.update_xaxes(tickangle=45)
    fig.update_yaxes(title_text="Time (s)", row=1, col=1)
    fig.update_yaxes(title_text="Distance (m)", row=1, col=2)
    fig.update_yaxes(title_text="Percentage", row=2, col=1)
    fig.update_yaxes(title_text="Confidence %", row=2, col=2)
    
    fig.show()
    
    # Print summary table
    print("\n📋 Summary Report:")
    print("="*100)
    print(f"{'Scenario':<20} {'Recommendation':<25} {'Closest Obstacle':<20} {'Confidence':<15}")
    print("="*100)
    
    for _, row in df_results.iterrows():
        print(f"{row['scenario']:<20} {row['recommendation']:<25} "
              f"{row['closest_obstacle']:.1f}m{'':<15} {row['confidence']:.1%}")
    print("="*100)


In [None]:
def gradio_process_image(input_image, 
                        min_distance, max_distance,
                        danger_threshold, warning_threshold, safe_threshold,
                        model_choice):
    """
    Process image for Gradio interface
    """
    if input_image is None:
        return None, None, "Please upload an image"
    
    # Set the model
    if model_choice in depth_estimator.models:
        depth_estimator.set_current_model(model_choice)
    
    try:
        # Process the image
        results = process_drone_navigation(
            input_image,
            model_name=model_choice,
            min_distance=min_distance,
            max_distance=max_distance,
            danger_threshold=danger_threshold,
            warning_threshold=warning_threshold,
            safe_threshold=safe_threshold
        )
        
        # Create status message
        status = f"""
        🚁 **Navigation Analysis Complete**
        
        **Flight Recommendation:** {results['flight_analysis']['overall']['recommendation'].replace('_', ' ')}
        
        **Safety Metrics:**
        - 🚨 Closest Obstacle: {results['flight_analysis']['overall']['closest_obstacle']:.1f} meters
        - 📊 Confidence: {results['flight_analysis']['overall']['confidence']:.1%}
        - 🔴 Danger Zone: {results['zone_statistics']['danger_percentage']:.1f}%
        - 🟢 Safe Zone: {results['zone_statistics']['safe_percentage']:.1f}%
        
        **Processing Time:** {results['processing_time']:.2f} seconds
        """
        
        # Convert visualization to RGB
        viz = cv2.cvtColor(results['comprehensive_visualization'], cv2.COLOR_BGR2RGB)
        
        # Create directional plot as image
        directional_fig = results['directional_plot']
        
        return viz, directional_fig, status
        
    except Exception as e:
        return None, None, f"Error: {str(e)}"

# Create Gradio interface
def create_gradio_interface():
    """Create and launch the Gradio interface"""
    
    with gr.Blocks(title="🚁 Drone Obstacle Avoidance System") as demo:
        gr.Markdown("""
        # 🚁 Drone Obstacle Avoidance System
        
        Upload an image or use your webcam to test the drone navigation system.
        Adjust the parameters to fine-tune the safety zones for your specific drone setup.
        """)
        
        with gr.Row():
            with gr.Column(scale=1):
                # Input controls
                input_image = gr.Image(label="Input Image", type="numpy")
                
                with gr.Group():
                    gr.Markdown("### 🎛️ Calibration Parameters")
                    min_distance = gr.Slider(0.1, 5.0, 0.5, 
                                           label="Min Distance (m)",
                                           info="Minimum detectable distance")
                    max_distance = gr.Slider(5.0, 50.0, 10.0,
                                           label="Max Distance (m)",
                                           info="Maximum detectable distance")
                
                with gr.Group():
                    gr.Markdown("### 🚨 Safety Thresholds")
                    danger_threshold = gr.Slider(0.5, 5.0, 1.5,
                                               label="Danger Threshold (m)",
                                               info="Distance for immediate stop")
                    warning_threshold = gr.Slider(1.0, 10.0, 3.0,
                                                label="Warning Threshold (m)",
                                                info="Distance for caution")
                    safe_threshold = gr.Slider(2.0, 15.0, 5.0,
                                             label="Safe Threshold (m)",
                                             info="Distance for safe navigation")
                
                model_choice = gr.Dropdown(
                    choices=list(depth_estimator.models.keys()),
                    value=depth_estimator.current_model,
                    label="Depth Estimation Model"
                )
                
                process_btn = gr.Button("🚀 Analyze Navigation", variant="primary")
                
            with gr.Column(scale=2):
                # Output displays
                output_viz = gr.Image(label="Navigation Analysis")
                directional_plot = gr.Plot(label="Directional Safety Analysis")
                status_text = gr.Markdown(label="Analysis Results")
        
        # Example images
        gr.Examples(
            examples=[
                ["drone_test_images/forest_path.jpg"],
                ["drone_test_images/urban_street.jpg"],
                ["drone_test_images/corridor.jpg"],
                ["drone_test_images/warehouse.jpg"],
                ["drone_test_images/cluttered_room.jpg"]
            ],
            inputs=input_image,
            label="Example Scenarios"
        )
        
        # Connect the processing function
        process_btn.click(
            fn=gradio_process_image,
            inputs=[input_image, min_distance, max_distance,
                   danger_threshold, warning_threshold, safe_threshold,
                   model_choice],
            outputs=[output_viz, directional_plot, status_text]
        )
    
    return demo

# Launch the interface
demo = create_gradio_interface()
demo.launch(share=True)


In [None]:
class DroneVideoProcessor:
    """
    Real-time video processing for drone navigation
    """
    
    def __init__(self, nav_analyzer, depth_estimator):
        self.nav_analyzer = nav_analyzer
        self.depth_estimator = depth_estimator
        self.frame_skip = 5  # Process every nth frame for performance
        self.frame_count = 0
        self.last_recommendation = "HOVER_AND_REASSESS"
        self.fps_tracker = []
        
    def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, Dict]:
        """
        Process a single video frame
        
        Args:
            frame: Input frame (BGR)
            
        Returns:
            processed_frame: Frame with navigation overlay
            navigation_data: Navigation information
        """
        self.frame_count += 1
        start_time = time.time()
        
        # Skip frames for performance
        if self.frame_count % self.frame_skip != 0:
            # Return previous recommendation
            return self._add_overlay(frame, None, cached=True), {}
        
        # Convert BGR to RGB
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Resize for faster processing
        h, w = frame.shape[:2]
        processing_size = (640, 480)
        resized_frame = cv2.resize(rgb_frame, processing_size)
        
        try:
            # Predict depth
            depth_map, _ = predict_depth(resized_frame)
            
            # Calibrate distances
            distance_map = calibrate_depth_to_distance(depth_map, 0.5, 10.0)
            
            # Resize distance map back to original size
            distance_map = cv2.resize(distance_map, (w, h))
            
            # Create safety zones
            safety_zones, zone_stats = self.nav_analyzer.create_safety_zones(distance_map)
            
            # Analyze flight path
            flight_analysis = self.nav_analyzer.analyze_flight_path(distance_map)
            
            # Update recommendation
            self.last_recommendation = flight_analysis['overall']['recommendation']
            
            # Calculate FPS
            process_time = time.time() - start_time
            fps = 1.0 / process_time
            self.fps_tracker.append(fps)
            if len(self.fps_tracker) > 30:
                self.fps_tracker.pop(0)
            avg_fps = np.mean(self.fps_tracker)
            
            navigation_data = {
                'recommendation': self.last_recommendation,
                'closest_obstacle': flight_analysis['overall']['closest_obstacle'],
                'confidence': flight_analysis['overall']['confidence'],
                'fps': avg_fps,
                'zone_stats': zone_stats
            }
            
            # Add overlay to frame
            processed_frame = self._add_overlay(frame, safety_zones, navigation_data)
            
            return processed_frame, navigation_data
            
        except Exception as e:
            print(f"Error processing frame: {e}")
            return frame, {}
    
    def _add_overlay(self, frame: np.ndarray, safety_zones: np.ndarray = None, 
                    navigation_data: Dict = None, cached: bool = False) -> np.ndarray:
        """Add navigation overlay to frame"""
        
        overlay = frame.copy()
        h, w = frame.shape[:2]
        
        # Add safety zones overlay if available
        if safety_zones is not None:
            # Convert safety zones to BGR
            safety_zones_bgr = cv2.cvtColor(safety_zones, cv2.COLOR_RGB2BGR)
            overlay = cv2.addWeighted(overlay, 0.7, safety_zones_bgr, 0.3, 0)
        
        # Add navigation HUD
        if navigation_data or cached:
            # Background for text
            cv2.rectangle(overlay, (10, 10), (w-10, 100), (0, 0, 0), -1)
            cv2.rectangle(overlay, (10, 10), (w-10, 100), (0, 255, 0), 2)
            
            # Navigation recommendation
            if cached:
                rec_text = f"RECOMMENDATION: {self.last_recommendation.replace('_', ' ')}"
            else:
                rec_text = f"RECOMMENDATION: {navigation_data['recommendation'].replace('_', ' ')}"
            
            # Color based on recommendation
            if "EMERGENCY" in rec_text or "STOP" in rec_text:
                color = (0, 0, 255)  # Red
            elif "TURN" in rec_text or "CLIMB" in rec_text:
                color = (0, 165, 255)  # Orange
            else:
                color = (0, 255, 0)  # Green
            
            cv2.putText(overlay, rec_text, (20, 40), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
            
            # Additional info if available
            if navigation_data and not cached:
                info_text = f"Closest: {navigation_data['closest_obstacle']:.1f}m | " \
                           f"Confidence: {navigation_data['confidence']:.1%} | " \
                           f"FPS: {navigation_data['fps']:.1f}"
                cv2.putText(overlay, info_text, (20, 70),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
            
            # Add directional indicators
            center_x, center_y = w // 2, h // 2
            
            if "LEFT" in self.last_recommendation:
                cv2.arrowedLine(overlay, (center_x, center_y), 
                               (center_x - 100, center_y), color, 5)
            elif "RIGHT" in self.last_recommendation:
                cv2.arrowedLine(overlay, (center_x, center_y), 
                               (center_x + 100, center_y), color, 5)
            elif "CLIMB" in self.last_recommendation:
                cv2.arrowedLine(overlay, (center_x, center_y), 
                               (center_x, center_y - 100), color, 5)
        
        return overlay

# Create video processor instance
video_processor = DroneVideoProcessor(nav_analyzer, depth_estimator)

print("🎥 Video processor initialized!")
print("   - Processing every 5th frame for performance")
print("   - Real-time navigation overlay enabled")
print("   - FPS tracking enabled")


In [None]:
def save_calibration_data(results_summary: List[Dict], 
                         calibration_params: Dict,
                         model_performance: Dict):
    """
    Save calibration data and model performance metrics
    """
    timestamp = time.strftime("%Y%m%d_%H%M%S")
    
    calibration_data = {
        'timestamp': timestamp,
        'calibration_parameters': calibration_params,
        'model_performance': model_performance,
        'test_results': results_summary,
        'system_info': {
            'device': depth_estimator.device,
            'models_available': list(depth_estimator.models.keys()),
            'current_model': depth_estimator.current_model
        }
    }
    
    # Save JSON file
    output_file = f"calibration_data/drone_calibration_{timestamp}.json"
    with open(output_file, 'w') as f:
        json.dump(calibration_data, f, indent=2)
    
    print(f"💾 Calibration data saved to: {output_file}")
    
    # Create summary report
    report = f"""
    # Drone Navigation Calibration Report
    
    Generated: {timestamp}
    
    ## System Configuration
    - Device: {calibration_data['system_info']['device']}
    - Current Model: {calibration_data['system_info']['current_model']}
    - Available Models: {', '.join(calibration_data['system_info']['models_available'])}
    
    ## Calibration Parameters
    - Min Distance: {calibration_params['min_distance']}m
    - Max Distance: {calibration_params['max_distance']}m
    - Danger Threshold: {calibration_params['danger_threshold']}m
    - Warning Threshold: {calibration_params['warning_threshold']}m
    - Safe Threshold: {calibration_params['safe_threshold']}m
    
    ## Performance Metrics
    - Average Processing Time: {model_performance['avg_processing_time']:.3f}s
    - Average FPS: {model_performance['avg_fps']:.1f}
    - Success Rate: {model_performance['success_rate']:.1%}
    
    ## Test Scenarios Summary
    Total Scenarios Tested: {len(results_summary)}
    """
    
    # Save report
    report_file = f"calibration_data/drone_calibration_report_{timestamp}.md"
    with open(report_file, 'w') as f:
        f.write(report)
    
    print(f"📄 Calibration report saved to: {report_file}")
    
    return calibration_data

# Calculate performance metrics
if 'all_results' in locals() and all_results:
    model_performance = {
        'avg_processing_time': np.mean([r['processing_time'] for r in all_results]),
        'avg_fps': 1.0 / np.mean([r['processing_time'] for r in all_results]),
        'success_rate': len(all_results) / len(os.listdir("drone_test_images"))
    }
    
    calibration_params = {
        'min_distance': 0.5,
        'max_distance': 10.0,
        'danger_threshold': 1.5,
        'warning_threshold': 3.0,
        'safe_threshold': 5.0
    }
    
    # Save calibration
    calibration_data = save_calibration_data(all_results, calibration_params, model_performance)
else:
    print("⚠️ No test results available. Run batch processing first to generate calibration data.")


In [None]:
# Example drone integration class
class DroneNavigationController:
    """
    Example integration for drone flight controller
    """
    
    def __init__(self, video_processor, calibration_data=None):
        self.video_processor = video_processor
        self.calibration = calibration_data or {}
        self.emergency_stop_enabled = True
        self.max_speed = 5.0  # m/s
        self.turn_rate = 30  # degrees/s
        
    def process_navigation_command(self, frame: np.ndarray) -> Dict:
        """
        Process frame and return drone control commands
        
        Args:
            frame: Current camera frame
            
        Returns:
            control_commands: Dictionary with drone control parameters
        """
        # Process frame
        processed_frame, nav_data = self.video_processor.process_frame(frame)
        
        if not nav_data:
            return self._get_default_commands()
        
        recommendation = nav_data['recommendation']
        closest_obstacle = nav_data['closest_obstacle']
        confidence = nav_data['confidence']
        
        # Generate control commands based on recommendation
        commands = {
            'timestamp': time.time(),
            'raw_recommendation': recommendation,
            'confidence': confidence
        }
        
        # Emergency stop
        if recommendation == "EMERGENCY_STOP" and self.emergency_stop_enabled:
            commands.update({
                'action': 'STOP',
                'forward_speed': 0,
                'lateral_speed': 0,
                'vertical_speed': 0,
                'yaw_rate': 0,
                'emergency': True
            })
        
        # Turn commands
        elif recommendation == "TURN_LEFT":
            speed_factor = min(1.0, closest_obstacle / 3.0)  # Slow down near obstacles
            commands.update({
                'action': 'TURN_LEFT',
                'forward_speed': self.max_speed * speed_factor * 0.3,
                'lateral_speed': -self.max_speed * speed_factor * 0.5,
                'vertical_speed': 0,
                'yaw_rate': -self.turn_rate,
                'emergency': False
            })
        
        elif recommendation == "TURN_RIGHT":
            speed_factor = min(1.0, closest_obstacle / 3.0)
            commands.update({
                'action': 'TURN_RIGHT',
                'forward_speed': self.max_speed * speed_factor * 0.3,
                'lateral_speed': self.max_speed * speed_factor * 0.5,
                'vertical_speed': 0,
                'yaw_rate': self.turn_rate,
                'emergency': False
            })
        
        # Climb
        elif recommendation == "CLIMB":
            commands.update({
                'action': 'CLIMB',
                'forward_speed': 0,
                'lateral_speed': 0,
                'vertical_speed': self.max_speed * 0.5,
                'yaw_rate': 0,
                'emergency': False
            })
        
        # Continue forward
        elif recommendation == "CONTINUE_FORWARD":
            speed_factor = min(1.0, (closest_obstacle - 1.5) / 3.5)  # Speed based on distance
            commands.update({
                'action': 'FORWARD',
                'forward_speed': self.max_speed * speed_factor,
                'lateral_speed': 0,
                'vertical_speed': 0,
                'yaw_rate': 0,
                'emergency': False
            })
        
        # Hover
        else:
            commands.update({
                'action': 'HOVER',
                'forward_speed': 0,
                'lateral_speed': 0,
                'vertical_speed': 0,
                'yaw_rate': 0,
                'emergency': False
            })
        
        # Add safety limits
        commands = self._apply_safety_limits(commands)
        
        return commands
    
    def _get_default_commands(self) -> Dict:
        """Get default hover commands"""
        return {
            'action': 'HOVER',
            'forward_speed': 0,
            'lateral_speed': 0,
            'vertical_speed': 0,
            'yaw_rate': 0,
            'emergency': False,
            'timestamp': time.time()
        }
    
    def _apply_safety_limits(self, commands: Dict) -> Dict:
        """Apply safety limits to control commands"""
        # Limit speeds
        commands['forward_speed'] = np.clip(commands['forward_speed'], -self.max_speed, self.max_speed)
        commands['lateral_speed'] = np.clip(commands['lateral_speed'], -self.max_speed, self.max_speed)
        commands['vertical_speed'] = np.clip(commands['vertical_speed'], -self.max_speed * 0.5, self.max_speed * 0.5)
        commands['yaw_rate'] = np.clip(commands['yaw_rate'], -self.turn_rate, self.turn_rate)
        
        return commands

# Create drone controller
drone_controller = DroneNavigationController(video_processor, 
                                           calibration_data if 'calibration_data' in locals() else None)

print("🚁 Drone Navigation Controller initialized!")
print("   - Emergency stop: ENABLED")
print("   - Max speed: 5.0 m/s")
print("   - Turn rate: 30 deg/s")
print("\n⚡ Ready for integration with flight controller!")
