<a href="https://colab.research.google.com/github/Alex-Jung-HB/0731_python_ensemble-for-accurate-object-detection/blob/main/0731_python_ensemble_for_accurate_object_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Ensemable using the most practical method of Weighted Boxes Fusion(WBF)

In [None]:
# =====================================================
# ENSEMBLE OBJECT DETECTION FOR AUTONOMOUS DRIVING
# Google Colab Compatible Version
# =====================================================

# Install required dependencies
!pip install ultralytics supervision ensemble-boxes opencv-python-headless
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

import cv2
import numpy as np
import torch
import supervision as sv
from ultralytics import YOLO
import ensemble_boxes
from google.colab import files
import os
import zipfile
from IPython.display import Video, display, HTML
import matplotlib.pyplot as plt
from datetime import datetime
import json

# =====================================================
# ENSEMBLE DETECTION CLASSES AND CONFIGURATION
# =====================================================

class AutonomousDrivingEnsemble:
    """
    Ensemble Object Detection System for Autonomous Driving
    Combines multiple YOLO models using Weighted Boxes Fusion (WBF)
    """

    def __init__(self):
        # Detection classes relevant for autonomous driving
        self.AD_CLASSES = {
            0: 'person',      # Pedestrians
            1: 'bicycle',     # Bicycles
            2: 'car',         # Cars
            3: 'motorcycle',  # Motorcycles
            5: 'bus',         # Buses
            7: 'truck',       # Trucks
            9: 'traffic_light', # Traffic lights
            # Additional custom classes can be added
        }

        # Color mapping for visualization
        self.COLORS = {
            'person': (0, 255, 255),      # Yellow
            'bicycle': (255, 0, 255),     # Magenta
            'car': (0, 0, 255),           # Red
            'motorcycle': (255, 255, 0),  # Cyan
            'bus': (0, 255, 0),           # Green
            'truck': (255, 0, 0),         # Blue
            'traffic_light': (128, 255, 0), # Light Green
        }

        # Detection statistics
        self.stats = {class_name: 0 for class_name in self.AD_CLASSES.values()}

        # Initialize models
        self.models = []
        self.model_weights = []

    def load_ensemble_models(self):
        """
        Load multiple YOLO models for ensemble detection
        Each model brings different strengths and perspectives
        """
        print("🔄 Loading ensemble models for autonomous driving...")

        try:
            # Model 1: YOLOv8n - Fast detection for real-time performance
            print("📦 Loading YOLOv8n (Speed optimized)...")
            model1 = YOLO('yolov8n.pt')
            self.models.append(model1)
            self.model_weights.append(1.0)  # Equal weight

            # Model 2: YOLOv8s - Balanced speed and accuracy
            print("📦 Loading YOLOv8s (Balanced performance)...")
            model2 = YOLO('yolov8s.pt')
            self.models.append(model2)
            self.model_weights.append(1.2)  # Slightly higher weight

            # Model 3: YOLOv8m - Higher accuracy for critical detections
            print("📦 Loading YOLOv8m (Accuracy optimized)...")
            model3 = YOLO('yolov8m.pt')
            self.models.append(model3)
            self.model_weights.append(1.5)  # Highest weight

            print(f"✅ Successfully loaded {len(self.models)} models for ensemble")

        except Exception as e:
            print(f"❌ Error loading models: {e}")
            # Fallback to single model
            print("🔄 Loading fallback single model...")
            model = YOLO('yolov8n.pt')
            self.models = [model]
            self.model_weights = [1.0]

    def weighted_boxes_fusion(self, predictions_list, image_size, iou_threshold=0.6, skip_box_threshold=0.01):
        """
        Implement Weighted Boxes Fusion (WBF) algorithm
        Combines predictions from multiple models intelligently

        Args:
            predictions_list: List of predictions from different models
            image_size: (width, height) of the image
            iou_threshold: IoU threshold for box clustering
            skip_box_threshold: Confidence threshold for filtering boxes

        Returns:
            Fused detection results
        """
        if not predictions_list:
            return [], [], []

        # Prepare data for ensemble-boxes WBF
        boxes_list = []
        scores_list = []
        labels_list = []

        img_width, img_height = image_size

        # Convert each model's predictions to normalized format
        for predictions in predictions_list:
            if predictions is None or len(predictions) == 0:
                # Empty predictions from this model
                boxes_list.append([])
                scores_list.append([])
                labels_list.append([])
                continue

            boxes = []
            scores = []
            labels = []

            # Extract detection data
            for detection in predictions:
                # Get bounding box coordinates (normalized to 0-1)
                x1, y1, x2, y2 = detection[:4]
                x1_norm = x1 / img_width
                y1_norm = y1 / img_height
                x2_norm = x2 / img_width
                y2_norm = y2 / img_height

                # Get confidence and class
                confidence = float(detection[4])
                class_id = int(detection[5])

                # Filter by confidence and relevant classes
                if confidence > skip_box_threshold and class_id in self.AD_CLASSES:
                    boxes.append([x1_norm, y1_norm, x2_norm, y2_norm])
                    scores.append(confidence)
                    labels.append(class_id)

            boxes_list.append(boxes)
            scores_list.append(scores)
            labels_list.append(labels)

        # Apply Weighted Boxes Fusion
        try:
            fused_boxes, fused_scores, fused_labels = ensemble_boxes.weighted_boxes_fusion(
                boxes_list,
                scores_list,
                labels_list,
                weights=self.model_weights,
                iou_thr=iou_threshold,
                skip_box_thr=skip_box_threshold
            )

            # Convert back to pixel coordinates
            final_boxes = []
            for box in fused_boxes:
                x1 = int(box[0] * img_width)
                y1 = int(box[1] * img_height)
                x2 = int(box[2] * img_width)
                y2 = int(box[3] * img_height)
                final_boxes.append([x1, y1, x2, y2])

            return final_boxes, fused_scores, fused_labels

        except Exception as e:
            print(f"⚠️ WBF fusion failed: {e}. Using first model's predictions.")
            # Fallback to first model's predictions
            if predictions_list and len(predictions_list[0]) > 0:
                pred = predictions_list[0]
                boxes = [[int(x) for x in det[:4]] for det in pred if det[4] > skip_box_threshold]
                scores = [float(det[4]) for det in pred if det[4] > skip_box_threshold]
                labels = [int(det[5]) for det in pred if det[4] > skip_box_threshold]
                return boxes, scores, labels
            return [], [], []

    def detect_frame(self, frame):
        """
        Run ensemble detection on a single frame

        Args:
            frame: Input image frame

        Returns:
            Annotated frame with detections
        """
        h, w = frame.shape[:2]
        predictions_list = []

        # Get predictions from each model in the ensemble
        for i, model in enumerate(self.models):
            try:
                # Run inference
                results = model(frame, verbose=False)

                # Extract detections
                if results and len(results) > 0 and results[0].boxes is not None:
                    boxes = results[0].boxes.xyxy.cpu().numpy()  # x1, y1, x2, y2
                    confidences = results[0].boxes.conf.cpu().numpy()
                    class_ids = results[0].boxes.cls.cpu().numpy()

                    # Combine into detection format
                    detections = []
                    for j in range(len(boxes)):
                        detection = [
                            boxes[j][0], boxes[j][1], boxes[j][2], boxes[j][3],  # bbox
                            confidences[j],  # confidence
                            class_ids[j]     # class_id
                        ]
                        detections.append(detection)

                    predictions_list.append(detections)
                else:
                    predictions_list.append([])

            except Exception as e:
                print(f"⚠️ Model {i} inference failed: {e}")
                predictions_list.append([])

        # Apply Weighted Boxes Fusion
        fused_boxes, fused_scores, fused_labels = self.weighted_boxes_fusion(
            predictions_list, (w, h)
        )

        # Draw detections on frame
        annotated_frame = self.draw_detections(frame.copy(), fused_boxes, fused_scores, fused_labels)

        # Update statistics
        self.update_stats(fused_labels)

        return annotated_frame

    def draw_detections(self, frame, boxes, scores, labels):
        """
        Draw detection bounding boxes and labels on frame
        """
        for i, (box, score, label) in enumerate(zip(boxes, scores, labels)):
            if int(label) not in self.AD_CLASSES:
                continue

            class_name = self.AD_CLASSES[int(label)]
            color = self.COLORS.get(class_name, (255, 255, 255))

            # Draw bounding box
            x1, y1, x2, y2 = map(int, box)
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

            # Prepare label text
            label_text = f"{class_name}: {score:.2f}"

            # Draw label background
            (text_width, text_height), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
            cv2.rectangle(frame, (x1, y1 - text_height - 10), (x1 + text_width, y1), color, -1)

            # Draw label text
            cv2.putText(frame, label_text, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

            # Draw model ensemble indicator (small circle)
            cv2.circle(frame, (x2 - 10, y1 + 10), 5, (0, 255, 0), -1)

        return frame

    def update_stats(self, labels):
        """Update detection statistics"""
        for label in labels:
            if int(label) in self.AD_CLASSES:
                class_name = self.AD_CLASSES[int(label)]
                self.stats[class_name] += 1

    def process_video(self, input_path, output_path):
        """
        Process entire video with ensemble detection

        Args:
            input_path: Path to input video file
            output_path: Path for output video file
        """
        print(f"🎬 Processing video: {input_path}")

        # Open video capture
        cap = cv2.VideoCapture(input_path)
        if not cap.isOpened():
            raise Exception(f"❌ Cannot open video file: {input_path}")

        # Get video properties
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        print(f"📊 Video Info: {width}x{height}, {fps} FPS, {total_frames} frames")

        # Setup video writer
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

        # Reset statistics
        self.stats = {class_name: 0 for class_name in self.AD_CLASSES.values()}

        frame_count = 0

        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    break

                # Process frame with ensemble detection
                processed_frame = self.detect_frame(frame)

                # Add frame info overlay
                info_text = f"Frame: {frame_count}/{total_frames} | Ensemble: {len(self.models)} models"
                cv2.putText(processed_frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

                # Write processed frame
                out.write(processed_frame)

                frame_count += 1

                # Progress indicator
                if frame_count % 30 == 0:  # Every 30 frames
                    progress = (frame_count / total_frames) * 100
                    print(f"🔄 Progress: {progress:.1f}% ({frame_count}/{total_frames})")

        finally:
            # Cleanup
            cap.release()
            out.release()
            cv2.destroyAllWindows()

        print(f"✅ Video processing complete: {output_path}")
        print("📈 Detection Statistics:")
        for class_name, count in self.stats.items():
            if count > 0:
                print(f"   {class_name}: {count}")

# =====================================================
# MAIN EXECUTION FUNCTIONS
# =====================================================

def upload_video():
    """Upload video file to Colab"""
    print("📤 Please upload your driving video file...")
    uploaded = files.upload()

    if uploaded:
        filename = list(uploaded.keys())[0]
        print(f"✅ Uploaded: {filename}")
        return filename
    else:
        print("❌ No file uploaded")
        return None

def create_sample_video():
    """Create a sample video for testing (optional)"""
    print("🎥 Creating sample video for testing...")

    # Create a simple test video
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter('sample_driving.mp4', fourcc, 20.0, (640, 480))

    for i in range(100):  # 5 seconds at 20 FPS
        # Create a simple frame with moving objects
        frame = np.random.randint(0, 100, (480, 640, 3), dtype=np.uint8)

        # Add some simple shapes to simulate objects
        cv2.rectangle(frame, (50 + i*2, 200), (100 + i*2, 250), (0, 255, 0), -1)  # Moving car
        cv2.circle(frame, (300, 300), 20, (255, 0, 0), -1)  # Traffic light

        out.write(frame)

    out.release()
    print("✅ Sample video created: sample_driving.mp4")
    return 'sample_driving.mp4'

def main():
    """Main execution function"""
    print("=" * 60)
    print("🚗 ENSEMBLE OBJECT DETECTION FOR AUTONOMOUS DRIVING")
    print("=" * 60)

    # Initialize ensemble system
    ensemble = AutonomousDrivingEnsemble()

    # Load ensemble models
    ensemble.load_ensemble_models()

    # Get input video
    choice = input("📹 Choose option:\n1. Upload your video\n2. Use sample video\nEnter choice (1 or 2): ")

    if choice == "1":
        input_video = upload_video()
        if not input_video:
            print("❌ No video provided. Exiting.")
            return
    else:
        input_video = create_sample_video()

    # Set output filename
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_video = f"autonomous_driving_detected_{timestamp}.mp4"

    try:
        # Process video with ensemble detection
        ensemble.process_video(input_video, output_video)

        # Create results summary
        results_summary = {
            'input_video': input_video,
            'output_video': output_video,
            'models_used': len(ensemble.models),
            'detection_stats': ensemble.stats,
            'processing_timestamp': timestamp
        }

        # Save results summary
        summary_file = f"detection_summary_{timestamp}.json"
        with open(summary_file, 'w') as f:
            json.dump(results_summary, f, indent=2)

        print("\n" + "=" * 60)
        print("🎉 PROCESSING COMPLETE!")
        print("=" * 60)
        print(f"📁 Output video: {output_video}")
        print(f"📊 Summary file: {summary_file}")

        # Display video preview
        if os.path.exists(output_video):
            display(Video(output_video, width=600))

        # Download files
        print("\n📥 Downloading processed files...")
        files.download(output_video)
        files.download(summary_file)

        # Create and download zip archive
        zip_filename = f"autonomous_driving_results_{timestamp}.zip"
        with zipfile.ZipFile(zip_filename, 'w') as zipf:
            zipf.write(output_video)
            zipf.write(summary_file)

        print(f"📦 Created archive: {zip_filename}")
        files.download(zip_filename)

    except Exception as e:
        print(f"❌ Error during processing: {e}")
        import traceback
        traceback.print_exc()

# =====================================================
# UTILITY FUNCTIONS
# =====================================================

def show_detection_summary():
    """Display detection performance summary"""
    print("\n📊 ENSEMBLE DETECTION SYSTEM SUMMARY")
    print("-" * 40)
    print("🎯 Target Classes for Autonomous Driving:")
    print("   • Vehicles: car, truck, bus, motorcycle")
    print("   • Pedestrians: person")
    print("   • Cyclists: bicycle")
    print("   • Infrastructure: traffic_light")
    print("\n🧠 Ensemble Architecture:")
    print("   • YOLOv8n: Real-time detection (Weight: 1.0)")
    print("   • YOLOv8s: Balanced performance (Weight: 1.2)")
    print("   • YOLOv8m: High accuracy (Weight: 1.5)")
    print("\n⚡ Fusion Method:")
    print("   • Weighted Boxes Fusion (WBF)")
    print("   • IoU Threshold: 0.6")
    print("   • Confidence Threshold: 0.01")

# =====================================================
# EXECUTION
# =====================================================

if __name__ == "__main__":
    # Show system information
    show_detection_summary()

    # Run main detection pipeline
    main()

Collecting ultralytics
  Downloading ultralytics-8.3.170-py3-none-any.whl.metadata (37 kB)
Collecting supervision
  Downloading supervision-0.26.1-py3-none-any.whl.metadata (13 kB)
Collecting ensemble-boxes
  Downloading ensemble_boxes-1.0.9-py3-none-any.whl.metadata (728 bytes)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.14-py3-none-any.whl.metadata (9.4 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.8.0->ultra

100%|██████████| 6.25M/6.25M [00:00<00:00, 104MB/s]

📦 Loading YOLOv8s (Balanced performance)...
Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8s.pt to 'yolov8s.pt'...



100%|██████████| 21.5M/21.5M [00:00<00:00, 207MB/s]

📦 Loading YOLOv8m (Accuracy optimized)...
Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8m.pt to 'yolov8m.pt'...



100%|██████████| 49.7M/49.7M [00:00<00:00, 155MB/s]


✅ Successfully loaded 3 models for ensemble
📹 Choose option:
1. Upload your video
2. Use sample video
Enter choice (1 or 2): 1
📤 Please upload your driving video file...
