<a href="https://colab.research.google.com/github/EintsWaveX/IWSS_EL4705-04_TrainingWorkspace/blob/main/IWSS_EL4705_04_TrainingWorkspace.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **WASTE CLASSIFICATION TRAINING**

_*Complete training notebook for ESP32-CAM waste sorter.*_

This section is focused on preparing all necessary libraries to support the Tiny CNN Model creation for the Waste Classification Training Dataset, i.e. TensorFlow (v2.10.0), NumPy, MatPlotLib, OpenCV for Python. All necessary data are found in [TrashNet/feyzazkeve](https://www.kaggle.com/datasets/feyzazkefe/trashnet).


In [None]:
!pip install tensorflow
!pip install numpy matplotlib opencv-python
!pip install ai-edge-litert

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import os
import shutil
import zipfile
import cv2
import binascii
from ai_edge_litert.interpreter import Interpreter
from google.colab import files
print("TensorFlow version:", tf.__version__)

# **STEP 1: UPLOAD THE DATASET**
### Create directory structure.

In [None]:
!mkdir -p dataset_320x80/train/plastic dataset_320x80/train/paper dataset_320x80/train/metal
!mkdir -p dataset_320x80/test/plastic dataset_320x80/test/paper dataset_320x80/test/metal
!mkdir -p dataset_340x60/train/plastic dataset_340x60/train/paper dataset_340x60/train/metal
!mkdir -p dataset_340x60/test/plastic dataset_340x60/test/paper dataset_340x60/test/metal

print("Please upload images to the corresponding folders")
print("Required: 320/340 images per class for training, 80/60 for testing")
print("Dataset required:\n\tPlastic, Paper, and Metal.\n")
print("Following the process below, if no need to re-extract the ZIP file for")
print("the dataset training and validation, just press the Cancel Upload button")
print("and the program will proceed to whatever in the availabe dataset anyway.")

folders = [
    "dataset_320x80/test/plastic",
    "dataset_320x80/test/paper",
    "dataset_320x80/test/metal",
    "dataset_320x80/train/plastic",
    "dataset_320x80/train/paper",
    "dataset_320x80/train/metal",

    "dataset_340x60/test/plastic",
    "dataset_340x60/test/paper",
    "dataset_340x60/test/metal",
    "dataset_340x60/train/plastic",
    "dataset_340x60/train/paper",
    "dataset_340x60/train/metal"
]

delete_folders = False
reupload_folders = True
upload_once = True

if delete_folders:
    for folder in folders:
        if os.path.exists(folder):
            for item in os.listdir(folder):
                item_path = os.path.join(folder, item)
                if os.path.isfile(item_path) or os.path.islink(item_path):
                    os.remove(item_path)
                elif os.path.isdir(item_path):
                    shutil.rmtree(item_path)
            print(f"Cleared contents of: {folder}")
        else:
            print(f"Folder not found: {folder}")

if reupload_folders:
    if upload_once:
        uploaded = files.upload()
        with zipfile.ZipFile("IWSS_WasteClassificationDataset.zip", "r") as zip_ref:
            zip_ref.extractall("")

    source_base = "IWSS_WasteClassificationDataset"
    classes = ["metal", "plastic", "paper"]
    configs = [
        ("dataset_320x80", 320, 80),
        ("dataset_340x60", 340, 60)
    ]

    for dataset_name, train_count, test_count in configs:
        for cls in classes:
            src_dir = os.path.join(source_base, cls)
            files = sorted(os.listdir(src_dir))

            for f in files[:train_count]:
                shutil.copy(
                    os.path.join(src_dir, f),
                    f"{dataset_name}/train/{cls}/{f}"
                )

            for f in files[train_count:train_count + test_count]:
                shutil.copy(
                    os.path.join(src_dir, f),
                    f"{dataset_name}/test/{cls}/{f}"
                )

        print(f"Successfully re-uploaded three {dataset_name} for `train` and `test`.")

# **STEP 2: LOAD AND PREPARE DATA**

In [None]:
IMG_SIZE_W, IMG_SIZE_H = 240, 320 # Available: [96, 128, 160, 240] | 320x240 for QVGA
BATCH_SIZE = 32
DATASET_PATH = "dataset_320x80" # Another variation: dataset_340x60
AUTOTUNE = tf.data.AUTOTUNE

def augment(image, label):
    image = tf.cast(image, tf.float32)
    image = data_augmentation(image, training=True)
    return image, label

data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal"),
    tf.keras.layers.RandomBrightness(0.15),
    tf.keras.layers.RandomContrast(0.2),
])

train_ds = tf.keras.utils.image_dataset_from_directory(
    f'{DATASET_PATH}/train',
    image_size=(IMG_SIZE_W, IMG_SIZE_H),
    batch_size=BATCH_SIZE,
    label_mode='int'
)
val_ds = tf.keras.utils.image_dataset_from_directory(
    f'{DATASET_PATH}/test',
    image_size=(IMG_SIZE_W, IMG_SIZE_H),
    batch_size=BATCH_SIZE,
    label_mode='int'
)

train_ds_aug = train_ds.map(
    augment,
    num_parallel_calls=AUTOTUNE
)

train_ds_aug = train_ds_aug.cache().prefetch(AUTOTUNE)
val_ds = val_ds.cache().prefetch(AUTOTUNE)


# **STEP 3: CREATE TINY CNN MODEL**

In [None]:
# Changes:
# 1. Removed the Rescaling(1./255) part, so now the model expects raw pixel
#    values in [0, 255].
# 2. Revamped the models channels into factors of 16 instead of 8 and/or 12.
# 3. Added BatchNormalization, removing Bias from Conv layers, and separating
#    ReLU from Conv.
# 4. Enhancing the Conv2D layering technique, with the final decision of using
#    four blocks in order to get better result in the training and validation
#    process.

# >>> model = tf.keras.Sequential([
# >>>     tf.keras.Input(shape=(IMG_SIZE_W, IMG_SIZE_H, 3)),
# >>>     tf.keras.layers.Conv2D(8, 3, padding='same', activation='relu'),
# >>>     tf.keras.layers.MaxPooling2D(),
# >>>     tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
# >>>     tf.keras.layers.MaxPooling2D(),
# >>>     tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
# >>>     tf.keras.layers.GlobalAveragePooling2D(),
# >>>     tf.keras.layers.Dense(3, activation='softmax')
# >>> ])

model = tf.keras.Sequential([
    tf.keras.Input(shape=(IMG_SIZE_W, IMG_SIZE_H, 3)),

    # ===== Block 1: Edge & color =====
    tf.keras.layers.Conv2D(
        16, 3, padding='same', use_bias=False
    ),
    tf.keras.layers.BatchNormalization(momentum=0.9),
    tf.keras.layers.ReLU(),
    tf.keras.layers.MaxPooling2D(),

    # ===== Block 2: Texture =====
    tf.keras.layers.Conv2D(
        32, 3, padding='same', use_bias=False
    ),
    tf.keras.layers.BatchNormalization(momentum=0.9),
    tf.keras.layers.ReLU(),
    tf.keras.layers.MaxPooling2D(),

    # ===== Block 3: Better Abstraction and Class Separation =====
    tf.keras.layers.Conv2D(
        48, 3, padding='same', use_bias=False
    ),
    tf.keras.layers.BatchNormalization(momentum=0.9),
    tf.keras.layers.ReLU(),
    tf.keras.layers.MaxPooling2D(),

    # ===== Block 4: Material Patterns =====
    tf.keras.layers.Conv2D(
        64, 3, padding='same', use_bias=False
    ),
    tf.keras.layers.BatchNormalization(momentum=0.9),
    tf.keras.layers.ReLU(),

    # ===== Global aggregation =====
    tf.keras.layers.GlobalAveragePooling2D(),

    # ===== Classification =====
    tf.keras.layers.Dense(3, activation='softmax')
])


model.compile(
    optimizer='adam',
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy']
)

model.summary()

# **STEP 4: TRAIN THE MODEL**

In [None]:
# Changes:
# 1. Added EarlyStopping in the training process for better model output.
# 2. Added unsafe_training flag for complete training set without
#    EarlyStopping callback, technically unsafe because underfitting
#    or overfitting may occur and some losses may happen along the way.
# 3. Added ReduceLROnPlateau.

# >>> Epochs Configuration <<<
# ✅ Best starting/default range: 30–60 epochs
# ❌ Avoid:
#           < 20 → likely underfitting
#           > 80 → likely overfitting (especially without augmentation)

# >>> EPOCHS = 40
# >>> history = model.fit(
# >>>     train_ds,
# >>>     validation_data=val_ds,
# >>>     epochs=EPOCHS
# >>> )

safe_training = True

# --- TRAINING PROCESS ---
# ...For better tuning, set unsafe_training = False.
# ...Configure amount of EPOCHS by indexing.
EPOCHS = [30, 40, 50, 60][-1]

callbacks, history = None, None
# class_weight = {
#     0: 1.2,  # Plastic
#     1: 1.0,  # Paper
#     2: 1.3,  # Metal
# }

if safe_training:
    print("[INFO] Safe training is ENABLED!")
    print("[INFO] EarlyStopping and ReduceLROnPlateau is APPLIED...")

    callbacks = [
        tf.keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=8,
            restore_best_weights=True
        ),
        tf.keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=1e-5
        )
    ]
    history = model.fit(
        train_ds_aug,
        # class_weight=class_weight,
        validation_data=val_ds,
        epochs=EPOCHS,
        callbacks=callbacks
    )
else:
    print("[INFO] Safe training is DISABLED!")

    history = model.fit(
        train_ds_aug,
        # class_weight=class_weight,
        validation_data=val_ds,
        epochs=EPOCHS,
    )

# --- TRAINING PROCESS ---

# Plot training history
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

# **STEP 5: CONVERT TO TFLITE FOR ESP32**

In [None]:
# Changes:
# 1. Confirming the representative dataset must be float32 in [0, 255].
# 2. Removed Rescaling from previous blocks, ensuring everything in raw pixels.
# 3. Fixed ds_taken into taking total amount of training samples over batch
#    size.
# 4. Removed ds_taken logic, just iterate over train_ds directly.

# >>> import math
# >>> num_samples = 0
# >>> for images, labels in train_ds:
# >>>     num_samples += images.shape[0]
# >>> ds_taken = 150
# >>> ds_taken = math.ceil(num_samples / BATCH_SIZE)

def representative_dataset():
    # >>> for images, _ in train_ds.take(ds_taken):
    # >>>    for i in range(BATCH_SIZE):
    # >>>        yield [images[i:i+1]]
    for images, _ in train_ds:   # NON-augmented dataset
        images = tf.cast(images, tf.float32)
        for i in range(images.shape[0]):
            yield [images[i:i+1]]   # (1, IMG_SIZE_W, IMG_SIZE_H, 3)

converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_types = [tf.int8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

tflite_model = converter.convert()

# Save the model
with open('waste_classifier.tflite', 'wb') as f:
    f.write(tflite_model)

print(f"Model saved! Size: {len(tflite_model) / 1024:.2f} KB")

# **STEP 6: [SAMPLE] TEST THE TFLITE MODEL**

### NOTE: Single random sample only.

In [None]:
# Changes:
# 1. Fixed the Input Preprocess (from ignoring INT8 scaling process before,
#    resulting in wrong prediction and such) with the correct INT8 mapping.
# 2. Used np.expand_dims instead of list wrapping.
# 3. Incorrect output handling produces probabilities instead of INT8 logits,
#    thus fixed the output_details part to be more proper in output_float,
#    making it is indeed in real-valued logits.
# 4. Even though THE model has softmax, INT8 outputs are still quantized.
#    Therefore, added changes for the predicted_class and confidence variables
#    by introducing new variables exp and probs.
# 5. Removed interpreter.get_tensor() logic, can cause confusion and it does
#    nothing to the algorithm.
# 6. Updated the np.clip() logic inside the INPUT block.

interpreter = Interpreter(model_content=tflite_model)
interpreter.allocate_tensors()

# --------------------------------------------------
# Get one validation sample
# --------------------------------------------------
for images, labels in val_ds.take(1):
    sample_image = images[0].numpy()
    sample_label = labels[0].numpy()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

def infer_int8(interpreter, image, input_details, output_details):
    # --------------------------------------------------
    # INPUT: Quantize float32 -> INT8
    # --------------------------------------------------
    input_scale, input_zero_point = input_details[0]['quantization']
    input_dtype = input_details[0]['dtype']

    image_float = image.astype(np.float32)
    image_quantized = image_float / input_scale + input_zero_point

    info = np.iinfo(input_dtype)
    image_int8 = np.clip(
        image_quantized, info.min, info.max
    ).astype(input_dtype)
    # >>> sample_image_int8 = (sample_image * 255).astype(np.int8)

    # >>> interpreter.set_tensor(input_details[0]['index'], [sample_image_int8])
    interpreter.set_tensor(
        input_details[0]['index'],
        np.expand_dims(image_int8, axis=0)
    )

    interpreter.invoke()

    # --------------------------------------------------
    # OUTPUT: Dequantize INT8 -> float32 logits
    # --------------------------------------------------
    output_scale, output_zero_point = output_details[0]['quantization']
    output_int8 = interpreter.get_tensor(output_details[0]['index'])

    output_float = output_scale * (
        output_int8.astype(np.float32) - output_zero_point
    )

    # --------------------------------------------------
    # Softmax (approximate confidence)
    # --------------------------------------------------
    exp = np.exp(output_float[0] - np.max(output_float[0]))
    probs = exp / np.sum(exp)

    return probs

probs = infer_int8(
    interpreter,
    sample_image,
    input_details,
    output_details
)

# >>> confidence = output[0][predicted_class] * 100
predicted_class = np.argmax(probs)
# >>> predicted_class = np.argmax(output[0])
confidence = probs[predicted_class] * 100

class_names = ['Plastic', 'Paper', 'Metal']
print("Test Result:")
print(f"True      : {class_names[sample_label]}")
print(f"Predicted : {class_names[predicted_class]}")
print(f"Confidence: {confidence:.1f}%")

# **STEP 7: [FULL] TEST THE TFLITE MODEL**

### NOTE: Entire waste dataset (Plastic, Paper, and Metal).

In [None]:
# ---- STATISTICS ---
total = 0
correct = 0
num_classes = len(class_names)
per_class_total = np.zeros(num_classes, dtype=int)
per_class_correct = np.zeros(num_classes, dtype=int)

# Optional: confusion matrix
confusion = np.zeros((num_classes, num_classes), dtype=int)

display_all_samples = True

# ---- DATASET LOOP ----
for images, labels in val_ds:
    images = images.numpy()
    labels = labels.numpy()

    for i in range(images.shape[0]):
        probs = infer_int8(
            interpreter,
            images[i],
            input_details,
            output_details
        )

        predicted_class = np.argmax(probs)
        confidence = probs[predicted_class] * 100
        true_label = labels[i]

        # ---- METRICS ----
        total += 1
        per_class_total[true_label] += 1
        confusion[true_label, predicted_class] += 1

        if predicted_class == true_label:
            correct += 1
            per_class_correct[true_label] += 1

        if display_all_samples:
            print(f"[True, Predicted, Confidence]: [{class_names[true_label]}, {class_names[predicted_class]}, {confidence:.1f}%]")

# Print all necessary informations.
print(f"\nOverall Accuracy: {correct / total * 100:.2f}%\n")
print("Per-Class Accuracy:")
for i, name in enumerate(class_names):
    acc = per_class_correct[i] / per_class_total[i] * 100
    print(f"{name:8s}: {acc:.2f}% ({per_class_correct[i]}/{per_class_total[i]})")

# ---- CONFUSION MATRIX GRAPHICAL DISPLAY ----
# Additional info: Displaying the Confusion Matrix.
fig, ax = plt.subplots(figsize=(6, 5))
im = ax.imshow(confusion, cmap="Blues")

# Axis labels
ax.set_xticks(np.arange(num_classes))
ax.set_yticks(np.arange(num_classes))
ax.set_xticklabels(class_names)
ax.set_yticklabels(class_names)

ax.set_xlabel("Predicted Label")
ax.set_ylabel("True Label")
ax.set_title("Confusion Matrix")

# Rotate x-axis labels
plt.setp(ax.get_xticklabels(), rotation=45, ha="right")

# Annotate each cell
for i in range(num_classes):
    for j in range(num_classes):
        ax.text(
            j, i,
            confusion[i, j],
            ha="center", va="center",
            color="white" if confusion[i, j] > confusion.max() / 2 else "black"
        )

fig.colorbar(im, ax=ax)
plt.tight_layout()
plt.show()

# **STEP 8: CONVERT TO C ARRAY FOR ESP32**

In [None]:
import binascii
from google.colab import files # Re-import files to ensure it's the module

def tflite_to_c_array(tflite_model, array_name='waste_model'):
    hex_data = binascii.hexlify(tflite_model).decode('utf-8')

    c_code = f"// TensorFlow Lite model for waste classification\n"
    c_code += f"// Size: {len(tflite_model)} bytes\n\n"
    c_code += f"const unsigned char {array_name}[] = {{\n    "

    # Format with 16 bytes per line
    for i in range(0, len(hex_data), 2):
        if i > 0 and i % 32 == 0:
            c_code += "\n    "
        c_code += f"0x{hex_data[i:i+2]}, "

    c_code = c_code[:-2] + "\n};\n\n"
    c_code += f"const int {array_name}_len = {len(tflite_model)};"

    return c_code

c_array_code = tflite_to_c_array(tflite_model)
with open('model.h', 'w') as f:
    f.write(c_array_code)

print("\nC array saved to 'model.h'")
print("Download this file for ESP32: files.download('model.h')")

# Download the files
download_flagged = True
if download_flagged:
    files.download(f'waste_classifier.tflite')
    files.download(f'model.h')