In [None]:
import os
import numpy as np
import warnings

import matplotlib.pyplot as plt

from PIL import Image, ImageFile
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import ImageDataGenerator

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dropout

from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input

from tensorflow.keras.layers import Dense, Dropout, BatchNormalization, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.models import load_model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.optimizers.schedules import ExponentialDecay

from tensorflow.keras.applications import ResNet50

from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay, precision_score, recall_score, f1_score
import seaborn as sns

from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize
from sklearn.metrics import det_curve

##### Get the Data and Load them and labels

In [None]:
# allow partially corrupted images
ImageFile.LOAD_TRUNCATED_IMAGES = True
warnings.filterwarnings("ignore", category=UserWarning, module="PIL.TiffImagePlugin")

data_dir = '/Users/kaunli/Desktop/School/Machine Learning/Project/WasteClassData'
image_size = (224, 224)
categories = ['Hazardous', 'Non-Recyclable', 'Organic', 'Recyclable']
num_classes = len(categories)
allowed_extensions = ('.jpg', '.jpeg', '.png')

In [None]:
# load the images
images = []
labels = []

for idx, main_category in enumerate(categories):
    main_path = os.path.join(data_dir, main_category, main_category)
    if not os.path.exists(main_path):
        print(f"Path not found: {main_path}")
        continue
    for subfolder in os.listdir(main_path):
        subfolder_path = os.path.join(main_path, subfolder)
        if not os.path.isdir(subfolder_path):
            continue
        for img_file in os.listdir(subfolder_path):
            if img_file.startswith('.') or not img_file.lower().endswith(allowed_extensions):
                continue
            try:
                img_path = os.path.join(subfolder_path, img_file)
                img = Image.open(img_path).convert('RGBA')  # handle transparency
                img = img.convert('RGB')  # convert to RGB
                img = img.resize(image_size)
                img_array = np.array(img, dtype='float32')
                images.append(img_array)
                labels.append(idx)
            except Exception as e:
                print(f"Skipping corrupted or unsupported file: {img_path} ({e})")

images = np.array(images)
labels = np.array(labels)

print("Loaded images:", images.shape)
print("Loaded labels:", labels.shape)

Loaded images: (2884, 224, 224, 3)
Loaded labels: (2884,)


##### Check how many images within Each Class

In [None]:
unique, counts = np.unique(labels, return_counts=True)
for cls, count in zip(categories, counts):
    print(f"{cls}: {count}")

##### Split train, validation, and test sets with 80/10/10

In [None]:
X_train, X_temp, y_train, y_temp = train_test_split(
    images, labels, test_size=0.2, random_state=42, stratify=labels
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
)


print("Train:", X_train.shape, y_train.shape)
print("Validation:", X_val.shape, y_val.shape)
print("Test:", X_test.shape, y_test.shape)

Train: (2307, 224, 224, 3) (2307,)
Validation: (288, 224, 224, 3) (288,)
Test: (289, 224, 224, 3) (289,)


##### One Hot Encoding

In [None]:
y_train = to_categorical(y_train, num_classes)
y_val = to_categorical(y_val, num_classes)
y_test = to_categorical(y_test, num_classes)

##### Compute Class Weights

In [None]:
from sklearn.utils import class_weight
from sklearn.utils.class_weight import compute_class_weight

class_weights = class_weight.compute_class_weight(
    'balanced',
    classes=np.unique(np.argmax(y_train, axis=1)),
    y=np.argmax(y_train, axis=1)
)
class_weights_dict = dict(enumerate(class_weights))
print("Class weights:", class_weights_dict)

Class weights: {0: 0.7741610738255034, 1: 1.1220817120622568, 2: 1.0944022770398483, 3: 1.1070057581573896}


In [None]:
# adjusted class weights based on evaluation
adjusted_class_weights = {0: 1.0, 1: 1.25, 2: 1.1, 3: 3.0}


##### Data Augmentation

In [None]:
train_datagen = ImageDataGenerator(
    preprocessing_function=preprocess_input,
    rotation_range=40,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.15,
    zoom_range=0.25,
    horizontal_flip=True,
    brightness_range=[0.8, 1.2],
    channel_shift_range=40,
    fill_mode='nearest'
)

val_datagen = ImageDataGenerator(preprocessing_function=preprocess_input)
test_datagen = ImageDataGenerator(preprocessing_function=preprocess_input)

train_generator = train_datagen.flow(X_train, y_train, batch_size=32)
val_generator = val_datagen.flow(X_val, y_val, batch_size=32)
test_generator = test_datagen.flow(X_test, y_test, batch_size=32, shuffle=False)

##### CNN Models

##### Baseline CNN (from scratch)

In [None]:
warnings.filterwarnings("ignore", category=UserWarning, module='keras')

baseline_model = Sequential([
    Input(shape=(224, 224, 3)),
    Conv2D(32, (3, 3), activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    
    Conv2D(64, (3, 3), activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    
    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(num_classes, activation='softmax')
])

# compile the model
baseline_model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

2025-11-12 15:34:53.524121: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M3
2025-11-12 15:34:53.524409: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 16.00 GB
2025-11-12 15:34:53.524423: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 5.92 GB
2025-11-12 15:34:53.524716: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2025-11-12 15:34:53.524729: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


In [None]:
# train the model
history = baseline_model.fit(
    train_generator,
    steps_per_epoch=len(X_train) // 32,
    epochs=20,
    validation_data=val_generator,
    validation_steps=len(X_val) // 32
)
# evaluate the model
test_loss, test_accuracy = baseline_model.evaluate(test_generator, steps=len(X_test) // 32)
print(f'Test accuracy: {test_accuracy:.4f}')

Epoch 1/20


2025-11-12 15:34:54.265744: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:117] Plugin optimizer for device_type GPU is enabled.


[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 158ms/step - accuracy: 0.2554 - loss: 323.9544 - val_accuracy: 0.3125 - val_loss: 1.3861
Epoch 2/20
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - accuracy: 0.4062 - loss: 1.4425 - val_accuracy: 0.3125 - val_loss: 1.3858
Epoch 3/20
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 150ms/step - accuracy: 0.3121 - loss: 1.3900 - val_accuracy: 0.3056 - val_loss: 1.3783
Epoch 4/20
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - accuracy: 0.1875 - loss: 1.4059 - val_accuracy: 0.3056 - val_loss: 1.3784
Epoch 5/20
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 147ms/step - accuracy: 0.3266 - loss: 1.3878 - val_accuracy: 0.3264 - val_loss: 1.4151
Epoch 6/20
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - accuracy: 0.2812 - loss: 1.4135 - val_accuracy: 0.3333 - val_loss: 1.4197
Epoch 7/20
[1m72/72[0m [32m━━━━━━━━━━

##### VGG16 Model

In [1]:
# define the VGG16 model
vgg_base = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
vgg_base.trainable = False  # freeze the base model

x = GlobalAveragePooling2D()(vgg_base.output)
x = BatchNormalization()(x)
x = Dense(128, activation='relu', kernel_regularizer='l2')(x)
x = Dropout(0.5)(x)
output = Dense(num_classes, activation='softmax')(x)

# create the model
model = Model(inputs=vgg_base.input, outputs=output)

NameError: name 'VGG16' is not defined

In [None]:
# define callbacks and learning rate scheduler
early_stop = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
checkpoint = ModelCheckpoint('best_vgg16_ecosort.keras', monitor='val_accuracy', save_best_only=True)

# learning rate schedule
initial_lr = 2e-5
lr_schedule = ExponentialDecay(
    initial_learning_rate=initial_lr,
    decay_steps=len(train_generator),
    decay_rate=0.9,
    staircase=True
)

In [None]:
# compile the model
model.compile(
    optimizer=Adam(learning_rate=lr_schedule),
    loss=CategoricalCrossentropy(label_smoothing=0.1),
    metrics=['accuracy']
)
# train the model
history = model.fit(
    train_generator,
    epochs=20,
    validation_data=val_generator,
    #class_weight=class_weights_dict,
    class_weight=adjusted_class_weights,
    callbacks=[early_stop, checkpoint]
)

# evaluate the model
test_loss, test_accuracy = model.evaluate(test_generator, steps=len(X_test) // 32)
print(f'VGG16 Model Test Accuracy: {test_accuracy:.4f}')

Epoch 1/20
[1m73/73[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m26s[0m 349ms/step - accuracy: 0.2436 - loss: 4.8097 - val_accuracy: 0.2743 - val_loss: 5.0203
Epoch 2/20
[1m73/73[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m27s[0m 365ms/step - accuracy: 0.2865 - loss: 4.5961 - val_accuracy: 0.2882 - val_loss: 4.4901
Epoch 3/20
[1m73/73[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m28s[0m 384ms/step - accuracy: 0.3082 - loss: 4.5246 - val_accuracy: 0.3090 - val_loss: 4.2637
Epoch 4/20
[1m73/73[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m28s[0m 378ms/step - accuracy: 0.3195 - loss: 4.3985 - val_accuracy: 0.3472 - val_loss: 4.1060
Epoch 5/20
[1m73/73[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m28s[0m 384ms/step - accuracy: 0.3242 - loss: 4.3447 - val_accuracy: 0.3681 - val_loss: 4.0210
Epoch 6/20
[1m73/73[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m28s[0m 377ms/step - accuracy: 0.3338 - loss: 4.3214 - val_accuracy: 0.3889 - val_loss: 3.9347
Epoch 7/20
[1m73/73[

In [None]:
vgg_base.trainable = True
for layer in vgg_base.layers[:-30]:
    layer.trainable = False

# recompile with smaller LR
fine_tune_lr = ExponentialDecay(
    initial_learning_rate=1e-5,
    decay_steps=len(train_generator),
    decay_rate=0.9,
    staircase=True
)

In [None]:
# compile the model
model.compile(
    optimizer=Adam(learning_rate=fine_tune_lr),
    loss=CategoricalCrossentropy(label_smoothing=0.1),
    metrics=['accuracy']
)

# train the model
history = model.fit(
    train_generator,
    epochs=15,
    validation_data=val_generator,
    #class_weight=class_weights_dict,
    class_weight=adjusted_class_weights,
    callbacks=[early_stop, checkpoint]
)

# evaluate the model
test_loss, test_accuracy = model.evaluate(test_generator, steps=len(X_test) // 32)
print(f"Fine-tuned VGG16 Model Test Accuracy: {test_accuracy:.4f}")

##### Plot Training vs Validation Accuracy/Loss Curve

In [None]:
# Plot training & validation accuracy values
plt.figure(figsize=(10, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

In [None]:
# evaluate the best saved VGG16 model on test data
best_vgg_model = load_model('best_vgg16_ecosort.keras')
test_loss, test_accuracy = best_vgg_model.evaluate(test_generator)
print("Best VGG16 model Test Accuracy:", test_accuracy)

##### ResNet50 Model

In [None]:
# Define ResNet50 model
resnet_base = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
resnet_base.trainable = False  # freeze the base model

x = GlobalAveragePooling2D()(resnet_base.output)
x = BatchNormalization()(x)
x = Dense(128, activation='relu', kernel_regularizer='l2')(x)
x = Dropout(0.5)(x)
output = Dense(num_classes, activation='softmax')(x)

# create the model
model = Model(inputs=resnet_base.input, outputs=output)

In [None]:
# compile the model with the initial learning rate schedule
initial_lr = 2e-5
lr_schedule = ExponentialDecay(
    initial_learning_rate=initial_lr,
    decay_steps=len(train_generator),
    decay_rate=0.9,
    staircase=True
)

# compile the model
model.compile(
    optimizer=Adam(learning_rate=lr_schedule),
    loss=CategoricalCrossentropy(),
    metrics=['accuracy']
)

In [None]:
# callbacks
early_stop = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
checkpoint = ModelCheckpoint('best_resnet50_ecosort.keras', monitor='val_accuracy', save_best_only=True)

# train the model
history = model.fit(
    train_generator,
    epochs=20,
    validation_data=val_generator,
    #class_weight=class_weights_dict,
    class_weight=adjusted_class_weights,
    callbacks=[early_stop, checkpoint]
)

# evaluate the model
test_loss, test_accuracy = model.evaluate(test_generator, steps=len(X_test) // 32)
print(f'ResNet50 Model Test Accuracy: {test_accuracy:.4f}')

##### Fine-tune ResNet50

In [None]:
resnet_base.trainable = True
for layer in resnet_base.layers[:-30]:
    layer.trainable = False

# recompile with smaller LR (continue decay)
fine_tune_lr = ExponentialDecay(
    initial_learning_rate=1e-6,
    decay_steps=len(train_generator),
    decay_rate=0.9,
    staircase=True
)

In [None]:
# compile the model
model.compile(
    optimizer=Adam(learning_rate=lr_schedule),
    loss=CategoricalCrossentropy(label_smoothing=0.1),
    metrics=['accuracy']
)

# train the model
history = model.fit(
    train_generator,
    epochs=15,
    validation_data=val_generator,
    #class_weight=class_weights_dict,
    class_weight=adjusted_class_weights,
    callbacks=[early_stop, checkpoint]
)

# evaluate the model
test_loss, test_accuracy = model.evaluate(test_generator, steps=len(X_test) // 32)
print(f"Fine-tuned ResNet50 Model Test Accuracy: {test_accuracy:.4f}")

##### Plot the Accuracy and Loss

In [None]:
# Plot training & validation accuracy values
plt.figure(figsize=(10, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

In [None]:
# evaluate the best saved ResNet50 model on test data
best_resnet_model = load_model('best_resnet50_ecosort.keras')
test_loss, test_accuracy = best_resnet_model.evaluate(test_generator)
print("Best ResNet50 model Test Accuracy:", test_accuracy)

##### Evaluation

In [None]:
# function to evaluate model performance
def evaluate_model(model, test_generator):
    # Get true labels and predicted labels
    y_true = np.argmax(y_test, axis=1)
    y_pred_probs = model.predict(test_generator)
    y_pred = np.argmax(y_pred_probs, axis=1)

    # Classification report
    print("Classification Report:")
    print(classification_report(y_true, y_pred, target_names=categories))

    # Confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=categories)
    disp.plot(cmap=plt.cm.Blues)
    plt.title("Confusion Matrix")
    plt.show()

    # Precision, Recall, F1-Score
    precision = precision_score(y_true, y_pred, average='weighted')
    recall = recall_score(y_true, y_pred, average='weighted')
    f1 = f1_score(y_true, y_pred, average='weighted')

    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1-Score: {f1:.4f}")

In [None]:
# call the function to evaluate VGG16 model
print("\nEvaluating VGG16 Model:")
evaluate_model(best_vgg_model, test_generator)

In [None]:
# call the function to evaluate the ResNet50 model
print("Evaluating ResNet50 Model:")
evaluate_model(best_resnet_model, test_generator)

##### ROC curves

In [None]:
y_true = np.argmax(y_test, axis=1)  # true labels
y_score = model.predict(test_generator)  # predicted probabilities

# Binarize labels for multi-class
y_true_bin = label_binarize(y_true, classes=np.arange(len(categories)))

# Compute ROC curve and AUC for each class
plt.figure(figsize=(8,6))
for i, category in enumerate(categories):
    fpr, tpr, thresholds = roc_curve(y_true_bin[:, i], y_score[:, i])
    roc_auc = auc(fpr, tpr)

    plt.plot(fpr, tpr, lw=2, label=f'{category} (AUC = {roc_auc:.2f})')

plt.plot([0, 1], [0, 1], color='navy', lw=1, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Multi-class ROC Curve')
plt.legend(loc="lower right")
plt.show()

##### DET Curve

In [None]:
plt.figure(figsize=(8,6))
for i, category in enumerate(categories):
    fnr, fpr, thresholds = det_curve(y_true_bin[:, i], y_score[:, i])

    plt.plot(fpr, fnr, lw=2, label=f'{category}')

plt.xlabel('False Positive Rate')
plt.ylabel('False Negative Rate')
plt.title('Multi-class DET Curve')
plt.legend()
plt.show()

### Test Our Image

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import load_model
from tensorflow.keras.applications.vgg16 import preprocess_input as vgg_preprocess
from tensorflow.keras.applications.resnet50 import preprocess_input as resnet_preprocess

In [None]:
# load models and print recommendations
vgg_model = load_model('best_vgg16_ecosort.keras')
resnet_model = load_model('best_resnet50_ecosort.keras')

categories = ['Hazardous', 'Non-Recyclable', 'Organic', 'Recyclable']

category_instructions = {
    'Hazardous': "Must be handled through specialized disposal programs for environmental and personal safety.",
    'Non-Recyclable': "Dispose in regular trash.",
    'Organic': "Suitable for composting.",
    'Recyclable': "Place in the recycling bin."
}

In [None]:
# Function to load, preprocess, and predict an image
def predict_image(model, img_path, target_size=(224, 224), model_type='vgg'):
    # Load image
    img = image.load_img(img_path, target_size=target_size)
    img_array = image.img_to_array(img)
    img_array = np.expand_dims(img_array, axis=0)

    # Preprocess for the specific model
    if model_type.lower() == 'vgg':
        img_array = vgg_preprocess(img_array)
    elif model_type.lower() == 'resnet':
        img_array = resnet_preprocess(img_array)
    
    # Predict
    preds = model.predict(img_array)
    class_idx = np.argmax(preds)
    confidence = preds[0][class_idx]
    
    return categories[class_idx], confidence, img


In [None]:
def classify_waste(img_path):
    # Predictions from both models
    vgg_pred, vgg_conf, img = predict_image(vgg_model, img_path, model_type='vgg')
    resnet_pred, resnet_conf, _ = predict_image(resnet_model, img_path, model_type='resnet')

    # Final combined decision
    if vgg_pred == resnet_pred:
        final_pred = vgg_pred
        final_text = f"Final Decision: {final_pred}\n{category_instructions[final_pred]}"
    else:
        final_text = (f"Models disagree.\n"
                      f"VGG16 → {vgg_pred}\n"
                      f"ResNet50 → {resnet_pred}\n"
                      f"⚠️ Do manual checking.")

    # Display image + results
    plt.figure(figsize=(7, 7))
    plt.imshow(img)
    plt.axis('off')

    plt.title(
        f"VGG16: {vgg_pred} ({vgg_conf:.2f})\n"
        f"{category_instructions[vgg_pred]}\n\n"
        f"ResNet50: {resnet_pred} ({resnet_conf:.2f})\n"
        f"{category_instructions[resnet_pred]}\n\n"
        f"{final_text}"
    )
    plt.show()