# Building a Smart system based on Deep Convolutional Neural Networks to classify Trash

In [75]:
# data visualisation and manipulation
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import style
import seaborn as sns
 

sns.set(style='whitegrid',color_codes=True)

#model selection

from sklearn.preprocessing import LabelEncoder

#preprocess.
from keras.preprocessing.image import ImageDataGenerator
from keras.applications.vgg16 import preprocess_input
from tensorflow.keras.utils import load_img
from tensorflow.keras.utils import img_to_array

#dl libraraies
import tensorflow as tf
import random as rn

from tensorflow import keras

In [76]:
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.applications import VGG16
from keras import models
from keras.optimizers import Adagrad
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import EarlyStopping
import numpy as np
from glob import glob
import os
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
# for reproducibility
np.random.seed(78)

In [77]:
from tqdm import tqdm
X=[]
Z=[]
def load_data(document,DIR):
        for img in tqdm(os.listdir(DIR)):
            label = document
            path = os.path.join(DIR,img)
            image= load_img(path,target_size=(IMG_SIZE,IMG_SIZE))
            image= img_to_array(image)
            image = preprocess_input(image)

            X.append(image)
            Z.append(str(label))
        return X,Z
IMG_SIZE=256

In [78]:
X=[]
Z=[]
DIR_cardboard='Garbage_classification/cardboard'
DIR_glass='Garbage_classification/glass'
DIR_metal='Garbage_classification/metal'
DIR_paper='Garbage_classification/paper'
DIR_plastic='Garbage_classification/plastic'
DIR_trash='Garbage_classification/trash'

load_data('cardboard',DIR_cardboard)
print(len(X))
load_data('glass',DIR_glass)
print(len(X))
load_data('metal',DIR_metal)
print(len(X))
load_data('paper',DIR_paper)
print(len(X))
load_data('plastic',DIR_plastic)
print(len(X))
load_data('trash',DIR_trash)
print(len(X))

100%|██████████| 403/403 [00:02<00:00, 187.71it/s]


403


100%|██████████| 501/501 [00:02<00:00, 218.53it/s]


904


100%|██████████| 410/410 [00:01<00:00, 218.90it/s]


1314


100%|██████████| 594/594 [00:02<00:00, 213.93it/s]


1908


100%|██████████| 482/482 [00:02<00:00, 210.75it/s]


2390


100%|██████████| 137/137 [00:00<00:00, 211.98it/s]

2527





In [79]:
# Input image dimensions
img_rows, img_cols, img_chans = 384, 512, 3
input_shape = (img_rows, img_cols, img_chans)
batch_size = 8
num_classes = 2
epochs = 100
data_augmentation = True

In [80]:
def train(x_train, x_test, y_train, y_test):
    
    #Loading the VGG model
    vgg_conv = VGG16(weights='imagenet', include_top=False,  input_shape=input_shape)
    
    for i in range(8):
        #removing the last layers  
        vgg_conv.layers.pop() 
    
    
    # Freezing all layers
    for layer in vgg_conv.layers[:]:
        layer.trainable = False
     
    # Building Deep learning model
    model = models.Sequential()
     
    # Adding the vgg model
    model.add(vgg_conv)
     
    # Adding new layers
    model.add(Flatten())
    model.add(Dense(350, activation='relu', input_shape=input_shape))
    model.add(Dropout(0.2))
    model.add(Dense(350, activation='relu'))
    model.add(Dropout(0.2))
    model.add(Dense(2, activation='sigmoid'))
     
    model.compile(loss='binary_crossentropy', optimizer=Adagrad(lr=1e-5, decay=1e-6), metrics=['accuracy'])
    
    """
    files = glob('Model2**')
    print(files)
    list_models=[]
    for  model_ in files:
        list_models.append(float(model_[:-5].split('=')[1]))
        
    index = np.argmin(list_models)
    load_model = files[index]
    print(load_model)

    if load_model is not None:
            model.load_weights(load_model)
            print("weights are loaded")
    else:
            print("weights are None")
    """       
    
    call =  [                  
                                    EarlyStopping(monitor='val_loss',  patience=20, verbose=1,  mode='auto'),
            ]
    
    if not data_augmentation:
        print('Not using data augmentation.')
        model.fit(x_train, y_train,
                  batch_size=batch_size,
                  epochs=epochs,
                  validation_data=(x_test, y_test),
                  shuffle=True)
    else:
        print('Using real-time data augmentation.')
        # This will do preprocessing and realtime data augmentation:
        datagen = ImageDataGenerator(
        featurewise_center=False,  # set input mean to 0 over the dataset   
        samplewise_center=False,  # set each sample mean to 0   
        featurewise_std_normalization=False,  # divide inputs by std of the dataset   
        samplewise_std_normalization=False,  # divide each input by its std  
        zca_whitening=False,  # apply ZCA whitening     
        zca_epsilon=1e-06,  # epsilon for ZCA whitening
        rotation_range=30,  # randomly rotate images in the range (degrees, 0 to 180)  <<1    0 => 30
        # randomly shift images horizontally (fraction of total width)
        width_shift_range=0.1,
        # randomly shift images vertically (fraction of total height)
        height_shift_range=0.1,
        shear_range=0.2,  # set range for random shear  <<3<<4  0 => 0.1 => 0.2
        zoom_range=0.3,  # set range for random zoom    <<1<<2<<3   0 => 0.1 => 0.2 =>0.3 
        channel_shift_range=0.2,  # set range for random channel shifts     <<5<<6   0.=>0.1=>0.2
        # set mode for filling points outside the input boundaries
        fill_mode='nearest',
        cval=0.,  # value used for fill_mode = "constant"     
        horizontal_flip=True,  # randomly flip images
        vertical_flip=True,  # randomly flip images    <<1    false => True
        # set rescaling factor (applied before any other transformation)
        rescale=None,   
        # set function that will be applied on each input
        preprocessing_function=None,
        # image data format, either "channels_first" or "channels_last"
        data_format=None,
        # fraction of images reserved for validation (strictly between 0 and 1)
        validation_split=0.0)
    
        print("steps_per_epoch (nbr of samples per epoch):", int(len(x_train)/batch_size))
        # Fit the model on the batches generated by datagen.flow().
        history = model.fit_generator(datagen.flow(x_train, y_train,
                                         batch_size=batch_size),steps_per_epoch = 800,
                            epochs=50,
                            validation_data=(x_test, y_test),
                            workers=10, callbacks = call)
        
        weights = '{}.hdf5'.format('Model3_adagrad_'+'val_acc:'+str(round(history.history['val_acc'][-1],3))+' val_loss='+str(round(history.history['val_loss'][-1],3)))
        model.save_weights(weights)
        print ('Model saved.')
        
        score = model.evaluate(x_test, y_test,batch_size=10, verbose=0)
        print('Test loss:', score[0])
        print('Test accuracy:', score[1])

        acc = history.history['acc']
        val_acc = history.history['val_acc']
        loss = history.history['loss']
        val_loss = history.history['val_loss']

        epoch = range(len(acc))

        plt.plot(epoch, acc, 'b', label='Training acc')
        plt.plot(epoch, val_acc, 'r', label='Validation acc')
        plt.title('Training and validation accuracy')
        plt.legend()
        plt.figure()

        plt.plot(epoch, loss, 'b', label='Training loss')
        plt.plot(epoch, val_loss, 'r', label='Validation loss')
        plt.title('Training and validation loss')
        plt.legend()
        plt.show()

In [81]:
def test(x_test):
    
    image = np.expand_dims((x_test[58] - np.mean(x_test))/ np.std(x_test), axis=0)

    plt.imshow(x_test[58])
    plt.show()

    out = model.predict(x_test[58])
    out = np.argmax(out)

    if out == 1:
            label = 'plastic'
    else:
            label = 'glass'

    return out, label

In [82]:
# data visualisation and manipulation
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import style
import seaborn as sns
 

sns.set(style='whitegrid',color_codes=True)

#model selection

from sklearn.preprocessing import LabelEncoder

#preprocess.
from keras.preprocessing.image import ImageDataGenerator
from keras.applications.vgg16 import preprocess_input
from tensorflow.keras.utils import load_img
from tensorflow.keras.utils import img_to_array

#dl libraraies
import tensorflow as tf
import random as rn

from tensorflow import keras

from keras.models import Sequential
from keras.layers import Dense
from tensorflow.keras.optimizers import Adam,SGD,Adagrad,Adadelta,RMSprop
from tensorflow.keras.utils import to_categorical

# specifically for cnn
from keras.layers import Dropout, Flatten,Activation
from keras.layers import Conv2D, MaxPooling2D, BatchNormalization

# specifically for manipulating zipped images and getting numpy arrays of pixel values of images.
import cv2                  
from tqdm import tqdm
import os                   
from random import shuffle

In [1]:
import os
import numpy as np
import cv2
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adagrad
from tensorflow.keras.optimizers import legacy

# Define the path to your image dataset
dataset_path = 'Garbage_classification'

# Define the desired image size
target_size = (256, 256)

# Load and preprocess the images
all_images = []
all_labels = []

# Iterate over the images in the dataset
for label in os.listdir(dataset_path):
    label_path = os.path.join(dataset_path, label)
    if os.path.isdir(label_path):
        for image_file in os.listdir(label_path):
            image_path = os.path.join(label_path, image_file)
            image = cv2.imread(image_path)
            image = cv2.resize(image, target_size)
            all_images.append(image)
            all_labels.append(label)

# Convert the lists to numpy arrays
all_images = np.array(all_images)
all_labels = np.array(all_labels)

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(
    all_images, all_labels, test_size=0.2, random_state=42, stratify=all_labels
)

# Normalize pixel values to range [0, 1]
X_train = X_train.astype("float32") / 255.0
X_test = X_test.astype("float32") / 255.0

# Convert labels to integer type
label_mapping = {label: idx for idx, label in enumerate(np.unique(all_labels))}
y_train = np.array([label_mapping[label] for label in y_train])
y_test = np.array([label_mapping[label] for label in y_test])

# Convert labels to one-hot encoding
y_train = np.eye(len(label_mapping))[y_train]
y_test = np.eye(len(label_mapping))[y_test]

# Define the model
model = models.Sequential()
model.add(layers.Flatten(input_shape=(target_size[0], target_size[1], 3)))
model.add(layers.Dense(64, activation="relu"))
model.add(layers.Dense(32, activation="relu"))
model.add(layers.Dense(len(label_mapping), activation="softmax"))

# Compile the model
model.compile(
    loss='categorical_crossentropy',
    optimizer=legacy.Adagrad(learning_rate=1e-5, decay=1e-6),
    metrics=['accuracy'])

# Define early stopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=20, verbose=1, mode='auto')

# Train the model
history = model.fit(
    X_train,
    y_train,
    validation_data=(X_test, y_test),
    batch_size=32,
    epochs=100,
    callbacks=[early_stopping]
)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

In [2]:
import os
import cv2
import numpy as np
from sklearn.preprocessing import LabelEncoder
from tensorflow import keras

# Define the path to your image dataset directory
dataset_dir = 'Garbage_classification'

# Define the desired image size
target_size = (256, 256)

# Load and preprocess the images
all_images = []
all_labels = []

# Iterate over the images in the dataset
for label in os.listdir(dataset_dir):
    label_path = os.path.join(dataset_dir, label)
    if os.path.isdir(label_path):
        for image_file in os.listdir(label_path):
            image_path = os.path.join(label_path, image_file)
            image = cv2.imread(image_path)
            image = cv2.resize(image, target_size)
            all_images.append(image)
            all_labels.append(label)

# Convert the lists to numpy arrays
all_images_array = np.array(all_images)
all_labels_array = np.array(all_labels)

# Map string labels to integer values
label_encoder = LabelEncoder()
all_labels_encoded = label_encoder.fit_transform(all_labels_array)

# Save the label encoder for future reference
np.save('label_encoder.npy', label_encoder.classes_)

# Save the numpy arrays as files
np.save('all_images_array.npy', all_images_array)
np.save('all_labels.npy', all_labels_encoded)

In [3]:
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow import keras
from sklearn.preprocessing import LabelEncoder

def train(x_train, x_test, y_train, y_test):
    # Define and train your model here
    # Example placeholder code
    model = keras.models.Sequential()
    model.add(keras.layers.Flatten(input_shape=(256, 256, 3)))  # Flatten the input shape
    model.add(keras.layers.Dense(64, activation='relu'))
    model.add(keras.layers.Dense(64, activation='relu'))
    model.add(keras.layers.Dense(num_classes, activation='softmax'))
    
    model.compile(loss='categorical_crossentropy',
                  optimizer='adam',
                  metrics=['accuracy'])
    
    model.fit(x_train, y_train,
              batch_size=32,
              epochs=10,
              verbose=1,
              validation_data=(x_test, y_test))
    
    model.save('your_model.h5')  # Save the trained model
    
    return model

def test(x_test):
    # Define your test function here
    # Example placeholder code
    model = keras.models.load_model('your_model.h5')  # Load your trained model
    predictions = model.predict(x_test)
    # Perform any necessary post-processing on predictions
    labels = np.argmax(predictions, axis=1)
    return predictions, labels

if __name__ == "__main__":
    # Load all images
    all_images_array = np.load('all_images_array.npy')

    # Load the class labels
    all_labels = np.load('all_labels.npy')

    # Load the label encoder for future use
    label_encoder = LabelEncoder()
    all_labels_encoded = label_encoder.fit_transform(all_labels)

    # Split the dataset into train and test sets, with a split ratio of 70:30
    x_train, x_test, y_train, y_test = train_test_split(
        all_images_array, all_labels_encoded, test_size=0.30, shuffle=True, random_state=78
    )

    # Data normalization to convert features to the same scale
    x_train = x_train.astype('float32') / 255.0
    x_test = x_test.astype('float32') / 255.0

    print('x_train shape:', x_train.shape)
    print(x_train.shape[0], 'train samples')
    print(x_test.shape[0], 'test samples')

    # Convert class vectors to one-hot encoding
    num_classes = len(label_encoder.classes_)
    y_train = keras.utils.to_categorical(y_train, num_classes)
    y_test = keras.utils.to_categorical(y_test, num_classes)

    train(x_train, x_test, y_train, y_test)

    predictions, labels = test(x_test)

    print('The predictions of the test set are:', predictions)
    print('The predicted labels are:', labels)
    
    # Evaluate the model
    _, accuracy = model.evaluate(x_test, y_test)
    print('Test accuracy:', accuracy)


x_train shape: (1768, 256, 256, 3)
1768 train samples
759 test samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
The predictions of the test set are: [[4.50083092e-02 3.66980344e-01 2.21607119e-01 6.56540394e-02
  2.31092528e-01 6.96576014e-02]
 [6.43486669e-03 5.92182219e-01 1.24495178e-01 2.61298735e-02
  2.50657737e-01 1.00156336e-04]
 [4.90745949e-03 4.83043008e-02 7.01037765e-01 5.42719811e-02
  1.90693349e-01 7.85150798e-04]
 ...
 [1.95864234e-02 8.32041651e-02 5.39829016e-01 1.65403709e-01
  1.90936342e-01 1.04029803e-03]
 [9.15093627e-03 1.98192805e-01 5.16510487e-01 4.00925763e-02
  2.35860854e-01 1.92341889e-04]
 [3.80101614e-02 4.31730837e-01 1.88954905e-01 5.93920127e-02
  2.64262348e-01 1.76497195e-02]]
The predicted labels are: [1 1 2 2 2 2 2 2 2 0 2 2 4 0 1 2 2 2 4 1 2 2 1 1 2 1 0 2 5 2 2 4 2 2 1 0 2
 2 2 1 4 1 1 1 2 2 2 1 2 1 2 1 0 1 2 2 2 1 4 1 1 4 0 2 1 2 2 0 1 4 2 2 2 0
 2 1 2 2 0 2 2 1 2 2 2 2 5 2 

In [4]:
model = keras.models.load_model('your_model.h5')  # Load your trained model
model.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 flatten_1 (Flatten)         (None, 196608)            0         
                                                                 
 dense_3 (Dense)             (None, 64)                12582976  
                                                                 
 dense_4 (Dense)             (None, 64)                4160      
                                                                 
 dense_5 (Dense)             (None, 6)                 390       
                                                                 
Total params: 12,587,526
Trainable params: 12,587,526
Non-trainable params: 0
_________________________________________________________________


In [6]:
num_epochs = 10  # Define the number of training epochs
batch_size = 32  # Define the batch size

history = model.fit(x_train, y_train, validation_data=(x_test, y_test), epochs=num_epochs, batch_size=batch_size)

# Retrieve validation accuracy
validation_accuracy = history.history['val_accuracy']

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [10]:
# Assuming you have loaded or trained a model named 'model'
# and have a test data set named 'x_test'

# Get prediction probabilities for each class
predictions = model.predict(x_test)

# The 'predictions' variable will be a 2D array where each row represents a sample in 'x_test'
# and each column represents the prediction probability for a specific class

# For example, to get the confidence score for the first sample in 'x_test' for class 0:
confidence_score = predictions[0][0]
# Assuming you have loaded or trained a model named 'model'
# and have a test data set named 'x_test'

# Assuming you have loaded or trained a model named 'model'
# and have a test data set named 'x_test'

# Get predicted probabilities for each class
predictions = model.predict(x_test)

# Get predicted class labels using argmax
predicted_classes = predictions.argmax(axis=1)

# The 'predicted_classes' variable will be a 1D array where each element represents the predicted class label for a specific sample in 'x_test'

# For example, to get the predicted class label for the first sample in 'x_test':
class_label = predicted_classes[0]


