# Object Detection and Segmentation with YOLOv8

This notebook uses YOLOv8 to perform object detection and segmentation on keyframes from videos, using COCO dataset classes.

## 1. Install required libraries

In [None]:
# Install required libraries
import sys
import subprocess

# Install packages using subprocess
!pip install -q torch torchvision
!pip install -q ultralytics
!pip install -qopencv-python-headless
!pip install -qgdown
!pip install -qmatplotlib
!pip install -qPillow

## 2. Download data from Google Drive

In [None]:
# Configure batch and Google Drive IDs
BATCH_NAME = "L01"  # Batch name (L01, L02, L03, ...)
BATCH_ID = "14MeYV2WBWwldMDGRrpG9s7vz8triwbWr"  # ID for L01.zip

# Create data directory if it doesn't exist
import os
if not os.path.exists('data'):
    os.makedirs('data', exist_ok=True)

# Download batch (keyframes) from Google Drive
print(f"Downloading batch {BATCH_NAME}...")
import gdown
gdown.download(id=BATCH_ID, output=f"data/{BATCH_NAME}.zip", quiet=True)

# Unzip the downloaded file
import zipfile
with zipfile.ZipFile(f"data/{BATCH_NAME}.zip", 'r') as zip_ref:
    zip_ref.extractall('./')

print("Data downloaded successfully!")

# Create directory for detection results
if not os.path.exists('detection_results'):
    os.makedirs('detection_results', exist_ok=True)

print("Data downloaded successfully!")

## 3. Import libraries

In [None]:
# Import required libraries
import os
import json
import glob
import torch
import cv2
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from PIL import Image
from ultralytics import YOLO
# Custom JSON encoder to handle numpy types
class NumpyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        if isinstance(obj, np.floating):
            return float(obj)
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        return super(NumpyEncoder, self).default(obj)

# Handle Google Drive connection if running in Colab
try:
    from google.colab import drive
    try:
        drive.mount('/content/drive')
        print("[INFO] Google Drive connected via Colab")
    except NotImplementedError:
        print("[INFO] Current environment does not support mounting Google Drive via Colab")
        # In Kaggle, data is accessed directly from the current directory
except ImportError:
    print("[INFO] Not running in Colab, skipping Google Drive mount")

## 4. Check environment

In [None]:
# Check GPU
if torch.cuda.is_available():
    device_count = torch.cuda.device_count()
    current_device = torch.cuda.current_device()
    device_name = torch.cuda.get_device_name(current_device)
    
    print(f"Number of GPUs: {device_count}")
    print(f"Current GPU: {current_device}")
    print(f"GPU name: {device_name}")
    print(f"CUDA version: {torch.version.cuda}")
else:
    print("CUDA not available. Check NVIDIA drivers and PyTorch CUDA installation.")

## 5. Configure batch and video indices

In [None]:
# Configure batch processing
START_VIDEO_INDEX = 1  # Start from V001
BATCH_SIZE = 8  # Process 8 videos at a time

# Define paths
BATCH_PATH = BATCH_NAME  # Example: "L01"

# Get list of videos in the batch
videos = sorted(glob.glob(os.path.join(BATCH_PATH, "V*")))

# Define paths
BATCH_PATH = BATCH_NAME  # Example: "L01"

# Get list of videos in the batch
videos = sorted(glob.glob(os.path.join(BATCH_PATH, "V*")))
print(f"Found {len(videos)} video directories in batch {BATCH_PATH}")

# Only process videos from START_VIDEO_INDEX to START_VIDEO_INDEX + BATCH_SIZE - 1
end_idx = min(START_VIDEO_INDEX + BATCH_SIZE - 1, len(videos))
selected_videos = videos[START_VIDEO_INDEX - 1:end_idx]
print(f"Processing {len(selected_videos)} videos: {[os.path.basename(v) for v in selected_videos]}")

## 6. Load YOLOv8 model

In [None]:
# Load YOLOv8 model
device = "cuda" if torch.cuda.is_available() else "cpu"

# Load the YOLOv8 model with segmentation capabilities
model = YOLO("yolov8x-seg.pt")  # Use x-seg for best performance with segmentation

print(f"YOLOv8 model loaded on device: {device}")

# Display the model information and COCO classes
print(f"Model info: YOLOv8 using COCO dataset with {len(model.names)} classes")
print(f"COCO class names: {model.names}")

## 7. Helper functions for object detection and filtering

In [None]:
# Function to detect objects with YOLOv8
def detect_objects(image_path, conf_threshold=0.35):
    """
    Detect objects in an image using YOLOv8 with COCO classes
    
    Returns dictionary with boxes, scores, classes and segmentation masks
    """
    try:
        # Run YOLOv8 inference on the image
        results = model(image_path, conf=conf_threshold)
        
        # Process results
        result = results[0]  # Get the first result (only one image)
        
        # Extract boxes, convert from xyxy format to [x1, y1, x2, y2]
        boxes = []
        scores = []
        class_names = []
        
        # Extract detection information
        for box, cls, conf in zip(result.boxes.xyxy.cpu().numpy(), 
                                 result.boxes.cls.cpu().numpy(), 
                                 result.boxes.conf.cpu().numpy()):
            boxes.append(box.tolist())
            scores.append(conf)
            class_name = model.names[int(cls)]
            class_names.append(class_name)
        
        return {
            "boxes": boxes,
            "scores": scores, 
            "labels": class_names
        }
        
    except Exception as e:
        print(f"Error processing {os.path.basename(image_path)}: {e}")
        return {
            "boxes": [],
            "scores": [],
            "labels": []
        }

# Function to visualize detection results on an image
def visualize_detection(image_path, detection_results):
    # Check if the image file exists
    if not os.path.exists(image_path):
        print(f"Error: Image file does not exist: {image_path}")
        return None
    
    # Try to read the image
    image = cv2.imread(image_path)
    
    # Check if image was successfully loaded
    if image is None:
        print(f"Error: Failed to load image: {image_path}")
        return None
    
    try:
        # Convert color space
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        for box, score, label in zip(detection_results["boxes"], detection_results["scores"], detection_results["labels"]):
            x1, y1, x2, y2 = map(int, box)
            
            # Draw bounding box
            cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
            
            # Draw label and score
            text = f"{label}: {score:.2f}"
            cv2.putText(image, text, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        
        plt.figure(figsize=(12, 8))
        plt.imshow(image)
        plt.axis('off')
        plt.show()
        
        return image
    except Exception as e:
        print(f"Error processing image {image_path}: {e}")
        return None
# Function to calculate IoU (Intersection over Union)
def calculate_iou(box1, box2):
    """Calculate IoU between two bounding boxes"""
    # Box coordinates
    x1_1, y1_1, x2_1, y2_1 = box1
    x1_2, y1_2, x2_2, y2_2 = box2
    
    # Calculate area of each box
    area1 = (x2_1 - x1_1) * (y2_1 - y1_1)
    area2 = (x2_2 - x1_2) * (y2_2 - y1_2)
    
    # Calculate coordinates of intersection
    x1_i = max(x1_1, x1_2)
    y1_i = max(y1_1, y1_2)
    x2_i = min(x2_1, x2_2)
    y2_i = min(y2_1, y2_2)
    
    # Check if there is no intersection
    if x2_i < x1_i or y2_i < y1_i:
        return 0.0
    
    # Calculate area of intersection
    area_intersection = (x2_i - x1_i) * (y2_i - y1_i)
    
    # Calculate IoU
    iou = area_intersection / (area1 + area2 - area_intersection)
    
    return iou

# Function to filter duplicate objects and keep only the highest scoring object
def filter_objects(objects, iou_threshold=0.7, confidence_threshold=0.5):
    """Filter duplicated objects, keep only the highest scoring object for each group of overlapping boxes"""
    # If there are no objects, return empty list
    if not objects:
        return []
    
    # Filter objects based on confidence threshold
    objects = [obj for obj in objects if obj["score"] >= confidence_threshold]
    
    # Sort objects by score in descending order
    sorted_objects = sorted(objects, key=lambda x: x["score"], reverse=True)
    
    # List to store filtered objects
    filtered_objects = []
    
    # Iterate through each object
    for obj in sorted_objects:
        # Check if current object overlaps with any object in filtered_objects
        duplicate = False
        for filtered_obj in filtered_objects:
            # If same object name and IoU greater than threshold
            if obj["object"] == filtered_obj["object"] and \
               calculate_iou(obj["box"], filtered_obj["box"]) > iou_threshold:
                duplicate = True
                break
        
        # If not duplicate, add to filtered list
        if not duplicate:
            filtered_objects.append(obj)
    
    return filtered_objects

# Load YOLOv8 model
device = "cuda" if torch.cuda.is_available() else "cpu"

# Load the YOLOv8 model with segmentation capabilities
model = YOLO("yolov8x-seg.pt")  # Use x-seg for best performance with segmentation

print(f"YOLOv8 model loaded on device: {device}")

# Display the model information and COCO classes
print(f"Model info: YOLOv8 using COCO dataset with {len(model.names)} classes")
print(f"COCO class names: {model.names}")

# Process each video
for video_dir in selected_videos:
    video_name = os.path.basename(video_dir)
    print(f"\nProcessing video: {video_name}")
    
    # Directly find keyframes from directories
    keyframe_dir = f"{BATCH_NAME}/{video_name}"
    print(f"Looking for keyframes in directory: {keyframe_dir}")
    
    # List all JPG files in the keyframe directory
    keyframe_files = glob.glob(os.path.join(keyframe_dir, "*.jpg"))
    
    if not keyframe_files:
        print(f"No keyframes found for {video_name} in {keyframe_dir}")
        continue
        
    print(f"Found {len(keyframe_files)} keyframes in {keyframe_dir}")
    
    # Initialize list to store results
    detection_results = []
    
    # Process each keyframe
    for keyframe_path in keyframe_files:
        keyframe_name = os.path.basename(keyframe_path)
        
        print(f"Processing keyframe: {keyframe_name}")
        
        # Initialize results for current keyframe
        keyframe_results = {
            "keyframe": keyframe_name,
            "caption": "",  # Empty caption since we're not using caption files
            "objects": []
        }
        
        # Run YOLOv8 detection with COCO classes
        results = detect_objects(keyframe_path)
        
        # Add results to the list
        for i, (box, score, label) in enumerate(zip(results["boxes"], results["scores"], results["labels"])):
            keyframe_results["objects"].append({
                "prompt": "COCO classes",  # Using all COCO classes instead of prompts
                "object": label,
                "box": box,
                "score": score
            })
        
        # Apply filter_objects to remove duplicate objects and filter by score
        keyframe_results["objects"] = filter_objects(keyframe_results["objects"])
        
        print(f"Keyframe {keyframe_name}: {len(keyframe_results['objects'])} objects after filtering")
        
        # Add keyframe results to the main list
        detection_results.append(keyframe_results)
    
    # Save results to JSON file
    output_file = os.path.join("detection_results", f"{BATCH_NAME}_{video_name}_detection_yolov8.json")
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(detection_results, f, ensure_ascii=False, indent=4, cls=NumpyEncoder)
    
    print(f"\nSaved detection results for {video_name} to {output_file}")

print("\nObject detection completed for all videos!")

## 9. Visualize some results

In [None]:
# Visualize some detection results (a random example)
# Choose a random keyframe from the results
import random

if len(detection_results) > 0:
    # Select a random keyframe with objects
    valid_keyframes = [kf for kf in detection_results if len(kf['objects']) > 0]
    
    if valid_keyframes:
        sample = random.choice(valid_keyframes)
        keyframe_name = sample['keyframe']
        objects = sample['objects']
        
        # Full path to keyframe
        keyframe_dir = f"{BATCH_NAME}/{video_name}"
        keyframe_path = os.path.join(keyframe_dir, keyframe_name)
        
        # Check if the image file exists
        if os.path.exists(keyframe_path):
            # Prepare detection results in the format needed by visualize_detection
            vis_results = {
                "boxes": [obj['box'] for obj in objects],
                "scores": [obj['score'] for obj in objects],
                "labels": [obj['object'] for obj in objects]
            }
            
            # Visualize
            print(f"Visualizing keyframe: {keyframe_name}")
            print(f"Objects detected: {[obj['object'] for obj in objects]}")
            
            image = visualize_detection(keyframe_path, vis_results)
            
            if image is None:
                print("Failed to visualize the image. Skipping visualization.")
        else:
            print(f"Keyframe file not found: {keyframe_path}")
            print("Skipping visualization.")
            print(f"Objects detected: {[obj['object'] for obj in objects]}")
            print("Try using a different keyframe or check file paths.")
    else:
        print("No keyframes with objects found.")
else:
    print("No detection results available to visualize.")

## 10. Zip detection results for download

In [None]:
# Zip all detection results for easy download
import shutil
import os
from datetime import datetime

# Get timestamp for unique filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
zip_filename = f"yolov8_detection_results_{timestamp}.zip"

# Check if we have detection results
if os.path.exists('detection_results') and os.listdir('detection_results'):
    file_count = len(os.listdir('detection_results'))
    print(f"Found {file_count} detection result files to zip")
    
    # Create zip archive
    shutil.make_archive(
        base_name=zip_filename.split('.')[0],  # Remove .zip extension
        format='zip',
        root_dir='.',
        base_dir='detection_results'
    )
    
    # For Kaggle notebooks: Create a download link
    try:
        from IPython.display import HTML
        import base64
        
        if os.path.exists(zip_filename):
            file_size = os.path.getsize(zip_filename) / (1024 * 1024)  # Size in MB
            print(f"\nZip file created: {zip_filename} ({file_size:.2f} MB)")
            
            # In Kaggle, files can be found in the output section
            print("\nIn Kaggle: Find this file in the 'Output' tab of this notebook.")
        else:
            print("Failed to create zip file")
            
    except ImportError:
        print("IPython display module not available.")
else:
    print("No detection results found to zip.")
    
print("\nDetection process complete!")