In [None]:
#import necessary libraries
import os
from collections import Counter
import cv2
import numpy as np
import pandas as pd
import random
import math
from tqdm import tqdm
from pathlib import Path
from sklearn.model_selection import train_test_split
import albumentations as A
from joblib import Parallel, delayed
from matplotlib import pyplot as plt


In [None]:
#declare variables and create augmented directories
dataset_path = "../dataset"
classes = ["glass", "paper", "cardboard", "plastic", "metal", "trash"]
img_size = (64, 64)  
target_size=500

In [None]:
# split dataset into train and test sets
all_files = []
all_labels = []

for idx, cls in enumerate(classes):
    class_folder = os.path.join(dataset_path, cls)
    if not os.path.isdir(class_folder):
        print(f"Warning: Folder not found: {class_folder}")
        continue

    for f in os.listdir(class_folder):
        if not (f.endswith(".jpg") or f.endswith(".png")):
            continue
        file_path = os.path.join(class_folder, f)

        img = cv2.imread(file_path)
        if img is None:
            print(f"Warning: Cannot open image {file_path}, skipping.")
            continue

        all_files.append(file_path)
        all_labels.append(idx)

train_files, test_files, train_labels, test_labels = train_test_split(
    all_files, all_labels, test_size=0.35, random_state=42, stratify=all_labels
)

print(f"Train: {len(train_files)} images")
print(f"Test: {len(test_files)} images")


In [None]:
# Define augmentation pipeline
random.seed(None)
np.random.seed(None)

transform = A.Compose([
    A.SomeOf([
        A.HorizontalFlip(p=0.7),
        A.VerticalFlip(p=.7),
        A.Rotate(limit=50, p=.8),
        A.RandomScale(scale_limit=0.2, p=.5),
        A.RandomBrightnessContrast(p=0.5),
        A.GaussianBlur(p=0.3),
        A.GaussNoise(p=0.1),
        A.ColorJitter(p=1),
    A.CenterCrop(width=128, height=128, p=.1),
        A.Resize(width=random.randint(64, 256), height=random.randint(64, 256), p=1.0)
    ], n=4)
])


In [None]:
# Function to perform data augmentation
def data_autgmentation(img_path, output_folder, aug_times=5):
    img_name = os.path.basename(img_path)
    img = cv2.imread(img_path)
    if img is None:
        print(f"❌ ERROR: Cannot load image {img_path}. Skipping.")
        return
    for i in range(aug_times):
        augmented = transform(image=img)
        aug_img = augmented["image"]
        aug_img_name = f"{os.path.splitext(img_name)[0]}_aug_{i+1}.jpg"
        aug_img_path = os.path.join(output_folder, aug_img_name)
        cv2.imwrite(aug_img_path, aug_img)
    print(f"✅ Augmented {aug_times} images for {img_name}")
    


In [None]:
# Augment training images by 500 images per class
class_counts = Counter(train_labels)

for cls_idx, cls_name in enumerate(classes):
    
    cls_images = [img for i, img in enumerate(train_files) if train_labels[i] == cls_idx]
    current_count = class_counts[cls_idx]
    aug_needed = max(0, target_size - current_count)
    
    if aug_needed == 0:
        continue
    
    output_folder = os.path.join(dataset_path, cls_name, "augmented")
    os.makedirs(output_folder, exist_ok=True)
    
    # Distribute augmentations across available images
    times_per_image = math.ceil(aug_needed / len(cls_images))
    
    for img_path in tqdm(cls_images, desc=f"Augmenting {cls_name}"):
        data_autgmentation(img_path, output_folder, times_per_image)


In [None]:
# for all augmented images, copy them to train_files and train_labels
for cls in classes:
    aug_folder = os.path.join(dataset_path, cls, "augmented")
    if not os.path.isdir(aug_folder):
        continue
    for f in os.listdir(aug_folder):
        if not (f.endswith(".jpg") or f.endswith(".png")):
            continue
        file_path = os.path.join(aug_folder, f)
        train_files.append(file_path)
        train_labels.append(classes.index(cls))

In [None]:
#creat pipelines that process images to fixed size and normalize them
def preprocess_image(img_path):
    img = cv2.imread(img_path)
    img = cv2.resize(img, img_size)
    img = img.astype('float32') / 255.0  # Normalize to [0, 1]
    return img

In [None]:
df = pd.DataFrame({
    'image_path': train_files,
    'image_label': train_labels
})

In [None]:
df['image_label'].value_counts()

In [None]:
def process_image(img_path, output_folder, aug_times):
    img_name = os.path.basename(img_path)
    img = cv2.imread(img_path)
    if img is None:
        return

    img = cv2.resize(img, img_size)
    img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    img_norm = img_gray / 255.0

    save_path = os.path.join(output_folder, f"{Path(img_name).stem}_proc.png")
    cv2.imwrite(save_path, (img_norm*255).astype(np.uint8))

    for i in range(aug_times):
        augmented = transform(image=img_gray)
        aug_img = augmented["image"]
        aug_save_path = os.path.join(output_folder, f"{Path(img_name).stem}_aug{i}.png")
        cv2.imwrite(aug_save_path, aug_img)


for cls in classes:
    input_folder = os.path.join(dataset_path, cls)
    output_folder = os.path.join(augmented_path, cls)
    images = [os.path.join(input_folder, f) for f in os.listdir(input_folder)]
    n_original = len(images)
    n_target = int(np.ceil(n_original * 1.3))
    n_aug_needed = n_target - n_original
    aug_times = int(np.ceil(n_aug_needed / n_original))

    print(f"Processing class: {cls} ({n_original} images, {aug_times} augmentations per image)")

    Parallel(n_jobs=-1)(
        delayed(process_image)(img_path, output_folder, aug_times) for img_path in tqdm(images)
    )
