# Split Data

This notebook aim to split the data in a smaller dataset for training and testing.

It aim to avoid biais by :
- Stratified Sampling 
- Perceptual Hashing / Feature Embeddings + kmeans

### Import Libraries


In [3]:
import os
import shutil
import random
from sklearn.model_selection import train_test_split

### Check the path
And check if the folder exist

In [4]:
DATA_FOLDER_PATH = "../"

INPUT_DIR = DATA_FOLDER_PATH + "00_archive/data/"
OUTPUT_DIR = DATA_FOLDER_PATH + "00_archive/data_samples/"

file_types = ["train", "test", "val"]
subdirectories = ["Coccidiosis", "Healthy", "New Castle Disease", "Salmonella"]

In [5]:

def check_and_create_path(verbose = False):
    """
    Check if the input directory structure exists and create the output directory structure if it doesn't.

    Parameters:
        - verbose (bool): If True, print detailed information about the directory structure.

    Return :
        - None
    """
    # Check input structure
    for file_type in file_types:
        input_path = os.path.join(INPUT_DIR, file_type)
        if not os.path.isdir(input_path):
            raise FileNotFoundError(f"‚ùå Input directory does not exist: {input_path}")
        if verbose :
            print(f"‚úÖ Found directory: {input_path}")

        for subdirectory in subdirectories:
            sub_path = os.path.join( input_path, subdirectory)
            if not os.path.isdir(sub_path):
                raise FileNotFoundError(f"‚ùå Subdirectory missing: {sub_path}")
            if verbose :
                print(f"  ‚úÖ Found subdirectory: {sub_path}")

    #  Check/create output directory
    if not os.path.exists(OUTPUT_DIR):
        os.makedirs(OUTPUT_DIR, exist_ok=True)
        if verbose :
            print(f"üìÅ Output directory created: {OUTPUT_DIR}")
    else:
        if verbose :
            print(f"‚úÖ Output directory already exists: {OUTPUT_DIR}")

    # Create output structure if not exist
    for file_type in file_types:
        output_path = os.path.join(OUTPUT_DIR, file_type)
        if not os.path.exists(output_path):
            os.makedirs(output_path, exist_ok=True)
            if verbose :
                print(f"üìÅ Created output directory: {output_path}")
        else:
            if verbose :
                print(f"‚úÖ Output directory already exists: {output_path}")

        for subdirectory in subdirectories:
            sub_path = os.path.join(output_path, subdirectory)
            if not os.path.exists(sub_path):
                os.makedirs(sub_path, exist_ok=True)
                if verbose :
                    print(f"üìÅ Created output subdirectory: {sub_path}")
            else:
                if verbose :
                    print(f"‚úÖ Output subdirectory already exists: {sub_path}")

    print(f"‚úÖ All folder structures are in place.")

check_and_create_path(False)

‚úÖ All folder structures are in place.


In [6]:
def clean_directory(directory):
    """
    Remove all files and subdirectories in a given directory.

    Parameters:
        - directory (str): The path to the directory to be cleaned.

    Return :
        - None
    """
    for filename in os.listdir(directory):
        file_path = os.path.join(directory, filename)
        try:
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.unlink(file_path)
            elif os.path.isdir(file_path):
                shutil.rmtree(file_path)
        except Exception as e:
            print(f"Failed to delete {file_path}. Reason: {e}")

def clean_output_samples():
    """
    Clean the output samples directory by removing all files and subdirectories.

    Parameters:
        - None

    Return :
        - None
    """
    for file_type in file_types:
        output_path = os.path.join(OUTPUT_DIR, file_type)
        for subdirectory in subdirectories:
            sub_path = os.path.join(output_path, subdirectory)
            clean_directory(sub_path)
    print(f"Cleaned output samples directory: {OUTPUT_DIR}")

clean_output_samples()

Cleaned output samples directory: ../archive/data_samples/


### Sampled data

V1 : basic stratified split per class and per folder using `train_test_split` from `scikit-learn`

In [None]:
#lets see how much data we have
def count_files_in_directory(directory):
    """
    Count the number of files in a given directory.

    Parameters:
        - directory (str): The path to the directory to be counted.

    Return :
        - int: The number of files in the directory.
    """
    return sum(len(files) for _, _, files in os.walk(directory))

def count_files_in_subdirectories(directory):
    """
    Count the number of files in all subdirectories of a given directory.

    Parameters:
        - directory (str): The path to the directory to be counted.

    Return :
        - int: The total number of files in all subdirectories.
    """
    total_files = 0
    for subdirectory in subdirectories:
        sub_path = os.path.join(directory, subdirectory)
        total_files += count_files_in_directory(sub_path)
    return total_files

def count_files_in_all_directories():
    """
    Count the number of files in all directories and subdirectories.

    Parameters:
        - None

    Return :
        - None
    """
    for file_type in file_types:
        input_path = os.path.join(INPUT_DIR, file_type)
        total_files = count_files_in_subdirectories(input_path)
        print(f"Total files in {file_type}: {total_files}")
        for subdirectory in subdirectories:
            sub_path = os.path.join(input_path, subdirectory)
            num_files = count_files_in_directory(sub_path)
            print(f"  {subdirectory}: {num_files} files")

print("-----------------------------------------------------")
count_files_in_all_directories()
print("-----------------------------------------------------")

Total files in train: 400000
  Coccidiosis: 100000 files
  Healthy: 100000 files
  New Castle Disease: 100000 files
  Salmonella: 100000 files
Total files in test: 70677
  Coccidiosis: 18752 files
  Healthy: 17412 files
  New Castle Disease: 15888 files
  Salmonella: 18625 files
Total files in val: 40000
  Coccidiosis: 10000 files
  Healthy: 10000 files
  New Castle Disease: 10000 files
  Salmonella: 10000 files
-----------------------------------------------------


In [15]:
SAMPLES_PER_CLASS = 1000  # arbitrary number of samples per class

In [16]:
def fixed_count_sample(verbose=False):
    """
    Sample a fixed number of images per class per folder and copy them to the output directory.
    """
    for file_type in file_types:
        for subdirectory in subdirectories:
            input_path = os.path.join(INPUT_DIR, file_type, subdirectory)
            output_path = os.path.join(OUTPUT_DIR, file_type, subdirectory)

            # List images
            images = [f for f in os.listdir(input_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
            if len(images) == 0:
                print(f"No images found in {input_path}")
                continue

            # Adjust if fewer images than the sample size
            sample_count = min(SAMPLES_PER_CLASS, len(images))
            sampled_images = random.sample(images, sample_count)

            # Copy sampled images
            for img in sampled_images:
                src = os.path.join(input_path, img)
                dst = os.path.join(output_path, img)
                shutil.copy2(src, dst)

            if verbose:
                print(f"{sample_count} images copied to {output_path}")

    print("Fixed-count sampling done!")

clean_output_samples()
fixed_count_sample(verbose=True)

1000 images copied to ../archive/data_samples/train\Coccidiosis
1000 images copied to ../archive/data_samples/train\Healthy
1000 images copied to ../archive/data_samples/train\New Castle Disease
1000 images copied to ../archive/data_samples/train\Salmonella
1000 images copied to ../archive/data_samples/test\Coccidiosis
1000 images copied to ../archive/data_samples/test\Healthy
1000 images copied to ../archive/data_samples/test\New Castle Disease
1000 images copied to ../archive/data_samples/test\Salmonella
1000 images copied to ../archive/data_samples/val\Coccidiosis
1000 images copied to ../archive/data_samples/val\Healthy
1000 images copied to ../archive/data_samples/val\New Castle Disease
1000 images copied to ../archive/data_samples/val\Salmonella
Fixed-count sampling done!


### Sampled data 

V2 : ResNet + KMeans Diversity Sampling


In [9]:
clean_output_samples()

Cleaned output samples directory: ../archive/data_samples/


In [None]:
# WARNING : too long, will be lunch later

import numpy as np
from tqdm import tqdm
from PIL import Image

from sklearn.cluster import KMeans
import torch
from torchvision import models, transforms

# Parameters
SAMPLES_PER_CLASS = 30
IMAGE_SIZE = 224
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Load Pretrained ResNet (remove final classification layer)
resnet = models.resnet18(pretrained=True)
resnet.fc = torch.nn.Identity()
resnet = resnet.to(DEVICE).eval()

# Preprocessing
preprocess = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

def extract_embedding(image_path):
    try:
        img = Image.open(image_path).convert("RGB")
        img_tensor = preprocess(img).unsqueeze(0).to(DEVICE)
        with torch.no_grad():
            embedding = resnet(img_tensor).squeeze().cpu().numpy()
        return embedding
    except Exception as e:
        print(f"Error processing {image_path}: {e}")
        return None

def diverse_sample_with_kmeans(verbose=False):
    """
    For each class in each file_type folder, extract embeddings, perform KMeans,
    and copy the most diverse images (closest to cluster centers).
    """
    for file_type in file_types:
        for subdirectory in subdirectories:
            input_path = os.path.join(INPUT_DIR, file_type, subdirectory)
            output_path = os.path.join(OUTPUT_DIR, file_type, subdirectory)

            # List images
            images = [f for f in os.listdir(input_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
            image_paths = [os.path.join(input_path, f) for f in images]

            # Extract embeddings
            embeddings = []
            valid_paths = []

            for path in tqdm(image_paths, desc=f"üîç {file_type}/{subdirectory}", leave=False):
                emb = extract_embedding(path)
                if emb is not None:
                    embeddings.append(emb)
                    valid_paths.append(path)

            if len(valid_paths) == 0:
                print(f"No valid images found in {input_path}")
                continue

            # KMeans clustering
            n_clusters = min(SAMPLES_PER_CLASS, len(valid_paths))
            kmeans = KMeans(n_clusters=n_clusters, random_state=42)
            kmeans.fit(embeddings)

            # Find image closest to each cluster center
            selected_paths = []
            for center in kmeans.cluster_centers_:
                dists = np.linalg.norm(np.array(embeddings) - center, axis=1)
                idx = np.argmin(dists)
                selected_paths.append(valid_paths[idx])

            # Remove duplicates
            selected_paths = list(set(selected_paths))

            # Copy selected images
            for src in selected_paths:
                dst = os.path.join(output_path, os.path.basename(src))
                shutil.copy2(src, dst)

            if verbose:
                print(f"{len(selected_paths)} diverse images copied to {output_path}")

    print("üéØ Diversity-based sampling complete!")

# üöÄ Run it
diverse_sample_with_kmeans(verbose=True)


                                                                              

KeyboardInterrupt: 