In [1]:
#Files
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import cv2
import glob
import pickle

#DATA
from keras.preprocessing.sequence import pad_sequences
from keras.preprocessing.text import one_hot
from keras.utils.np_utils import to_categorical
from sklearn.model_selection import train_test_split
import matplotlib.image as mpimg

#CNN
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from keras.models import Sequential
from keras.layers import Convolution2D,MaxPooling2D,Flatten,Dense
from tensorflow.keras.optimizers import Adam
from keras.losses import CategoricalCrossentropy
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.applications.vgg16 import VGG16

#VIS
from keras.utils.vis_utils import plot_model

In [2]:
def _prepareData(path): 
    '''
    params: path(string)
    return: [list list] of images in dataset and the list of labels
    '''
    labelsList = []
    listOfimg = []
    for directory in sorted(glob.glob(os.path.join(path, '*')), key = lambda k: k.split("/")[-1]):
            for img in glob.glob(os.path.join(directory,'*.jpg')):
                imgcv = cv2.imread(img)
                imgcv_r = cv2.resize(imgcv,(224,224)) #Resize to 128,128
                listOfimg.append(imgcv_r)
                labelsList.append(int(directory.split("/")[-1].replace('c','')))
    
    X_Train, X_Test, Y_Train, Y_Test =  train_test_split(listOfimg,labelsList, test_size = 0.2)
    Y_Train = tf.keras.utils.to_categorical(Y_Train, num_classes=10)
    Y_Test = tf.keras.utils.to_categorical(Y_Test, num_classes=10)

    return np.array(X_Train), np.array(X_Test), Y_Train, Y_Test

In [3]:
'''#Paths
pathTrainImages = "/kaggle/input/state-farm-distracted-driver-detection/img/train/"
pathPropagateImages =  "/kaggle/input/state-farm-distracted-driver-detection/img/test/"

#List of Images for Train and Test
X_Train, X_Test, Y_Train, Y_Test = _prepareData(pathTrainImages)

print("Size X_Train: {}, Size Y_Train: {}".format(len(X_Train),len(Y_Train)))
print("Size X_Test: {}, Size Y_Test: {}".format(len(X_Test),len(Y_Test)))
'''
#Paths
pathTrain_Images = "/kaggle/input/state-farm-distracted-driver-detection/imgs/train/"
pathPropagate_Images =  "/kaggle/input/state-farm-distracted-driver-detection/imgs/test/"

#List of Images for Train and Test
X_Train, X_Test, Y_Train, Y_Test = _prepareData(pathTrain_Images)

print("Size X_Train: {}, Size Y_Train: {}".format(len(X_Train),len(Y_Train)))
print("Size X_Test: {}, Size Y_Test: {}".format(len(X_Test),len(Y_Test)))

In [6]:
NUM_CLASSES = 10
LEARNING_RATE = 0.0001
NUM_EPOCHS = 10
BATCH_SIZE = 16
INPUT_SHAPE = (224,224,3)

def compile_model(model, learning_rate):
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
                loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
                metrics=['accuracy'])
    print(model.summary())
    return model


def fit_model(model, initial_epochs, batch_size, train_data, train_labels):
    history = model.fit(x = train_data, y = train_labels,
                        epochs = initial_epochs, batch_size = batch_size,
                        verbose = 1,validation_split=0.2)
    return history

def plot_accuracy(history, model_name):
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])

    plt.title(model_name+' Accuray')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    #plt.ylim([0.9,1])
    plt.legend(['train','test'], loc='upper left')
    plt.show()
    
def plot_loss(history, model_name):
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])

    plt.title(model_name+' Loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    #plt.ylim([0,.4])
    plt.legend(['train','test'], loc='upper left')
    plt.show()

# CNN

We create our own customized CNN model using the standard CNN architecture Conv => Activation => MaxPool

In [7]:
def create_model(input_shape, num_classes):
    model = tf.keras.Sequential([
    tf.keras.Input(shape=input_shape),
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(num_classes)
    ])
    return model

Next, we introduce data augmentation layer and normalization layer into the previously built CNN

In [8]:
def create_augmented(input_shape, num_classes):
    model = tf.keras.Sequential([
    tf.keras.Input(shape=input_shape),
    tf.keras.layers.RandomFlip("horizontal"), 
    tf.keras.layers.RandomRotation(0.1),
    tf.keras.layers.Rescaling(1./255),
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(num_classes)
    ])
    return model

In [None]:
custom = create_model(INPUT_SHAPE, NUM_CLASSES)
custom_model = compile_model(custom, LEARNING_RATE)
cus_history = fit_model(custom_model, NUM_EPOCHS, BATCH_SIZE, X_Train, Y_Train)

In [None]:
plot_loss(cus_history, 'Custom-CNN')
plot_accuracy(cus_history, 'Custom-CNN')

In [None]:
test_loss, test_acc = custom_model.evaluate(X_Test, Y_Test, verbose = 1)

In [None]:
custom_aug = create_augmented(INPUT_SHAPE, NUM_CLASSES)
custom_aug_model = compile_model(custom_aug, LEARNING_RATE)
aug_history = fit_model(custom_aug_model, NUM_EPOCHS, BATCH_SIZE, X_Train, Y_Train)

In [None]:
plot_loss(aug_history, 'Custom-Augmentation-CNN')
plot_accuracy(aug_history, 'Custom-Augmentation-CNN')

In [None]:
test_loss, test_acc = custom_aug_model.evaluate(X_Test, Y_Test, verbose = 1)

# TRANSFER LEARNING
Now, we build models using transfer learning approaches to improve results.

In [None]:
def vgg(input_shape, num_classes, mode):
    vgg = VGG16(
        include_top=False, weights='imagenet', input_tensor=None,
        input_shape=input_shape, pooling=None, classes=num_classes,
        classifier_activation='softmax'
        )
    if mode=='feature-extraction':
        vgg.trainable = False
    if mode=='fine-tune':
        vgg.trainable = True
        # Let's take a look to see how many layers are in the base model
        print("Number of layers in the base model: ", len(vgg.layers))

        # Fine-tune from this layer onwards
        fine_tune_at = 16

        # Freeze all the layers before the `fine_tune_at` layer
        for layer in vgg.layers[:fine_tune_at]:
            layer.trainable =  False
    print(vgg.summary())
    return vgg

In [None]:
def transfer_learn(base_model, input_shape, num_classes):
    inputs = tf.keras.Input(shape=input_shape)
#     x = utils.augment_data(inputs)
    x = preprocess_input(inputs)
    x = base_model(x, training=False)
    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.Dropout(0.2)(x)
    outputs = tf.keras.layers.Dense(num_classes)(x)
    model = tf.keras.Model(inputs, outputs)
    return model

## Feature Extraction
Extracting bottleneck features from a pretrained VGG16

In [None]:
vgg16 = vgg(INPUT_SHAPE, NUM_CLASSES, 'feature-extraction')
fe_model = compile_model(transfer_learn(vgg16, INPUT_SHAPE, NUM_CLASSES), LEARNING_RATE)
fe_history = fit_model(fe_model, NUM_EPOCHS, BATCH_SIZE, X_Train, Y_Train)

In [None]:
plot_loss(fe_history, 'Feature-Extraction')
plot_accuracy(fe_history, 'Feature-Extraction')

In [None]:
test_loss, test_acc = fe_model.evaluate(X_Test, Y_Test, verbose = 1)

## Fine Tuning
Fine tune some layers of the VGG16 i.e. retrain some the weights of the top layers of the pre-trained model alongside the training of the top that we previously trained to increase performance even further.

In [19]:
fine_tune_epochs = 10
total_epochs =  NUM_EPOCHS + fine_tune_epochs
vgg16_fine = vgg(INPUT_SHAPE, NUM_CLASSES, 'fine-tune')
fine_model = compile_model(transfer_learn(vgg16_fine, INPUT_SHAPE, NUM_CLASSES), LEARNING_RATE)
ft_history = fit_model(fine_model, total_epochs, X_Train, Y_Train)

In [20]:
plot_loss(ft_history, 'Fine-Tuner')
plot_accuracy(ft_history, 'Fine-Tuner')

In [21]:
test_loss, test_acc = fine_model.evaluate(X_Test, Y_Test, verbose = 1)