# **Waste Material Segregation for Improving Waste Management**

## **Objective**

The objective of this project is to implement an effective waste material segregation system using convolutional neural networks (CNNs) that categorises waste into distinct groups. This process enhances recycling efficiency, minimises environmental pollution, and promotes sustainable waste management practices.

The key goals are:

* Accurately classify waste materials into categories like cardboard, glass, paper, and plastic.
* Improve waste segregation efficiency to support recycling and reduce landfill waste.
* Understand the properties of different waste materials to optimise sorting methods for sustainability.

## **Data Understanding**

The Dataset consists of images of some common waste materials.

1. Food Waste
2. Metal
3. Paper
4. Plastic
5. Other
6. Cardboard
7. Glass


**Data Description**

* The dataset consists of multiple folders, each representing a specific class, such as `Cardboard`, `Food_Waste`, and `Metal`.
* Within each folder, there are images of objects that belong to that category.
* However, these items are not further subcategorised. <br> For instance, the `Food_Waste` folder may contain images of items like coffee grounds, teabags, and fruit peels, without explicitly stating that they are actually coffee grounds or teabags.

## **1. Load the data**

Load and unzip the dataset zip file.

**Import Necessary Libraries**

In [65]:
# Recommended versions:

#numpy version: 1.26.4
# pandas version: 2.2.2
# seaborn version: 0.13.2
# matplotlib version: 3.10.0
# PIL version: 11.1.0
# tensorflow version: 2.18.0
# keras version: 3.8.0
# sklearn version: 1.6.1 

In [66]:
#  !pip install numpy==1.26.4 pandas==2.2.2 seaborn==0.13.2 Pillow==11.1.0 tensorflow==2.18.0 keras==3.8.0 scikit-learn==1.6.1

In [80]:
# Import essential libraries
import os
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
import cv2

from PIL import Image
from collections import Counter

from tensorflow import keras
from tensorflow.keras import layers, models, regularizers
from keras.models import Sequential, Model
from keras.layers import Dense, Dropout, GlobalAveragePooling2D, Conv2D, MaxPooling2D, Flatten, BatchNormalization
from keras.applications import MobileNetV2
from keras.applications.mobilenet_v2 import preprocess_input
from keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau

import warnings
warnings.filterwarnings('ignore')

from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

Load the dataset.

In [68]:
# Load and unzip the dataset
import zipfile

zip_path = r"F:\\DS C71\\Waste_Management_Case_Study\data.zip"

extract_dir = r"F:\\DS C71\\Waste_Management_Case_Study\data"

os.makedirs(extract_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)
    
print(f"Dataset extracted to: {extract_dir}")

data_dir = os.path.join(extract_dir, "data")

KeyboardInterrupt: 

In [None]:
data_dir = os.path.join(extract_dir, "data")

## **2. Data Preparation** <font color=red> [25 marks] </font><br>


### **2.1 Load and Preprocess Images** <font color=red> [8 marks] </font><br>

Let us create a function to load the images first. We can then directly use this function while loading images of the different categories to load and crop them in a single step.

#### **2.1.1** <font color=red> [3 marks] </font><br>
Create a function to load the images.

In [None]:
# Create a function to load the raw images

import os

def load_images_from_folder(data_dir, image_size=(150, 150)):
    X = []
    y = []
    class_names = sorted([d for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, d))])
    for label_index, class_name in enumerate(class_names):
        class_path = os.path.join(data_dir, class_name)
        for image_name in os.listdir(class_path):
            image_path = os.path.join(class_path, image_name)
            try:
                with Image.open(image_path) as img:
                    img = img.convert('RGB')
                    img = img.resize(image_size)
                    X.append(np.array(img))
                    y.append(label_index)
            except Exception as e:
                print(f"Error loading {image_path}: {e}")
    return np.array(X), np.array(y), class_names

In [None]:
X, y, class_names = load_images_from_folder(data_dir)
print("Loaded images:", X.shape)
print("Loaded labels:", y.shape)
print("Unique Labels: ", class_names)

#### **2.1.2** <font color=red> [5 marks] </font><br>
Load images and labels.

Load the images from the dataset directory. Labels of images are present in the subdirectories.

Verify if the images and labels are loaded correctly.

In [None]:
# Get the images and their labels
plt.figure(figsize=(15, 6))
shown = set()
for idx in range(len(X)):
    if y[idx] not in shown:
        plt.subplot(1, len(class_names), y[idx] + 1)
        plt.imshow(X[idx])
        plt.title(class_names[y[idx]])
        plt.axis('off')
        shown.add(y[idx])
        if len(shown) == len(class_names):
            break
plt.tight_layout()
plt.show()

In [None]:
counts = Counter(y)
plt.figure(figsize=(10, 6))
plt.bar([class_names[i] for i in counts.keys()], counts.values(), color='skyblue')
plt.title("Class Distribution")
plt.xlabel("Class")
plt.ylabel("Number of Images")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()       # check if labels look consistent

In [None]:
from collections import Counter

label_counts = Counter(y)
for label_idx, count in label_counts.items():
    print(f"{class_names[label_idx]}: {count} images")

In [None]:
plt.figure(figsize=(15, 6))
shown = set()
for idx in range(len(X)):
    if y[idx] not in shown:
        plt.subplot(1, len(class_names), y[idx] + 1)
        plt.imshow(X[idx])
        plt.title(class_names[y[idx]])
        plt.axis('off')
        shown.add(y[idx])
        if len(shown) == len(class_names):
            break
plt.tight_layout()
plt.show()

Perform any operations, if needed, on the images and labels to get them into the desired format.

### **2.2 Data Visualisation** <font color=red> [9 marks] </font><br>

#### **2.2.1** <font color=red> [3 marks] </font><br>
Create a bar plot to display the class distribution

In [None]:
# Visualise Data Distribution

# Count the number of samples per class
class_counts = np.bincount(y)

# Plot the bar chart
plt.figure(figsize=(10, 6))
bars = plt.bar(class_names, class_counts, color='skyblue', edgecolor='black')

# Add title and axis labels
plt.title("Class Distribution of Waste Images", fontsize=14)
plt.xlabel("Waste Material Class", fontsize=12)
plt.ylabel("Number of Images", fontsize=12)

# Annotate each bar with the count
for bar in bars:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width() / 2, height + 10, str(height), ha='center', va='bottom')

# Rotate x-axis labels if needed
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

#### **2.2.2** <font color=red> [3 marks] </font><br>
Visualise some sample images

In [None]:
# Visualise Sample Images (across different labels)

samples_per_class = 5
num_classes = len(class_names)
plt.figure(figsize=(samples_per_class * 3, num_classes * 3))

for class_idx in range(num_classes):
    class_name = class_names[class_idx]
    indices = np.where(y == class_idx)[0][:samples_per_class]
    
    for i, idx in enumerate(indices):
        plt_idx = class_idx * samples_per_class + i + 1
        plt.subplot(num_classes, samples_per_class, plt_idx)
        plt.imshow(X[idx].astype('uint8'))
        plt.title(class_name if i == 0 else "")
        plt.axis('off')

plt.tight_layout()
plt.suptitle("Sample Images per Class", fontsize=16)
plt.subplots_adjust(top=0.92)
plt.show()

#### **2.2.3** <font color=red> [3 marks] </font><br>
Based on the smallest and largest image dimensions, resize the images.

In [None]:
# Find the smallest and largest image dimensions from the data set

sizes = []
for class_name in class_names:
    for img_name in os.listdir(os.path.join(data_dir, class_name)):
        try:
            with Image.open(os.path.join(data_dir, class_name, img_name)) as img:
                sizes.append(img.size)
        except:
            continue
widths, heights = zip(*sizes)
print(f"Smallest Dimension: {min(widths)} x {min(heights)}")
print(f"Largest Dimension: {max(widths)} x {max(heights)}")

In [None]:
# Resize the image dimensions

resized_images = []

for img in X:
    resized = cv2.resize(img, (256, 256))  # This won't change anything if already 256x256
    resized_images.append(resized)

X = np.array(resized_images)

### **2.3 Encoding the classes** <font color=red> [3 marks] </font><br>

There are seven classes present in the data.

We have extracted the images and their labels, and visualised their distribution. Now, we need to perform encoding on the labels. Encode the labels suitably.

#### **2.3.1** <font color=red> [3 marks] </font><br>
Encode the target class labels.

In [69]:
# Encode the labels suitably

from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical

unique = np.unique(y)
classes = [class_names[label] for label in unique]
print("Unique Labels: ", classes)

le = LabelEncoder()
y_encoded = le.fit_transform(y)
y_cat = to_categorical(y_encoded)

# Check the results
print("Original class labels:", le.classes_)
print("Encoded labels:\n", y_cat)  

Unique Labels:  ['Cardboard', 'Food_Waste', 'Glass', 'Metal', 'Other', 'Paper', 'Plastic']
Original class labels: [0 1 2 3 4 5 6]
Encoded labels:
 [[1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 1.]]


In [77]:
num_classes = len(np.unique(y))

# One-hot encode the labels
y_encoded = to_categorical(y, num_classes=num_classes)

### **2.4 Data Splitting** <font color=red> [5 marks] </font><br>

#### **2.4.1** <font color=red> [5 marks] </font><br>
Split the dataset into training and validation sets

In [74]:
# Assign specified parts of the dataset to train and validation sets

from sklearn.model_selection import train_test_split

X = X.astype('float16') / 255.0
X_train, X_val, y_train, y_val = train_test_split(X, y_cat, test_size=0.2, random_state=42, stratify=y_cat)

In [75]:
train_datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.1,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest')

val_datagen = ImageDataGenerator()

train_generator = train_datagen.flow(X_train, y_train, batch_size=32)
val_generator = val_datagen.flow(X_val, y_val, batch_size=32)

## **3. Model Building and Evaluation** <font color=red> [20 marks] </font><br>

### **3.1 Model building and training** <font color=red> [15 marks] </font><br>

#### **3.1.1** <font color=red> [10 marks] </font><br>
Build and compile the model. Use 3 convolutional layers. Add suitable normalisation, dropout, and fully connected layers to the model.

Test out different configurations and report the results in conclusions.

In [81]:
# Build the model
num_classes = y_encoded.shape[1]
input_shape = X.shape[1:]

model = models.Sequential([
    layers.Conv2D(32, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.001), input_shape=input_shape),
    layers.BatchNormalization(),
    layers.Activation('relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.25),

    layers.Conv2D(64, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.001)),
    layers.BatchNormalization(),
    layers.Activation('relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.25),

    layers.Conv2D(128, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.001)),
    layers.BatchNormalization(),
    layers.Activation('relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.25),

    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(num_classes, activation='softmax')
])

In [82]:
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

#### **3.1.2** <font color=red> [5 marks] </font><br>
Train the model.

Use appropriate metrics and callbacks as needed.

In [85]:
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau

In [86]:
# Define Callbacks

early_stop = EarlyStopping(monitor='val_accuracy', patience=5, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_accuracy', factor=0.5, patience=3, min_lr=1e-6, verbose=1)

In [None]:
# Training
history = model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=30,
    callbacks=[early_stop, reduce_lr]
)


Epoch 1/30
[1m101/191[0m [32m━━━━━━━━━━[0m[37m━━━━━━━━━━[0m [1m7:14[0m 5s/step - accuracy: 0.2630 - loss: 26.6278

In [None]:
# Evaluate on validation data

train_acc = history.history['accuracy'][-1]
print(f"Final Training Accuracy: {train_acc:.2f}")

val_loss, val_acc = model.evaluate(val_generator)
print(f"Validation Accuracy: {val_acc:.2f}")

In [None]:
# Plot accuracy

plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Train')
plt.plot(history.history['val_accuracy'], label='Val')
plt.title('Accuracy')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Train')
plt.plot(history.history['val_loss'], label='Val')
plt.title('Loss')
plt.legend()

plt.show()


### **3.2 Model Testing and Evaluation** <font color=red> [5 marks] </font><br>

#### **3.2.1** <font color=red> [5 marks] </font><br>
Evaluate the model on test dataset. Derive appropriate metrics.

In [None]:
# Evaluate on the test set; display suitable metrics

val_loss, val_acc = model.evaluate(X_val, y_val, verbose=1)
print(f"Validation Loss: {val_loss:.4f}")
print(f"Validation Accuracy: {val_acc:.4f}")

In [None]:
# Get predicted probabilities
y_pred = model.predict(X_val)
y_pred_labels = np.argmax(y_pred, axis=1)
y_true_labels = np.argmax(y_val, axis=1)

print("Classification Report:")
print(classification_report(y_true_labels, y_pred_labels))

In [None]:
# Step 1: Get predictions
y_pred_probs = model.predict(X_val)
y_pred = np.argmax(y_pred_probs, axis=1)  # predicted class indices
y_true = np.argmax(y_val, axis=1)         # true class indices if one-hot encoded

# Step 2: Calculate metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted')
recall = recall_score(y_true, y_pred, average='weighted')
f1 = f1_score(y_true, y_pred, average='weighted')

# Step 3: Print results
print(f"Accuracy       : {accuracy:.2f}")
print(f"Precision      : {precision:.2f}")
print(f"Recall         : {recall:.2f}")
print(f"F1-Score       : {f1:.2f}")

In [None]:
cm = confusion_matrix(y_true_labels, y_pred_labels)
labels = ['Cardboard', 'Food_Waste', 'Glass', 'Metal', 'Other', 'Paper', 'Plastic']
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()

## **4. Data Augmentation** <font color=red> [optional] </font><br>

#### **4.1 Create a Data Augmentation Pipeline**

##### **4.1.1**
Define augmentation steps for the datasets.

In [None]:
# Define augmentation steps to augment images

from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Define augmentation steps for training data
train_datagen = ImageDataGenerator(
    rescale=1./255,               # Normalize pixel values
    rotation_range=20,            # Randomly rotate images by up to 20 degrees
    width_shift_range=0.1,        # Shift width by 10%
    height_shift_range=0.1,       # Shift height by 10%
    shear_range=0.1,              # Shear angle
    zoom_range=0.2,               # Zoom in/out by up to 20%
    horizontal_flip=True,         # Flip images horizontally
    fill_mode='nearest'           # Fill in missing pixels
)

# For validation data, only rescale
val_datagen = ImageDataGenerator(rescale=1./255)

Augment and resample the images.
In case of class imbalance, you can also perform adequate undersampling on the majority class and augment those images to ensure consistency in the input datasets for both classes.

Augment the images.

In [None]:
# Create a function to augment the images

from tensorflow.keras.preprocessing.image import ImageDataGenerator
import numpy as np

def augment_images(X, y, augment_count=2):
    """
    Augment each image `augment_count` times using basic transformations.

    Parameters:
    - X: np.array of images (4D: samples, height, width, channels)
    - y: corresponding labels (1D: integers or strings)
    - augment_count: number of augmented copies to create per image

    Returns:
    - X_aug: augmented images
    - y_aug: corresponding labels
    """
    datagen = ImageDataGenerator(
        rotation_range=20,
        width_shift_range=0.1,
        height_shift_range=0.1,
        zoom_range=0.2,
        shear_range=0.1,
        horizontal_flip=True,
        fill_mode='nearest'
    )

    X_aug, y_aug = [], []
    
    for i in range(len(X)):
        x = X[i].reshape((1,) + X[i].shape)  # reshape for ImageDataGenerator
        label = y[i]
        gen = datagen.flow(x, batch_size=1)
        for _ in range(augment_count):
            aug_img = next(gen)[0]
            X_aug.append(aug_img)
            y_aug.append(label)

    return np.array(X_aug), np.array(y_aug)

In [None]:
X_aug = X_aug.astype('float32')
X_train = X_train.astype('float32')

X_aug, y_aug = augment_images(X_train, y_train, augment_count=1)

# Combine original + augmented data
X_train_combined = np.concatenate([X_train, X_aug])
y_train_combined = np.concatenate([y_train, y_aug])

In [None]:
# Create the augmented training dataset

def augment_images(X, y, augment_count=2):
    datagen = ImageDataGenerator(
        rotation_range=20,
        width_shift_range=0.1,
        height_shift_range=0.1,
        zoom_range=0.2,
        shear_range=0.1,
        horizontal_flip=True,
        fill_mode='nearest'
    )

    X_aug, y_aug = [], []

    for i in range(len(X)):
        x = X[i].reshape((1,) + X[i].shape)
        label = y[i]
        gen = datagen.flow(x, batch_size=1)
        for _ in range(augment_count):
            aug_img = next(gen)[0]
            X_aug.append(aug_img)
            y_aug.append(label)

    return np.array(X_aug), np.array(y_aug)


In [None]:
X_aug, y_aug = augment_images(X_train, y_train, augment_count=2)  # Augment each image 2 times

In [None]:
X_train_combined = np.concatenate([X_train, X_aug])
y_train_combined = np.concatenate([y_train, y_aug])

In [None]:
from sklearn.utils import shuffle

X_train_combined, y_train_combined = shuffle(X_train_combined, y_train_combined, random_state=42)

In [None]:
train_generator = train_datagen.flow(X_train_combined, y_train_combined, batch_size=32)

##### **4.1.2**

Train the model on the new augmented dataset.

In [None]:
# Train the model using augmented images

model.fit(train_generator, validation_data=(X_val, y_val), epochs=30)

## **5. Conclusions** <font color = red> [5 marks]</font>

#### **5.1 Conclude with outcomes and insights gained** <font color =red> [5 marks] </font>

* Report your findings about the data
* Report model training results