In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Conv2D, MaxPool2D, AveragePooling2D, Dense, Flatten, Dropout, Rescaling, RandomTranslation, RandomRotation, RandomZoom, RandomContrast, LeakyReLU, BatchNormalization
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam, Adadelta
from tensorflow.keras.applications import ResNet50V2, Xception, EfficientNetB0, DenseNet121
import pathlib
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import cv2
import PIL
from PIL import Image
import pathlib

In [None]:
#Set constants for image size and processing
img_height = 32
img_width = 32
num_classes = 43

In [None]:
#Set directory for dataset
from google.colab import drive
drive.mount('/content/gdrive')

#unzip the folder to a directory
!unzip "/content/gdrive/MyDrive/Colab Notebooks/archive.zip" -d path_original

In [None]:
#set data paths
data_dir = 'path_original'
train_path = 'path_original/Train'
test_path = 'path_original/'

In [None]:
#read in the training data
image_data = []
image_labels = []

for i in range(num_classes):
    path = data_dir + '/Train/' + str(i)
    images = os.listdir(path)

    for img in images:
        try:
            image = cv2.imread(path + '/' + img)
            image_fromarray = Image.fromarray(image, 'RGB')
            resize_image = image_fromarray.resize((img_height, img_width))
            image_data.append(np.array(resize_image))
            image_labels.append(i)
        except:
            print("Error in " + img)

# convert lists to numpy arrays
image_data = np.array(image_data)
image_labels = np.array(image_labels)

In [None]:
#shuffling the training data
idx = np.arange(image_data.shape[0])
np.random.shuffle(idx)
image_data = image_data[idx]
image_labels = image_labels[idx]

#Creating training and validation sets
from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(image_data, image_labels, train_size=0.8, random_state=101, shuffle=True)

#normalize the images
#As the pixel values range from 0 to 256, apart from 0 the range is 255. So dividing all the values by 255 will convert it to range from 0 to 1.
X_train = X_train/255 
X_val = X_val/255

#One-hot encoding labels
y_train = keras.utils.to_categorical(y_train, 43)
y_val = keras.utils.to_categorical(y_val, 43)

In [None]:
#read in the test data
test = pd.read_csv(data_dir + '/Test.csv')

labels = test["ClassId"].values
imgs = test["Path"].values

data =[]

for img in imgs:
    try:
        image = cv2.imread(data_dir + '/' +img)
        image_fromarray = Image.fromarray(image, 'RGB')
        resize_image = image_fromarray.resize((img_height, img_width))
        data.append(np.array(resize_image))
    except:
        print("Error in " + img)
X_test = np.array(data)
y_test = labels

#normalize by 255
X_test = X_test/255
y_test = keras.utils.to_categorical(y_test, 43)

In [None]:
#Data augmentation for alternative models
data_augment = Sequential([RandomTranslation(height_factor=0.1,width_factor=0.1),
                           RandomRotation(0.1),
                           RandomZoom(0.1),RandomContrast(0.1)])

In [None]:
#Specifying and training ResNet
i = tf.keras.layers.Input([None, None, 3], dtype = tf.uint8)
x = tf.cast(i, tf.float32)
x = data_augment(i)
x = tf.keras.applications.resnet_v2.preprocess_input(x)
core = ResNet50V2(include_top=False,weights=None,pooling='avg',input_shape=(64,64,3))
x = core(x)
base_model = tf.keras.Model(inputs=[i], outputs=[x])
resnet = Sequential()
resnet.add(base_model)
resnet.add(Dropout(0.2))
resnet.add(Dense(43, activation='softmax'))
resnet.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
resnet.fit(x=X_train,y = y_train, batch_size=32, epochs=30, validation_data=(X_val, y_val))

In [None]:
#Specifying and training DenseNet
i = tf.keras.layers.Input([None, None, 3], dtype = tf.uint8)
x = tf.cast(i, tf.float32)
x = data_augment(i)
x = tf.keras.applications.densenet.preprocess_input(x)
core = DenseNet121(include_top=False,weights=None,pooling='avg',input_shape=(64,64,3))
x = core(x)
base_model = tf.keras.Model(inputs=[i], outputs=[x])
densenet = Sequential()
densenet.add(base_model)
densenet.add(Dropout(0.2))
densenet.add(Dense(43, activation='softmax'))
densenet.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
densenet.fit(x=X_train,y = y_train, batch_size=32, epochs=30, validation_data=(X_val, y_val))

In [None]:
#Specifying and training EfficientNet
i = tf.keras.layers.Input([None, None, 3], dtype = tf.uint8)
x = tf.cast(i, tf.float32)
x = data_augment(i)
x = tf.keras.applications.efficientnet.preprocess_input(x)
core = EfficientNetB0(include_top=False,weights=None,pooling='avg',input_shape=(64,64,3))
x = core(x)
base_model = tf.keras.Model(inputs=[i], outputs=[x])
efficientnet = Sequential()
efficientnet.add(base_model)
efficientnet.add(Dropout(0.2))
efficientnet.add(Dense(43, activation='softmax'))
efficientnet.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
efficientnet.fit(x=X_train,y = y_train, batch_size=32, epochs=30, validation_data=(X_val, y_val))

In [None]:
#Specifying and training Xception
i = tf.keras.layers.Input([None, None, 3], dtype = tf.uint8)
x = tf.cast(i, tf.float32)
x = data_augment(i)
x = tf.keras.applications.xception.preprocess_input(x)
core = Xception(include_top=False,weights=None,pooling='avg',input_shape=(71,71,3))
x = core(x)
base_model = tf.keras.Model(inputs=[i], outputs=[x])
xception = Sequential()
xception.add(base_model)
xception.add(Dropout(0.2))
xception.add(Dense(43, activation='softmax'))
xception.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
xception.fit(x=X_train,y = y_train, batch_size=32, epochs=30, validation_data=(X_val, y_val))

In [None]:
#Set up our custom model architecture
#Trying simple expanded LeNet architecture with C-C-P-C-C-P layering
#Using batchnormalization after each convolutional layer

model = Sequential()

#CNN layer 1
model.add(Conv2D(filters=25, kernel_size=(5,5), input_shape=(img_height,img_width,3)))
model.add(LeakyReLU(alpha=0.01))
model.add(Conv2D(filters=25, kernel_size=(5,5)))
model.add(LeakyReLU(alpha=0.01))
model.add(MaxPool2D(pool_size=(2,2)))
model.add(BatchNormalization(axis=-1))

#CNN layer 2
model.add(Conv2D(filters=50, kernel_size=(3,3)))
model.add(LeakyReLU(alpha=0.01))
model.add(Conv2D(filters=50, kernel_size=(3,3)))
model.add(LeakyReLU(alpha=0.01))
model.add(MaxPool2D(pool_size=(2,2)))
model.add(BatchNormalization(axis=-1))
model.add(Dropout(rate=0.3))

model.add(Flatten())
model.add(Dense(512))
model.add(LeakyReLU(alpha=0.01))
keras.layers.BatchNormalization()
model.add(Dropout(rate=0.5))
model.add(Dense(43, activation='softmax'))


In [None]:
#Load saved model
model = tf.keras.models.load_model('/content/gdrive/MyDrive/Colab Notebooks/cnn6')

In [None]:
#Get summary table of model
model.build(input_shape=(None, 32, 32, 3))
model.summary()

In [None]:
#Compile model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
#Create data augmentation
augment = ImageDataGenerator(
    rotation_range=10,
    zoom_range=0.15,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.1,
    horizontal_flip=False,
    vertical_flip=False,
    fill_mode="nearest")

#Fit model
history = model.fit(x=X_train,y = y_train, batch_size=32, epochs=30, validation_data=(X_val, y_val))
#model.fit(augment.flow(X_train, y_train, batch_size=32), epochs=10, validation_data=(X_val, y_val))

In [None]:
#Plot training and validation acc through epochs
plt.plot(history.history['accuracy'],label="Training Accuracy")
plt.plot(history.history['val_accuracy'],label="Validation Accuracy")
plt.title("Learning Curve of Our Model Through 30 Epochs")
plt.legend()
plt.savefig("learning_curve.png")

In [None]:
#Save and download model
model.save('cnn6')
!zip -r /content/cnn6.zip /content/cnn6
from google.colab import files
files.download("/content/cnn6.zip")

In [None]:
#Evaluate model to get test accuracy
model.evaluate(x=X_test,y=y_test)

In [None]:
#Get predictions from model
preds = np.argmax(model.predict(x=X_test),axis=1)

In [None]:
#Get classification report on test predictions

from sklearn.metrics import classification_report, confusion_matrix, f1_score

print(classification_report(test['ClassId'], preds))

In [None]:
#Plotting model complexity plots

import pandas as pd
from io import StringIO
times = StringIO("""    train    classif    trainable
Custom-Simple    15    0.41    630539
ResNet50V2    67    0.69    23607467
DenseNet121    77    1.34    6997931
EfficientNetB0    64    0.75    4062631
Xception    99    0.81    20895059
""")
times_df = pd.read_csv(times, index_col=0, delimiter=' ', skipinitialspace=True)

#Set figure and axes
fig = plt.figure() 
ax = fig.add_subplot(111)
ax2 = ax.twinx()

#Plot both times on different axes
width = 0.3
times_df.train.plot(kind='bar', color='teal', ax=ax, width=width, position=1)
times_df.classif.plot(kind='bar', color='orange', ax=ax2, width=width, position=0)

ax.set_ylabel('Training Time (sec/epoch)')
ax2.set_ylabel('Classification Time (ms/image)')
plt.title("Training and Classification Times for Different CNN Architectures")

plt.savefig('CNN_runtimes.png', bbox_inches="tight")
plt.show()

#Plot trainable parameters barplot
times_df.trainable.plot(kind='bar', color='orange', width=width)
plt.title("Number of Trainable Parameters for CNN Architectures", fontsize=10.5)
plt.ylabel("Trainable Paramters (in 10 millions)")
plt.savefig('CNN_params.png', bbox_inches="tight")

In [None]:
#Plotting F1 scores vs training data

f1_scores = f1_score(test['ClassId'], preds, average=None)

train_counts = np.unique(traindf['ClassId'],return_counts=True)[1]

plt.scatter(train_counts, f1_scores, color="teal")
plt.xlabel("Number of Training Images")
plt.ylabel("F1-Score")
plt.title("F1-Score compared with Amount of Data in Class")
plt.savefig('f1_scores.png', bbox_inches="tight")

In [None]:
#Getting class 27 images
class_27 = np.where(np.argmax(y_test,axis=1) == 27)[0]
class_27_wrong = np.zeros(0,dtype=int)
for i in range(len(class_27)):
  if preds[class_27[i]] != 27:
    class_27_wrong = np.append(class_27_wrong,class_27[i])

In [None]:
confusion_matrix(test['ClassId'],preds)[27,11]
confusion_matrix(test['ClassId'],preds)[27,21]

In [None]:
plt.figure(figsize = (25, 5))

#Print out some wrong predictions for class 27 which had worst performance
start_index = 0
for i in range(5):
    plt.subplot(1, 5, i + 1)
    plt.grid(False)
    plt.xticks([])
    plt.yticks([])
    prediction = preds[class_27_wrong[i]]
    truth = labels[class_27_wrong[i]]
    col = 'g'
    if prediction != truth:
        col = 'r'
    plt.xlabel('Actual={} | Pred={}'.format(truth, prediction), color = col, fontsize=20)
    plt.imshow(X_test[class_27_wrong[i]])
plt.savefig("class27_preds.png")
plt.show()



In [None]:
#Print out some images from class 21 and 11 to see what could be cause
class_11 = np.where(np.argmax(y_test,axis=1)==11)[0]
class_21 = np.where(np.argmax(y_test,axis=1)==21)[0]

fig = plt.figure(figsize=(25,5))
ax1 = fig.add_subplot(1,5,1)
ax1.imshow(X_test[class_11[0]])
ax1.set_xlabel("Class 11", fontsize=20)
ax1.set_xticks([])
ax1.set_yticks([])
ax2 = fig.add_subplot(1,5,2)
ax2.imshow(X_test[class_11[1]])
ax2.set_xlabel("Class 11", fontsize=20)
ax2.set_xticks([])
ax2.set_yticks([])
ax3 = fig.add_subplot(1, 5, 3)
ax3.imshow(X_test[class_21[0]])
ax3.set_xlabel("Class 21", fontsize=20)
ax3.set_xticks([])
ax3.set_yticks([])
ax4 = fig.add_subplot(1, 5, 4)
ax4.set_xlabel("Class 21", fontsize=20)
ax4.set_xticks([])
ax4.set_yticks([])
ax4.imshow(X_test[class_21[1]])

fig.savefig("class11_21.png")