<a href="https://colab.research.google.com/github/filippomenegatti/NeuralNetworks_Classification/blob/main/Script_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Image classification with Neural Networks
Use `tensorflow` to train neural networks for the classification of fruit/vegetable types based on images from this dataset. Images must be transformed from JPG to RGB pixel values and scaled down (e.g., 32x32).  
Use fruit/vegetable types (as opposed to variety) as labels to predict and consider only the 10 most frequent types (apple, banana, plum, pepper, cherry, grape, tomato, potato, pear, peach).  
Experiment with different network architectures and training parameters documenting their influence of the final predictive performance. While the training loss can be chosen freely, the reported test errors must be measured according to the zero-one loss for multiclass classification.

# Introduction

In these essay we are going to analyse the dataset available on Kaggle website [1] under the license CC BY-SA 4.0, using a Deep Learning approach. The dataset contains 90380 images of 131 fruits and vegetables divided in folders for training and test set respectively. We are going to select just a subsample of the available fruits creating 10 macrocategories with the most frequent types. Different Neural Networks architectures will be compared, starting from different settings of the Feedforward Neural Networks and concluding with two Convolutional Neural Network models.



# Setting up the environment

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from google.colab import files

files.upload() #import the kaggle.json file

In [None]:
#install kaggle and download the data set in the desired path
!pip install -q kaggle
!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d moltean/fruits
!mkdir ML_assignment
!unzip fruits.zip -d ML_assignment

In [None]:
#import all the libraries and functions
import os
import random
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, KFold


import tensorflow as tf
from tensorflow import keras
from keras.preprocessing.image import array_to_img, img_to_array, load_img
from keras.utils import np_utils
from keras.models import Sequential, Model
from keras.layers import Dense, Flatten, Dropout, Conv2D, MaxPool2D, Activation, MaxPooling2D, Input, AveragePooling2D, GlobalAveragePooling2D
from keras.losses import categorical_crossentropy
from keras.optimizers import Adam, SGD, RMSprop, Adamax
from keras import regularizers
from keras.callbacks import LearningRateScheduler, History, EarlyStopping
from keras.wrappers.scikit_learn import KerasClassifier

print('The Tensorflow version used is: ' + tf.__version__)
print('The Keras version used is: ' + keras.__version__)

In [None]:
#set the seed
seed = 33
random.seed(seed)
tf.random.set_seed(seed)

# Dataset preprocessing

In [None]:
# import of the dataset divided in the 10 categories requested with the target size of 32x32

types = ["Apple", "Banana", "Plum", "Pepper", "Cherry", "Grape", "Tomato", "Potato", "Pear", "Peach"]

fruits = {}

def load_dataset(dire):
    fruits = {}
    images_as_array = []
    labels = []
    for category in os.listdir(dire):
        for typ in types:
            if(category.split()[0] == typ):
                fruits[category]= typ
                path = os.path.join(dire,category)
                class_num =types.index(fruits[category])

                class_name = fruits[category]
                for img in os.listdir(path):
                    file = os.path.join(path,img)
                    images_as_array.append(img_to_array(load_img(file,target_size=(32, 32))))
                    labels.append(class_num)
    images_as_array =  np.array(images_as_array)
    labels = np.array(labels)
    return images_as_array, labels

In [None]:
train_path= '/content/ML_assignment/fruits-360/Training'
test_path= '/content/ML_assignment/fruits-360/Test'
train = load_dataset(train_path)
test = load_dataset(test_path)
X_train, y_train = train
X_test, y_test = test
X_train, y_train = shuffle(X_train, y_train)
X_test, y_test = shuffle(X_test, y_test)
print(X_train.shape)
print(X_test.shape)

In [None]:
n_classes = len(np.unique(y_train))
print(n_classes)

In [None]:
#look at the distribution of the classes in the sets to see if they are balanced_

unique_train, counts_train = np.unique(y_train, return_counts=True)
plt.bar(unique_train, counts_train)

unique_test, counts_test = np.unique(y_test, return_counts=True)
plt.bar(unique_test, counts_test)

plt.xticks(rotation=45)
plt.gca().legend(('y_train','y_test'))
plt.title('Class Frequency')
plt.xlabel('Class')
plt.ylabel('Frequency')

plt.show()

In [None]:
# creation the validation set as a 20% of the training one
X_val, X_train, y_val, y_train  = train_test_split(X_train, y_train, train_size = 0.20)

In [None]:
# normalization of the sets
X_train = X_train.astype('float32')/255
X_test = X_test.astype('float32')/255
X_val = X_val.astype('float32')/255

print('Training X:\n',X_train.shape)
print('\nVaildation X:\n',X_val.shape)
print('\nTest X:\n',X_test.shape)

In [None]:
# image example of the data

n_rows = 3
n_cols = 6
plt.figure(figsize=(n_cols * 1.5, n_rows * 1.5))
for row in range(n_rows):
    for col in range(n_cols):
        index = n_cols * row + col
        plt.subplot(n_rows, n_cols, index + 1)
        plt.imshow(X_train[index], cmap="binary", interpolation="nearest")
        plt.axis('off')
        plt.title(types[y_train[index]], fontsize=12)
plt.subplots_adjust(wspace=0.2, hspace=0.5)
plt.show()

In [None]:
# convert labels to categorical
y_train = np_utils.to_categorical(y_train, n_classes)
y_val = np_utils.to_categorical(y_val, n_classes)
y_test = np_utils.to_categorical(y_test, n_classes)

In [None]:
# definition of the zero-one loss function used for the calculation of the test error
def zo_loss(test, pred):
    y_hat = []
    y_t = []
    for i in range(len(pred)):
        y_hat.append(np.argmax(pred[i]))
        y_t.append(np.argmax(test[i]))
    loss = []
    for i in range(len(pred)):
        if(y_hat[i] == y_t[i]):
            loss.append(0)
        else:
            loss.append(1)
    return np.mean(loss)

# Feedforward Deep Neural Networks

### First basic model

In [None]:
model1 = keras.Sequential()
model1.add(keras.layers.Flatten(input_shape=[32, 32, 3]))
model1.add(keras.layers.Dense(1000, activation="relu"))
model1.add(keras.layers.Dense(400, activation="relu"))
model1.add(keras.layers.Dense(10, activation="softmax"))

model1.compile(loss = keras.losses.categorical_crossentropy,
              optimizer = "sgd",
              metrics = ["accuracy"])

model1.summary()

In [None]:
%%time

history1 = model1.fit(X_train, y_train, epochs=30,
                    validation_data=(X_val, y_val), 
                    verbose = 1, 
                    callbacks = [EarlyStopping(monitor='val_accuracy', patience=5, restore_best_weights=True)]
                    )

In [None]:
model1.evaluate(X_train, y_train)
model1.evaluate(X_test, y_test)

In [None]:
pd.DataFrame(history1.history).plot(figsize=(8, 5))
plt.grid(True)
plt.gca().set_ylim(0, 1)
plt.show()

In [None]:
y_pred1 = model1.predict(X_test)
zo_loss(y_test, y_pred1)

### Nesterov and exponential decay

In [None]:
# define the learning rate change 
def exp_decay(epoch):
    lrate = learning_rate * np.exp(-decay_rate*epoch)
    return lrate
early_stop = EarlyStopping(monitor='val_accuracy', patience=5, restore_best_weights=True)
epochs = 30
decay_rate = 1e-6
momentum = 0.9
learning_rate = 0.01
sgd = SGD(lr=learning_rate, momentum=momentum, decay=decay_rate, nesterov=True)
loss_history = History()
lr_rate = LearningRateScheduler(exp_decay)
callbacks_list = [loss_history, lr_rate, early_stop]

In [None]:
model2 = keras.Sequential()
model2.add(keras.layers.Flatten(input_shape=[32, 32, 3]))
model2.add(keras.layers.Dense(1000, activation="relu"))
model2.add(keras.layers.Dense(400, activation="relu"))
model2.add(keras.layers.Dense(10, activation="softmax"))

model2.compile(loss = keras.losses.categorical_crossentropy,
              optimizer = sgd,
              metrics = ["accuracy"])

model2.summary()

In [None]:
%%time

model2_history = model2.fit(X_train, y_train, epochs=epochs, 
                            verbose=1, callbacks=callbacks_list,
                            validation_data=(X_val, y_val))

In [None]:
pd.DataFrame(model2_history.history).plot(figsize=(8, 5))
plt.grid(True)
plt.gca().set_ylim(0, 1)
plt.show()

In [None]:
model2.evaluate(X_train, y_train)
model2.evaluate(X_test, y_test)

In [None]:
y_pred2 = model2.predict(X_test)
zo_loss(y_test, y_pred2)

### Dropout  

In [None]:
model3 = keras.Sequential()
model3.add(keras.layers.Flatten(input_shape=[32, 32, 3]))
model3.add(keras.layers.Dense(1000, activation="relu"))
model3.add(keras.layers.Dropout(0.1))
model3.add(keras.layers.Dense(400, activation="relu"))
model3.add(keras.layers.Dropout(0.2))
model3.add(keras.layers.Dense(10, activation="softmax"))

model3.compile(loss = keras.losses.categorical_crossentropy,
              optimizer = sgd,
              metrics = ["accuracy"])

model3.summary()

In [None]:
%%time

history3 = model3.fit(X_train, y_train, epochs=epochs, 
                            verbose=1, callbacks=callbacks_list,
                            validation_data=(X_val, y_val))

In [None]:
print(model3.evaluate(X_train, y_train))
print(model3.evaluate(X_test, y_test))

In [None]:
pd.DataFrame(history3.history).plot(figsize=(8, 5))
plt.grid(True)
plt.gca().set_ylim(0, 1)
plt.show()

y_pred3 = model3.predict(X_test)
zo_loss(y_test, y_pred3)

### L1 and L2 regularizers

In [None]:
model4 = keras.Sequential()
model4.add(keras.layers.Flatten(input_shape=[32, 32, 3]))
model4.add(keras.layers.Dense(1000, activation="relu", kernel_regularizer=regularizers.l1_l2()))
model4.add(keras.layers.Dense(400, activation="relu", kernel_regularizer=regularizers.l1_l2()))
model4.add(keras.layers.Dense(10, activation="softmax"))

model4.compile(loss = keras.losses.categorical_crossentropy,
              optimizer = sgd,
              metrics = ["accuracy"])

model4.summary()

In [None]:
%%time

history4 = model4.fit(X_train, y_train, epochs=epochs, 
                            verbose=1, callbacks=callbacks_list,
                            validation_data=(X_val, y_val))

In [None]:
pd.DataFrame(history4.history).plot(figsize=(8, 5))
plt.grid(True)
plt.gca().set_ylim(0, 1)
plt.show()

print(model4.evaluate(X_train, y_train))
print(model4.evaluate(X_test, y_test))

y_pred4 = model4.predict(X_test)
print(zo_loss(y_test, y_pred4))

# Hyperparameters Tuning

In [None]:
def create_model(optimizer = 'adam'):

    model = Sequential()
    model.add(keras.layers.Flatten(input_shape=[32, 32, 3]))
    model.add(Dense(1000, activation=tf.nn.relu))
    model.add(Dense(400, activation=tf.nn.relu))
    model.add(Dense(10, activation=tf.nn.softmax))
 
    model.compile(loss='categorical_crossentropy',
              optimizer=optimizer,
              metrics=['accuracy'])
    return model

In [None]:
epochs = 30

model_CV = KerasClassifier(build_fn=create_model, epochs=epochs, verbose=1)

# define the grid search parameters

optimizer = ['adam', 'rmsprop', 'adamax', 'nadam']

param_grid = dict(optimizer=optimizer)
grid = GridSearchCV(estimator=model_CV, param_grid=param_grid, cv=5)
grid_result = grid.fit(X_train, y_train, callbacks=callbacks_list,
                            validation_data=(X_val, y_val))

In [None]:
# print results
print(f'Best Accuracy for {grid_result.best_score_} using {grid_result.best_params_}')
means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']
for mean, stdev, param in zip(means, stds, params):
    print(f' mean={mean:.5}, std={stdev:.5} using {param}')

In [None]:
def create_model_SGD(nl1=1, nl2=1,  nl3=1, 
                 nn1=200, nn2=100, nn3 = 50, l1=0.01, l2=0.01,
                 dropout=0, output_shape=10, opt = sgd, act = 'relu'):
    
    
    reg = keras.regularizers.l1_l2(l1=l1, l2=l2)
                                                     
    model = Sequential()
    model.add(Flatten(input_shape=[32, 32, 3]))

    first=True
    
    for i in range(nl1):
        if first:
            model.add(Dense(nn1, activation=act, kernel_regularizer=reg))
            first=False
        else: 
            model.add(Dense(nn1, activation=act, kernel_regularizer=reg))
        if dropout!=0:
            model.add(Dropout(dropout))
            
    for i in range(nl2):
        if first:
            model.add(Dense(nn2, activation=act, kernel_regularizer=reg))
            first=False
        else: 
            model.add(Dense(nn2, activation=act, kernel_regularizer=reg))
        if dropout!=0:
            model.add(Dropout(dropout))
            
    for i in range(nl3):
        if first:
            model.add(Dense(nn3, activation=act, kernel_regularizer=reg))
            first=False
        else: 
            model.add(Dense(nn3, activation=act, kernel_regularizer=reg)) 
        if dropout!=0:
            model.add(Dropout(dropout))
            
    model.add(Dense(output_shape, activation='softmax'))
    model.compile(loss='categorical_crossentropy', optimizer= opt, metrics=['accuracy'],)
    return model

In [None]:
model_SGD = KerasClassifier(build_fn=create_model_SGD, epochs=30, verbose=1)

In [None]:
# numbers of layers
nl1 = [0,1,2,3]
nl2 = [0,1,2,3]
nl3 = [0,1,2,3]

# neurons in each layer
nn1=[1000, 1500, 2000,]
nn2=[500,1000,1500]
nn3=[250,500,1000]

# dropout and regularisation
dropout = [0, 0.1, 0.2, 0.3]
l1 = [0, 0.01, 0.003, 0.001,0.0001]
l2 = [0, 0.01, 0.003, 0.001,0.0001]

# dictionary summary
param_grid = dict(nl1=nl1, nl2=nl2, nl3=nl3, nn1=nn1, nn2=nn2, nn3=nn3, 
                  l1=l1, l2=l2, dropout=dropout)

In [None]:
grid1 = RandomizedSearchCV(estimator=model_SGD, cv=KFold(5), param_distributions=param_grid, 
                          verbose=20,  n_iter=10)

In [None]:
grid_result_SGD = grid1.fit(X_train, y_train, 
                            verbose=1, callbacks=callbacks_list,
                            validation_data=(X_val, y_val))

In [None]:
grid_result_SGD.best_params_

In [None]:
best_SGD = grid_result_SGD.best_estimator_

In [None]:
best_SGD.model.save("/content/drive/MyDrive/ML_NN/sgd")

In [None]:
tunedSGD = keras.models.load_model("/content/drive/MyDrive/ML_NN/sgd")

In [None]:
tunedSGD.summary()

In [None]:
historySGD = tunedSGD.fit(X_train, y_train,
  verbose=1, callbacks=callbacks_list,
  validation_data=(X_val, y_val),
  epochs = 30)

In [None]:
tunedSGD.evaluate(X_train, y_train)
tunedSGD.evaluate(X_test, y_test)

In [None]:
pd.DataFrame(historySGD.history).plot(figsize=(8, 5))
plt.grid(True)
plt.gca().set_ylim(0, 1)
plt.show()

In [None]:
y_pred = tunedSGD.predict(X_test)
print(zo_loss(y_test, y_pred))

In [None]:
def create_model_AM(nl1=1, nl2=1,  nl3=1, 
                 nn1=200, nn2=100, nn3 = 50, l1=0.01, l2=0.01,
                 dropout=0, output_shape=10, opt = keras.optimizers.Adamax(), act = 'relu'):
    
    
    reg = keras.regularizers.l1_l2(l1=l1, l2=l2)
                                                     
    model = Sequential()
    model.add(Flatten(input_shape=[32, 32, 3]))

    first=True
    
    for i in range(nl1):
        if first:
            model.add(Dense(nn1, activation=act, kernel_regularizer=reg)) #, kernel_initializer= init
            first=False
        else: 
            model.add(Dense(nn1, activation=act, kernel_regularizer=reg)) #, kernel_initializer= init
        if dropout!=0:
            model.add(Dropout(dropout))
            
    for i in range(nl2):
        if first:
            model.add(Dense(nn2, activation=act, kernel_regularizer=reg)) #, kernel_initializer= init
            first=False
        else: 
            model.add(Dense(nn2, activation=act, kernel_regularizer=reg)) #, kernel_initializer= init
        if dropout!=0:
            model.add(Dropout(dropout))
            
    for i in range(nl3):
        if first:
            model.add(Dense(nn3, activation=act, kernel_regularizer=reg)) #, kernel_initializer= init
            first=False
        else: 
            model.add(Dense(nn3, activation=act, kernel_regularizer=reg)) #, kernel_initializer= init
        if dropout!=0:
            model.add(Dropout(dropout))
            
    model.add(Dense(output_shape, activation='softmax'))
    model.compile(loss='categorical_crossentropy', optimizer= opt, metrics=['accuracy'],)
    return model

In [None]:
model_Adamax = KerasClassifier(build_fn=create_model_AM, epochs=30, verbose=1)

In [None]:
grid2 = RandomizedSearchCV(estimator= model_Adamax, cv=KFold(5), param_distributions=param_grid, verbose=20,  n_iter=10)

In [None]:
grid_result_AM = grid2.fit(X_train, y_train, callbacks=callbacks_list,
                            validation_data=(X_val, y_val))

In [None]:
grid_result_AM.best_params_

In [None]:
best_AM = grid_result_AM.best_estimator_

In [None]:
best_AM.model.save('/content/drive/MyDrive/ML_NN/adamax')

In [None]:
tunedAdamax = keras.models.load_model("/content/drive/MyDrive/ML_NN/adamax")

In [None]:
tunedAdamax.summary()

In [None]:
historyAdamax = tunedAdamax.fit(X_train, y_train,
  verbose=1, callbacks=callbacks_list,
  validation_data=(X_val, y_val),
  epochs = 30)

In [None]:
pd.DataFrame(historyAdamax.history).plot(figsize=(8, 5))
plt.grid(True)
plt.gca().set_ylim(0, 1)
plt.show()

In [None]:
tunedAdamax.evaluate(X_train, y_train)
tunedAdamax.evaluate(X_test, y_test)

In [None]:
y_pred = tunedAdamax.predict(X_test)
print(zo_loss(y_test, y_pred))

# Convolutional Neural Network

### VGG16 Convolutional Network

In [None]:
model = Sequential()
model.add(Conv2D(input_shape=[32,32,3],filters=64,kernel_size=(3,3),padding="same", activation="relu"))
model.add(Conv2D(filters=64,kernel_size=(3,3),padding="same", activation="relu"))
model.add(MaxPooling2D(pool_size=(2,2),strides=(2,2)))
model.add(Conv2D(filters=128, kernel_size=(3,3), padding="same", activation="relu"))
model.add(Conv2D(filters=128, kernel_size=(3,3), padding="same", activation="relu"))
model.add(MaxPooling2D(pool_size=(2,2),strides=(2,2)))
model.add(Conv2D(filters=256, kernel_size=(3,3), padding="same", activation="relu"))
model.add(Conv2D(filters=256, kernel_size=(3,3), padding="same", activation="relu"))
model.add(Conv2D(filters=256, kernel_size=(3,3), padding="same", activation="relu"))
model.add(MaxPooling2D(pool_size=(2,2),strides=(2,2)))
model.add(Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"))
model.add(Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"))
model.add(Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"))
model.add(MaxPooling2D(pool_size=(2,2),strides=(2,2)))
model.add(Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"))
model.add(Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"))
model.add(Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"))
model.add(MaxPooling2D(pool_size=(2,2),strides=(2,2)))
model.add(Flatten())
model.add(Dense(units=4096,activation="relu"))
model.add(Dense(units=4096,activation="relu"))
model.add(Dense(units=10, activation="softmax"))

In [None]:
model.summary()

In [None]:
model.compile(loss='categorical_crossentropy',
              optimizer='adamax',
              metrics=['accuracy'])

In [None]:
history = model.fit(X_train,y_train,
        epochs=20,
        validation_data=(X_val, y_val),
        verbose=1, shuffle=True)

In [None]:
model.evaluate(X_train, y_train)
model.evaluate(X_test, y_test)

In [None]:
pd.DataFrame(history.history).plot(figsize=(8, 5))
plt.grid(True)
plt.gca().set_ylim(0, 1)
plt.show()

In [None]:
y_pred = model.predict(X_test)
print(zo_loss(y_test, y_pred))

### Res-Net 34

In [None]:
class ResidualUnit(keras.layers.Layer):
  def __init__(self, filters, strides=1, activation="relu", **kwargs):
    super().__init__(**kwargs)
    self.activation = keras.activations.get(activation)
    self.main_layers = [
      keras.layers.Conv2D(filters, 3, strides=strides, padding="same", use_bias=False), 
      keras.layers.BatchNormalization(),
      self.activation, keras.layers.Conv2D(filters, 3, strides=1, padding="same", use_bias=False), keras.layers.BatchNormalization()]
    self.skip_layers = []
    if strides > 1:
      self.skip_layers = [keras.layers.Conv2D(filters, 1, strides=strides, padding="same", use_bias=False), keras.layers.BatchNormalization()]
  def call(self, inputs):
    Z = inputs
    for layer in self.main_layers:
      Z = layer(Z)
    skip_Z = inputs
    for layer in self.skip_layers:
      skip_Z = layer(skip_Z)
    return self.activation(Z + skip_Z)

In [None]:
model = keras.models.Sequential()
model.add(keras.layers.Conv2D(64, 7, strides=2, input_shape=[32, 32, 3],
                              padding="same", use_bias=False))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.MaxPool2D(pool_size=3, strides=2, padding="same"))
prev_filters = 64
for filters in [64] * 3 + [128] * 4 + [256] * 6 + [512] * 3:
  strides = 1 if filters == prev_filters else 2
  model.add(ResidualUnit(filters, strides=strides))
  prev_filters = filters
model.add(keras.layers.GlobalAvgPool2D())
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(10, activation="softmax"))
model.summary()

In [None]:
model.compile(loss='categorical_crossentropy',
              optimizer='adamax',
              metrics=['accuracy'])

In [None]:
history = model.fit(X_train,y_train,
        epochs=20,
        validation_data=(X_val, y_val),
        verbose=1, shuffle=True)

In [None]:
model.evaluate(X_train, y_train)
model.evaluate(X_test, y_test)

In [None]:
pd.DataFrame(history.history).plot(figsize=(8, 5))
plt.grid(True)
plt.gca().set_ylim(0, 1)
plt.show()

In [None]:
y_pred = model.predict(X_test)
print(zo_loss(y_test, y_pred))