<a href="https://colab.research.google.com/github/MizeroR/realwaste-classifier/blob/main/notebooks/Summative_assignment_MLOP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Pipeline Overview**

*  Data acquisition: downloaded RealWaste zip, extracted and discovered class folders.
*  Data processing: split into train/val/test, ImageDataGenerator augmentation and normalization.
*  Model creation: MobileNetV2 & EfficientNetB0 with 2-phase transfer learning (frozen → fine-tune).
*  Evaluation: classification report, confusion matrix, saving metrics JSON.
*  Versioning: models saved to models/ with timestamped versions and _current.h5 pointer copies.
*  Retraining: retrain_keras_model() detects new data and retrains, promotes model if improved.
API: prediction helpers plus FastAPI skeleton included in api/ folder.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import os
import warnings
warnings.filterwarnings('ignore')

# Traditional ML
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.metrics import precision_recall_fscore_support
from sklearn.utils.class_weight import compute_class_weight

# Deep Learning
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models, callbacks
from tensorflow.keras.applications import MobileNetV2, EfficientNetB0
from tensorflow.keras.preprocessing.image import ImageDataGenerator

np.random.seed(42)
tf.random.set_seed(42)

print(f"TensorFlow Version: {tf.__version__}")
print(f"GPU Available: {len(tf.config.list_physical_devices('GPU')) > 0}")

In [None]:
CLASS_LABELS = [
    'Cardboard',
    'Food Organics',
    'Glass',
    'Metal',
    'Miscellaneous Trash',
    'Paper',
    'Plastic',
    'Textile Trash',
    'Vegetation'
]

IMG_HEIGHT, IMG_WIDTH = 224, 224
BATCH_SIZE = 32
NUM_CLASSES = len(CLASS_LABELS)
DATASET_SIZE = 4752

In [None]:
print("DOWNLOADING REALWASTE DATASET")

DATASET_URL = "https://archive.ics.uci.edu/static/public/908/realwaste.zip"
DATASET_ZIP = "realwaste.zip"
EXTRACT_DIR = "realwaste_data"

print(f"\nDownloading dataset from: {DATASET_URL}")

!wget -q --show-progress {DATASET_URL} -O {DATASET_ZIP}

print("\n Download complete!")

In [None]:
print(f"\nExtracting dataset to: {EXTRACT_DIR}/")
!unzip -o -q {DATASET_ZIP} -d {EXTRACT_DIR}

print("Extraction complete!")

print("\nInspecting dataset structure...")
!ls -la {EXTRACT_DIR}

def find_data_root(extract_dir):
    """Find the directory containing the actual image class folders"""
    for root, dirs, files in os.walk(extract_dir):
        dir_names = [d.lower() for d in dirs]
        if any(cls.lower().replace(' ', '_') in dir_names or
               cls.lower().replace(' ', '-') in dir_names or
               cls.lower().replace(' ', '') in dir_names
               for cls in CLASS_LABELS):
            return root
    return extract_dir

DATA_ROOT = find_data_root(EXTRACT_DIR)
print(f"\nData root directory: {DATA_ROOT}")
print("\nDirectories found:")
!ls -la {DATA_ROOT}

In [None]:
DATA_DIR = DATA_ROOT

print(f"\nDataset Configuration:")
print(f"Number of Classes: {NUM_CLASSES}")
print(f"Total Instances: {DATASET_SIZE}")
print(f"Image Size: {IMG_HEIGHT}x{IMG_WIDTH}")
print(f"Classes: {', '.join(CLASS_LABELS)}")
print(f"Data Directory: {DATA_DIR}")

In [None]:

def load_dataset_info(data_dir):
    """
    Load and explore dataset structure
    Handles different possible naming conventions
    """
    class_counts = {}
    all_files = []

    # Try to find directories for each class (handling different naming conventions)
    for class_name in CLASS_LABELS:
        # different variations of class names
        possible_names = [
            class_name,  # Original: "Food Organics"
            class_name.replace(' ', '_'),  # With underscore: "Food_Organics"
            class_name.replace(' ', '-'),  # With dash: "Food-Organics"
            class_name.replace(' ', ''),   # No space: "FoodOrganics"
            class_name.lower(),  # Lowercase: "food organics"
            class_name.lower().replace(' ', '_'),  # Lowercase underscore: "food_organics"
            class_name.lower().replace(' ', '-'),  # Lowercase dash: "food-organics"
            class_name.lower().replace(' ', ''),   # Lowercase no space: "foodorganics"
        ]

        found = False
        for name_variant in possible_names:
            class_path = Path(data_dir) / name_variant
            if class_path.exists() and class_path.is_dir():
                # Find all image files
                files = (list(class_path.glob('*.jpg')) +
                        list(class_path.glob('*.jpeg')) +
                        list(class_path.glob('*.png')) +
                        list(class_path.glob('*.JPG')) +
                        list(class_path.glob('*.JPEG')) +
                        list(class_path.glob('*.PNG')))

                class_counts[class_name] = len(files)
                all_files.extend([(str(f), class_name) for f in files])
                print(f"✓ Found {len(files)} images for '{class_name}' in directory '{name_variant}'")
                found = True
                break

        if not found:
            class_counts[class_name] = 0
            print(f"⚠️ Warning: No directory found for '{class_name}'")

    return class_counts, all_files

# Load dataset information
class_counts, file_list = load_dataset_info(DATA_DIR)

df = pd.DataFrame(file_list, columns=['filepath', 'class'])
print(f"\nDataset loaded: {len(df)} images")
print("\nClass Distribution:")
print(df['class'].value_counts())

plt.figure(figsize=(12, 6))
df['class'].value_counts().plot(kind='bar', color='steelblue', edgecolor='black')
plt.title('RealWaste Dataset: Class Distribution', fontsize=14, fontweight='bold')
plt.xlabel('Waste Category', fontsize=12)
plt.ylabel('Number of Images', fontsize=12)
plt.xticks(rotation=45, ha='right')
plt.grid(axis='y', alpha=0.3)
plt.tight_layout()
plt.show()

print("\nClass Balance Analysis:")
class_percentages = df['class'].value_counts(normalize=True) * 100
for cls, pct in class_percentages.items():
    print(f"{cls}: {pct:.2f}%")

imbalance_ratio = class_percentages.max() / class_percentages.min()
print(f"\nImbalance Ratio: {imbalance_ratio:.2f}x")
if imbalance_ratio > 2:
    print("Significant class imbalance detected - consider using class weights")

In [None]:
# Split dataset
train_df, temp_df = train_test_split(df, test_size=0.3, stratify=df['class'], random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['class'], random_state=42)

print(f"\nData Split:")
print(f"Training: {len(train_df)} images ({len(train_df)/len(df)*100:.1f}%)")
print(f"Validation: {len(val_df)} images ({len(val_df)/len(df)*100:.1f}%)")
print(f"Testing: {len(test_df)} images ({len(test_df)/len(df)*100:.1f}%)")

# Data augmentation for training
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

# Validation and test data (no augmentation)
val_test_datagen = ImageDataGenerator(rescale=1./255)

# Create data generators
train_generator = train_datagen.flow_from_dataframe(
    train_df,
    x_col='filepath',
    y_col='class',
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=True,
    seed=42
)

val_generator = val_test_datagen.flow_from_dataframe(
    val_df,
    x_col='filepath',
    y_col='class',
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=False
)

test_generator = val_test_datagen.flow_from_dataframe(
    test_df,
    x_col='filepath',
    y_col='class',
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=False
)

print("\nData generators created successfully")

In [None]:
label_to_index = train_generator.class_indices

df['label_id'] = df['class'].map(label_to_index)

class_weights_array = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(df['label_id']),
    y=df['label_id']
)

class_weights = {i: float(w) for i, w in enumerate(class_weights_array)}

print("FINAL Class Weights:", class_weights)


In [None]:
# Visualize sample augmented images
def visualize_augmentation(generator, num_images=9):
    fig, axes = plt.subplots(3, 3, figsize=(12, 12))
    axes = axes.ravel()

    batch = next(generator)
    images, labels = batch[0], batch[1]

    for i in range(min(num_images, len(images))):
        axes[i].imshow(images[i])
        class_idx = np.argmax(labels[i])
        axes[i].set_title(CLASS_LABELS[class_idx], fontsize=10)
        axes[i].axis('off')

    plt.suptitle('Sample Augmented Training Images', fontsize=14, fontweight='bold')
    plt.tight_layout()
    plt.show()

visualize_augmentation(train_generator)


# Experiment 1: MobileNetV2 with Fine-tuning (Functional API)

In [None]:
def create_mobilenet_model():
    inputs = layers.Input(shape=(IMG_HEIGHT, IMG_WIDTH, 3))

    base = MobileNetV2(
        include_top=False,
        weights='imagenet',
        input_tensor=inputs
    )
    base.trainable = False  # Freeze base initially

    x = layers.GlobalAveragePooling2D()(base.output)
    x = layers.BatchNormalization()(x)
    x = layers.Dense(256, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(NUM_CLASSES, activation='softmax')(x)

    model = models.Model(inputs=inputs, outputs=outputs)
    return model, base

# Create model
mobilenet_model, mobilenet_base = create_mobilenet_model()
mobilenet_model.summary()

# Phase 1: Train frozen base
mobilenet_model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=['accuracy']
)
history_mobilenet_phase1 = mobilenet_model.fit(
    train_generator,
    epochs=15,
    validation_data=val_generator,
    class_weight=class_weights,
    callbacks=[early_stopping, reduce_lr]
)

# Phase 2: Fine-tune top layers
mobilenet_base.trainable = True
for layer in mobilenet_base.layers[:-20]:
    layer.trainable = False

mobilenet_model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.0001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)
history_mobilenet_phase2 = mobilenet_model.fit(
    train_generator,
    epochs=20,
    validation_data=val_generator,
    class_weight=class_weights,
    callbacks=[early_stopping, reduce_lr]
)

# Experiment 2: EfficientNetB0 with Fine-tuning (Functional API)

In [None]:
def create_efficientnet_model(trainable_layers=20):
    base = EfficientNetB0(
        include_top=False,
        weights='imagenet',
        input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)
    )

    # Freeze all layers initially
    base.trainable = False

    inputs = layers.Input(shape=(IMG_HEIGHT, IMG_WIDTH, 3))
    x = base(inputs, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dense(512, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(256, activation='relu')(x)
    x = layers.Dropout(0.3)(x)
    outputs = layers.Dense(NUM_CLASSES, activation='softmax')(x)

    model = models.Model(inputs=inputs, outputs=outputs)
    return model, base

efficientnet_model, efficientnet_base = create_efficientnet_model()

# Phase 1: Train with frozen base
print("\nPhase 1: Training with frozen EfficientNet base...")
efficientnet_model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

history_eff_phase1 = efficientnet_model.fit(
    train_generator,
    epochs=20,
    class_weight=class_weights,
    validation_data=val_generator,
    callbacks=[early_stopping, reduce_lr],
    verbose=1
)

# Phase 2: Fine-tune top layers
print("\nPhase 2: Fine-tuning top layers...")
efficientnet_base.trainable = True

# Freeze all layers except the last 50
for layer in efficientnet_base.layers[:-50]:
    layer.trainable = False

efficientnet_model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.0001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

history_eff_phase2 = efficientnet_model.fit(
    train_generator,
    epochs=30,
    class_weight=class_weights,
    validation_data=val_generator,
    callbacks=[early_stopping, reduce_lr],
    verbose=1
)

efficientnet_test_loss, efficientnet_test_acc = efficientnet_model.evaluate(test_generator, verbose=0)
print(f"\nEfficientNetB0 (Fine-tuned) Test Accuracy: {efficientnet_test_acc:.4f}")

In [None]:
# MobileNetV2
train_acc_mobilenet = history_mobilenet_phase1.history['accuracy'] + history_mobilenet_phase2.history['accuracy']
val_acc_mobilenet = history_mobilenet_phase1.history['val_accuracy'] + history_mobilenet_phase2.history['val_accuracy']
epochs_mobilenet = range(1, len(train_acc_mobilenet) + 1)
phase1_end_mobilenet = len(history_mobilenet_phase1.history['accuracy'])

# EfficientNetB0
train_acc_efficientnet = history_eff_phase1.history['accuracy'] + history_eff_phase2.history['accuracy']
val_acc_efficientnet = history_eff_phase1.history['val_accuracy'] + history_eff_phase2.history['val_accuracy']
epochs_efficientnet = range(1, len(train_acc_efficientnet) + 1)
phase1_end_efficientnet = len(history_eff_phase1.history['accuracy'])

# --- Plot ---
plt.figure(figsize=(14,7))

# MobileNetV2 curves
plt.plot(epochs_mobilenet, train_acc_mobilenet, 'g-', label='MobileNetV2 Train')
plt.plot(epochs_mobilenet, val_acc_mobilenet, 'g--', label='MobileNetV2 Val')

# Mark phase separation
plt.axvline(x=phase1_end_mobilenet, color='g', linestyle=':', label='MobileNetV2 Fine-tuning Start')

# EfficientNet curves
plt.plot(epochs_efficientnet, train_acc_efficientnet, 'b-', label='EfficientNet Train')
plt.plot(epochs_efficientnet, val_acc_efficientnet, 'b--', label='EfficientNet Val')

# Mark phase separation
plt.axvline(x=phase1_end_efficientnet, color='b', linestyle=':', label='EfficientNet Fine-tuning Start')

plt.title('Train and Validation Accuracy with Phase Separation')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Evaluate MobileNetV2
mobilenet_test_loss, mobilenet_test_acc = mobilenet_model.evaluate(test_generator, verbose=0)
print("MobileNetV2 Test Accuracy:", mobilenet_test_acc)

# Evaluate EfficientNetB0
efficientnet_test_loss, efficientnet_test_acc = efficientnet_model.evaluate(test_generator, verbose=0)
print("EfficientNetB0 Test Accuracy:", efficientnet_test_acc)


In [None]:
models_dict = {
    'MobileNetV2': {
        'model': mobilenet_model,
        'test_acc': mobilenet_test_acc
    },
    'EfficientNetB0': {
        'model': efficientnet_model,
        'test_acc': efficientnet_test_acc
    }
}

best_model_name = max(models_dict, key=lambda x: models_dict[x]['test_acc'])
best_model = models_dict[best_model_name]['model']
best_acc = models_dict[best_model_name]['test_acc']

print(f"\nBest Model: {best_model_name} with Test Accuracy: {best_acc:.4f}")

# Save model
save_path = f"{best_model_name}_best_model.h5"
best_model.save(save_path)
print(f"Saved best model to: {save_path}")
