<a href="https://colab.research.google.com/github/Keerthana123-coder/Automated-Waste-Segregation-with-Computer-Vision/blob/main/AI_Powered_Waste_Classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Waste classification Model**

In [None]:
pip install keras



## ***Install packages***

In [None]:
import pandas as pd
import numpy as np
import logging
import tensorflow as tf
import warnings
import glob
import tqdm
import os

from tqdm import tqdm
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
from seaborn import heatmap

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report

from skimage.io import imread, imshow
from skimage.transform import resize

from keras.models import Sequential, load_model
from keras.layers import Conv2D, Lambda, MaxPooling2D, Dense, Dropout, Flatten # convolution layers & core layers

from keras.layers import BatchNormalization
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical


from tensorflow import keras
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, History

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# **Data structure**

In [None]:
display.Image('Resources/Images/waste_data_structure.jpeg', width = 550, height = 250)

In [None]:
base_dir = "Resources/Dataset"
train_dir = os.path.join(base_dir, "Train")
test_dir = os.path.join(base_dir, "Test")

In [None]:
train_o = glob.glob(os.path.join(train_dir, 'O', '*.jpg'))
train_r = glob.glob(os.path.join(train_dir, 'R', '*.jpg'))

a = len(train_o)
b = len(train_r)

print("Number of training samples: {}".format(a+b))

In [None]:
# Os path join (test)

test_o = glob.glob(os.path.join(test_dir, 'O', '*.jpg'))
test_r = glob.glob(os.path.join(test_dir, 'R', '*.jpg'))

a = len(test_o)
b = len(test_r)


print("Number of test samples: {}".format(a+b))

# **Data Augumentation**

In [None]:
# Datagenerators

train_datagen = ImageDataGenerator(rescale = 1.0 / 255.0,
                                   zoom_range = 0.4,
                                   rotation_range = 10,
                                   horizontal_flip = True,
                                   vertical_flip = True,
                                   validation_split = 0.2)

valid_datagen = ImageDataGenerator(rescale = 1.0 / 255.0,
                                   validation_split = 0.2)

test_datagen  = ImageDataGenerator(rescale = 1.0 / 255.0)

# **Dataset Training**

In [None]:
# Train dataset

train_ds  = train_datagen.flow_from_directory(directory = train_dir,
                                                   target_size = (180, 180),
                                                   class_mode = 'categorical',
                                                   batch_size = 32,
                                                   subset = 'training')

# **Validating Dataset**

In [None]:
# Validate dataset

valid_ds = valid_datagen.flow_from_directory(directory = train_dir,
                                                  target_size = (180, 180),
                                                  class_mode = 'categorical',
                                                  batch_size = 32,
                                                  subset = 'validation')

# **Testing Dataset**

In [None]:
# Test dataset

test_ds = test_datagen.flow_from_directory(directory = test_dir,
                                                  target_size = (180, 180),
                                                  class_mode = 'categorical',
                                                  batch_size = 32,
                                                  shuffle=False)

In [None]:
# Check classes

print(train_ds.class_indices)
print(test_ds.class_indices)

In [None]:
# Viewing images

fig, ax = plt.subplots(nrows = 2, ncols = 5, figsize = (12,6))
#plt.subplots_adjust(hspace=0.55)

for i in range(2):
    for j in range(5):
        rand1 = np.random.randint(len(train_ds))
        rand2 = np.random.randint(32)
        ax[i,j].imshow(train_ds[rand1][0][rand2])
        ax[i,j].axis('off')
        label = train_ds[rand1][1][rand2]
        # print(label[0])
        if label[0] == 0:
            ax[i,j].set_title('Recycle Waste')
        else:
            ax[i,j].set_title('Organic Waste')

plt.tight_layout
plt.show()

# **Building The Model**

In [None]:
# Defining callbacks

filepath = './final_model_weights.hdf5'

earlystopping = EarlyStopping(monitor = 'val_auc',
                              mode = 'max' ,
                              patience = 5,
                              verbose = 1)

checkpoint = ModelCheckpoint(filepath,
                                monitor = 'val_auc',
                                mode='max',
                                save_best_only=True,
                                verbose = 1)


callback_list = [earlystopping, checkpoint]

# **Base Model(VGG16)**

In [None]:
# Base model

base_model = VGG16(input_shape=(180,180,3),
                   include_top=False,
                   weights="imagenet")

In [None]:
# Freezing layers

for layer in base_model.layers:
    layer.trainable=False

In [None]:
# Show vgg model summary
base_model.summary()

# **Visualizing Base Model Layers With Test Images**

In [None]:
# Iterate thru all the layers of the model

for layer in base_model.layers:
    if 'conv' in layer.name:
        weights, bias= layer.get_weights()
        # filters, biases = layer.get_weights()
        print(layer.name)
        # print(layer.name, filters.shape)
        # normalize filter values between  0 and 1 for visualization
        f_min, f_max = weights.min(), weights.max()
        filters = (weights - f_min) / (f_max - f_min)
        print(filters.shape[3])
        filter_cnt=1
        # plotting all the filters
        for i in range(filters.shape[3]):
            # get the filters
            filt=filters[:,:,:, i]
            # plotting each of the channel, color image RGB channels
            for j in range(filters.shape[0]):
                ax = plt.subplot(filters.shape[3], filters.shape[0], filter_cnt)
                ax.set_xticks([])
                ax.set_yticks([])
                plt.imshow(filt[:,:, j])
                filter_cnt+=1
        plt.show()

In [None]:
# Visualize feature maps for an organic image

# Image path
img_path=test_dir + '/O' + '/O_12825.jpg'

# Define a new model, input=image
# Output=intermediate representations for all layers in the previous model after the first
successive_outputs = [layer.output for layer in base_model.layers[1:]]

# Visualization_model = Model(img_input, successive_outputs)
visualization_model = tf.keras.models.Model(inputs = base_model.input, outputs = successive_outputs)

# Load the input image
img = load_img(img_path, target_size=(180, 180))

# Convert ht image to Array of dimension (180,180,3)
x = img_to_array(img)
x = x.reshape((1,) + x.shape)

# Rescale by 1/255
x /= 255.0

# Run input image through our visualization network to obtain all intermediate representations for the image
successive_feature_maps = visualization_model.predict(x)

# Retrieve the names of the layers, so we can have them as part of our plot
layer_names = [layer.name for layer in base_model.layers]
for layer_name, feature_map in zip(layer_names, successive_feature_maps):
  print(feature_map.shape)
  if len(feature_map.shape) == 4:

    # Plot Feature maps for the conv / maxpool layers, not the fully-connected layers
    n_features = feature_map.shape[-1]  # number of features in the feature map
    size = feature_map.shape[ 1]  # feature map shape (1, size, size, n_features)

    # Tile our images in a matrix
    display_grid = np.zeros((size, size * n_features))

    # Postprocess the feature to be visually palatable
    for i in range(n_features):
      x  = feature_map[0, :, :, i]
      x -= x.mean()
      x /= x.std ()
      x *=  64
      x += 128
      x  = np.clip(x, 0, 255).astype('uint8')

      # Tile each filter into a horizontal grid
      display_grid[:, i * size : (i + 1) * size] = x

# Display the grid
    scale = 20. / n_features
    plt.figure(figsize=(scale * n_features*5, scale*4))
    plt.title(layer_name)
    plt.grid(False)
    plt.imshow(display_grid, aspect='auto', cmap='bwr' )

# Disable tf warning
logging.getLogger('tensorflow').disabled = True

In [None]:
# Visualize feature maps for a recycled image

# Image path
img_path=test_dir + '/R' + '/R_11107.jpg'

# Define a new model, input=image
# Output=intermediate representations for all layers in the previous model after the first
successive_outputs = [layer.output for layer in base_model.layers[1:]]

# Visualization_model = Model(img_input, successive_outputs)
visualization_model = tf.keras.models.Model(inputs = base_model.input, outputs = successive_outputs)

# Load the input image
img = load_img(img_path, target_size=(180, 180))

# Convert ht image to Array of dimension (180,180,3)
x = img_to_array(img)
x = x.reshape((1,) + x.shape)

# Rescale by 1/255
x /= 255.0

# Run input image through our visualization network to obtain all intermediate representations for the image
successive_feature_maps = visualization_model.predict(x)

# Retrieve the names of the layers, so we can have them as part of our plot
layer_names = [layer.name for layer in base_model.layers]
for layer_name, feature_map in zip(layer_names, successive_feature_maps):
  print(feature_map.shape)
  if len(feature_map.shape) == 4:

    # Plot Feature maps for the conv / maxpool layers, not the fully-connected layers

    n_features = feature_map.shape[-1]  # number of features in the feature map
    size = feature_map.shape[ 1]  # feature map shape (1, size, size, n_features)

    # We will tile our images in this matrix
    display_grid = np.zeros((size, size * n_features))

    # Postprocess the feature to be visually palatable
    for i in range(n_features):
      x  = feature_map[0, :, :, i]
      x -= x.mean()
      x /= x.std ()
      x *=  64
      x += 128
      x  = np.clip(x, 0, 255).astype('uint8')

      # Tile each filter into a horizontal grid
      display_grid[:, i * size : (i + 1) * size] = x

# Display the grid
    scale = 20. / n_features
    plt.figure( figsize=(scale * n_features*5, scale*4) )
    plt.title ( layer_name )
    plt.grid  ( False )
    plt.imshow( display_grid, aspect='auto', cmap='bwr' )

# Ignore Runtimewarning
warnings.filterwarnings('ignore')

## Adding to the Base Model -  Building Dense Layers

In [None]:
# Defining Layers
model=Sequential()
model.add(base_model)
model.add(Dropout(0.2))
model.add(Flatten())

# Add dense layers
model.add(BatchNormalization())
model.add(Dense(5000,activation="relu",kernel_initializer='he_uniform'))
model.add(BatchNormalization())
model.add(Dropout(0.2))
model.add(Dense(1000,activation="relu",kernel_initializer='he_uniform'))
model.add(BatchNormalization())
model.add(Dropout(0.2))
model.add(Dense(500,activation="relu",kernel_initializer='he_uniform'))
model.add(Dropout(0.2))
model.add(Dense(2,activation="softmax"))

In [None]:
# Show model summary (with custom layers)

model.summary()

# **Training Model**

In [None]:
# Model fit (training)

model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=[tf.keras.metrics.AUC(name = 'auc')])
model_history = model.fit(train_ds, epochs=20, validation_data=valid_ds, callbacks = callback_list, verbose = 1)

In [None]:
# Storing loss and accuracy values into a DataFrame to save permanently for plots

# Store model.fit results in a variable
history = model_history

# Save as DataFrame:
history_df = pd.DataFrame(history.history)
history_df

In [None]:
# Save as csv
history_df.to_csv('Resources/Model/model_history.csv', index=False)

In [None]:
# Import csv
model_history = pd.read_csv ('Resources/Model/model_history.csv')
model_history

In [None]:
# Plot model loss

plt.figure(figsize=(12,7))
plt.plot(model_history['loss'], color='deeppink', linewidth=4)
plt.plot(model_history['val_loss'], color='dodgerblue', linewidth=4)
plt.title('Model Loss', fontsize=14, fontweight='bold')
plt.ylabel('Loss', fontsize=14, fontweight='bold')
plt.xlabel('Epoch', fontsize=14, fontweight='bold')
plt.legend(['Train', 'Validation'], loc='upper left', bbox_to_anchor=(1,1), fontsize=14)
plt.show()

In [12]:
# Plot the model accuracy

plt.figure(figsize=(12,7))
plt.plot(model_history['auc'], color='deeppink', linewidth=4)
plt.plot(model_history['val_auc'], color='dodgerblue', linewidth=4)
plt.title('Model Accuracy', fontsize=14, fontweight='bold')
plt.ylabel('Accuracy', fontsize=14, fontweight='bold')
plt.xlabel('Epoch', fontsize=14, fontweight='bold')
plt.legend(['Train', 'Validation'], loc='upper left', bbox_to_anchor=(1,1), fontsize=14)
plt.show()

NameError: name 'model_history' is not defined

<Figure size 1200x700 with 0 Axes>

# **Model Evaluation**

In [None]:
# Evaluate overall loss and accuracy for test data
model.evaluate(test_ds)

In [None]:
# Store final values as variables

loss_final = 0.3659922480583191
auc_final = 0.9392455220222473
print(f"The final loss was {loss_final}, and the final accuracy was {auc_final}.")

# **Classification Report**

In [None]:
# Confusion Matrix and Classification Report

metrics=tf.keras.metrics.AUC(name = 'auc')

num_of_test_samples = 2513
batch_size = 32

Y_pred = model.predict(test_ds, num_of_test_samples // batch_size+1)
y_pred = np.argmax(Y_pred, axis=1)
print('Confusion Matrix')
print(confusion_matrix(test_ds.classes, y_pred))
print('Classification Report')
target_names = ['Organic', 'Recycled']
print(classification_report(test_ds.classes, y_pred, target_names=target_names))

In [None]:
# Plot the classification report

# Put data into a 2D array
data = np.array([[.82, .90],
                 [.94, .74],
                [.88, .81]])

# Plot the heatmap
yticklabels = ['Precision', 'Recall', 'F1']
xticklabels = ['Organic', 'Recycled']
fig, ax = plt.subplots(figsize=(12,8))
ax = sns.heatmap(data, xticklabels=xticklabels, yticklabels=yticklabels, annot=True, cmap='Blues')
ax.set_title("Classification Report", fontsize=14, fontweight='bold')

# **True & False Positives**

In [None]:
# Removing scientific notation in the heatmap

np.set_printoptions(suppress=True, threshold=2000)

In [None]:
# Confusion matrix plot

# Converting integers to percents
perc1 = round(1314/1401*100,2)
perc2 = round(87/1401*100,2)
perc3 = round(288/1112*100,2)
perc4 = round(824/1112*100,2)

# Put data into a 2D array
data = np.array([[perc1, perc2],
                [perc3, perc4]])

text = np.array([['% Predicted as Organic Correctly', '% Predicted as Organic Incorrectly'],
                ['% Predicted as Recycled Incorrectly', '% Predicted as Recycled Correctly']])

# Combine text with values
formatted_text = (np.asarray(["{0}\n{1:.2f}".format(
text, data) for text, data in zip(text.flatten(), data.flatten())])).reshape(2, 2)

# Plot heatmap
fig, ax = plt.subplots(figsize=(12,8))
sns.set(font_scale=1.5)
ax = sns.heatmap(data_percent, annot=formatted_text, fmt="", cmap='Blues', annot_kws={"fontsize":14, "weight":'bold'})
ax.set_title("Confusion Matrix", fontsize=14, fontweight='bold')
ax.set_xlabel("Actual", fontsize=14, fontweight='bold')
ax.set_ylabel("Predicted", fontsize=14, fontweight='bold')


# **Predicting Test Images**

In [None]:
# Load the saved model weights

model = load_model('Resources/Model/final_model_weights.hdf5')

In [None]:
def getprediction(img):
    img = img_to_array(img)
    img = img / 255
    imshow(img)
    plt.axis('off')
    img = np.expand_dims(img,axis=0)
    category = model.predict_classes(img)
    answer = category[0]
    probability = model.predict_proba(img)
    # probability_results = probability[0][0]
    if answer == 1:
        print(f"The image belongs to Recycle waste category, probability: {probability[0][1]}.")
    else:
        print(f"The image belongs to Organic waste category, probability: {probability[0][0]}.")

In [None]:
# Test Case 1 - ORGANIC

test_case1 = load_img(test_dir + '/O' + '/O_12568.jpg', target_size=(180,180))
getprediction(test_case1)

In [None]:
# Test Case 2 - ORGANIC

test_case2 = load_img(test_dir + '/O' + '/O_13185.jpg', target_size=(180,180))
getprediction(test_case2)

In [None]:
# Test Case 3 - ORGANIC

test_case3 = load_img(test_dir + '/O' + '/O_13905.jpg', target_size=(180,180))
getprediction(test_case3)

In [None]:
# Test Case 4 - RECYCLED

test_case4 = load_img(test_dir + '/R' + '/R_10000.jpg', target_size=(180,180))
getprediction(test_case4)

In [None]:
# Test Case 5 - RECYCLED

test_case5 = load_img(test_dir + '/R' + '/R_10398.jpg', target_size=(180,180))
getprediction(test_case5)

In [None]:
# Test Case 6 - RECYCLED
test_case6 = load_img(test_dir + '/R' + '/R_10714.jpg', target_size=(180,180))
getprediction(test_case6)

In [None]:
# Test Case 7 - RECYCLED
test_case7 = load_img(test_dir + '/R' + '/R_11107.jpg', target_size=(180,180))
getprediction(test_case7)

In [None]:
# Test Case 8 - RECYCLED
test_case8 = load_img(test_dir + '/R' + '/R_10005.jpg', target_size=(180,180))
getprediction(test_case8)