# Stratified k-fold splits generation
generate stratified k fold splits in `datasets/k_fold_cv`

In [None]:
import sklearn
from sklearn.model_selection import StratifiedKFold
from pathlib import Path
import yaml
import shutil
import random
import numpy as np
from collections import Counter

# dataset path
DATASET_DIR = Path("datasets/roboflow")
OUTPUT_DIR = Path("datasets/k_fold_cv")

# load the train set of the dataset
train_set_path = DATASET_DIR / "train" / "images"
train_labels_path = DATASET_DIR / "train" / "labels"
if not train_set_path.exists():
    raise FileNotFoundError(f"Train set path {train_set_path} does not exist.")
if not train_labels_path.exists():
    raise FileNotFoundError(f"Train labels path {train_labels_path} does not exist.")

# load the data.yaml file
data_yaml_path = DATASET_DIR / "data.yaml"
if not data_yaml_path.exists():
    raise FileNotFoundError(f"Data YAML file {data_yaml_path} does not exist.")
with open(data_yaml_path, 'r') as file:
    data_yaml = yaml.safe_load(file)

print("Analyzing dataset for stratified splitting...")

# Get the list of image files once, sorted for reproducibility
image_files = sorted(list(train_set_path.glob("*.jpg")))
print(f"Found {len(image_files)} training images")

# Create labels for stratified splitting based on dominant class in each image
image_labels = []
class_distribution = Counter()

for img_file in image_files:
    label_file = train_labels_path / f"{img_file.stem}.txt"
    
    if not label_file.exists():
        # If no label file exists, assign a default class (e.g., 0)
        dominant_class = 0
    else:
        # Read all classes in this image and find the dominant one
        classes_in_image = []
        with open(label_file, 'r') as f:
            for line in f:
                if line.strip():
                    parts = line.strip().split()
                    if len(parts) >= 5:
                        class_id = int(float(parts[0]))
                        classes_in_image.append(class_id)
        
        if classes_in_image:
            # Use the most frequent class in the image as the dominant class
            class_counts = Counter(classes_in_image)
            dominant_class = class_counts.most_common(1)[0][0]
        else:
            dominant_class = 0
    
    image_labels.append(dominant_class)
    class_distribution[dominant_class] += 1

print("Class distribution in original dataset:")
for class_id, count in sorted(class_distribution.items()):
    percentage = (count / len(image_files)) * 100
    print(f"  Class {class_id}: {count} images ({percentage:.1f}%)")

# Convert to numpy arrays
image_files = np.array(image_files)
image_labels = np.array(image_labels)

# Create a parametric stratified cross validation splitter
k_folds = 5
skf = StratifiedKFold(n_splits=k_folds, shuffle=True, random_state=17)

# create the output directory if it does not exist
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)   

print(f"\nCreating {k_folds} stratified folds...")

# iterate over the folds
for fold, (train_index, val_index) in enumerate(skf.split(image_files, image_labels)):
    print(f"\nProcessing fold {fold}/{k_folds}")
    fold_dir = OUTPUT_DIR / f"fold_{fold}"
    fold_dir.mkdir(parents=True, exist_ok=True)

    # Create train/val images and labels directories
    train_images_dir = fold_dir / "train" / "images"
    train_labels_dir = fold_dir / "train" / "labels"
    val_images_dir = fold_dir / "val" / "images"
    val_labels_dir = fold_dir / "val" / "labels"
    train_images_dir.mkdir(parents=True, exist_ok=True)
    train_labels_dir.mkdir(parents=True, exist_ok=True)
    val_images_dir.mkdir(parents=True, exist_ok=True)
    val_labels_dir.mkdir(parents=True, exist_ok=True)

    # Analyze class distribution for this fold
    train_class_dist = Counter()
    val_class_dist = Counter()
    
    for idx in train_index:
        train_class_dist[image_labels[idx]] += 1
    for idx in val_index:
        val_class_dist[image_labels[idx]] += 1

    print(f"  Train set: {len(train_index)} images")
    for class_id, count in sorted(train_class_dist.items()):
        percentage = (count / len(train_index)) * 100
        print(f"    Class {class_id}: {count} images ({percentage:.1f}%)")
    
    print(f"  Val set: {len(val_index)} images")
    for class_id, count in sorted(val_class_dist.items()):
        percentage = (count / len(val_index)) * 100
        print(f"    Class {class_id}: {count} images ({percentage:.1f}%)")

    # Copy images
    for idx in train_index:
        src = image_files[idx]
        shutil.copy(src, train_images_dir / src.name)
    for idx in val_index:
        src = image_files[idx]
        shutil.copy(src, val_images_dir / src.name)

    fold_labels_dir = DATASET_DIR / "train" / "labels"

    # Copy labels
    for idx in train_index:
        label_file = fold_labels_dir / f"{image_files[idx].stem}.txt"
        if label_file.exists():
            shutil.copy(label_file, train_labels_dir / label_file.name)
    for idx in val_index:
        label_file = fold_labels_dir / f"{image_files[idx].stem}.txt"
        if label_file.exists():
            shutil.copy(label_file, val_labels_dir / label_file.name)

    # Update data.yaml for the current fold
    fold_data_yaml = data_yaml.copy()
    fold_data_yaml['train'] = 'train/images'
    fold_data_yaml['val'] = 'val/images'
    
    # Add metadata about the fold
    fold_data_yaml['# Fold Info'] = {
        'fold_index': fold,
        'total_folds': k_folds,
        'train_images': len(train_index),
        'val_images': len(val_index),
        'stratified': True,
        'random_state': 17
    }

    with open(fold_dir / "data.yaml", 'w') as file:
        yaml.dump(fold_data_yaml, file, default_flow_style=False)

print(f"\n‚úÖ Successfully created {k_folds} stratified folds!")
print(f"Output directory: {OUTPUT_DIR}")
print("\nEach fold maintains the same class distribution as the original training set.")

In [None]:
import matplotlib.pyplot as plt
import cv2
import random

def plot_fold_examples(fold_dir, num_examples=5):
    train_images_dir = fold_dir / "train" / "images"
    val_images_dir = fold_dir / "val" / "images"
    train_labels_dir = fold_dir / "train" / "labels"
    val_labels_dir = fold_dir / "val" / "labels"
    
    train_images = sorted(train_images_dir.glob("*.jpg"))
    val_images = sorted(val_images_dir.glob("*.jpg"))
    
    # Randomly sample images (without replacement)
    train_samples = random.sample(train_images, min(num_examples, len(train_images)))
    val_samples = random.sample(val_images, min(num_examples, len(val_images)))
    
    fig, axes = plt.subplots(2, num_examples, figsize=(20, 5))
    
    for i in range(num_examples):
        # Train images
        if i < len(train_samples):
            img = cv2.imread(str(train_samples[i]))
            label_file = train_labels_dir / f"{train_samples[i].stem}.txt"
            if label_file.exists():
                with open(label_file, 'r') as f:
                    for line in f:
                        parts = line.strip().split()
                        class_id = int(float(parts[0]))
                        x_center, y_center, width, height = map(float, parts[1:])
                        img_height, img_width = img.shape[:2]
                        x1 = int((x_center - width / 2) * img_width)
                        y1 = int((y_center - height / 2) * img_height)
                        x2 = int((x_center + width / 2) * img_width)
                        y2 = int((y_center + height / 2) * img_height)
                        cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2)
                        cv2.putText(img, str(class_id), (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
            axes[0, i].imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
            axes[0, i].set_title(f"Train Image {i+1}")
            axes[0, i].axis('off')
        else:
            axes[0, i].axis('off')
        
        # Val images
        if i < len(val_samples):
            img = cv2.imread(str(val_samples[i]))
            label_file = val_labels_dir / f"{val_samples[i].stem}.txt"
            if label_file.exists():
                with open(label_file, 'r') as f:
                    for line in f:
                        parts = line.strip().split()
                        class_id = int(float(parts[0]))
                        x_center, y_center, width, height = map(float, parts[1:])
                        img_height, img_width = img.shape[:2]
                        x1 = int((x_center - width / 2) * img_width)
                        y1 = int((y_center - height / 2) * img_height)
                        x2 = int((x_center + width / 2) * img_width)
                        y2 = int((y_center + height / 2) * img_height)
                        cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2)
                        cv2.putText(img, str(class_id), (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
            axes[1, i].imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
            axes[1, i].set_title(f"Val Image {i+1}")
            axes[1, i].axis('off')
        else:
            axes[1, i].axis('off')
    
    plt.tight_layout()
    plt.show()

# Plot examples from the first fold
first_fold_dir = OUTPUT_DIR / "fold_0"
plot_fold_examples(first_fold_dir, num_examples=5)

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
from collections import Counter

def analyze_fold_stratification():
    """Analyze and visualize the class distribution across all folds to verify stratification."""
    print("üîç ANALYZING FOLD STRATIFICATION")
    print("=" * 50)
    
    fold_stats = {}
    
    # Analyze each fold
    for fold_idx in range(k_folds):
        fold_dir = OUTPUT_DIR / f"fold_{fold_idx}"
        
        # Analyze train set
        train_labels_dir = fold_dir / "train" / "labels"
        train_class_counts = Counter()
        train_total = 0
        
        if train_labels_dir.exists():
            for label_file in train_labels_dir.glob("*.txt"):
                with open(label_file, 'r') as f:
                    for line in f:
                        if line.strip():
                            parts = line.strip().split()
                            if len(parts) >= 5:
                                class_id = int(float(parts[0]))
                                train_class_counts[class_id] += 1
                                train_total += 1
        
        # Analyze val set
        val_labels_dir = fold_dir / "val" / "labels"
        val_class_counts = Counter()
        val_total = 0
        
        if val_labels_dir.exists():
            for label_file in val_labels_dir.glob("*.txt"):
                with open(label_file, 'r') as f:
                    for line in f:
                        if line.strip():
                            parts = line.strip().split()
                            if len(parts) >= 5:
                                class_id = int(float(parts[0]))
                                val_class_counts[class_id] += 1
                                val_total += 1
        
        fold_stats[fold_idx] = {
            'train_counts': train_class_counts,
            'val_counts': val_class_counts,
            'train_total': train_total,
            'val_total': val_total
        }
        
        print(f"\nFold {fold_idx}:")
        print(f"  Train: {train_total} annotations")
        print(f"  Val: {val_total} annotations")
    
    # Create visualization
    fig, axes = plt.subplots(2, 3, figsize=(18, 10))
    fig.suptitle('Stratified K-Fold Class Distribution Analysis', fontsize=16, fontweight='bold')
    
    # Get all unique classes
    all_classes = set()
    for stats in fold_stats.values():
        all_classes.update(stats['train_counts'].keys())
        all_classes.update(stats['val_counts'].keys())
    all_classes = sorted(list(all_classes))
    
    # 1. Train set class distribution across folds
    train_data = []
    for fold_idx in range(k_folds):
        fold_data = {}
        stats = fold_stats[fold_idx]
        for class_id in all_classes:
            count = stats['train_counts'].get(class_id, 0)
            percentage = (count / stats['train_total'] * 100) if stats['train_total'] > 0 else 0
            fold_data[f'Class {class_id}'] = percentage
        fold_data['Fold'] = f'Fold {fold_idx}'
        train_data.append(fold_data)
    
    train_df = pd.DataFrame(train_data)
    train_df.set_index('Fold').plot(kind='bar', ax=axes[0,0], 
                                   title='Train Set Class Distribution (%)')
    axes[0,0].set_ylabel('Percentage')
    axes[0,0].legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    axes[0,0].tick_params(axis='x', rotation=45)
    
    # 2. Val set class distribution across folds
    val_data = []
    for fold_idx in range(k_folds):
        fold_data = {}
        stats = fold_stats[fold_idx]
        for class_id in all_classes:
            count = stats['val_counts'].get(class_id, 0)
            percentage = (count / stats['val_total'] * 100) if stats['val_total'] > 0 else 0
            fold_data[f'Class {class_id}'] = percentage
        fold_data['Fold'] = f'Fold {fold_idx}'
        val_data.append(fold_data)
    
    val_df = pd.DataFrame(val_data)
    val_df.set_index('Fold').plot(kind='bar', ax=axes[0,1], 
                                 title='Val Set Class Distribution (%)')
    axes[0,1].set_ylabel('Percentage')
    axes[0,1].legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    axes[0,1].tick_params(axis='x', rotation=45)
    
    # 3. Dataset sizes across folds
    train_sizes = [fold_stats[i]['train_total'] for i in range(k_folds)]
    val_sizes = [fold_stats[i]['val_total'] for i in range(k_folds)]
    
    x = range(k_folds)
    width = 0.35
    axes[0,2].bar([i - width/2 for i in x], train_sizes, width, label='Train', alpha=0.8)
    axes[0,2].bar([i + width/2 for i in x], val_sizes, width, label='Val', alpha=0.8)
    axes[0,2].set_xlabel('Fold')
    axes[0,2].set_ylabel('Number of Annotations')
    axes[0,2].set_title('Dataset Sizes by Fold')
    axes[0,2].legend()
    axes[0,2].set_xticks(x)
    axes[0,2].set_xticklabels([f'Fold {i}' for i in x])
    
    # 4. Class distribution consistency (coefficient of variation)
    class_cv_data = {}
    for class_id in all_classes:
        train_percentages = []
        for fold_idx in range(k_folds):
            stats = fold_stats[fold_idx]
            count = stats['train_counts'].get(class_id, 0)
            percentage = (count / stats['train_total'] * 100) if stats['train_total'] > 0 else 0
            train_percentages.append(percentage)
        
        # Coefficient of variation (std/mean)
        cv = np.std(train_percentages) / np.mean(train_percentages) if np.mean(train_percentages) > 0 else 0
        class_cv_data[f'Class {class_id}'] = cv
    
    axes[1,0].bar(class_cv_data.keys(), class_cv_data.values(), alpha=0.8)
    axes[1,0].set_title('Class Distribution Consistency\n(Lower CV = More Consistent)')
    axes[1,0].set_ylabel('Coefficient of Variation')
    axes[1,0].tick_params(axis='x', rotation=45)
    
    # 5. Overall statistics
    total_train = sum(train_sizes)
    total_val = sum(val_sizes)
    
    axes[1,1].text(0.1, 0.9, f'Total Training Annotations: {total_train:,}', transform=axes[1,1].transAxes, fontsize=12)
    axes[1,1].text(0.1, 0.8, f'Total Validation Annotations: {total_val:,}', transform=axes[1,1].transAxes, fontsize=12)
    axes[1,1].text(0.1, 0.7, f'Average per fold (train): {total_train/k_folds:.0f}', transform=axes[1,1].transAxes, fontsize=12)
    axes[1,1].text(0.1, 0.6, f'Average per fold (val): {total_val/k_folds:.0f}', transform=axes[1,1].transAxes, fontsize=12)
    axes[1,1].text(0.1, 0.5, f'Train/Val ratio: {total_train/total_val:.2f}', transform=axes[1,1].transAxes, fontsize=12)
    
    # Calculate overall class distribution
    overall_class_counts = Counter()
    for stats in fold_stats.values():
        overall_class_counts.update(stats['train_counts'])
        overall_class_counts.update(stats['val_counts'])
    
    axes[1,1].text(0.1, 0.35, 'Overall Class Distribution:', transform=axes[1,1].transAxes, fontsize=12, fontweight='bold')
    total_overall = sum(overall_class_counts.values())
    y_pos = 0.25
    for class_id in sorted(all_classes):
        count = overall_class_counts.get(class_id, 0)
        percentage = (count / total_overall * 100) if total_overall > 0 else 0
        axes[1,1].text(0.1, y_pos, f'  Class {class_id}: {percentage:.1f}%', transform=axes[1,1].transAxes, fontsize=10)
        y_pos -= 0.05
    
    axes[1,1].set_xlim(0, 1)
    axes[1,1].set_ylim(0, 1)
    axes[1,1].axis('off')
    axes[1,1].set_title('Summary Statistics')
    
    # 6. Stratification quality score
    quality_scores = []
    for class_id in all_classes:
        class_percentages = []
        for fold_idx in range(k_folds):
            stats = fold_stats[fold_idx]
            train_count = stats['train_counts'].get(class_id, 0)
            val_count = stats['val_counts'].get(class_id, 0)
            total_count = train_count + val_count
            fold_total = stats['train_total'] + stats['val_total']
            percentage = (total_count / fold_total * 100) if fold_total > 0 else 0
            class_percentages.append(percentage)
        
        # Quality score: 1 - coefficient_of_variation (higher is better)
        cv = np.std(class_percentages) / np.mean(class_percentages) if np.mean(class_percentages) > 0 else 1
        quality_score = max(0, 1 - cv)
        quality_scores.append(quality_score)
    
    avg_quality = np.mean(quality_scores)
    
    axes[1,2].bar([f'Class {i}' for i in all_classes], quality_scores, alpha=0.8, 
                  color=['green' if s > 0.9 else 'orange' if s > 0.7 else 'red' for s in quality_scores])
    axes[1,2].axhline(y=avg_quality, color='red', linestyle='--', 
                     label=f'Average: {avg_quality:.3f}')
    axes[1,2].set_title('Stratification Quality Score\n(1.0 = Perfect Stratification)')
    axes[1,2].set_ylabel('Quality Score')
    axes[1,2].legend()
    axes[1,2].tick_params(axis='x', rotation=45)
    
    plt.tight_layout()
    plt.show()
    
    # Print summary
    print(f"\nüìä STRATIFICATION ANALYSIS SUMMARY:")
    print(f"Average quality score: {avg_quality:.3f}")
    if avg_quality > 0.9:
        print("‚úÖ Excellent stratification! Class distributions are very consistent across folds.")
    elif avg_quality > 0.7:
        print("‚ö†Ô∏è  Good stratification. Minor variations in class distribution across folds.")
    else:
        print("‚ùå Poor stratification. Significant variations in class distribution across folds.")
    
    return fold_stats

# Run the stratification analysis
fold_analysis_stats = analyze_fold_stratification()