In [4]:
!pip install ultralytics
!pip install opencv-python
!pip install pandas
!pip install numpy
!pip install matplotlib

Collecting ultralytics
  Using cached ultralytics-8.3.229-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop<=2.0.18 (from ultralytics)
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.3.229-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m11.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.18-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.229 ultralytics-thop-2.0.18


In [6]:
import cv2
import numpy as np
from ultralytics import YOLO
import pandas as pd
import json
from pathlib import Path

class VideoObjectDetector:
    def __init__(self, model_name='yolov8n.pt', conf_threshold=0.5):
        """
        Initialize the object detector

        Args:
            model_name: YOLO model to use
            conf_threshold: Confidence threshold for detections
        """
        self.model = YOLO(model_name)
        self.conf_threshold = conf_threshold
        self.class_names = self.model.names

    def calculate_iou(self, box1, box2):
        """
        Calculate Intersection over Union (IoU) between two bounding boxes

        Args:
            box1: [x1, y1, x2, y2] format
            box2: [x1, y1, x2, y2] format

        Returns:
            iou: IoU value between 0 and 1
        """
        # Calculate intersection area
        x1_inter = max(box1[0], box2[0])
        y1_inter = max(box1[1], box2[1])
        x2_inter = min(box1[2], box2[2])
        y2_inter = min(box1[3], box2[3])

        # Calculate intersection area
        inter_area = max(0, x2_inter - x1_inter) * max(0, y2_inter - y1_inter)

        # Calculate union area
        box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
        box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
        union_area = box1_area + box2_area - inter_area

        # Avoid division by zero
        if union_area == 0:
            return 0

        return inter_area / union_area

    def process_video(self, input_video_path, output_video_path, target_class='person'):
        """
        Process video and detect objects

        Args:
            input_video_path: Path to input video
            output_video_path: Path to save output video
            target_class: Class to detect (e.g., 'person', 'car', etc.)

        Returns:
            results: Dictionary containing detection results
        """
        # Open input video
        cap = cv2.VideoCapture(input_video_path)

        # Get video properties
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        # Initialize video writer
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

        # Storage for results
        all_detections = []
        frame_count = 0

        print(f"Processing video: {input_video_path}")
        print(f"Total frames: {total_frames}")

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Perform detection
            results = self.model(frame, conf=self.conf_threshold, verbose=False)

            # Process detections
            frame_detections = []
            for result in results:
                boxes = result.boxes
                if boxes is not None:
                    for box in boxes:
                        # Get box coordinates and class
                        x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                        conf = box.conf[0].cpu().numpy()
                        class_id = int(box.cls[0].cpu().numpy())
                        class_name = self.class_names[class_id]

                        # Filter by target class
                        if class_name == target_class:
                            frame_detections.append({
                                'frame': frame_count,
                                'class': class_name,
                                'confidence': float(conf),
                                'bbox': [float(x1), float(y1), float(x2), float(y2)]
                            })

                            # Draw bounding box
                            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
                            label = f"{class_name} {conf:.2f}"
                            cv2.putText(frame, label, (int(x1), int(y1)-10),
                                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

            all_detections.extend(frame_detections)

            # Write frame to output video
            out.write(frame)

            frame_count += 1
            if frame_count % 30 == 0:
                print(f"Processed {frame_count}/{total_frames} frames")

        # Release resources
        cap.release()
        out.release()

        return {
            'detections': all_detections,
            'video_info': {
                'fps': fps,
                'width': width,
                'height': height,
                'total_frames': total_frames
            }
        }

    def evaluate_detections(self, detections, ground_truth, iou_threshold=0.5):
        """
        Evaluate detection results against ground truth

        Args:
            detections: List of detected objects
            ground_truth: Dictionary with ground truth data
            iou_threshold: IoU threshold for considering a detection as correct

        Returns:
            evaluation_metrics: Dictionary with evaluation results
        """
        true_positives = 0
        false_positives = 0
        false_negatives = 0
        iou_scores = []

        # Group detections and ground truth by frame
        detections_by_frame = {}
        for det in detections:
            frame = det['frame']
            if frame not in detections_by_frame:
                detections_by_frame[frame] = []
            detections_by_frame[frame].append(det)

        # Evaluate each frame with ground truth
        for frame_num, gt_boxes in ground_truth.items():
            frame_detections = detections_by_frame.get(frame_num, [])
            gt_matched = [False] * len(gt_boxes)
            det_matched = [False] * len(frame_detections)

            # Match detections with ground truth
            for i, gt_box in enumerate(gt_boxes):
                for j, det in enumerate(frame_detections):
                    if not det_matched[j]:
                        iou = self.calculate_iou(gt_box, det['bbox'])
                        if iou >= iou_threshold:
                            true_positives += 1
                            gt_matched[i] = True
                            det_matched[j] = True
                            iou_scores.append(iou)
                            break

            # Count false positives and false negatives
            false_positives += sum([1 for matched in det_matched if not matched])
            false_negatives += sum([1 for matched in gt_matched if not matched])

        # Calculate metrics
        precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
        recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
        f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        mean_iou = np.mean(iou_scores) if iou_scores else 0

        return {
            'true_positives': true_positives,
            'false_positives': false_positives,
            'false_negatives': false_negatives,
            'precision': precision,
            'recall': recall,
            'f1_score': f1_score,
            'mean_iou': mean_iou,
            'total_iou_scores': len(iou_scores)
        }

def create_sample_ground_truth(video_path, output_path, num_frames=3):
    """
    Create sample ground truth data by manually annotating frames

    Args:
        video_path: Path to input video
        output_path: Path to save ground truth data
        num_frames: Number of frames to annotate
    """
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Select frames to annotate
    frame_indices = [int(total_frames * (i+1) / (num_frames+1)) for i in range(num_frames)]

    ground_truth = {}

    for frame_idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
        ret, frame = cap.read()
        if ret:
            print(f"Annotating frame {frame_idx}")
            cv2.imshow('Frame', frame)
            cv2.waitKey(1)

            # Simple manual annotation (in real scenario, use proper annotation tool)
            print("Click and drag to draw bounding boxes. Press 'q' when done.")

            boxes = []
            while True:
                clone = frame.copy()
                for box in boxes:
                    cv2.rectangle(clone, (box[0], box[1]), (box[2], box[3]), (0, 255, 0), 2)

                cv2.imshow('Annotation', clone)
                key = cv2.waitKey(1) & 0xFF

                if key == ord('q'):
                    break
                elif key == ord('r'):
                    # Reset current frame annotations
                    boxes = []
                elif key == ord('a'):
                    # Add bounding box (simulated - in real scenario, use mouse events)
                    # For demo purposes, we'll add a sample box
                    h, w = frame.shape[:2]
                    sample_box = [w//4, h//4, 3*w//4, 3*h//4]
                    boxes.append(sample_box)
                    print(f"Added box: {sample_box}")

            ground_truth[frame_idx] = boxes
            cv2.destroyWindow('Annotation')

    cap.release()
    cv2.destroyAllWindows()

    # Save ground truth
    with open(output_path, 'w') as f:
        json.dump(ground_truth, f, indent=2)

    print(f"Ground truth saved to {output_path}")
    return ground_truth

def generate_report(evaluation_results, output_path):
    """
    Generate evaluation report

    Args:
        evaluation_results: Dictionary with evaluation metrics
        output_path: Path to save the report
    """
    report = f"""
    OBJECT DETECTION EVALUATION REPORT
    ==================================

    Performance Metrics:
    - True Positives: {evaluation_results['true_positives']}
    - False Positives: {evaluation_results['false_positives']}
    - False Negatives: {evaluation_results['false_negatives']}
    - Precision: {evaluation_results['precision']:.3f}
    - Recall: {evaluation_results['recall']:.3f}
    - F1-Score: {evaluation_results['f1_score']:.3f}
    - Mean IoU: {evaluation_results['mean_iou']:.3f}
    - Total IoU Scores: {evaluation_results['total_iou_scores']}

    Interpretation:
    - Precision: Proportion of correct detections among all detections
    - Recall: Proportion of ground truth objects that were detected
    - F1-Score: Harmonic mean of precision and recall
    - IoU: Measure of overlap between predicted and ground truth boxes
    """

    with open(output_path, 'w') as f:
        f.write(report)

    print(f"Report saved to {output_path}")
    return report

def main():
    """
    Main function to run the complete pipeline
    """
    # Initialize detector
    detector = VideoObjectDetector(model_name='yolov8n.pt', conf_threshold=0.5)

    # File paths
    input_video = "/content/input_video.mp4"
    output_video = "output_video.mp4"
    ground_truth_file = "ground_truth.json"
    report_file = "evaluation_report.txt"

    # Step 1: Create sample ground truth (if not exists)
    if not Path(ground_truth_file).exists():
        print("Creating sample ground truth...")
        ground_truth = create_sample_ground_truth(input_video, ground_truth_file)
    else:
        with open(ground_truth_file, 'r') as f:
            ground_truth = json.load(f)

    # Step 2: Process video and detect objects
    print("Processing video for object detection...")
    results = detector.process_video(input_video, output_video, target_class='person')

    # Step 3: Evaluate results
    print("Evaluating detection results...")
    evaluation_results = detector.evaluate_detections(
        results['detections'],
        ground_truth
    )

    # Step 4: Generate report
    print("Generating evaluation report...")
    report = generate_report(evaluation_results, report_file)

    # Print summary
    print("\n" + "="*50)
    print("PROCESSING COMPLETE")
    print("="*50)
    print(f"Input video: {input_video}")
    print(f"Output video: {output_video}")
    print(f"Total frames processed: {results['video_info']['total_frames']}")
    print(f"Total detections: {len(results['detections'])}")
    print(f"Precision: {evaluation_results['precision']:.3f}")
    print(f"Recall: {evaluation_results['recall']:.3f}")
    print(f"F1-Score: {evaluation_results['f1_score']:.3f}")
    print(f"Mean IoU: {evaluation_results['mean_iou']:.3f}")

if __name__ == "__main__":
    main()

Processing video for object detection...
Processing video: /content/input_video.mp4
Total frames: 1407
Processed 30/1407 frames
Processed 60/1407 frames
Processed 90/1407 frames
Processed 120/1407 frames
Processed 150/1407 frames
Processed 180/1407 frames
Processed 210/1407 frames
Processed 240/1407 frames
Processed 270/1407 frames
Processed 300/1407 frames
Processed 330/1407 frames
Processed 360/1407 frames
Processed 390/1407 frames
Processed 420/1407 frames
Processed 450/1407 frames
Processed 480/1407 frames
Processed 510/1407 frames
Processed 540/1407 frames
Processed 570/1407 frames
Processed 600/1407 frames
Processed 630/1407 frames
Processed 660/1407 frames
Processed 690/1407 frames
Processed 720/1407 frames
Processed 750/1407 frames
Processed 780/1407 frames
Processed 810/1407 frames
Processed 840/1407 frames
Processed 870/1407 frames
Processed 900/1407 frames
Processed 930/1407 frames
Processed 960/1407 frames
Processed 990/1407 frames
Processed 1020/1407 frames
Processed 1050/