This notebook has been imported from Kaggle. Please note that the dataset used in this notebook is not included here. However, you can find the dataset [here](https://www.kaggle.com/datasets/joebeachcapital/realwaste).


In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
# Data Loading and Preprocessing

In [None]:
import tensorflow as tf, os, subprocess, sys

print("TF:", tf.__version__)
print("Physical GPUs:", tf.config.list_physical_devices("GPU"))

# Optional, see driver details:
!nvidia-smi

In [None]:
gpus = tf.config.list_physical_devices('GPU')
for g in gpus:
    tf.config.experimental.set_memory_growth(g, True)

In [None]:
# Mixed precision for T4 Tensor Cores
from tensorflow.keras import mixed_precision

mixed_precision.set_global_policy("mixed_float16")
strategy = tf.distribute.MirroredStrategy()

### Data Loading

In [None]:
SEED = 42
IMG_SIZE = (224, 224)
BATCH = 64
AUTOTUNE = tf.data.AUTOTUNE
DATA_DIR = "/kaggle/input/realwaste/realwaste-main/RealWaste" 

In [None]:
train_raw = tf.keras.utils.image_dataset_from_directory(
    DATA_DIR,
    validation_split=0.30,
    subset="training",
    seed=SEED,
    image_size=IMG_SIZE,
    batch_size=BATCH,
)

temp_raw = tf.keras.utils.image_dataset_from_directory(
    DATA_DIR,
    validation_split=0.30,
    subset="validation",
    seed=SEED,
    image_size=IMG_SIZE,
    batch_size=BATCH,
    shuffle=True,
)

class_names = train_raw.class_names
num_classes = len(class_names)
print("Classes:", class_names)

In [None]:
temp_batches = temp_raw.cardinality().numpy()  # should be known for this dataset
val_raw  = temp_raw.take(temp_batches // 2)
test_raw = temp_raw.skip(temp_batches // 2)

### Data Preprocessing

In [None]:
import matplotlib.pyplot as plt

In [None]:
def count_labels(ds, num_classes):
    counts = np.zeros(num_classes, dtype=np.int64)
    for _, y in ds.unbatch():
        # y may be int (sparse) or one-hot; normalize to int
        if len(y.shape) == 0:
            counts[int(y.numpy())] += 1
        else:
            counts[int(np.argmax(y.numpy()))] += 1
    return counts

train_counts = count_labels(train_raw, len(class_names))
val_counts   = count_labels(val_raw,   len(class_names))
test_counts  = count_labels(test_raw,  len(class_names))

def bar_counts(title, counts):
    plt.figure(figsize=(10,4))
    plt.title(title)
    plt.bar(class_names, counts)
    plt.xticks(rotation=45, ha='right')
    plt.ylabel("images")
    plt.tight_layout()
    plt.show()

bar_counts("Train class distribution", train_counts)
bar_counts("Validation class distribution", val_counts)
bar_counts("Test class distribution", test_counts)

In [None]:
ys = []
for _, y in train_raw.unbatch().take(2000):
    ys.append(y.numpy())

ys = np.array(ys)
print("dtype:", ys.dtype, "shape:", ys.shape)
print("min/max:", ys.min(), ys.max())
print("unique:", np.unique(ys)[:20]) 

#### Augment and Normalize data

In [None]:
from tensorflow import keras
from tensorflow.keras import layers

In [None]:
augment = keras.Sequential([
    layers.RandomFlip("horizontal"),
    layers.RandomRotation(0.12),
    layers.RandomTranslation(0.1, 0.1),
    layers.RandomZoom(0.15),
    layers.RandomContrast(0.2),
], name="augment")

normalize = layers.Rescaling(1./255)

def to_float32(x, y):  # ensure float32 before normalize
    return tf.cast(x, tf.float32), y

In [None]:
# TRAIN: augment → normalize
train_ds = (train_raw
            .map(to_float32, num_parallel_calls=AUTOTUNE)
            .map(lambda x,y: (normalize(x), y), num_parallel_calls=AUTOTUNE)
            .map(lambda x,y: (augment(x, training=True), y), num_parallel_calls=AUTOTUNE)
            .prefetch(AUTOTUNE))

val_ds = (val_raw
          .map(to_float32, num_parallel_calls=AUTOTUNE)
          .map(lambda x,y: (normalize(x), y), num_parallel_calls=AUTOTUNE)
          .prefetch(AUTOTUNE))

test_ds = (test_raw
           .map(to_float32, num_parallel_calls=AUTOTUNE)
           .map(lambda x,y: (normalize(x), y), num_parallel_calls=AUTOTUNE)
           .prefetch(AUTOTUNE))

# Training

### Model

In [None]:
def conv_bn_act(x, filters, k, stride=1, act="relu"):
    x = layers.Conv2D(filters, k, strides=stride, padding="same", use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(act)(x)
    return x

def residual_block(x, filters, k, act="relu"):
    """Two convs with a residual (skip). Uses 1x1 conv if channels mismatch."""
    shortcut = x
    # First conv
    y = conv_bn_act(x, filters, k, act=act)
    # Second conv (no activation before add)
    y = layers.Conv2D(filters, k, padding="same", use_bias=False)(y)
    y = layers.BatchNormalization()(y)

    # Match channels if needed
    if shortcut.shape[-1] != filters:
        shortcut = layers.Conv2D(filters, 1, padding="same", use_bias=False)(shortcut)
        shortcut = layers.BatchNormalization()(shortcut)

    out = layers.Add()([shortcut, y])
    out = layers.Activation(act)(out)
    return out

def build_resnet(input_shape, x1, m1, x2, m2, x3, m3, x4, x5,d, K, act="relu"):
    """
   - Conv layer (x1, m1) + residual block
      - MaxPool
      - Conv layer (x2, m2) + residual block
      - MaxPool
      - Flatten
      - Dense x3 + activation
      - Dropout d
      - Output softmax K
    """
    inp = layers.Input(shape=input_shape)

    # ---- Stage 1 ----
    x = conv_bn_act(inp, x1, m1, act=act)     # conv layer
    x = residual_block(x, x1, m1, act=act)    # residual 
    x = layers.MaxPooling2D(pool_size=2)(x)   # max pool

    # ---- Stage 2 ----
    x = conv_bn_act(x, x2, m2, act=act)       # conv layer
    x = residual_block(x, x2, m2, act=act)    # residual
    x = layers.MaxPooling2D(pool_size=2)(x)   # max pool

    # ---- Stage 3 ----
    x = conv_bn_act(x, x3, m3, act=act)       # conv layer
    x = residual_block(x, x3, m3, act=act)    # residual
    x = layers.MaxPooling2D(pool_size=2)(x)   # max pool

    # ---- Head ----
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(x4, activation=act, kernel_initializer="he_normal")(x)
    x = layers.Dropout(d)(x)    

    x = layers.Dense(x5, activation=act, kernel_initializer="he_normal")(x)
    x = layers.Dropout(d)(x)

    logits = layers.Dense(K, dtype="float32", name="logits")(x)
    out = layers.Activation("softmax", dtype="float32", name="softmax")(logits)

    model = keras.Model(inp, out, name="Model")
    return model

In [None]:
INPUT_SHAPE = (224, 224, 3)
K = len(class_names)

with strategy.scope():
    model = build_resnet(
        input_shape=INPUT_SHAPE,
        x1=32, m1=3,
        x2=64, m2=3,
        x3=128, m3=3,
        x4=256, x5=64, d=0.4,
        K=K, act="relu"
    )
    optimizer = tf.keras.optimizers.Adam(
        learning_rate=1e-3,   
        clipnorm=1.0   
    )

    model.compile(
        optimizer=optimizer,
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )

model.summary()

### Train Model

In [None]:
# ---------------- Callbacks ----------------
callbacks = [
    keras.callbacks.ReduceLROnPlateau(
        monitor="val_accuracy", factor=0.5, patience=4, min_lr=1e-5, verbose=1
    ),
    keras.callbacks.EarlyStopping(
        monitor="val_accuracy", mode="max", patience=10, restore_best_weights=True, verbose=1
    ),
    keras.callbacks.ModelCheckpoint(
        "best_rescnn.keras", monitor="val_accuracy", mode="max", save_best_only=True, verbose=1
    ),
    keras.callbacks.TerminateOnNaN()
]

In [None]:
# ---------------- Train ----------------
EPOCHS = 20
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    callbacks=callbacks,
    verbose=1
)


In [None]:
print("Keys:", list(history.history.keys()))

In [None]:
H = history.history
epochs = np.arange(1, len(H["loss"]) + 1)

# ---- Print summaries ----
print("\nFinal epoch:")
print(f"  accuracy     : {H['accuracy'][-1]:.5f}")
print(f"  val_accuracy : {H['val_accuracy'][-1]:.5f}")
print(f"  loss         : {H['loss'][-1]:.5f}")
print(f"  val_loss     : {H['val_loss'][-1]:.5f}")
print(f"  learning_rate: {H['learning_rate'][-1]:.6g}")

best_va_ep = int(np.nanargmax(H["val_accuracy"])) + 1
best_vl_ep = int(np.nanargmin(H["val_loss"])) + 1
print("\nBest epochs:")
print(f"  best val_accuracy at epoch {best_va_ep}: {H['val_accuracy'][best_va_ep-1]:.5f}")
print(f"  best val_loss     at epoch {best_vl_ep}: {H['val_loss'][best_vl_ep-1]:.5f}")

# ---- Plot: Loss ----
plt.figure()
plt.plot(epochs, H["loss"], label="train loss")
plt.plot(epochs, H["val_loss"], label="val loss")
plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.title("Training vs Validation Loss")
plt.grid(True); plt.legend(); plt.tight_layout()

# ---- Plot: Accuracy ----
plt.figure()
plt.plot(epochs, H["accuracy"], label="train acc")
plt.plot(epochs, H["val_accuracy"], label="val acc")
plt.xlabel("Epoch"); plt.ylabel("Accuracy"); plt.title("Training vs Validation Accuracy")
plt.grid(True); plt.legend(); plt.tight_layout()

# ---- Plot: Learning Rate ----
plt.figure()
plt.plot(epochs, H["learning_rate"], label="learning rate")
plt.xlabel("Epoch"); plt.ylabel("LR"); plt.title("Learning Rate Schedule")
plt.grid(True); plt.legend(); plt.tight_layout()

plt.show()

## Optimizer Comparison: Adam vs SGD vs SGD+Momentum
We compare three optimizers on the same architecture and dataset:
- Adam (current choice)
- SGD (no momentum)
- SGD with momentum (m=0.9 by default)

> Metrics: validation accuracy (primary), validation loss (secondary). We prefer val accuracy to capture generalization on held‑out data, and val loss to reveal calibration/overfitting trends.

In [None]:
from copy import deepcopy
from sklearn.metrics import confusion_matrix, classification_report
import itertools

# Utility to build a fresh model and compile with a given optimizer
def make_compiled_model(optimizer):
    model = build_resnet(
        input_shape=INPUT_SHAPE,
        x1=32, m1=3,
        x2=64, m2=3,
        x3=128, m3=3,
        x4=256, x5=64, d=0.4,
        K=K, act="relu"
    )
    model.compile(optimizer=optimizer, loss="sparse_categorical_crossentropy", metrics=["accuracy"])
    return model

# Train helper
def train_with_optimizer(optimizer, epochs=EPOCHS):
    with strategy.scope():
        m = make_compiled_model(optimizer)
    hist = m.fit(
        train_ds, validation_data=val_ds, epochs=epochs, callbacks=callbacks, verbose=1
    )
    return m, hist

In [None]:
# Define optimizers to compare
adam_opt = tf.keras.optimizers.Adam(learning_rate=1e-3, clipnorm=1.0)
sgd_opt  = tf.keras.optimizers.SGD(learning_rate=1e-2)
sgdm_opt = tf.keras.optimizers.SGD(learning_rate=1e-2, momentum=0.9)

results = {}
for name, opt in [("Adam", adam_opt), ("SGD", sgd_opt), ("SGD+Momentum", sgdm_opt)]:
    print(f"\n==== Training with {name} ====")
    m, h = train_with_optimizer(opt, epochs=EPOCHS)
    H = h.history
    results[name] = {
        "model": m,
        "history": H,
        "final_val_acc": float(H.get("val_accuracy", [float('nan')])[-1]),
        "final_val_loss": float(H.get("val_loss", [float('nan')])[-1]),
    }

print("\nSummary (final epoch):")
for k,v in results.items():
    print(f"{k:>14} | val_acc: {v['final_val_acc']:.4f} | val_loss: {v['final_val_loss']:.4f}")

In [None]:
# Plot validation accuracy/loss across optimizers
plt.figure(figsize=(10,4))
for name, v in results.items():
    H = v["history"]
    plt.plot(H["val_accuracy"], label=f"{name} val_acc")
plt.xlabel("Epoch"); plt.ylabel("Val Accuracy"); plt.title("Optimizer Comparison: Validation Accuracy")
plt.grid(True); plt.legend(); plt.tight_layout()
plt.show()

plt.figure(figsize=(10,4))
for name, v in results.items():
    H = v["history"]
    plt.plot(H["val_loss"], label=f"{name} val_loss")
plt.xlabel("Epoch"); plt.ylabel("Val Loss"); plt.title("Optimizer Comparison: Validation Loss")
plt.grid(True); plt.legend(); plt.tight_layout()
plt.show()

## Momentum Impact Discussion
We vary the momentum parameter for SGD and observe its effect on convergence and generalization.
- Momentum helps smooth noisy gradients and accelerates optimization along consistent directions.
- Too low (≈0.0) behaves like vanilla SGD, slower convergence.
- Moderate (0.9) often improves stability and val accuracy.
- Too high (>0.95–0.99) may overshoot minima and cause oscillations without careful LR tuning.

In [None]:
# Sweep different momentum values
momentums = [0.0, 0.5, 0.9]
mom_results = {}
for m in momentums:
    print(f"\n==== Training with SGD momentum={m} ====")
    opt = tf.keras.optimizers.SGD(learning_rate=1e-2, momentum=m)
    model_m, hist_m = train_with_optimizer(opt, epochs=EPOCHS)
    Hm = hist_m.history
    mom_results[m] = {
        "model": model_m,
        "history": Hm,
        "final_val_acc": float(Hm.get("val_accuracy", [float('nan')])[-1]),
        "final_val_loss": float(Hm.get("val_loss", [float('nan')])[-1]),
    }

print("\nMomentum sweep summary (final epoch):")
for m,v in mom_results.items():
    print(f"m={m:.2f} | val_acc: {v['final_val_acc']:.4f} | val_loss: {v['final_val_loss']:.4f}")

plt.figure(figsize=(10,4))
for m,v in mom_results.items():
    plt.plot(v["history"]["val_accuracy"], label=f"momentum={m}")
plt.xlabel("Epoch"); plt.ylabel("Val Accuracy"); plt.title("SGD Momentum Sweep: Validation Accuracy")
plt.grid(True); plt.legend(); plt.tight_layout()
plt.show()

## Final Evaluation on Test Set
We report train/validation accuracy from history and compute test accuracy, confusion matrix, precision, and recall on `test_ds`.

In [None]:
# Choose the best model among the optimizer comparison by val_accuracy
best_name = max(results.keys(), key=lambda k: results[k]["final_val_acc"])
best_model = results[best_name]["model"]
print(f"Best by val_accuracy: {best_name}")

# Evaluate on test set
test_metrics = best_model.evaluate(test_ds, verbose=1)
print("Test metrics (loss, accuracy):", test_metrics)

# Build predictions and confusion matrix
y_true = []
y_pred = []
for x_batch, y_batch in test_ds:
    preds = best_model.predict(x_batch, verbose=0)
    y_true.extend(y_batch.numpy().tolist())
    y_pred.extend(np.argmax(preds, axis=1).tolist())

cm = confusion_matrix(y_true, y_pred)
print("Confusion Matrix:\n", cm)

print("\nClassification Report:")
print(classification_report(y_true, y_pred, target_names=class_names))