# YOLO Model Comparison Experiments

## Automated Testing Across All YOLOv8 Models

This notebook automatically runs parking detection experiments across all YOLOv8 model variants:
- **yolov8n.pt** (Nano) - Fastest, least accurate
- **yolov8s.pt** (Small) - Balanced speed/accuracy
- **yolov8m.pt** (Medium) - Good accuracy
- **yolov8l.pt** (Large) - High accuracy
- **yolov8x.pt** (Extra Large) - Highest accuracy, slowest

## Features:
- Automatic model downloading if missing
- Comprehensive performance comparison
- Tracking issue analysis
- Consolidated results export

In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
import json
import os
import time
import pandas as pd
from datetime import datetime
from collections import defaultdict
import matplotlib.pyplot as plt

# Import the ParkingDetector class from the main notebook
exec(open('parking-detection.ipynb').read())

In [None]:
class ModelComparison:
    def __init__(self, video_path):
        self.video_path = video_path
        self.video_name = os.path.splitext(os.path.basename(video_path))[0]
        
        # Define all YOLOv8 models to test
        self.models = [
            'yolov8n.pt',  # Nano
            'yolov8s.pt',  # Small  
            'yolov8m.pt',  # Medium
            'yolov8l.pt',  # Large
            'yolov8x.pt'   # Extra Large
        ]
        
        self.model_dir = '../yolo-model'
        self.results = []
        self.comparison_id = f"comparison_{self.video_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        
        print(f"Model Comparison Setup:")
        print(f"Video: {self.video_path}")
        print(f"Models to test: {len(self.models)}")
        print(f"Comparison ID: {self.comparison_id}")
        
    def download_missing_models(self):
        """Download any missing YOLO models"""
        print("\nChecking for missing models...")
        
        for model_name in self.models:
            model_path = os.path.join(self.model_dir, model_name)
            
            if not os.path.exists(model_path):
                print(f"Downloading {model_name}...")
                try:
                    # Create model directory if it doesn't exist
                    os.makedirs(self.model_dir, exist_ok=True)
                    
                    # Download model (YOLO will handle this automatically)
                    temp_model = YOLO(model_name)
                    
                    # Move to our model directory
                    import shutil
                    source_path = os.path.join(os.getcwd(), model_name)
                    if os.path.exists(source_path):
                        shutil.move(source_path, model_path)
                        print(f"Downloaded and moved {model_name} to {model_path}")
                    else:
                        print(f"Model {model_name} downloaded to default location")
                        
                except Exception as e:
                    print(f"Failed to download {model_name}: {e}")
            else:
                print(f"Found {model_name}")
                
    def run_single_experiment(self, model_name, show_video=False):
        """Run experiment with a single model"""
        model_path = os.path.join(self.model_dir, model_name)
        
        print(f"\n{'='*50}")
        print(f"Testing Model: {model_name}")
        print(f"{'='*50}")
        
        try:
            detector = ParkingDetector(model_path, self.video_path)
            
            start_time = time.time()
            success = detector.run_detection(show_video=show_video)
            total_experiment_time = time.time() - start_time
            
            if success:
                # Save individual results
                output_dir = detector.save_results()
                
                # Extract key metrics for comparison
                stats = detector.results['detection_stats']
                perf = detector.results['performance']
                
                result = {
                    'model_name': model_name.replace('.pt', ''),
                    'model_path': model_path,
                    'experiment_id': detector.results['experiment_id'],
                    'output_dir': output_dir,
                    'total_experiment_time': total_experiment_time,
                    
                    # Performance metrics
                    'avg_fps': perf['avg_fps'],
                    'min_fps': perf['min_fps'],
                    'max_fps': perf['max_fps'],
                    
                    # Detection stats
                    'total_frames': stats['total_frames'],
                    'total_detections': stats['total_detections'],
                    'parked_vehicles': stats['unique_parked'],
                    'processing_time': stats['processing_time'],
                    
                    # Tracking quality
                    'tracking_gaps': stats.get('tracking_gaps_total', 0),
                    'vehicles_with_issues': stats.get('vehicles_with_tracking_issues', 0),
                    'tracking_quality_score': self.calculate_tracking_quality(stats),
                    
                    # Overall score
                    'overall_score': self.calculate_overall_score(perf, stats),
                    'success': True
                }
                
                print(f"SUCCESS: {model_name}")
                print(f"  Avg FPS: {perf['avg_fps']:.1f}")
                print(f"  Parked Vehicles: {stats['unique_parked']}")
                print(f"  Tracking Issues: {stats.get('tracking_gaps_total', 0)}")
                
            else:
                result = {
                    'model_name': model_name.replace('.pt', ''),
                    'model_path': model_path,
                    'success': False,
                    'error': 'Detection failed'
                }
                print(f"FAILED: {model_name}")
                
        except Exception as e:
            result = {
                'model_name': model_name.replace('.pt', ''),
                'model_path': model_path,
                'success': False,
                'error': str(e)
            }
            print(f"ERROR: {model_name} - {e}")
            
        return result
        
    def calculate_tracking_quality(self, stats):
        """Calculate tracking quality score (0-100)"""
        total_detections = stats.get('total_detections', 1)
        tracking_gaps = stats.get('tracking_gaps_total', 0)
        
        # Higher score = fewer tracking issues
        if total_detections == 0:
            return 0
            
        gap_ratio = tracking_gaps / total_detections
        quality_score = max(0, 100 - (gap_ratio * 100))
        return round(quality_score, 2)
        
    def calculate_overall_score(self, perf, stats):
        """Calculate overall model performance score (0-100)"""
        # Weighted scoring: FPS (30%), Tracking Quality (40%), Detection Count (30%)
        
        fps_score = min(100, (perf['avg_fps'] / 30) * 100)  # Normalize to 30 FPS max
        tracking_score = self.calculate_tracking_quality(stats)
        
        # Detection efficiency (detections per frame)
        detection_efficiency = stats.get('total_detections', 0) / max(1, stats.get('total_frames', 1))
        detection_score = min(100, detection_efficiency * 500)  # Scale appropriately
        
        overall = (fps_score * 0.3) + (tracking_score * 0.4) + (detection_score * 0.3)
        return round(overall, 2)
        
    def run_all_experiments(self, show_video=False):
        """Run experiments with all models"""
        print(f"Starting comparison with {len(self.models)} models...")
        print(f"Video display: {'Enabled' if show_video else 'Disabled (faster)'}")
        
        # Download missing models first
        self.download_missing_models()
        
        # Run experiments
        start_time = time.time()
        
        for i, model_name in enumerate(self.models, 1):
            print(f"\n[{i}/{len(self.models)}] Testing {model_name}...")
            
            result = self.run_single_experiment(model_name, show_video)
            self.results.append(result)
            
        total_time = time.time() - start_time
        
        print(f"\n{'='*60}")
        print(f"ALL EXPERIMENTS COMPLETED in {total_time:.1f}s")
        print(f"{'='*60}")
        
        return self.results
        
    def save_comparison_results(self):
        """Save consolidated comparison results"""
        output_dir = f"../output/comparisons/{self.comparison_id}"
        os.makedirs(output_dir, exist_ok=True)
        
        # Save detailed JSON results
        json_file = f"{output_dir}/comparison_results.json"
        with open(json_file, 'w') as f:
            json.dump({
                'comparison_id': self.comparison_id,
                'video_path': self.video_path,
                'video_name': self.video_name,
                'timestamp': datetime.now().isoformat(),
                'models_tested': len(self.models),
                'results': self.results
            }, f, indent=2, default=str)
            
        # Create comparison CSV
        successful_results = [r for r in self.results if r.get('success', False)]
        
        if successful_results:
            df = pd.DataFrame(successful_results)
            
            # Select key columns for comparison
            comparison_cols = [
                'model_name', 'avg_fps', 'total_detections', 'parked_vehicles',
                'tracking_gaps', 'vehicles_with_issues', 'tracking_quality_score',
                'overall_score', 'processing_time', 'total_experiment_time'
            ]
            
            comparison_df = df[comparison_cols].copy()
            comparison_df = comparison_df.sort_values('overall_score', ascending=False)
            
            csv_file = f"{output_dir}/model_comparison.csv"
            comparison_df.to_csv(csv_file, index=False)
            
            print(f"\nComparison results saved:")
            print(f"  Directory: {output_dir}")
            print(f"  Files: comparison_results.json, model_comparison.csv")
            
            return output_dir, comparison_df
        else:
            print(f"No successful experiments to save comparison")
            return output_dir, None
            
    def display_results(self):
        """Display comparison results in a nice format"""
        successful_results = [r for r in self.results if r.get('success', False)]
        
        if not successful_results:
            print("No successful experiments to display")
            return
            
        print(f"\nMODEL COMPARISON RESULTS")
        print(f"{'='*80}")
        print(f"Video: {self.video_name}")
        print(f"Models tested: {len(successful_results)}/{len(self.models)}")
        print()
        
        # Sort by overall score
        sorted_results = sorted(successful_results, key=lambda x: x['overall_score'], reverse=True)
        
        # Display ranking
        print(f"{'Rank':<4} {'Model':<8} {'Score':<6} {'FPS':<6} {'Parked':<7} {'Gaps':<5} {'Quality':<8}")
        print(f"{'-'*50}")
        
        for i, result in enumerate(sorted_results, 1):
            print(f"{i:<4} {result['model_name']:<8} {result['overall_score']:<6} "
                  f"{result['avg_fps']:<6.1f} {result['parked_vehicles']:<7} "
                  f"{result['tracking_gaps']:<5} {result['tracking_quality_score']:<8.1f}")
                  
        # Show detailed stats for top performer
        best_model = sorted_results[0]
        print(f"\nBEST PERFORMER: {best_model['model_name']}")
        print(f"  Overall Score: {best_model['overall_score']}")
        print(f"  Average FPS: {best_model['avg_fps']:.1f}")
        print(f"  Total Detections: {best_model['total_detections']}")
        print(f"  Parked Vehicles: {best_model['parked_vehicles']}")
        print(f"  Tracking Quality: {best_model['tracking_quality_score']:.1f}")
        print(f"  Processing Time: {best_model['processing_time']:.1f}s")
        
        return sorted_results

In [None]:
# COMPARISON CONFIGURATION

# Video path for testing
video_path = '../dataset/video_1_cut_03-57_13-47.mov'

# Create comparison instance
comparison = ModelComparison(video_path)

print("Ready to run model comparison!")
print("\nTo start the comparison, run the next cell.")
print("Note: This will take significant time as it tests all 5 models.")

In [None]:
# RUN MODEL COMPARISON

print("Starting automated model comparison...")
print("This will test all YOLOv8 models from nano to extra-large")
print()

# Run all experiments (set show_video=True if you want to see each one)
results = comparison.run_all_experiments(show_video=False)

# Save consolidated results
output_dir, comparison_df = comparison.save_comparison_results()

# Display results summary
ranking = comparison.display_results()

In [None]:
# DETAILED RESULTS ANALYSIS

if 'comparison_df' in locals() and comparison_df is not None:
    print("\nDETAILED ANALYSIS")
    print("="*60)
    
    # Performance analysis
    print("\nPERFORMANCE METRICS:")
    print(f"Fastest Model: {comparison_df.loc[comparison_df['avg_fps'].idxmax(), 'model_name']} "
          f"({comparison_df['avg_fps'].max():.1f} FPS)")
    print(f"Most Accurate: {comparison_df.loc[comparison_df['total_detections'].idxmax(), 'model_name']} "
          f"({comparison_df['total_detections'].max()} detections)")
    print(f"Best Tracking: {comparison_df.loc[comparison_df['tracking_quality_score'].idxmax(), 'model_name']} "
          f"({comparison_df['tracking_quality_score'].max():.1f} quality score)")
    
    # Speed vs Accuracy trade-off
    print(f"\nSPEED vs ACCURACY TRADE-OFF:")
    for _, row in comparison_df.iterrows():
        speed_category = "Fast" if row['avg_fps'] > 15 else "Medium" if row['avg_fps'] > 10 else "Slow"
        accuracy_category = "High" if row['total_detections'] > comparison_df['total_detections'].mean() else "Low"
        print(f"  {row['model_name']}: {speed_category} speed, {accuracy_category} accuracy")
    
    # Tracking issues analysis
    print(f"\nTRACKING ISSUES ANALYSIS:")
    total_gaps = comparison_df['tracking_gaps'].sum()
    total_vehicles_with_issues = comparison_df['vehicles_with_issues'].sum()
    
    if total_gaps > 0:
        print(f"Total tracking gaps across all models: {total_gaps}")
        print(f"Models with fewest tracking issues:")
        sorted_by_gaps = comparison_df.sort_values('tracking_gaps')
        for _, row in sorted_by_gaps.head(3).iterrows():
            print(f"  {row['model_name']}: {row['tracking_gaps']} gaps")
    else:
        print("No tracking issues detected in any model!")
    
    # Recommendations
    print(f"\nRECOMMENDations:")
    best_overall = comparison_df.loc[comparison_df['overall_score'].idxmax()]
    fastest = comparison_df.loc[comparison_df['avg_fps'].idxmax()]
    most_accurate = comparison_df.loc[comparison_df['total_detections'].idxmax()]
    
    print(f"  Best Overall: {best_overall['model_name']} (score: {best_overall['overall_score']})")
    print(f"  For Real-time: {fastest['model_name']} ({fastest['avg_fps']:.1f} FPS)")
    print(f"  For Accuracy: {most_accurate['model_name']} ({most_accurate['total_detections']} detections)")
    
else:
    print("No successful experiments to analyze")
    print("Make sure to run the comparison first!")

In [None]:
# EXPORT RESULTS FOR FURTHER ANALYSIS

if 'comparison' in locals() and comparison.results:
    print("EXPORT OPTIONS:")
    print("="*40)
    
    # Show available data
    successful_count = len([r for r in comparison.results if r.get('success', False)])
    print(f"Successfully tested: {successful_count}/{len(comparison.models)} models")
    
    if successful_count > 0:
        print(f"\nResults saved to: {output_dir}")
        print(f"Files available:")
        print(f"  - comparison_results.json (detailed data)")
        print(f"  - model_comparison.csv (summary table)")
        
        # Show individual experiment directories
        print(f"\nIndividual experiment results:")
        for result in comparison.results:
            if result.get('success', False):
                print(f"  {result['model_name']}: {result.get('output_dir', 'N/A')}")
                
        print(f"\nYou can now:")
        print(f"  1. Open CSV files in Excel/Google Sheets")
        print(f"  2. Use JSON data for custom analysis") 
        print(f"  3. Compare individual experiment details")
        print(f"  4. Create custom visualizations")
        
        # Quick stats for copying
        print(f"\nQUICK STATS (copy-friendly):")
        if 'comparison_df' in locals() and comparison_df is not None:
            print("Model,FPS,Detections,Parked,TrackingGaps,OverallScore")
            for _, row in comparison_df.iterrows():
                print(f"{row['model_name']},{row['avg_fps']:.1f},{row['total_detections']},"
                      f"{row['parked_vehicles']},{row['tracking_gaps']},{row['overall_score']}")
    else:
        print("No successful experiments to export")
        
else:
    print("No comparison data available")
    print("Run the model comparison first!")