In [2]:
import os
import shutil
import pandas as pd
import yaml
from collections import defaultdict, Counter
from sklearn.model_selection import train_test_split
import numpy as np
from pathlib import Path
from datetime import datetime

def load_class_names(data_yaml_path):
    """Load class names from data.yaml file"""
    with open(data_yaml_path, 'r') as file:
        data = yaml.safe_load(file)
    return data['names']

def parse_yolo_annotation(annotation_path):
    """Parse YOLO annotations from a file"""
    annotations = []
    with open(annotation_path, 'r') as file:
        for line in file:
            line = line.strip()
            if line:
                parts = line.split()
                class_id = int(parts[0])
                x_center = float(parts[1])
                y_center = float(parts[2])
                width = float(parts[3])
                height = float(parts[4])
                annotations.append({
                    'class_id': class_id,
                    'x_center': x_center,
                    'y_center': y_center,
                    'width': width,
                    'height': height
                })
    return annotations

def get_dataset_info(images_dir, labels_dir, class_names):
    """Collect information about the entire dataset"""
    dataset_info = []
    image_files = [f for f in os.listdir(images_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    
    for image_file in image_files:
        label_file = os.path.splitext(image_file)[0] + '.txt'
        label_path = os.path.join(labels_dir, label_file)
        
        if os.path.exists(label_path):
            annotations = parse_yolo_annotation(label_path)
            
            for ann in annotations:
                dataset_info.append({
                    'image_name': image_file,
                    'label_name': label_file,
                    'class_id': ann['class_id'],
                    'class_name': class_names[ann['class_id']],
                    'x_center': ann['x_center'],
                    'y_center': ann['y_center'],
                    'width': ann['width'],
                    'height': ann['height']
                })
    
    return dataset_info

def get_stratified_split_simple(dataset_info, train_ratio=0.8):
    """Simple stratification - each image goes to the split that needs its classes most"""
    image_classes = defaultdict(set)
    for info in dataset_info:
        image_classes[info['image_name']].add(info['class_id'])
    
    images_with_primary_class = []
    for image_name, classes in image_classes.items():
        primary_class = min(classes)
        images_with_primary_class.append((image_name, primary_class))
    
    images_by_class = defaultdict(list)
    for image_name, primary_class in images_with_primary_class:
        images_by_class[primary_class].append(image_name)
    
    train_images = []
    test_images = []
    
    for class_id, images in images_by_class.items():
        if len(images) == 1:
            train_images.extend(images)
        else:
            train_imgs, test_imgs = train_test_split(
                images, 
                train_size=train_ratio, 
                random_state=42,
                stratify=None
            )
            train_images.extend(train_imgs)
            test_images.extend(test_imgs)
    
    return train_images, test_images

def create_directory_structure(output_dir, include_drift_accumulative=True):
    """Create necessary directory structure"""
    dirs_to_create = [
        os.path.join(output_dir, 'train', 'images'),
        os.path.join(output_dir, 'train', 'labels'),
        os.path.join(output_dir, 'test_freeze', 'images'),
        os.path.join(output_dir, 'test_freeze', 'labels')
    ]
    
    if include_drift_accumulative:
        dirs_to_create.extend([
            os.path.join(output_dir, 'test_accumulative', 'images'),
            os.path.join(output_dir, 'test_accumulative', 'labels'),
            os.path.join(output_dir, 'test_drift', 'images'),
            os.path.join(output_dir, 'test_drift', 'labels')
        ])
    
    for dir_path in dirs_to_create:
        os.makedirs(dir_path, exist_ok=True)

def copy_files(image_list, source_images_dir, source_labels_dir, dest_images_dir, dest_labels_dir):
    """Copy image and label files to destination directories"""
    for image_name in image_list:
        source_image_path = os.path.join(source_images_dir, image_name)
        dest_image_path = os.path.join(dest_images_dir, image_name)
        if os.path.exists(source_image_path):
            shutil.copy2(source_image_path, dest_image_path)
        
        label_name = os.path.splitext(image_name)[0] + '.txt'
        source_label_path = os.path.join(source_labels_dir, label_name)
        dest_label_path = os.path.join(dest_labels_dir, label_name)
        
        if os.path.exists(source_label_path):
            shutil.copy2(source_label_path, dest_label_path)

def load_existing_csv(csv_path):
    """Load existing CSV if available"""
    if os.path.exists(csv_path):
        return pd.read_csv(csv_path)
    return None

def identify_new_samples(current_dataset_info, existing_df):
    """Identify new samples that are not in the existing CSV"""
    if existing_df is None:
        return current_dataset_info
    
    existing_images = set(existing_df['image_name'].unique())
    current_images = set([info['image_name'] for info in current_dataset_info])
    
    new_images = current_images - existing_images
    
    new_dataset_info = [info for info in current_dataset_info if info['image_name'] in new_images]
    
    return new_dataset_info

def manage_test_drift_corrected(output_dir, new_test_images, source_images_dir, source_labels_dir, n_drift):
    """Manage test_drift as FIFO with n_drift most recent samples"""
    drift_images_dir = os.path.join(output_dir, 'test_drift', 'images')
    drift_labels_dir = os.path.join(output_dir, 'test_drift', 'labels')
    
    existing_drift_images = []
    if os.path.exists(drift_images_dir):
        existing_drift_images = [f for f in os.listdir(drift_images_dir) 
                                if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    
    print(f"Current test_drift: {len(existing_drift_images)} images")
    print(f"New images to add: {len(new_test_images)}")
    print(f"n_drift limit: {n_drift}")
    
    total_after_addition = len(existing_drift_images) + len(new_test_images)
    
    if total_after_addition > n_drift:
        num_to_remove = total_after_addition - n_drift
        
        if num_to_remove >= len(existing_drift_images):
            for img in existing_drift_images:
                img_path = os.path.join(drift_images_dir, img)
                if os.path.exists(img_path):
                    os.remove(img_path)
                
                label_name = os.path.splitext(img)[0] + '.txt'
                label_path = os.path.join(drift_labels_dir, label_name)
                if os.path.exists(label_path):
                    os.remove(label_path)
            
            if len(new_test_images) > n_drift:
                images_to_add = new_test_images[-n_drift:]
                print(f"Too many new images. Adding only the last {n_drift}")
            else:
                images_to_add = new_test_images
        else:
            images_to_remove = existing_drift_images[:num_to_remove]
            print(f"Removing {len(images_to_remove)} old images")
            
            for img in images_to_remove:
                img_path = os.path.join(drift_images_dir, img)
                if os.path.exists(img_path):
                    os.remove(img_path)
                
                label_name = os.path.splitext(img)[0] + '.txt'
                label_path = os.path.join(drift_labels_dir, label_name)
                if os.path.exists(label_path):
                    os.remove(label_path)
            
            images_to_add = new_test_images
    else:
        images_to_add = new_test_images
    
    print(f"Adding {len(images_to_add)} images to test_drift")
    copy_files(images_to_add, source_images_dir, source_labels_dir, 
               drift_images_dir, drift_labels_dir)
    
    final_drift_images = []
    if os.path.exists(drift_images_dir):
        final_drift_images = [f for f in os.listdir(drift_images_dir) 
                             if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    
    print(f"Final test_drift: {len(final_drift_images)} images")
    
    return final_drift_images

def sync_csv_with_physical_directories(csv_path, output_dir):
    """Synchronize CSV with physical directories to ensure consistency"""
    
    if not os.path.exists(csv_path):
        return None
    
    df = pd.read_csv(csv_path)
    
    split_dirs = {
        'train': os.path.join(output_dir, 'train', 'images'),
        'test_freeze': os.path.join(output_dir, 'test_freeze', 'images'),
        'test_accumulative': os.path.join(output_dir, 'test_accumulative', 'images'),
        'test_drift': os.path.join(output_dir, 'test_drift', 'images')
    }
    
    physical_images = {}
    for split_name, split_dir in split_dirs.items():
        if os.path.exists(split_dir):
            images = set([f for f in os.listdir(split_dir) 
                         if f.lower().endswith(('.jpg', '.jpeg', '.png'))])
            physical_images[split_name] = images
        else:
            physical_images[split_name] = set()
    
    print(f"Synchronizing CSV with physical directories...")
    print(f"  - train: {len(physical_images['train'])} images")
    print(f"  - test_freeze: {len(physical_images['test_freeze'])} images")
    print(f"  - test_accumulative: {len(physical_images['test_accumulative'])} images")
    print(f"  - test_drift: {len(physical_images['test_drift'])} images")
    
    for idx, row in df.iterrows():
        image_name = row['image_name']
        current_splits = []
        
        for split_name, images in physical_images.items():
            if image_name in images:
                current_splits.append(split_name)
        
        old_splits = [s.strip() for s in str(row['splits']).split(',') if s.strip()]
        if 'test_new' in old_splits:
            test_splits = [s for s in current_splits if s in ['test_freeze', 'test_accumulative', 'test_drift']]
            if not test_splits:
                current_splits.append('test_new')
        
        df.at[idx, 'splits'] = ','.join(current_splits) if current_splits else 'unknown'
        df.at[idx, 'split'] = current_splits[0] if current_splits else 'unknown'
    
    df.to_csv(csv_path, index=False)
    print("CSV synchronized with physical directories!")
    
    return df

def update_csv_with_splits_corrected(dataset_info, existing_df, train_images, test_images, 
                                    test_accumulative_images, test_drift_images, output_dir, 
                                    add_to_test_freeze, iteration):
    """Update or create CSV with information from all splits"""
    
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    for info in dataset_info:
        info['iteration'] = iteration
        info['timestamp'] = timestamp
        
        splits = []
        
        if info['image_name'] in train_images:
            splits.append('train')
        
        if info['image_name'] in test_images:
            if add_to_test_freeze:
                splits.append('test_freeze')
            splits.append('test_new')
        
        if info['image_name'] in test_accumulative_images:
            splits.append('test_accumulative')
        
        if info['image_name'] in test_drift_images:
            splits.append('test_drift')
        
        info['splits'] = ','.join(splits) if splits else 'unknown'
        info['split'] = splits[0] if splits else 'unknown'
    
    new_df = pd.DataFrame(dataset_info)
    
    if existing_df is not None:
        existing_df['splits'] = existing_df['splits'].apply(
            lambda x: ','.join([s for s in str(x).split(',') if s.strip() != 'test_drift']) 
            if pd.notna(x) else ''
        )
        
        existing_df['splits'] = existing_df['splits'].apply(
            lambda x: x if x and x != '' else 'unknown'
        )
        
        for idx, row in existing_df.iterrows():
            if row['image_name'] in test_drift_images:
                current_splits = [s.strip() for s in str(row['splits']).split(',') if s.strip() and s.strip() != 'unknown']
                if 'test_drift' not in current_splits:
                    current_splits.append('test_drift')
                existing_df.at[idx, 'splits'] = ','.join(current_splits)
        
        combined_df = pd.concat([existing_df, new_df], ignore_index=True)
    else:
        combined_df = new_df
    
    csv_path = os.path.join(output_dir, 'dataset_info.csv')
    combined_df.to_csv(csv_path, index=False)
    
    return combined_df

def print_statistics(df, class_names, iteration=1):
    """Print dataset statistics"""
    print(f"\n=== DATASET STATISTICS (Iteration {iteration}) ===")
    
    total_images = df['image_name'].nunique()
    total_annotations = len(df)
    
    print(f"Total images: {total_images}")
    print(f"Total annotations: {total_annotations}")
    
    current_iteration_df = df[df['iteration'] == iteration]
    if not current_iteration_df.empty:
        current_images = current_iteration_df['image_name'].nunique()
        current_annotations = len(current_iteration_df)
        print(f"New images (iteration {iteration}): {current_images}")
        print(f"New annotations (iteration {iteration}): {current_annotations}")
    
    print("\n--- Distribution by Split ---")
    if 'splits' in df.columns:
        all_splits = set()
        for splits_str in df['splits'].dropna():
            if str(splits_str) != 'unknown':
                all_splits.update(str(splits_str).split(','))
        
        for split_type in sorted(all_splits):
            split_type = split_type.strip()
            if split_type and split_type != 'unknown':
                count = df[df['splits'].str.contains(split_type, na=False)]['image_name'].nunique()
                percentage = (count / total_images) * 100
                print(f"{split_type}: {count} images ({percentage:.1f}%)")
    
    print("\n--- Distribution by Class ---")
    class_counts = df['class_name'].value_counts()
    for class_name, count in class_counts.items():
        percentage = (count / total_annotations) * 100
        print(f"{class_name}: {count} annotations ({percentage:.1f}%)")

def split_yolo_dataset(central_dir, output_dir, train_ratio=0.8, data_yaml_path=None,
                      csv_info=None, add_to_test_freeze=False, n_drift=100):
    """
    Main function to split YOLO dataset in a stratified and incremental way
    
    Args:
        central_dir: Directory containing images/ and labels/ subfolders
        output_dir: Output directory where train/ and test_freeze/ folders will be created
        train_ratio: Proportion of data for training (default: 0.8)
        data_yaml_path: Path to data.yaml file (optional)
        csv_info: Path to CSV with previous information (optional)
        add_to_test_freeze: Whether to add new test to test_freeze (default: False)
        n_drift: Maximum number of samples in test_drift (default: 100)
    """
    
    images_dir = os.path.join(central_dir, 'images')
    labels_dir = os.path.join(central_dir, 'labels')
    
    if csv_info is None:
        csv_info = os.path.join(output_dir, 'dataset_info.csv')
    
    existing_df = load_existing_csv(csv_info)
    is_first_run = existing_df is None
    
    iteration = 1 if is_first_run else existing_df['iteration'].max() + 1
    
    print(f"=== {'FIRST EXECUTION' if is_first_run else f'INCREMENTAL EXECUTION (Iteration {iteration})'} ===")
    
    if data_yaml_path is None:
        data_yaml_path = os.path.join(central_dir, 'data.yaml')
    
    if os.path.exists(data_yaml_path):
        class_names = load_class_names(data_yaml_path)
    else:
        print("Warning: data.yaml not found. Using generic class names.")
        max_class_id = 0
        for label_file in os.listdir(labels_dir):
            if label_file.endswith('.txt'):
                with open(os.path.join(labels_dir, label_file), 'r') as f:
                    for line in f:
                        if line.strip():
                            class_id = int(line.split()[0])
                            max_class_id = max(max_class_id, class_id)
        class_names = [f'class_{i}' for i in range(max_class_id + 1)]
    
    print(f"Classes found: {class_names}")
    
    print("Collecting dataset information...")
    current_dataset_info = get_dataset_info(images_dir, labels_dir, class_names)
    
    new_dataset_info = identify_new_samples(current_dataset_info, existing_df)
    
    if not new_dataset_info:
        print("No new samples found!")
        return
    
    print(f"Found {len(set([info['image_name'] for info in new_dataset_info]))} new images")
    
    print("Performing stratified split of new samples...")
    train_images, test_images = get_stratified_split_simple(new_dataset_info, train_ratio)
    
    print(f"New samples - Train: {len(train_images)}, Test: {len(test_images)}")
    
    print("Creating/checking directory structure...")
    create_directory_structure(output_dir, include_drift_accumulative=True)
    
    print("Copying new files to train/...")
    copy_files(
        train_images, 
        images_dir, 
        labels_dir,
        os.path.join(output_dir, 'train', 'images'),
        os.path.join(output_dir, 'train', 'labels')
    )
    
    if add_to_test_freeze:
        print("Adding new tests to test_freeze/...")
        copy_files(
            test_images,
            images_dir,
            labels_dir, 
            os.path.join(output_dir, 'test_freeze', 'images'),
            os.path.join(output_dir, 'test_freeze', 'labels')
        )
    
    if is_first_run:
        print("Copying files to test_freeze/ (first execution)...")
        copy_files(
            test_images,
            images_dir,
            labels_dir, 
            os.path.join(output_dir, 'test_freeze', 'images'),
            os.path.join(output_dir, 'test_freeze', 'labels')
        )
    
    print("Adding to test_accumulative...")
    copy_files(
        test_images,
        images_dir,
        labels_dir,
        os.path.join(output_dir, 'test_accumulative', 'images'),
        os.path.join(output_dir, 'test_accumulative', 'labels')
    )
    
    print("Managing test_drift...")
    final_drift_images = manage_test_drift_corrected(
        output_dir, test_images, images_dir, labels_dir, n_drift
    )
    
    if is_first_run:
        print("Configuring test_accumulative and test_drift (first execution)...")
        test_accumulative_images = test_images.copy()
        test_drift_images = final_drift_images
    else:
        test_accumulative_images = test_images.copy()
        test_drift_images = final_drift_images
    
    print("Updating CSV report...")
    df = update_csv_with_splits_corrected(
        new_dataset_info, existing_df, train_images, test_images,
        test_accumulative_images, test_drift_images, output_dir,
        add_to_test_freeze, iteration
    )
    
    print("Synchronizing CSV with physical directories...")
    df = sync_csv_with_physical_directories(
        os.path.join(output_dir, 'dataset_info.csv'), 
        output_dir
    )
    
    print_statistics(df, class_names, iteration)
    
    print(f"\nSplit completed! Files saved in: {output_dir}")
    print(f"CSV report updated in: {os.path.join(output_dir, 'dataset_info.csv')}")
    print(f"Configuration used:")
    print(f"  - add_to_test_freeze: {add_to_test_freeze}")
    print(f"  - n_drift: {n_drift}")
    print(f"  - train_ratio: {train_ratio}")

def recreate_splits_from_csv(csv_path, source_data_dir, output_dir, iteration=None):
    """
    Recreate all splits from metadata CSV
    
    Args:
        csv_path: Path to dataset_info.csv
        source_data_dir: Directory with original data (images/ and labels/)
        output_dir: Directory where to recreate splits
        iteration: Specific iteration to recreate (None = all)
    """
    
    df = pd.read_csv(csv_path)
    
    if iteration is not None:
        df = df[df['iteration'] == iteration]
        print(f"Recreating splits only from iteration {iteration}")
    else:
        print("Recreating splits from all iterations")
    
    create_directory_structure(output_dir, include_drift_accumulative=True)
    
    source_images_dir = os.path.join(source_data_dir, 'images')
    source_labels_dir = os.path.join(source_data_dir, 'labels')
    
    splits_dict = {
        'train': set(),
        'test_freeze': set(),
        'test_accumulative': set(),
        'test_drift': set(),
        'test_new': set()
    }
    
    for _, row in df.iterrows():
        image_name = row['image_name']
        splits = str(row['splits']).split(',') if pd.notna(row['splits']) else []
        
        for split in splits:
            split = split.strip()
            if split in splits_dict:
                splits_dict[split].add(image_name)
    
    for split_name, images in splits_dict.items():
        if images:
            dest_images_dir = os.path.join(output_dir, split_name, 'images')
            dest_labels_dir = os.path.join(output_dir, split_name, 'labels')
            
            print(f"Recreating {split_name}/: {len(images)} images")
            copy_files(list(images), source_images_dir, source_labels_dir, 
                      dest_images_dir, dest_labels_dir)
    
    print(f"\nSplits recreated successfully in: {output_dir}")
    
    print("\n=== RECREATED SPLITS STATISTICS ===")
    for split_name, images in splits_dict.items():
        if images:
            print(f"{split_name}: {len(images)} images")

In [3]:
split_yolo_dataset(
    central_dir="./data",
    output_dir="./dataset",
    train_ratio=0.8,
    csv_info=None,
    add_to_test_freeze=False,
    n_drift=50
)

=== FIRST EXECUTION ===
Classes found: ['Paper', 'Rock', 'Scissors']
Collecting dataset information...
Found 858 new images
Performing stratified split of new samples...
New samples - Train: 685, Test: 173
Creating/checking directory structure...
Copying new files to train/...
Copying files to test_freeze/ (first execution)...
Adding to test_accumulative...
Managing test_drift...
Current test_drift: 0 images
New images to add: 173
n_drift limit: 50
Too many new images. Adding only the last 50
Adding 50 images to test_drift
Final test_drift: 50 images
Configuring test_accumulative and test_drift (first execution)...
Updating CSV report...
Synchronizing CSV with physical directories...
Synchronizing CSV with physical directories...
  - train: 685 images
  - test_freeze: 173 images
  - test_accumulative: 173 images
  - test_drift: 50 images
CSV synchronized with physical directories!

=== DATASET STATISTICS (Iteration 1) ===
Total images: 858
Total annotations: 999
New images (iteration 1

In [22]:
split_yolo_dataset(
    central_dir="./data",
    output_dir="./dataset",
    add_to_test_freeze=False,
    n_drift=50
)

=== INCREMENTAL EXECUTION (Iteration 2) ===
Classes found: ['Paper', 'Rock', 'Scissors']
Collecting dataset information...
Found 908 new images
Performing stratified split of new samples...
New samples - Train: 725, Test: 183
Creating/checking directory structure...
Copying new files to train/...
Adding to test_accumulative...
Managing test_drift...
Current test_drift: 50 images
New images to add: 183
n_drift limit: 50
Too many new images. Adding only the last 50
Adding 50 images to test_drift
Final test_drift: 50 images
Updating CSV report...
Synchronizing CSV with physical directories...
Synchronizing CSV with physical directories...
  - train: 1410 images
  - test_freeze: 173 images
  - test_accumulative: 356 images
  - test_drift: 50 images
CSV synchronized with physical directories!

=== DATASET STATISTICS (Iteration 2) ===
Total images: 1766
Total annotations: 2062
New images (iteration 2): 908
New annotations (iteration 2): 1063

--- Distribution by Split ---
test_accumulative: 

In [23]:
def recreate_splits_from_csv(csv_path, source_data_dir, output_dir, iteration=None):
    """
    Recreate all splits from metadata CSV
    
    Args:
        csv_path: Path to dataset_info.csv
        source_data_dir: Directory with original data (images/ and labels/)
        output_dir: Directory where to recreate splits
        iteration: Specific iteration to recreate (None = all)
    """
    
    df = pd.read_csv(csv_path)
    
    if iteration is not None:
        df = df[df['iteration'] == iteration]
        print(f"Recreating splits only from iteration {iteration}")
    else:
        print("Recreating splits from all iterations")
    
    create_directory_structure(output_dir, include_drift_accumulative=True)
    
    source_images_dir = os.path.join(source_data_dir, 'images')
    source_labels_dir = os.path.join(source_data_dir, 'labels')
    
    splits_dict = {
        'train': set(),
        'test_freeze': set(),
        'test_accumulative': set(),
        'test_drift': set(),
        'test_new': set()
    }
    
    for _, row in df.iterrows():
        image_name = row['image_name']
        splits = row['splits'].split(',') if pd.notna(row['splits']) else []
        
        for split in splits:
            split = split.strip()
            if split in splits_dict:
                splits_dict[split].add(image_name)
    
    for split_name, images in splits_dict.items():
        if images:
            dest_images_dir = os.path.join(output_dir, split_name, 'images')
            dest_labels_dir = os.path.join(output_dir, split_name, 'labels')
            
            print(f"Recreating {split_name}/: {len(images)} images")
            copy_files(list(images), source_images_dir, source_labels_dir, 
                      dest_images_dir, dest_labels_dir)
    
    print(f"\nSplits recreated successfully in: {output_dir}")
    
    print("\n=== RECREATED SPLITS STATISTICS ===")
    for split_name, images in splits_dict.items():
        if images:
            print(f"{split_name}: {len(images)} images")

def recreate_specific_split(csv_path, source_data_dir, output_dir, split_name, iteration=None):
    """
    Recreate only a specific split
    
    Args:
        csv_path: Path to dataset_info.csv
        source_data_dir: Directory with original data
        output_dir: Output directory
        split_name: Split name ('train', 'test_freeze', etc.)
        iteration: Specific iteration (None = all)
    """
    
    df = pd.read_csv(csv_path)
    
    if iteration is not None:
        df = df[df['iteration'] == iteration]
    
    split_images = set()
    for _, row in df.iterrows():
        splits = row['splits'].split(',') if pd.notna(row['splits']) else []
        if split_name in [s.strip() for s in splits]:
            split_images.add(row['image_name'])
    
    if not split_images:
        print(f"No images found for split '{split_name}'")
        return
    
    dest_images_dir = os.path.join(output_dir, split_name, 'images')
    dest_labels_dir = os.path.join(output_dir, split_name, 'labels')
    os.makedirs(dest_images_dir, exist_ok=True)
    os.makedirs(dest_labels_dir, exist_ok=True)
    
    source_images_dir = os.path.join(source_data_dir, 'images')
    source_labels_dir = os.path.join(source_data_dir, 'labels')
    
    print(f"Recreating {split_name}/: {len(split_images)} images")
    copy_files(list(split_images), source_images_dir, source_labels_dir,
              dest_images_dir, dest_labels_dir)
    
    print(f"Split '{split_name}' recreated successfully!")

In [24]:
recreate_splits_from_csv(
    csv_path="dataset/dataset_info.csv",
    source_data_dir="data",
    output_dir="dataset_debug"
)


Recreating splits from all iterations
Recreating train/: 1410 images
Recreating test_freeze/: 173 images
Recreating test_accumulative/: 356 images
Recreating test_drift/: 50 images

Splits recreated successfully in: dataset_debug

=== RECREATED SPLITS STATISTICS ===
train: 1410 images
test_freeze: 173 images
test_accumulative: 356 images
test_drift: 50 images


In [1]:
def validate_splits_consistency(csv_path, splits_dir):
    """
    Validate if physical splits are consistent with CSV
    """
    df = pd.read_csv(csv_path)
    
    csv_counts = {}
    splits_dict = {
        'train': set(),
        'test_freeze': set(),
        'test_accumulative': set(),
        'test_drift': set()
    }
    
    for _, row in df.iterrows():
        splits = row['splits'].split(',') if pd.notna(row['splits']) else []
        for split in splits:
            split = split.strip()
            if split in splits_dict:
                splits_dict[split].add(row['image_name'])
    
    physical_counts = {}
    for split_name in splits_dict.keys():
        split_images_dir = os.path.join(splits_dir, split_name, 'images')
        if os.path.exists(split_images_dir):
            images = [f for f in os.listdir(split_images_dir) 
                     if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
            physical_counts[split_name] = len(images)
        else:
            physical_counts[split_name] = 0
    
    print("=== CONSISTENCY VALIDATION ===")
    all_consistent = True
    for split_name in splits_dict.keys():
        csv_count = len(splits_dict[split_name])
        physical_count = physical_counts[split_name]
        
        status = "‚úÖ" if csv_count == physical_count else "‚ùå"
        print(f"{split_name}: CSV={csv_count}, Physical={physical_count} {status}")
        
        if csv_count != physical_count:
            all_consistent = False
    
    if all_consistent:
        print("\nüéØ All splits are consistent!")
    else:
        print("\n‚ö†Ô∏è  Inconsistencies detected. Run recreate_splits_from_csv()")
    
    return all_consistent

In [26]:
validate_splits_consistency(csv_path="dataset/dataset_info.csv", splits_dir="dataset")

=== CONSISTENCY VALIDATION ===
train: CSV=1410, Physical=1410 ‚úÖ
test_freeze: CSV=173, Physical=173 ‚úÖ
test_accumulative: CSV=356, Physical=356 ‚úÖ
test_drift: CSV=50, Physical=50 ‚úÖ

üéØ All splits are consistent!


True