In [14]:
import numpy as np
import pandas as pd
import os
import cv2
from pathlib import Path
import random
from tqdm import tqdm
from pathlib import Path
import os



In [15]:
BASE_DIR = Path.cwd()
DATASET_PATH = BASE_DIR / 'dataset'
CATEGORIES = ['cardboard', 'glass', 'metal', 'paper', 'plastic', 'trash']
TARGET_COUNT = 500


In [16]:
def getImages(categoryPath):
    return list(categoryPath.glob('*.jpg'))


def printCategoryCounts():
    print("\nImage counts per category:")
    print("-" * 30)
    for categoryName in CATEGORIES:
        categoryPath = DATASET_PATH / categoryName
        imageCount = len(getImages(categoryPath))
        print(f"{categoryName:15s}: {imageCount} images")


def validateAndCleanDataset():
    if not DATASET_PATH.exists():
        raise FileNotFoundError(f"Dataset not found at: {DATASET_PATH}")
    print("Validating dataset for corrupted images...")
    categoryDirectories = [directory for directory in DATASET_PATH.iterdir() if directory.is_dir()]
    print(f"Found {len(categoryDirectories)} categories: {[category.name for category in categoryDirectories]}\n")
    totalRemovedImages = 0
    for categoryDirectory in sorted(categoryDirectories):
        imageFiles = [file for file in categoryDirectory.iterdir() if file.is_file()]
        removedCount = 0
        for imagePath in imageFiles:
            if cv2.imread(str(imagePath)) is None:
                print(f"Removing: {imagePath.name} as it is corrupted.")
                try:
                    os.remove(imagePath)
                    removedCount += 1
                except Exception as error:
                    print(f"Warning: Could not remove: {error}")
        numberOfValidImages = len(imageFiles) - removedCount
        print(f"{categoryDirectory.name}: {numberOfValidImages}/{len(imageFiles)} valid images")
        totalRemovedImages += removedCount
    print(f"\nValidation complete. Removed {totalRemovedImages} corrupted file(s).")
    printCategoryCounts()


def augmentImage(image):
    height, width = image.shape[:2]
    center = (width / 2, height / 2)
    augmentations = [
        cv2.flip(image, 1),  # Horizontal flip
        cv2.flip(image, 0),  # Vertical flip
        cv2.warpAffine(image, cv2.getRotationMatrix2D(center, 90, 1.0), (width, height)), # 90 degrees
        cv2.warpAffine(image, cv2.getRotationMatrix2D(center, 180, 1.0), (width, height)), # 180 degrees
        cv2.warpAffine(image, cv2.getRotationMatrix2D(center, 270, 1.0), (width, height)), # 270 degrees
        cv2.convertScaleAbs(image, alpha=1.3, beta=30),   # Brightness +30
        cv2.convertScaleAbs(image, alpha=0.7, beta=-30),  # Brightness -30
        cv2.GaussianBlur(image, (5, 5), 0),               # Gaussian blur 5x5
    ]
    # Zoom crop
    scale = 1.2
    newHeight, newWidth = int(height * scale), int(width * scale)
    resized = cv2.resize(image, (newWidth, newHeight))
    startHeight, startWidth = (newHeight - height) // 2, (newWidth - width) // 2
    augmentations.append(resized[startHeight:startHeight + height, startWidth:startWidth + width])
    return augmentations


def augmentDataset():
    print(f"\nAugmenting images to reach {TARGET_COUNT} per category...")
    for categoryName in CATEGORIES:
        categoryPath = DATASET_PATH / categoryName
        imagePaths = getImages(categoryPath)
        currentImageCount = len(imagePaths)
        imagesNeeded = TARGET_COUNT - currentImageCount
        print(f"\n{categoryName}: {currentImageCount} images", end="")
        if imagesNeeded <= 0:
            print(" - Already sufficient")
            continue
        print(f" (Need {imagesNeeded} more)")
        # Load original images using tqdm for progress
        originalImages = [(cv2.imread(str(imageFile)), imageFile.stem) for imageFile in tqdm(imagePaths, desc="Loading")]
        originalImages = [(image, name) for image, name in originalImages]
        # Generate augmented images
        generatedCount = 0
        while generatedCount < imagesNeeded:
            image, imageName = random.choice(originalImages)
            for augmentationIndex, augmentedImage in enumerate(augmentImage(image)):
                if generatedCount >= imagesNeeded:
                    break
                savePath = categoryPath / f"{imageName}_augmented_{generatedCount}_{augmentationIndex}.jpg"
                cv2.imwrite(str(savePath), augmentedImage)
                generatedCount += 1
        print(f"Generated {generatedCount} augmented images")
    print("\n" + "=" * 50)
    print("Augmentation complete.")
    printCategoryCounts()



In [17]:
validateAndCleanDataset()


Validating dataset for corrupted images...
Found 6 categories: ['cardboard', 'glass', 'metal', 'paper', 'plastic', 'trash']

cardboard: 500/500 valid images
glass: 500/500 valid images
metal: 500/500 valid images
paper: 500/500 valid images
plastic: 500/500 valid images
trash: 500/500 valid images

Validation complete. Removed 0 corrupted file(s).

Image counts per category:
------------------------------
cardboard      : 500 images
glass          : 500 images
metal          : 500 images
paper          : 500 images
plastic        : 500 images
trash          : 500 images


In [18]:
augmentDataset()



Augmenting images to reach 500 per category...

cardboard: 500 images - Already sufficient

glass: 500 images - Already sufficient

metal: 500 images - Already sufficient

paper: 500 images - Already sufficient

plastic: 500 images - Already sufficient

trash: 500 images - Already sufficient

Augmentation complete.

Image counts per category:
------------------------------
cardboard      : 500 images
glass          : 500 images
metal          : 500 images
paper          : 500 images
plastic        : 500 images
trash          : 500 images


In [19]:
# Feature Extraction Step
rows = []

for category_dir in DATASET_PATH.iterdir(): # For each category directory
    if category_dir.is_dir(): # Ensure it's a directory
        category = category_dir.name # Get category name
        for img in category_dir.iterdir(): # For each image in the category directory
                rows.append({
                    "filename": str(img),     
                    "category": category
                })

df = pd.DataFrame(rows)
df

Unnamed: 0,filename,category
0,c:\Users\AFAQE\Documents\GitHub\ML-Project\dat...,cardboard
1,c:\Users\AFAQE\Documents\GitHub\ML-Project\dat...,cardboard
2,c:\Users\AFAQE\Documents\GitHub\ML-Project\dat...,cardboard
3,c:\Users\AFAQE\Documents\GitHub\ML-Project\dat...,cardboard
4,c:\Users\AFAQE\Documents\GitHub\ML-Project\dat...,cardboard
...,...,...
2995,c:\Users\AFAQE\Documents\GitHub\ML-Project\dat...,trash
2996,c:\Users\AFAQE\Documents\GitHub\ML-Project\dat...,trash
2997,c:\Users\AFAQE\Documents\GitHub\ML-Project\dat...,trash
2998,c:\Users\AFAQE\Documents\GitHub\ML-Project\dat...,trash


In [None]:
def extract_features(image_path):
    img = cv2.imread(image_path)
    img = cv2.resize(img, (128, 128))

    hist = cv2.calcHist([img], [0,1,2], None, 
                         [8,8,8], [0,256,0,256,0,256])

    return cv2.normalize(hist, hist).flatten()




In [21]:


BASE_DIR = Path.cwd()
DATASET_PATH = BASE_DIR / "dataset"

CATEGORIES = ["cardboard", "glass", "metal", "paper", "plastic", "trash"]

X = []
y = []

label_map = {name: idx for idx, name in enumerate(CATEGORIES)}

for category in CATEGORIES:
    folder = DATASET_PATH / category
    print("Processing:", folder)

    for filename in os.listdir(folder):
        file_path = folder / filename
        features = extract_features(str(file_path))

        if features is not None:
            X.append(features)
            y.append(label_map[category])


Processing: c:\Users\AFAQE\Documents\GitHub\ML-Project\dataset\cardboard
Processing: c:\Users\AFAQE\Documents\GitHub\ML-Project\dataset\glass
Processing: c:\Users\AFAQE\Documents\GitHub\ML-Project\dataset\metal
Processing: c:\Users\AFAQE\Documents\GitHub\ML-Project\dataset\paper
Processing: c:\Users\AFAQE\Documents\GitHub\ML-Project\dataset\plastic
Processing: c:\Users\AFAQE\Documents\GitHub\ML-Project\dataset\trash


In [22]:
from sklearn.model_selection import train_test_split

X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)


In [23]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train) 
X_val = scaler.transform(X_val)          


In [24]:
np.save("X_train.npy", X_train)
np.save("X_val.npy", X_val)
np.save("y_train.npy", y_train)
np.save("y_val.npy", y_val)
