# YOLO Dataset Preparation and Training Notebook

This notebook provides a comprehensive workflow for:
1. **Dataset Preparation**: Prepare two types of YOLO datasets - one with background fusion and one with original images
2. **Data Conversion**: Convert JSON annotations to YOLO format
3. **Model Training**: Fine-tune YOLOv8 models on both datasets
4. **Comparison**: Compare performance between original and background-fused datasets

## Prerequisites
- Ultralytics YOLOv8 installed (`pip install ultralytics`)
- Your data directory containing images and JSON annotation files
- Background-fused images (ending with `_fused.jpg`) if using background fusion

## 1. Import Required Libraries

In [2]:
import json
import shutil
import random
import os
import sys
from pathlib import Path
from typing import List, Tuple, Optional, Dict, Any
import matplotlib.pyplot as plt
import pandas as pd
from IPython.display import Image, display
import warnings
warnings.filterwarnings('ignore')

# Check if ultralytics is available
try:
    from ultralytics import YOLO
    import torch
    print("✅ Ultralytics YOLO imported successfully")
    print(f"✅ PyTorch version: {torch.__version__}")
    print(f"✅ CUDA available: {torch.cuda.is_available()}")
    if torch.cuda.is_available():
        print(f"✅ GPU: {torch.cuda.get_device_name()}")
except ImportError:
    print("❌ Ultralytics not found. Install with: pip install ultralytics")

# Set random seed for reproducibility
RANDOM_SEED = 42
random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

print("✅ All libraries imported successfully!")

✅ Ultralytics YOLO imported successfully
✅ PyTorch version: 2.5.1+cu121
✅ CUDA available: True
✅ GPU: NVIDIA GeForce RTX 3080 Laptop GPU
✅ All libraries imported successfully!


## 2. Configuration and Setup

Configure paths and parameters for dataset preparation and training.

In [3]:
# =========================
# CONFIGURATION PARAMETERS
# =========================

# Paths
SOURCE_DIR = Path("out_pro_plus")  # Directory with images and JSON files
OUTPUT_BASE_DIR = Path("datasets")    # Base directory for prepared datasets
PRETRAINED_MODEL_PATH = Path("yolo_model/yolo12x.pt")  

# Dataset parameters
TRAIN_RATIO = 0.8  # 80% for training, 20% for validation
RANDOM_SEED = 42

# Training parameters
TRAINING_CONFIG = {
    'epochs': 100,
    'batch_size': 8,
    'image_size': 1024,
    'learning_rate': 0.001,
    'patience': 20,
    'save_period': 10,
    'workers': 0,  # Use 0 on Windows to avoid DataLoader issues
}

# Display configuration
print("📁 Configuration:")
print(f"   Source Directory: {SOURCE_DIR}")
print(f"   Output Directory: {OUTPUT_BASE_DIR}")
print(f"   Pretrained Model: {PRETRAINED_MODEL_PATH}")
print(f"   Train/Val Split: {TRAIN_RATIO:.1%}/{1-TRAIN_RATIO:.1%}")
print(f"   Training Epochs: {TRAINING_CONFIG['epochs']}")
print(f"   Batch Size: {TRAINING_CONFIG['batch_size']}")
print(f"   Image Size: {TRAINING_CONFIG['image_size']}")

# Check if source directory exists
if SOURCE_DIR.exists():
    jpg_files = list(SOURCE_DIR.glob("*.jpg"))
    fused_files = list(SOURCE_DIR.glob("*_fused.jpg"))
    json_files = list(SOURCE_DIR.glob("*.json"))
    
    print(f"\n📊 Source Directory Analysis:")
    print(f"   Original images: {len(jpg_files) - len(fused_files)}")
    print(f"   Fused images: {len(fused_files)}")
    print(f"   JSON files: {len(json_files)}")
else:
    print(f"⚠️  Source directory {SOURCE_DIR} does not exist!")
    print("   Please update SOURCE_DIR variable or create the directory.")

📁 Configuration:
   Source Directory: out_pro_plus
   Output Directory: datasets
   Pretrained Model: yolo_model\yolo12x.pt
   Train/Val Split: 80.0%/20.0%
   Training Epochs: 100
   Batch Size: 8
   Image Size: 1024

📊 Source Directory Analysis:
   Original images: 7202
   Fused images: 7121
   JSON files: 7121


## 3. YOLO Dataset Preparation Class

This class combines all the functionality from the three original files:
- `prep_dataset.py` - Dataset organization and splitting
- `json_to_yolo.py` - JSON to YOLO format conversion
- `clean_json.py` - JSON file cleanup

In [4]:
class YOLODatasetPreparator:
    """
    Comprehensive YOLO dataset preparation class.
    Combines functionality from prep_dataset.py, json_to_yolo.py, and clean_json.py
    """
    
    def __init__(self, seed: int = 42):
        """Initialize the dataset preparator with random seed."""
        random.seed(seed)
        self.seed = seed
    
    def find_images_and_labels(self, source_dir: Path, use_fused: bool = False) -> List[Tuple[Path, Optional[Path]]]:
        """Find image-label pairs in the source directory."""
        pairs = []
        
        if use_fused:
            # Look for fused images and corresponding JSON files
            fused_images = list(source_dir.glob("*_fused.jpg"))
            print(f"Found {len(fused_images)} fused images in {source_dir}")
            
            for image_path in fused_images:
                # Remove _fused suffix to find corresponding JSON
                json_name = image_path.stem.replace("_fused", "") + ".json"
                json_path = source_dir / json_name
                
                if json_path.exists():
                    pairs.append((image_path, json_path))
                else:
                    # Include images without JSON (no detections = negative examples)
                    pairs.append((image_path, None))
            
            # Add original images that don't have JSON files as negative examples
            # This ensures fused dataset has same negative examples as original dataset
            original_images = [img for img in source_dir.glob("*.jpg") if not img.name.endswith("_fused.jpg")]
            negative_originals = []
            
            for image_path in original_images:
                json_path = source_dir / f"{image_path.stem}.json"
                if not json_path.exists():
                    # This original image has no detections, add it as negative example
                    pairs.append((image_path, None))
                    negative_originals.append(image_path)
            
            if negative_originals:
                print(f"Added {len(negative_originals)} original images without detections as negative examples")
                    
        else:
            # Look for original images and corresponding JSON files
            jpg_images = [img for img in source_dir.glob("*.jpg") if not img.name.endswith("_fused.jpg")]
            print(f"Found {len(jpg_images)} original images in {source_dir}")
            
            for image_path in jpg_images:
                json_path = source_dir / f"{image_path.stem}.json"
                
                if json_path.exists():
                    pairs.append((image_path, json_path))
                else:
                    # Include images without JSON (no detections = negative examples)
                    pairs.append((image_path, None))
        
        # Count positive vs negative examples
        positive_pairs = sum(1 for _, json_path in pairs if json_path is not None)
        negative_pairs = len(pairs) - positive_pairs
        
        print(f"Found {len(pairs)} total images:")
        print(f"  • {positive_pairs} images with detections (positive examples)")
        print(f"  • {negative_pairs} images without detections (negative examples)")
        return pairs
    
    def split_train_val(self, pairs: List[Tuple[Path, Optional[Path]]], train_ratio: float = 0.8) -> Tuple[List[Tuple[Path, Optional[Path]]], List[Tuple[Path, Optional[Path]]]]:
        """Split image-label pairs into train and validation sets."""
        shuffled_pairs = pairs.copy()
        random.shuffle(shuffled_pairs)
        
        train_count = int(len(shuffled_pairs) * train_ratio)
        train_pairs = shuffled_pairs[:train_count]
        val_pairs = shuffled_pairs[train_count:]
        
        print(f"Split: {len(train_pairs)} train, {len(val_pairs)} validation")
        return train_pairs, val_pairs
    
    def setup_yolo_directories(self, yolo_dir: Path):
        """Create YOLO dataset directory structure."""
        dirs_to_create = [
            yolo_dir / "images" / "train",
            yolo_dir / "images" / "val", 
            yolo_dir / "labels" / "train",
            yolo_dir / "labels" / "val"
        ]
        
        for dir_path in dirs_to_create:
            dir_path.mkdir(parents=True, exist_ok=True)
            
        print(f"✅ Created YOLO directory structure in {yolo_dir}")
    
    def convert_json_to_yolo(self, json_path: Optional[Path], output_file: Path) -> int:
        """Convert single JSON file to YOLO format, or create empty file if no JSON."""
        if json_path is None or not json_path.exists():
            # Create empty label file for images with no detections
            with open(output_file, 'w') as f:
                f.write("")  # Empty file
            return 0
        
        with open(json_path, 'r') as f:
            data = json.load(f)
        
        img_width = data['img_width']
        img_height = data['img_height']
        
        yolo_lines = []
        for annotation in data['annotations']:
            x1, y1, x2, y2 = annotation['box']
            
            # Convert to YOLO format (normalized xywh)
            x_center = (x1 + x2) / 2.0 / img_width
            y_center = (y1 + y2) / 2.0 / img_height
            width = (x2 - x1) / img_width
            height = (y2 - y1) / img_height
            
            # Class ID (0 for person)
            class_id = 0
            
            yolo_line = f"{class_id} {x_center:.6f} {y_center:.6f} {width:.6f} {height:.6f}"
            yolo_lines.append(yolo_line)
        
        # Write YOLO format file
        with open(output_file, 'w') as f:
            f.write('\n'.join(yolo_lines))
        
        return len(yolo_lines)
    
    def copy_and_convert_pairs(self, pairs: List[Tuple[Path, Optional[Path]]], yolo_dir: Path, split_name: str):
        """Copy images and convert JSON labels to YOLO format."""
        images_dir = yolo_dir / "images" / split_name
        labels_dir = yolo_dir / "labels" / split_name
        
        copied_images = 0
        total_annotations = 0
        empty_labels = 0
        
        for image_path, json_path in pairs:
            # Copy image
            dest_image = images_dir / image_path.name
            shutil.copy2(image_path, dest_image)
            copied_images += 1
            
            # Convert JSON to YOLO format (or create empty label file)
            label_name = image_path.stem + ".txt"
            dest_label = labels_dir / label_name
            annotations_count = self.convert_json_to_yolo(json_path, dest_label)
            
            if annotations_count == 0:
                empty_labels += 1
            total_annotations += annotations_count
        
        print(f"✅ Copied {copied_images} images and converted {total_annotations} annotations to {split_name} set")
        if empty_labels > 0:
            print(f"   📝 Created {empty_labels} empty label files for images without detections (negative examples)")
        return copied_images, total_annotations
    
    def create_yaml_config(self, yolo_dir: Path, dataset_name: str):
        """Create YOLO dataset configuration YAML file."""
        # Use absolute path to avoid path resolution issues
        absolute_path = yolo_dir.resolve()
        
        yaml_content = f"""# YOLO dataset configuration for {dataset_name}
path: {absolute_path}
train: images/train
val: images/val

names:
  0: person

# Dataset info
nc: 1  # number of classes
"""
        
        yaml_file = yolo_dir / f"{yolo_dir.name}.yaml"
        with open(yaml_file, 'w') as f:
            f.write(yaml_content)
        
        print(f"✅ Created YAML config: {yaml_file}")
        print(f"   📁 Dataset path: {absolute_path}")
        return yaml_file
    
    def prepare_dataset(self, source_dir: Path, output_dir: Path, dataset_name: str, 
                       use_fused: bool = False, train_ratio: float = 0.8):
        """Prepare complete YOLO dataset."""
        print(f"\n{'='*60}")
        print(f"🚀 Preparing {dataset_name} dataset")
        print(f"📁 Source: {source_dir}")
        print(f"📁 Output: {output_dir}")
        print(f"🖼️  Using {'fused' if use_fused else 'original'} images")
        print(f"📊 Train/Val split: {train_ratio:.1%}/{1-train_ratio:.1%}")
        print(f"{'='*60}")
        
        # Find image-label pairs
        pairs = self.find_images_and_labels(source_dir, use_fused)
        if not pairs:
            print(f"❌ No valid image-label pairs found in {source_dir}")
            return None
        
        # Split into train/val
        train_pairs, val_pairs = self.split_train_val(pairs, train_ratio)
        
        # Create directory structure
        self.setup_yolo_directories(output_dir)
        
        # Copy and convert files
        train_images, train_annotations = self.copy_and_convert_pairs(train_pairs, output_dir, "train")
        val_images, val_annotations = self.copy_and_convert_pairs(val_pairs, output_dir, "val")
        
        # Create YAML configuration
        yaml_file = self.create_yaml_config(output_dir, dataset_name)
        
        print(f"\n✅ {dataset_name} dataset prepared!")
        print(f"   📊 Train: {train_images} images, {train_annotations} annotations")
        print(f"   📊 Val: {val_images} images, {val_annotations} annotations")
        print(f"   📄 Config: {yaml_file}")
        
        return {
            'train_images': train_images,
            'train_annotations': train_annotations,
            'val_images': val_images,
            'val_annotations': val_annotations,
            'yaml_file': yaml_file,
            'output_dir': output_dir
        }
    
    def subsample_dataset(self, source_dir: Path, output_dir: Path, dataset_name: str,
                         sample_size: int, use_fused: bool = False, train_ratio: float = 0.8):
        """Create a subsampled dataset with a specific number of images."""
        print(f"\n{'='*60}")
        print(f"🎯 Preparing {dataset_name} dataset (subsample: {sample_size} images)")
        print(f"📁 Source: {source_dir}")
        print(f"📁 Output: {output_dir}")
        print(f"🖼️  Using {'fused' if use_fused else 'original'} images")
        print(f"📊 Train/Val split: {train_ratio:.1%}/{1-train_ratio:.1%}")
        print(f"{'='*60}")
        
        # Find all image-label pairs
        all_pairs = self.find_images_and_labels(source_dir, use_fused)
        if not all_pairs:
            print(f"❌ No valid image-label pairs found in {source_dir}")
            return None
        
        # Check if we have enough images
        if len(all_pairs) < sample_size:
            print(f"⚠️  Warning: Only {len(all_pairs)} images available, using all of them")
            sample_size = len(all_pairs)
        
        # Separate positive and negative examples for balanced subsampling
        positive_pairs = [(img, json_path) for img, json_path in all_pairs if json_path is not None]
        negative_pairs = [(img, json_path) for img, json_path in all_pairs if json_path is None]
        
        print(f"📊 Available data: {len(positive_pairs)} positive, {len(negative_pairs)} negative examples")
        
        # Calculate how many of each type to include (try to maintain balance)
        total_available = len(all_pairs)
        positive_ratio = len(positive_pairs) / total_available
        
        target_positive = min(int(sample_size * positive_ratio), len(positive_pairs))
        target_negative = min(sample_size - target_positive, len(negative_pairs))
        
        # If we don't have enough negatives, take more positives
        if target_negative < sample_size - target_positive:
            target_positive = min(sample_size - target_negative, len(positive_pairs))
        
        print(f"🎯 Subsampling: {target_positive} positive + {target_negative} negative = {target_positive + target_negative} total")
        
        # Randomly sample from each category
        random.shuffle(positive_pairs)
        random.shuffle(negative_pairs)
        
        subsampled_pairs = positive_pairs[:target_positive] + negative_pairs[:target_negative]
        
        # Split into train/val
        train_pairs, val_pairs = self.split_train_val(subsampled_pairs, train_ratio)
        
        # Create directory structure
        self.setup_yolo_directories(output_dir)
        
        # Copy and convert files
        train_images, train_annotations = self.copy_and_convert_pairs(train_pairs, output_dir, "train")
        val_images, val_annotations = self.copy_and_convert_pairs(val_pairs, output_dir, "val")
        
        # Create YAML configuration
        yaml_file = self.create_yaml_config(output_dir, dataset_name)
        
        print(f"\n✅ {dataset_name} dataset prepared!")
        print(f"   📊 Train: {train_images} images, {train_annotations} annotations")
        print(f"   📊 Val: {val_images} images, {val_annotations} annotations")
        print(f"   📄 Config: {yaml_file}")
        
        return {
            'train_images': train_images,
            'train_annotations': train_annotations,
            'val_images': val_images,
            'val_annotations': val_annotations,
            'yaml_file': yaml_file,
            'output_dir': output_dir,
            'sample_size': target_positive + target_negative,
            'positive_samples': target_positive,
            'negative_samples': target_negative
        }

# Initialize the dataset preparator
preparator = YOLODatasetPreparator(seed=RANDOM_SEED)
print("✅ YOLODatasetPreparator initialized!")

✅ YOLODatasetPreparator initialized!


## 4. Prepare Both Datasets

Now we'll prepare both datasets:
1. **Original Dataset**: Using original images without background fusion
2. **Fused Dataset**: Using background-fused images

In [None]:
# Prepare both datasets
datasets = {}

# 1. Prepare Original Dataset (without background fusion)
print("🎯 Step 1: Preparing Original Dataset...")
original_result = preparator.prepare_dataset(
    source_dir=SOURCE_DIR,
    output_dir=OUTPUT_BASE_DIR / "yolo_original",
    dataset_name="Original Images",
    use_fused=False,
    train_ratio=TRAIN_RATIO
)

if original_result:
    datasets['original'] = original_result
    print("✅ Original dataset prepared successfully!")
else:
    print("❌ Failed to prepare original dataset")

# 2. Prepare Fused Dataset (with background fusion)
print("\n🎯 Step 2: Preparing Background-Fused Dataset...")
fused_result = preparator.prepare_dataset(
    source_dir=SOURCE_DIR,
    output_dir=OUTPUT_BASE_DIR / "yolo_fused",
    dataset_name="Background Fused",
    use_fused=True,
    train_ratio=TRAIN_RATIO
)

if fused_result:
    datasets['fused'] = fused_result
    print("✅ Fused dataset prepared successfully!")
else:
    print("❌ Failed to prepare fused dataset")

# Display summary
print(f"\n{'='*60}")
print("📋 DATASET PREPARATION SUMMARY")
print(f"{'='*60}")

for dataset_name, result in datasets.items():
    if result:
        print(f"\n{dataset_name.upper()} Dataset:")
        print(f"  📊 Train: {result['train_images']} images, {result['train_annotations']} annotations")
        print(f"  📊 Val: {result['val_images']} images, {result['val_annotations']} annotations")
        print(f"  📄 Config: {result['yaml_file']}")
        print(f"  📁 Directory: {result['output_dir']}")

print(f"🎉 All datasets ready for YOLO training!")

## 5. Create Subsampled Datasets for Testing

For testing and experimentation, we'll create smaller subsampled versions of both datasets.
This allows for faster training iterations to test different configurations.

In [None]:
# =========================
# SUBSAMPLING CONFIGURATION
# =========================

# Define the subsample sizes you want to test
SUBSAMPLE_SIZES = [500, 1000, 2000, 5000]

# Only create subsamples if we have enough data
def get_available_subsample_sizes(dataset_info):
    """Determine which subsample sizes are feasible based on available data."""
    if not dataset_info:
        return []
    
    total_images = dataset_info['train_images'] + dataset_info['val_images']
    feasible_sizes = [size for size in SUBSAMPLE_SIZES if size <= total_images]
    
    if not feasible_sizes and total_images > 0:
        # If none of the preset sizes work, use the maximum available
        feasible_sizes = [total_images]
    
    return feasible_sizes

# Check feasible sizes for each dataset
print("📊 Checking feasible subsample sizes...")
for dataset_name, dataset_info in datasets.items():
    feasible_sizes = get_available_subsample_sizes(dataset_info)
    print(f"{dataset_name.upper()}: {feasible_sizes} (total available: {dataset_info['train_images'] + dataset_info['val_images'] if dataset_info else 0})")

In [None]:
# =========================
# CREATE SUBSAMPLED DATASETS
# =========================

subsampled_datasets = {}

for dataset_type in ['original', 'fused']:
    if dataset_type not in datasets or not datasets[dataset_type]:
        print(f"⚠️  Skipping {dataset_type} dataset - not available")
        continue

    dataset_info = datasets[dataset_type]
    feasible_sizes = get_available_subsample_sizes(dataset_info)

    if not feasible_sizes:
        print(f"⚠️  No feasible subsample sizes for {dataset_type} dataset")
        continue

    subsampled_datasets[dataset_type] = {}

    print("\n" + "=" * 60)
    print(f"🎯 Creating subsampled {dataset_type.upper()} datasets")
    print("=" * 60)
    print(f"📊 Feasible sizes: {feasible_sizes}")
    print("=" * 60)

    for sample_size in feasible_sizes:
        print("\n" + f"🔸 Creating {sample_size}-image subset...")

        # Create subdirectory for this sample size
        subsample_dir = OUTPUT_BASE_DIR / f"yolo_{dataset_type}_sub{sample_size}"

        # Prepare subsampled dataset
        result = preparator.subsample_dataset(
            source_dir=SOURCE_DIR,
            output_dir=subsample_dir,
            dataset_name=f"{dataset_type.title()} ({sample_size} images)",
            sample_size=sample_size,
            use_fused=(dataset_type == 'fused'),
            train_ratio=TRAIN_RATIO
        )

        if result:
            subsampled_datasets[dataset_type][sample_size] = result
            print(f"✅ {sample_size}-image {dataset_type} subset created!")
        else:
            print(f"❌ Failed to create {sample_size}-image {dataset_type} subset")

# Display summary of all subsampled datasets
print("\n" + "=" * 80)
print("📋 SUBSAMPLED DATASETS SUMMARY")
print("=" * 80 + "\n")

for dataset_type, subsamples in subsampled_datasets.items():
    if subsamples:
        print(f"{dataset_type.upper()} Subsamples:")
        for sample_size, result in subsamples.items():
            print(f"  📊 {sample_size} images:")
            print(f"      Train: {result['train_images']} images, {result['train_annotations']} annotations")
            print(f"      Val:   {result['val_images']} images, {result['val_annotations']} annotations")
            print(f"      Positive/Negative: {result['positive_samples']}/{result['negative_samples']}")
            print(f"      Config: {result['yaml_file']}")
        print()

total_subsamples = sum(len(v) for v in subsampled_datasets.values())
print(f"\n🎉 Created {total_subsamples} subsampled datasets ready for testing!")


## 5. YOLO Training Class

This class handles model training, evaluation, and comparison between datasets.

In [5]:
import os
from pathlib import Path
from typing import Any, Dict

import torch
import pandas as pd
from ultralytics import YOLO
from PIL import Image
from IPython.display import display


class YOLOTrainer:
    """YOLO model training and evaluation class."""

    def __init__(self, pretrained_model_path: Path):
        """Initialize trainer with pretrained model."""
        self.pretrained_model_path = pretrained_model_path
        self.models: Dict[str, YOLO] = {}
        self.training_results: Dict[str, Dict[str, Any]] = {}

    # ------------------------------------------------------------------ #
    # 1) TRAIN
    # ------------------------------------------------------------------ #
    def train_model(
        self,
        dataset_config: Dict[str, Any],
        dataset_name: str,
        training_config: Dict[str, Any],
    ) -> Dict[str, Any] | None:
        """Train YOLO model on specified dataset."""
        print(f"\n{'=' * 60}")
        print(f"🚀 Training YOLO model on {dataset_name} dataset")
        print(f"{'=' * 60}")

        # Fix OpenMP duplicate symbol issue on Windows
        os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

        model = YOLO(str(self.pretrained_model_path))
        yaml_file = dataset_config["yaml_file"]
        project_dir = f"runs/{dataset_name.lower().replace(' ', '_')}"

        print(f"📄 Dataset config: {yaml_file}")
        print(f"📁 Results will be saved to: {project_dir}")
        print("⚙️  Training parameters:")
        for key, value in training_config.items():
            print(f"   {key}: {value}")

        try:
            # -------------------- start training -------------------- #
            results = model.train(
                data=str(yaml_file),
                epochs=training_config["epochs"],
                batch=training_config["batch_size"],
                imgsz=training_config["image_size"],
                lr0=training_config["learning_rate"],
                patience=training_config["patience"],
                save_period=training_config["save_period"],
                workers=training_config["workers"],
                project=project_dir,
                plots=True,
                optimizer="auto",
                warmup_epochs=5,
                cos_lr=True,
                hsv_h=0.015,
                hsv_s=0.7,
                hsv_v=0.4,
                degrees=10.0,
                translate=0.1,
                scale=0.5,
                fliplr=0.5,
                dropout=0.1,
                device=0 if torch.cuda.is_available() else "cpu",
                verbose=True,
            )

            # -------------------- bookkeeping -------------------- #
            self.models[dataset_name] = model
            self.training_results[dataset_name] = {
                "model": model,
                "results": results,
                "project_dir": project_dir,
                "best_model_path": model.ckpt_path,
            }

            print(f"✅ Training completed for {dataset_name}!")
            print(f"📄 Best model saved to: {model.ckpt_path}")

            return self.training_results[dataset_name]

        except Exception as e:
            print(f"❌ Training failed for {dataset_name}: {e}")
            return None

    # ------------------------------------------------------------------ #
    # 2) EVALUATE
    # ------------------------------------------------------------------ #
    def evaluate_model(self, dataset_name: str) -> Dict[str, float] | None:
        """Evaluate trained model performance."""
        if dataset_name not in self.training_results:
            print(f"❌ No training results found for {dataset_name}")
            return None

        results = self.training_results[dataset_name]["results"]

        metrics = {
            "mAP50": float(results.results_dict.get("metrics/mAP50(B)", 0)),
            "mAP50-95": float(results.results_dict.get("metrics/mAP50-95(B)", 0)),
            "precision": float(results.results_dict.get("metrics/precision(B)", 0)),
            "recall": float(results.results_dict.get("metrics/recall(B)", 0)),
            "box_loss": float(results.results_dict.get("train/box_loss", 0)),
            "cls_loss": float(results.results_dict.get("train/cls_loss", 0)),
            "dfl_loss": float(results.results_dict.get("train/dfl_loss", 0)),
        }

        print(f"\n📊 {dataset_name} Model Performance:")
        for metric, value in metrics.items():
            print(f"   {metric}: {value:.4f}")

        return metrics

    # ------------------------------------------------------------------ #
    # 3) COMPARE MANY MODELS
    # ------------------------------------------------------------------ #
    def compare_models(self) -> pd.DataFrame | None:
        """Compare performance between all trained models."""
        if len(self.training_results) < 2:
            print("❌ Need at least 2 trained models for comparison")
            return None

        comparison_data = []

        for dataset_name in self.training_results.keys():
            metrics = self.evaluate_model(dataset_name)
            if metrics:
                metrics["Dataset"] = dataset_name
                comparison_data.append(metrics)

        df = pd.DataFrame(comparison_data)

        if not df.empty:
            cols = ["Dataset"] + [c for c in df.columns if c != "Dataset"]
            df = df[cols]

            print(f"\n{'=' * 60}")
            print("📊 MODEL COMPARISON")
            print(f"{'=' * 60}")
            print(df.to_string(index=False, float_format="%.4f"))

            best_map50 = df.loc[df["mAP50"].idxmax()]
            print(
                f"\n🏆 Best performing model (mAP50): "
                f"{best_map50['Dataset']} ({best_map50['mAP50']:.4f})"
            )

        return df

    # ------------------------------------------------------------------ #
    # 4) VISUALIZE TRAINING CURVES
    # ------------------------------------------------------------------ #
    def visualize_training_curves(self, dataset_name: str):
        """Display training curves for a specific model."""
        if dataset_name not in self.training_results:
            print(f"❌ No training results found for {dataset_name}")
            return

        project_dir = Path(self.training_results[dataset_name]["project_dir"])
        results_img = project_dir / "train" / "results.png"

        if results_img.exists():
            print(f"\n📈 Training curves for {dataset_name}:")
            display(Image(filename=str(results_img)))
        else:
            print(f"❌ Training curves not found at {results_img}")

    # ------------------------------------------------------------------ #
    # 5) QUICK INFERENCE TEST
    # ------------------------------------------------------------------ #
    def test_inference(
        self, dataset_name: str, test_image_path: Path, conf_threshold: float = 0.25
    ):
        """Test model inference on a sample image."""
        if dataset_name not in self.models:
            print(f"❌ No trained model found for {dataset_name}")
            return

        if not test_image_path.exists():
            print(f"❌ Test image not found: {test_image_path}")
            return

        model = self.models[dataset_name]

        print(f"\n🔍 Testing {dataset_name} model on {test_image_path.name}")

        results = model.predict(source=str(test_image_path), conf=conf_threshold, save=True)

        if results and len(results) > 0:
            result = results[0]
            if hasattr(result, "save_dir"):
                result_image = Path(result.save_dir) / test_image_path.name
                if result_image.exists():
                    print("\n📸 Inference result:")
                    display(Image(filename=str(result_image)))
                else:
                    print(f"❌ Result image not found at {result_image}")

            if hasattr(result, "boxes") and result.boxes is not None:
                print(f"\n🎯 Detected {len(result.boxes)} objects")
                for i, box in enumerate(result.boxes):
                    conf = float(box.conf)
                    cls = int(box.cls)
                    print(
                        f"   Detection {i + 1}: class={cls} (person), "
                        f"confidence={conf:.3f}"
                    )
        else:
            print("❌ No inference results returned")


# Example usage
PRETRAINED_MODEL_PATH = Path("yolo12x.pt")
trainer = YOLOTrainer(PRETRAINED_MODEL_PATH)
print("✅ YOLOTrainer initialized!")

✅ YOLOTrainer initialized!


## 6. Train Models on Both Datasets



In [None]:
# Train models on both datasets
# Dictionary lưu lại thông tin huấn luyện của từng tập
training_results = {}

# --------------------------------------------------
# Kiểm tra xem đã có dữ liệu chưa
# --------------------------------------------------
if not datasets:
    print("❌ No datasets prepared. Please run the dataset preparation cells first.")
else:
    print(f"🎯 Starting training on {len(datasets)} datasets...")

    # --------------------------------------------------
    # 1) Huấn luyện trên tập Original
    # --------------------------------------------------
    if "original" in datasets:
        print("\n" + "=" * 60)
        print("🚀 Training Model 1: Original Images Dataset")
        print("=" * 60)

        original_result = trainer.train_model(
            dataset_config=datasets["original"],
            dataset_name="Original Images",
            training_config=TRAINING_CONFIG,
        )

        if original_result:
            training_results["original"] = original_result
            print("✅ Original dataset training completed!")

    # --------------------------------------------------
    # 2) Huấn luyện trên tập Background-Fused
    # --------------------------------------------------
    if "fused" in datasets:
        print("\n" + "=" * 60)
        print("🚀 Training Model 2: Background-Fused Dataset")
        print("=" * 60)

        fused_result = trainer.train_model(
            dataset_config=datasets["fused"],
            dataset_name="Background Fused",
            training_config=TRAINING_CONFIG,
        )

        if fused_result:
            training_results["fused"] = fused_result
            print("✅ Fused dataset training completed!")

    # --------------------------------------------------
    # Tóm tắt
    # --------------------------------------------------
    print(f"\n{'=' * 60}")
    print("🎉 All training completed!")
    print(f"✅ Trained {len(training_results)} models successfully")
    print(f"{'=' * 60}")


## 7. Train Models on Subsampled Datasets


In [None]:
# =========================
# SUBSAMPLED TRAINING CONFIGURATION PER SIZE
# =========================

SUBSAMPLE_TRAINING_CONFIGS = {
    500: {
        'epochs':        200,
        'batch_size':    6,
        'image_size':    640,
        'learning_rate': 5e-4,
        'patience':      40,
        'save_period':   5,
        'workers':       8,
    },
    1000: {
        'epochs':        150,
        'batch_size':    8,
        'image_size':    640,
        'learning_rate': 7e-4,
        'patience':      30,
        'save_period':   5,
        'workers':       8,
    },
    2000: {
        'epochs':        100,
        'batch_size':    8,
        'accumulate':    2,
        'image_size':    640,
        'learning_rate': 1e-3,
        'patience':      25,
        'save_period':   5,
        'workers':       12,
    },
    5000: {
        'epochs':        100,
        'batch_size':    16,
        'image_size':    1024,
        'learning_rate': 1e-3,
        'patience':      20,
        'save_period':   10,
        'workers':       8,
    }
}

print("🎯 Subsampled Training Configurations:")
for size, cfg in SUBSAMPLE_TRAINING_CONFIGS.items():
    print(f"  • {size} images:")
    for k, v in cfg.items():
        print(f"      {k}: {v}")
print()

🎯 Subsampled Training Configurations:
  • 500 images:
      epochs: 200
      batch_size: 6
      image_size: 640
      learning_rate: 0.0005
      patience: 40
      save_period: 5
      workers: 8
  • 1000 images:
      epochs: 150
      batch_size: 8
      image_size: 640
      learning_rate: 0.0007
      patience: 30
      save_period: 5
      workers: 8
  • 2000 images:
      epochs: 100
      batch_size: 16
      image_size: 768
      learning_rate: 0.001
      patience: 25
      save_period: 5
      workers: 12
  • 5000 images:
      epochs: 100
      batch_size: 16
      image_size: 1024
      learning_rate: 0.001
      patience: 20
      save_period: 10
      workers: 8



In [None]:
subsample_training_results = {}

if not subsampled_datasets:
    print("❌ No subsampled datasets prepared. Please run the subsampling cells first.")
else:
    total_models_to_train = sum(len(sub) for sub in subsampled_datasets.values())
    print(f"🎯 Starting training on {total_models_to_train} subsampled datasets...\n")
    
    trained_count = 0
    
    for dataset_type, subsamples in subsampled_datasets.items():
        print(f"\n{'='*70}")
        print(f"🚀 Training {dataset_type.upper()} subsampled models")
        print(f"{'='*70}\n")
        
        subsample_training_results[dataset_type] = {}
        
        for sample_size, dataset_info in subsamples.items():
            trained_count += 1
            model_name = f"{dataset_type}_{sample_size}"
            cfg = SUBSAMPLE_TRAINING_CONFIGS.get(sample_size)
            
            print(f"🔸 [{trained_count}/{total_models_to_train}] Training {model_name}")
            print(f"   Dataset: {sample_size} images "
                  f"({dataset_info['positive_samples']} pos, {dataset_info['negative_samples']} neg)")
            print(f"   Using config for {sample_size} images:")
            for k, v in cfg.items():
                print(f"      {k}: {v}")
            
            # Train the model with the size-specific config
            result = trainer.train_model(
                dataset_config=dataset_info,
                dataset_name=model_name,
                training_config=cfg,
            )
            
            if result:
                subsample_training_results[dataset_type][sample_size] = result
                print(f"✅ {model_name} training completed!\n")
            else:
                print(f"❌ {model_name} training failed!\n")
    
    # Summary
    total_successful = sum(len(r) for r in subsample_training_results.values())
    print(f"\n{'='*70}")
    print("🎉 Subsampled dataset training completed!")
    print(f"✅ Successfully trained {total_successful}/{total_models_to_train} models")
    print(f"{'='*70}")

In [None]:
import torch
print("CUDA available:", torch.cuda.is_available())
print("CUDA devices:", torch.cuda.device_count())
if torch.cuda.is_available():
    print("Current device:", torch.cuda.current_device(), "-", torch.cuda.get_device_name(0))


In [None]:
import os
n_workers = max(1, os.cpu_count() // 2)  
print(f"Setting DataLoader workers = {n_workers}")


## 7.1. Selective Training - Run Specific Subsample Sizes

In [None]:
# =========================
# SELECTIVE TRAINING ON 2000 IMAGES
# =========================

# Specify which sizes to train (start with 2000)
SIZES_TO_TRAIN = [2000]  # Add more sizes like [2000, 1000, 500] as needed

# Initialize results storage if not already done
if 'subsample_training_results' not in locals():
    subsample_training_results = {}

if not subsampled_datasets:
    print("❌ No subsampled datasets prepared. Please run the subsampling cells first.")
else:
    # Filter to only train on specified sizes
    models_to_train = []
    for dataset_type in ['original', 'fused']:
        if dataset_type in subsampled_datasets:
            for sample_size in SIZES_TO_TRAIN:
                if sample_size in subsampled_datasets[dataset_type]:
                    models_to_train.append((dataset_type, sample_size))
    
    if not models_to_train:
        print(f"❌ None of the specified sizes {SIZES_TO_TRAIN} are available in subsampled datasets.")
        print("Available sizes:")
        for dataset_type, subsamples in subsampled_datasets.items():
            print(f"  {dataset_type}: {list(subsamples.keys())}")
    else:
        print(f"🎯 Training on {len(models_to_train)} models with sizes: {SIZES_TO_TRAIN}")
        print(f"📋 Models to train: {[f'{dt}_{size}' for dt, size in models_to_train]}")
        
        # Train each model
        for i, (dataset_type, sample_size) in enumerate(models_to_train, 1):
            model_name = f"{dataset_type}_{sample_size}"
            dataset_info = subsampled_datasets[dataset_type][sample_size]
            cfg = SUBSAMPLE_TRAINING_CONFIGS.get(sample_size)
            
            print(f"\n{'='*70}")
            print(f"🚀 [{i}/{len(models_to_train)}] Training {model_name}")
            print(f"{'='*70}")
            print(f"📊 Dataset: {sample_size} images "
                  f"({dataset_info['positive_samples']} pos, {dataset_info['negative_samples']} neg)")
            print(f"⚙️  Training configuration:")
            for k, v in cfg.items():
                print(f"      {k}: {v}")
            print()
            
            # Initialize nested dict if needed
            if dataset_type not in subsample_training_results:
                subsample_training_results[dataset_type] = {}
            
            # Train the model
            result = trainer.train_model(
                dataset_config=dataset_info,
                dataset_name=model_name,
                training_config=cfg,
            )
            
            if result:
                subsample_training_results[dataset_type][sample_size] = result
                print(f"✅ {model_name} training completed successfully!")
                
                # Show quick evaluation
                metrics = trainer.evaluate_model(model_name)
                if metrics:
                    print(f"📊 Quick Results: mAP50={metrics['mAP50']:.4f}, "
                          f"Precision={metrics['precision']:.4f}, "
                          f"Recall={metrics['recall']:.4f}")
            else:
                print(f"❌ {model_name} training failed!")
            
            print()
        
        # Summary
        successful_models = []
        for dataset_type, sample_size in models_to_train:
            if (dataset_type in subsample_training_results and 
                sample_size in subsample_training_results[dataset_type]):
                successful_models.append(f"{dataset_type}_{sample_size}")
        
        print(f"{'='*70}")
        print("🎉 Selective Training Summary")
        print(f"{'='*70}")
        print(f"✅ Successfully trained: {len(successful_models)}/{len(models_to_train)} models")
        print(f"📋 Completed models: {successful_models}")
        
        if len(successful_models) >= 2:
            print("\n💡 Next steps:")
            print("1. Run the evaluation section to compare results")
            print("2. Add more sizes to SIZES_TO_TRAIN if satisfied with results")
            print("3. Use the analysis section to visualize performance")

In [6]:
# =========================
# RE-RUN TRAINING FOR FUSED 2000-IMAGE DATASET (DIRECT LOAD)
# =========================

from pathlib import Path

# Set paths directly
fused_sub_dir = Path("datasets/yolo_fused_sub2000")
yaml_file = fused_sub_dir / "yolo_fused_sub2000.yaml"

# Minimal dataset_info dict for trainer.train_model
dataset_info = {
    "yaml_file": yaml_file,
    "output_dir": fused_sub_dir,
    # Optionally, add dummy values for display
    "positive_samples": "?", 
    "negative_samples": "?", 
}

# Load config for 2000 images
cfg = SUBSAMPLE_TRAINING_CONFIGS.get(2000)
model_name = "fused_2000"

print(f"🚀 Re-running training for {model_name}")
print(f"   Dataset directory: {fused_sub_dir}")
print(f"   Config file: {yaml_file}")
print("   Training config:")
for k, v in cfg.items():
    print(f"      {k}: {v}")
print()

# Train the model
result = trainer.train_model(
    dataset_config=dataset_info,
    dataset_name=model_name,
    training_config=cfg,
)

if result:
    if "subsample_training_results" not in locals():
        subsample_training_results = {}
    if "fused" not in subsample_training_results:
        subsample_training_results["fused"] = {}
    subsample_training_results["fused"][2000] = result
    print(f"✅ {model_name} training completed successfully!")
    metrics = trainer.evaluate_model(model_name)
    if metrics:
        print(f"📊 Quick Results: mAP50={metrics['mAP50']:.4f}, "
              f"Precision={metrics['precision']:.4f}, "
              f"Recall={metrics['recall']:.4f}")
else:
    print(f"❌ {model_name} training failed!")

🚀 Re-running training for fused_2000
   Dataset directory: datasets\yolo_fused_sub2000
   Config file: datasets\yolo_fused_sub2000\yolo_fused_sub2000.yaml
   Training config:
      epochs: 100
      batch_size: 16
      image_size: 768
      learning_rate: 0.001
      patience: 25
      save_period: 5
      workers: 12


🚀 Training YOLO model on fused_2000 dataset
📄 Dataset config: datasets\yolo_fused_sub2000\yolo_fused_sub2000.yaml
📁 Results will be saved to: runs/fused_2000
⚙️  Training parameters:
   epochs: 100
   batch_size: 16
   image_size: 768
   learning_rate: 0.001
   patience: 25
   save_period: 5
   workers: 12
New https://pypi.org/project/ultralytics/8.3.176 available  Update with 'pip install -U ultralytics'
Ultralytics 8.3.174  Python-3.10.18 torch-2.5.1+cu121 CUDA:0 (NVIDIA GeForce RTX 3080 Laptop GPU, 16384MiB)
[34m[1mengine\trainer: [0magnostic_nms=False, amp=True, augment=False, auto_augment=randaugment, batch=16, bgr=0.0, box=7.5, cache=False, cfg=None, classes=N

[34m[1mtrain: [0mScanning C:\auto-labeling\src\datasets\yolo_fused_sub2000\labels\train.cache... 1600 images, 19 backgrounds, 0 corrupt: 100%|██████████| 1600/1600 [00:00<?, ?it/s]


[34m[1mval: [0mFast image access  (ping: 0.10.0 ms, read: 361.4146.4 MB/s, size: 348.6 KB)


[34m[1mval: [0mScanning C:\auto-labeling\src\datasets\yolo_fused_sub2000\labels\val.cache... 400 images, 4 backgrounds, 0 corrupt: 100%|██████████| 400/400 [00:00<?, ?it/s]


Plotting labels to runs\fused_2000\train5\labels.jpg... 
[34m[1moptimizer:[0m 'optimizer=auto' found, ignoring 'lr0=0.001' and 'momentum=0.937' and determining best 'optimizer', 'lr0' and 'momentum' automatically... 
[34m[1moptimizer:[0m AdamW(lr=0.002, momentum=0.9) with parameter groups 205 weight(decay=0.0), 214 weight(decay=0.0005), 211 bias(decay=0.0)
Image sizes 768 train, 768 val
Using 12 dataloader workers
Logging results to [1mruns\fused_2000\train5[0m
Starting training for 100 epochs...

      Epoch    GPU_mem   box_loss   cls_loss   dfl_loss  Instances       Size


  0%|          | 0/100 [00:30<?, ?it/s]


❌ Training failed for fused_2000: CUDA out of memory. Tried to allocate 486.00 MiB. GPU 0 has a total capacity of 16.00 GiB of which 0 bytes is free. Of the allocated memory 23.97 GiB is allocated by PyTorch, and 404.86 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)
❌ fused_2000 training failed!


In [None]:
# =========================
# QUICK EVALUATION OF 2000-IMAGE MODELS
# =========================

# Check if we have trained models for the 2000-image size
trained_2000_models = []
for dataset_type in ['original', 'fused']:
    model_name = f"{dataset_type}_2000"
    if model_name in trainer.models:
        trained_2000_models.append(model_name)

if trained_2000_models:
    print(f"📊 Quick evaluation of {len(trained_2000_models)} trained 2000-image models")
    print("=" * 70)
    
    comparison_data = []
    
    for model_name in trained_2000_models:
        print(f"\n🔍 Evaluating {model_name}...")
        metrics = trainer.evaluate_model(model_name)
        if metrics:
            metrics['Model'] = model_name
            comparison_data.append(metrics)
    
    if comparison_data:
        import pandas as pd
        df_2000 = pd.DataFrame(comparison_data)
        
        # Reorder columns
        cols = ['Model'] + [c for c in df_2000.columns if c != 'Model']
        df_2000 = df_2000[cols]
        
        print(f"\n📋 2000-Image Model Comparison:")
        print("=" * 70)
        print(df_2000.to_string(index=False, float_format='%.4f'))
        
        # Find best model
        if len(df_2000) > 1:
            best_model = df_2000.loc[df_2000['mAP50'].idxmax()]
            print(f"\n🏆 Best 2000-image model: {best_model['Model']} (mAP50: {best_model['mAP50']:.4f})")
            
            # Compare original vs fused
            original_row = df_2000[df_2000['Model'].str.contains('original')]
            fused_row = df_2000[df_2000['Model'].str.contains('fused')]
            
            if not original_row.empty and not fused_row.empty:
                orig_map50 = original_row['mAP50'].iloc[0]
                fused_map50 = fused_row['mAP50'].iloc[0]
                improvement = fused_map50 - orig_map50
                
                print(f"\n📈 Background Fusion Impact:")
                print(f"   Original: {orig_map50:.4f} mAP50")
                print(f"   Fused:    {fused_map50:.4f} mAP50")
                print(f"   Change:   {improvement:+.4f} mAP50 ({improvement/orig_map50*100:+.1f}%)")
        
        # Save results
        results_file = OUTPUT_BASE_DIR / "2000_image_model_comparison.csv"
        df_2000.to_csv(results_file, index=False)
        print(f"\n💾 Results saved to: {results_file}")
        
else:
    print("❌ No 2000-image models found. Please run the training cell above first.")
    print("💡 Make sure SIZES_TO_TRAIN includes 2000 and run the training cell.")

## 8. Evaluate and Compare Models

Let's evaluate the performance of both models and compare their results.

In [None]:
# --------------------------------------------------
# Evaluate model performance for all trained models
# --------------------------------------------------
if trainer.training_results:
    print("📊 Evaluating model performance...")

    # Build a comparison DataFrame
    comparison_df = trainer.compare_models()

    if comparison_df is not None and not comparison_df.empty:
        import matplotlib.pyplot as plt

        # Create a 2 × 2 subplot grid
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        fig.suptitle("Model Performance Comparison", fontsize=16)

        # --------------------------------------------------
        # 1) mAP@0.5
        # --------------------------------------------------
        axes[0, 0].bar(comparison_df["Dataset"], comparison_df["mAP50"])
        axes[0, 0].set_title("mAP@0.5")
        axes[0, 0].set_ylabel("mAP50")
        axes[0, 0].tick_params(axis="x", rotation=45)

        # --------------------------------------------------
        # 2) mAP@0.5:0.95
        # --------------------------------------------------
        axes[0, 1].bar(comparison_df["Dataset"], comparison_df["mAP50-95"])
        axes[0, 1].set_title("mAP@0.5:0.95")
        axes[0, 1].set_ylabel("mAP50-95")
        axes[0, 1].tick_params(axis="x", rotation=45)

        # --------------------------------------------------
        # 3) Precision vs Recall
        # --------------------------------------------------
        axes[1, 0].scatter(
            comparison_df["recall"],
            comparison_df["precision"],
            s=100,
            zorder=3,
        )
        for i, dataset in enumerate(comparison_df["Dataset"]):
            axes[1, 0].annotate(
                dataset,
                (comparison_df["recall"].iloc[i], comparison_df["precision"].iloc[i]),
                xytext=(5, 5),
                textcoords="offset points",
            )
        axes[1, 0].set_xlabel("Recall")
        axes[1, 0].set_ylabel("Precision")
        axes[1, 0].set_title("Precision vs Recall")
        axes[1, 0].grid(True, linestyle="--", alpha=0.4)

        # --------------------------------------------------
        # 4) Losses (box, cls, dfl)
        # --------------------------------------------------
        loss_cols = ["box_loss", "cls_loss", "dfl_loss"]
        x = range(len(comparison_df))
        width = 0.25

        axes[1, 1].bar([i - width for i in x], comparison_df["box_loss"],  width, label="Box Loss")
        axes[1, 1].bar([i for i in x],          comparison_df["cls_loss"],  width, label="Class Loss")
        axes[1, 1].bar([i + width for i in x],  comparison_df["dfl_loss"],  width, label="DFL Loss")

        axes[1, 1].set_xlabel("Dataset")
        axes[1, 1].set_ylabel("Loss")
        axes[1, 1].set_title("Training Losses")
        axes[1, 1].set_xticks(x)
        axes[1, 1].set_xticklabels(comparison_df["Dataset"])
        axes[1, 1].legend()

        # Improve layout
        plt.tight_layout(rect=[0, 0.03, 1, 0.95])
        plt.show()

        # --------------------------------------------------
        # Save the comparison table
        # --------------------------------------------------
        comparison_file = OUTPUT_BASE_DIR / "model_comparison.csv"
        comparison_df.to_csv(comparison_file, index=False)
        print(f"\n💾 Model comparison saved to: {comparison_file}")

else:
    print("❌ No trained models found. Please run the training cells first.")


## 9. Visualize Training Curves

Display training curves for both models to understand training progress.

In [None]:
# Display training curves for all models
if trainer.training_results:
    for dataset_name in trainer.training_results:
        trainer.visualize_training_curves(dataset_name)
else:
    print("❌ No training results available for visualization.")


## 10. Test Model Inference

Test both models on sample images to see their detection performance.

In [None]:
# Test inference on sample images
if trainer.models and datasets:
    print("🔍 Testing model inference...")

    # Find sample test images from validation sets
    test_images = []

    for dataset_name, dataset_info in datasets.items():
        val_images_dir = dataset_info['output_dir'] / 'images' / 'val'
        if val_images_dir.exists():
            # Get first 2 validation images as test samples
            sample_images = list(val_images_dir.glob('*.jpg'))[:2]
            for img in sample_images:
                test_images.append((dataset_name, img))

    if test_images:
        print(f"\n📸 Testing on {len(test_images)} sample images...")

        # Test each model on sample images
        conf_threshold = 0.3

        for model_name in trainer.models.keys():
            print(f"\n{'='*50}")
            print(f"🤖 Testing {model_name} model")
            print(f"{'='*50}")

            for i, (dataset_source, test_image) in enumerate(test_images[:2]):  # Test on first 2 images
                print(f"\n📷 Test image {i+1}: {test_image.name} (from {dataset_source} dataset)")
                trainer.test_inference(model_name, test_image, conf_threshold)

                # Add some spacing between results
                print("\n" + "-"*40)

    else:
        print("❌ No test images found in validation sets.")
        print("💡 You can manually specify a test image path in the next cell.")

else:
    print("❌ No trained models or datasets available for testing.")
    print("💡 Please run the dataset preparation and training cells first.")


## 10. Custom Inference Testing (Optional)

Test your models on custom images by specifying the path below.

In [None]:
# Custom inference testing - modify the path below
CUSTOM_TEST_IMAGE_PATH = Path("path/to/your/test/image.jpg")  # Update this path
CONF_THRESHOLD = 0.25  # Confidence threshold for detections

# Test custom image if path is provided and exists
if CUSTOM_TEST_IMAGE_PATH.exists() and trainer.models:
    print(f"🔍 Testing custom image: {CUSTOM_TEST_IMAGE_PATH}")

    # Test all trained models on the custom image
    for model_name in trainer.models.keys():
        print(f"\n{'='*50}")
        print(f"🤖 {model_name} Model Results")
        print(f"{'='*50}")

        trainer.test_inference(model_name, CUSTOM_TEST_IMAGE_PATH, CONF_THRESHOLD)

else:
    if not CUSTOM_TEST_IMAGE_PATH.exists():
        print("💡 To test a custom image:")
        print("   1. Update CUSTOM_TEST_IMAGE_PATH variable above")
        print("   2. Make sure the image file exists")
        print("   3. Re-run this cell")

    if not trainer.models:
        print("❌ No trained models available for testing.")


## 11. Analyze Subsampled Model Performance

Let's analyze how dataset size affects model performance by comparing results across different subsample sizes.

In [None]:
# =========================
# ANALYZE SUBSAMPLED MODEL PERFORMANCE
# =========================

if subsample_training_results:
    print("📊 Analyzing subsampled model performance...")
    
    # Collect performance data for all subsampled models
    subsample_performance_data = []
    
    for dataset_type, results in subsample_training_results.items():
        for sample_size, result in results.items():
            model_name = f"{dataset_type}_{sample_size}"
            
            # Evaluate this model
            metrics = trainer.evaluate_model(model_name)
            if metrics:
                metrics.update({
                    'dataset_type': dataset_type,
                    'sample_size': sample_size,
                    'model_name': model_name
                })
                subsample_performance_data.append(metrics)
    
    if subsample_performance_data:
        import matplotlib.pyplot as plt
        import numpy as np
        
        df_subsample = pd.DataFrame(subsample_performance_data)
        
        # Create comprehensive analysis plots
        fig, axes = plt.subplots(2, 3, figsize=(18, 12))
        fig.suptitle('Impact of Dataset Size on YOLO Performance', fontsize=16)
        
        # Plot for each dataset type
        for dataset_type in ['original', 'fused']:
            if dataset_type in df_subsample['dataset_type'].values:
                subset = df_subsample[df_subsample['dataset_type'] == dataset_type].sort_values('sample_size')
                color = 'blue' if dataset_type == 'original' else 'red'
                
                # mAP@0.5 vs Dataset Size
                axes[0, 0].plot(subset['sample_size'], subset['mAP50'], 
                              marker='o', label=f'{dataset_type.title()}', color=color)
                
                # mAP@0.5:0.95 vs Dataset Size
                axes[0, 1].plot(subset['sample_size'], subset['mAP50-95'], 
                              marker='o', label=f'{dataset_type.title()}', color=color)
                
                # Precision vs Dataset Size
                axes[0, 2].plot(subset['sample_size'], subset['precision'], 
                              marker='o', label=f'{dataset_type.title()}', color=color)
                
                # Recall vs Dataset Size
                axes[1, 0].plot(subset['sample_size'], subset['recall'], 
                              marker='o', label=f'{dataset_type.title()}', color=color)
                
                # Box Loss vs Dataset Size
                axes[1, 1].plot(subset['sample_size'], subset['box_loss'], 
                              marker='o', label=f'{dataset_type.title()}', color=color)
                
                # Class Loss vs Dataset Size
                axes[1, 2].plot(subset['sample_size'], subset['cls_loss'], 
                              marker='o', label=f'{dataset_type.title()}', color=color)
        
        # Configure plots
        axes[0, 0].set_title('mAP@0.5 vs Dataset Size')
        axes[0, 0].set_xlabel('Dataset Size')
        axes[0, 0].set_ylabel('mAP@0.5')
        axes[0, 0].legend()
        axes[0, 0].grid(True, alpha=0.3)
        
        axes[0, 1].set_title('mAP@0.5:0.95 vs Dataset Size')
        axes[0, 1].set_xlabel('Dataset Size')
        axes[0, 1].set_ylabel('mAP@0.5:0.95')
        axes[0, 1].legend()
        axes[0, 1].grid(True, alpha=0.3)
        
        axes[0, 2].set_title('Precision vs Dataset Size')
        axes[0, 2].set_xlabel('Dataset Size')
        axes[0, 2].set_ylabel('Precision')
        axes[0, 2].legend()
        axes[0, 2].grid(True, alpha=0.3)
        
        axes[1, 0].set_title('Recall vs Dataset Size')
        axes[1, 0].set_xlabel('Dataset Size')
        axes[1, 0].set_ylabel('Recall')
        axes[1, 0].legend()
        axes[1, 0].grid(True, alpha=0.3)
        
        axes[1, 1].set_title('Box Loss vs Dataset Size')
        axes[1, 1].set_xlabel('Dataset Size')
        axes[1, 1].set_ylabel('Box Loss')
        axes[1, 1].legend()
        axes[1, 1].grid(True, alpha=0.3)
        
        axes[1, 2].set_title('Class Loss vs Dataset Size')
        axes[1, 2].set_xlabel('Dataset Size')
        axes[1, 2].set_ylabel('Class Loss')
        axes[1, 2].legend()
        axes[1, 2].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        # Print detailed comparison table
        print(f"\n{'='*100}")
        print("📊 DETAILED SUBSAMPLED MODEL COMPARISON")
        print(f"{'='*100}")
        
        # Sort by dataset type and sample size
        df_display = df_subsample.sort_values(['dataset_type', 'sample_size'])
        
        # Select key columns for display
        display_cols = ['dataset_type', 'sample_size', 'mAP50', 'mAP50-95', 'precision', 'recall']
        print(df_display[display_cols].to_string(index=False, float_format='%.4f'))
        
        # Find best performing models at each size
        print(f"\n🏆 BEST MODELS BY DATASET SIZE:")
        for size in sorted(df_subsample['sample_size'].unique()):
            size_data = df_subsample[df_subsample['sample_size'] == size]
            best_idx = size_data['mAP50'].idxmax()
            best_model = size_data.loc[best_idx]
            print(f"   📊 {size} images: {best_model['model_name']} (mAP50: {best_model['mAP50']:.4f})")
        
        # Analyze performance trends
        print(f"\n📈 PERFORMANCE TRENDS:")
        for dataset_type in ['original', 'fused']:
            if dataset_type in df_subsample['dataset_type'].values:
                subset = df_subsample[df_subsample['dataset_type'] == dataset_type].sort_values('sample_size')
                if len(subset) > 1:
                    map50_improvement = subset['mAP50'].iloc[-1] - subset['mAP50'].iloc[0]
                    size_range = f"{subset['sample_size'].iloc[0]}-{subset['sample_size'].iloc[-1]}"
                    print(f"   🎯 {dataset_type.title()}: mAP50 change from {size_range} images: {map50_improvement:+.4f}")
        
        # Save detailed results
        subsample_results_file = OUTPUT_BASE_DIR / "subsample_performance_analysis.csv"
        df_subsample.to_csv(subsample_results_file, index=False)
        print(f"\n💾 Detailed subsampled results saved to: {subsample_results_file}")
        
    else:
        print("❌ No subsampled model performance data available")
        
else:
    print("❌ No subsampled models trained. Please run the subsampled training cells first.")

## 12. Summary and Next Steps

This notebook has provided a complete workflow for YOLO dataset preparation and model training.

In [None]:
# Final summary and recommendations
print("📋 COMPREHENSIVE WORKFLOW SUMMARY")
print("=" * 70)

# Dataset summary
if datasets:
    print("✅ Full Dataset Preparation:")
    for dataset_name, info in datasets.items():
        print(f"   • {dataset_name.title()} dataset: {info['train_images']}T + {info['val_images']}V images")
        print(f"     Config: {info['yaml_file']}")
else:
    print("❌ No full datasets prepared")

# Subsampled dataset summary
if subsampled_datasets:
    total_subsamples = sum(len(subsamples) for subsamples in subsampled_datasets.values())
    print(f"\n✅ Subsampled Dataset Preparation:")
    print(f"   • Created {total_subsamples} subsampled datasets for testing")
    for dataset_type, subsamples in subsampled_datasets.items():
        sizes = list(subsamples.keys())
        print(f"   • {dataset_type.title()} subsamples: {sizes} images")
else:
    print("\n❌ No subsampled datasets prepared")

# Full model training summary
if trainer.training_results:
    print(f"\n✅ Full Model Training:")
    for model_name in trainer.training_results.keys():
        result = trainer.training_results[model_name]
        print(f"   • {model_name} model trained")
        print(f"     Best weights: {result['best_model_path']}")
        print(f"     Results: {result['project_dir']}")
else:
    print("\n❌ No full models trained")

# Subsampled model training summary
if subsample_training_results:
    total_subsample_models = sum(len(results) for results in subsample_training_results.values())
    print(f"\n✅ Subsampled Model Training:")
    print(f"   • Trained {total_subsample_models} models on different dataset sizes")
    for dataset_type, results in subsample_training_results.items():
        sizes = list(results.keys())
        print(f"   • {dataset_type.title()}: {sizes} image subsets")
else:
    print("\n❌ No subsampled models trained")

# Next steps and recommendations
print(f"\n🔍 RECOMMENDED ANALYSIS:")
print("1. 📊 Compare full dataset models to determine: Original vs Background-Fused")
print("2. 📈 Analyze subsampled results to understand: Impact of dataset size")
print("3. 🎯 Choose optimal approach based on:")
print("   - Performance requirements (mAP targets)")
print("   - Training time constraints")
print("   - Data availability")

print(f"\n🚀 DEPLOYMENT OPTIONS:")
if trainer.models:
    print("Choose your best performing model for deployment:")
    for model_name in trainer.models.keys():
        result = trainer.training_results[model_name]
        print(f"\n   📦 {model_name} model:")
        print("   ```python")
        print(f"   from ultralytics import YOLO")
        print(f"   model = YOLO('{result['best_model_path']}')")
        print("   results = model.predict('your_image.jpg', conf=0.25)")
        print("   ```")

# Output files summary
print("\n📁 Generated Files:")
if OUTPUT_BASE_DIR.exists():
    print(f"   • Datasets: {OUTPUT_BASE_DIR}")
    print("   • Training runs: runs/")
    if (OUTPUT_BASE_DIR / 'model_comparison.csv').exists():
        print(f"   • Comparison: {OUTPUT_BASE_DIR}/model_comparison.csv")

print("\n🎉 Workflow completed successfully!")
print("💡 You can now use your trained YOLO models for person detection.")
