In [None]:
from google_drive_downloader import GoogleDriveDownloader as gdd
import random, os, json, re, math, statistics
import numpy as np
from numpy import load
from numpy import save
import scipy as sp
from scipy import io
import cv2 as cv 
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from keras.models import load_model
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, BatchNormalization, Conv2D, Dropout, Flatten, MaxPooling2D, Activation
from sklearn.metrics import classification_report, plot_confusion_matrix
from keras.utils.vis_utils import plot_model

In [None]:
# Mount google drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Download dataset
gdd.download_file_from_google_drive(file_id='15M2oVwpl1UwUgreZDXj1QlLY8pv0cdSl',
                                    dest_path='./RetailDataset.zip',
                                    unzip=True)
os.remove('./RetailDataset.zip')

In [None]:
### Load Data
path = './drive/My Drive/MSc Project/'
x_train_name = open(path+'x_train_name.txt').readlines()
x_val_name = open(path+'x_val_name.txt').readlines()
x_test_name = open(path+'x_test_name.txt').readlines()
y_train = load(path+'y_train.npy')
y_val = load(path+'y_val.npy')
y_test = load(path+'y_test.npy')
x_val = load(path+'x_val.npy')

In [None]:
### Data Generator
class InputData:
  def __init__(self, img_folder_path, img_names, labels, batch_size):
    '''
    Args:
      img_folder_path: path to image folder
      img_names: all image names
      labels: image labels

    Output:
      a tuple of two elements:
        an array of an image batch and a list of corresponding labels
    '''
    self.lines = img_names
    self.batch_size = batch_size
    self.folder = img_folder_path
    self.y = labels
    self._index = 0
    self._end = False

  @property
  def increment_index(self):
    self._index += self.batch_size

  def generate_batch(self):
    old_index = self._index
    self.increment_index
    if self._index <= len(self.lines):
      new_index = self._index
    else:
      new_index = len(self.lines)
      self._end = True
    input_shape = (new_index-old_index,256,256,3) # input image shape is (256,256,3)
    result = np.zeros(input_shape)
    counter = 0
    for i in range(old_index,new_index):
      img_path = self.folder+self.lines[i].strip()
      crop_img = cv.imread(img_path)[172:1772,496:2096]/255 # crop image to (160,160)
      result[counter] = cv.resize(crop_img, (256,256)) # resize image to (256,256)
      counter += 1
    return result, self.y[old_index:new_index]

def generate_batches(image_paths,labels,batch_size,epoch):
  for i in range(epoch):
    batch = InputData('./train2019/',image_paths,labels,batch_size)
    while batch._end==False:
      yield batch.generate_batch()

def partial_train(length,percentage):
  return math.ceil(length*percentage)

In [None]:
### CNN training
def cnn_train_models(x_train_name, y_train, x_val, y_val, train_proportion,
               batch_size, num_epochs, num_conv, num_full):
  model = Sequential()
  model.add(BatchNormalization())

  if num_conv==1 or num_conv==2 or num_conv==3:
    model.add(Conv2D(32, kernel_size=(7,7), dilation_rate=2, input_shape=(256,256,3))) # a dilation layer
    model.add(Activation('relu'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(5,5)))
 
  if num_conv==2 or num_conv==3:
    model.add(Conv2D(32, kernel_size=(7,7)))
    model.add(Activation('relu'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(5,5),strides=(2,2)))

  if num_conv==3:
    model.add(Conv2D(64, kernel_size=(7,7)))
    model.add(Activation('relu'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(5,5)))
    
  model.add(Flatten()) # Flattening the 2D arrays for fully connected layers

  model.add(Dense(128, activation=tf.nn.relu))
  model.add(BatchNormalization())

  model.add(Dense(17,activation=tf.nn.softmax))
  opt = keras.optimizers.Adam(learning_rate=0.0001) # Adam optimiser
  model.compile(loss='sparse_categorical_crossentropy', # since label is an integer in [0,16]
                optimizer=opt, metrics=['accuracy'])

  # Train model on dataset
  partial_index = partial_train(len(x_train_name),train_proportion)
  history = model.fit(generate_batches(x_train_name[:partial_index],y_train[:partial_index],batch_size,num_epochs),
                      steps_per_epoch=math.ceil(partial_index/batch_size), verbose=1, epochs=num_epochs,
                      validation_data=(x_val,y_val))
                      # steps_per_epoch = number of training samples/number of batches
  
  # Save model
  proportion = int(train_proportion*100)
  model_name = 'CNN_'+str(proportion)+'_'+str(num_conv)+'conv_'+str(num_full)+'full_'+str(batch_size)+'batch'
  model.save('./drive/My Drive/MSc Project/Models/'+model_name)
  return model, history.history

### CNN evaluation
def cnn_test_models(model,x_test_name,y_test):
  test_result = model.evaluate(generate_batches(x_test_name,y_test,128,1), verbose=1)
  return test_result

In [None]:
### Summarise training history
def summarise_history(history, name):
  fig, axs = plt.subplots(2,1,figsize=(6,12))
  plt.xlabel('epoch', fontsize=16)

  # summarise history for loss
  ax = axs[0]
  ax.set_ylim([0,3.0])
  ax.plot(history['loss'])
  ax.plot(history['val_loss'])
  ax.set_title('training loss',fontsize=16)
  ax.set_xticks(np.arange(0,11,2))
  ax.legend(['train', 'validation'], loc='upper left')

  # summarise history for accuracy
  ax = axs[1]
  ax.plot(history['accuracy'])
  ax.plot(history['val_accuracy'])
  ax.set_title('classification accuracy',fontsize=16)
  ax.set_xticks(np.arange(0,11,2))
  ax.set_yticks(np.arange(0,1.1,0.1))
  ax.legend(['train', 'validation'], loc='upper left')

  #plt.show()
  plt.savefig(name+'.png', bbox_inches='tight')
  plt.close(fig)

In [None]:
### Experiment on data quantity

# 20%
model, model_history = cnn_train_models(x_train_name,y_train,x_val,y_val,train_proportion=0.2,
                           batch_size=128,num_epochs=10,num_conv=1,num_full=2)
print(model_history)
summarise_history(model_history,'0.2_1conv')
test_result = cnn_test_models(model,x_test_name,y_test)
print(test_result)

# 40%
model, model_history = cnn_train_models(x_train_name,y_train,x_val,y_val,train_proportion=0.4,
                           batch_size=128,num_epochs=10,num_conv=1,num_full=2)
print(model_history)
summarise_history(model_history,'0.4_1conv')
test_result = cnn_test_models(model,x_test_name,y_test)
print(test_result)

# 60%
model, model_history = cnn_train_models(x_train_name,y_train,x_val,y_val,train_proportion=0.6,
                           batch_size=128,num_epochs=10,num_conv=1,num_full=2)
print(model_history)
summarise_history(model_history,'0.6_1conv')
test_result = cnn_test_models(model,x_test_name,y_test)
print(test_result)

# 80%
model, model_history = cnn_train_models(x_train_name,y_train,x_val,y_val,train_proportion=0.8,
                           batch_size=128,num_epochs=10,num_conv=1,num_full=2)
print(model_history)
#summarise_history(model_history,'0.8_1conv') # this model diverges
test_result = cnn_test_models(model,x_test_name,y_test)
print(test_result)

# 100%
model, model_history = cnn_train_models(x_train_name,y_train,x_val,y_val,train_proportion=1.0,
                           batch_size=128,num_epochs=10,num_conv=1,num_full=2)
print(model_history)
summarise_history(model_history,'1.0_1conv')
test_result = cnn_test_models(model,x_test_name,y_test)
print(test_result)

In [None]:
### Experiment on CNN depth

# 1 convolutional layer, using 60% data
# training has be done in previous experiment

# 2 convolutional layers, using 60% data
model, model_history = cnn_train_models(x_train_name,y_train,x_val,y_val,train_proportion=0.6,
                           batch_size=128,num_epochs=10,num_conv=2,num_full=2)
print(model_history)
summarise_history(model_history,'0.6_2conv')
test_result = cnn_test_models(model,x_test_name,y_test)
print(test_result)

# 3 convolutional layers, using 60% data
model, model_history = cnn_train_models(x_train_name,y_train,x_val,y_val,train_proportion=0.6,
                           batch_size=128,num_epochs=10,num_conv=3,num_full=2)
print(model_history)
summarise_history(model_history,'0.6_3conv')
test_result = cnn_test_models(model,x_test_name,y_test)
print(test_result)

In [None]:
### The CNN that yields the optimal result in this research
#100% data, 8 epochs
model, model_history = cnn_train_models(x_train_name,y_train,x_val,y_val,train_proportion=1.0,
                           batch_size=128,num_epochs=8,num_conv=1,num_full=2)
print(model_history)
test_result = cnn_test_models(model,x_test_name,y_test)
print(test_result)