In [None]:
# mount google drive if needed.
from google.colab import drive
drive.mount('/content/drive')

# imports
import zipfile
import os
import shutil
import random
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
import tensorflow as tf
from tensorflow import keras
from keras.callbacks import TensorBoard
from keras.preprocessing import image
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing.image import ImageDataGenerator, img_to_array, load_img
from google.colab import files
from IPython.display import Image
from termcolor import colored
from matplotlib import patheffects

# extract the zip file and load the data

To manage the data set, we will do the following:

1. first, we will unzip the dataset and store its contents somewhere we can find.
2. After we will create 2 groups of data, training set and test set. Since the contents of the test set provided in the zip file are not labelled, we need to create our own test set from the labelled data.
3. after creating the datasets, we will load them into the program so we can use them with our models.

In [None]:
# set this variable to the path of the zip file
zip_file = '/content/drive/MyDrive/data/state-farm-distracted-driver-detection.zip'

# set this variable to the path where you want to extract the files
path_extracted_files = './extracted_files/'

step 1: extract the files if they havent been extracted

In [None]:
# check if the files have already been extracted
if os.path.exists(path_extracted_files):
    print(colored('Files have already been extracted', 'green'))
else:
    print(colored('Extracting files...', 'yellow'))
    # if they have not been extracted, extract them
    with zipfile.ZipFile(path_zip, 'r') as zip_ref:
        zip_ref.extractall(path_extracted_files)

    print(colored('Files have been extracted!', 'green'))

clear unnecesary files to optimize space

In [None]:
# check if we haven't already removed the unnececesary files, and if so remove them
print(colored('Removing unnecessary files...', 'yellow'))
if os.path.exists(path_extracted_files + 'driver_imgs_list.csv'):
    os.remove(path_extracted_files + 'driver_imgs_list.csv')

# do the same for the sample_submission file if it exists
if os.path.exists(path_extracted_files + 'sample_submission.csv'):
    os.remove(path_extracted_files + 'sample_submission.csv')

# clear the unlabelled test set that is inside the provided dataset
if os.path.exists(path_extracted_files + 'imgs/test'):
    shutil.rmtree(path_extracted_files + 'imgs/test')

print(colored('Unnecessary files have been removed!', 'green'))

step 2: create the test set from the labelled data.

In [None]:
#make 2 directories named test and train in the content directory with the same folders that are in the train directory
if os.path.exists(path_extracted_files + 'test'):
    print(colored('Directories already exist', 'green'))
else:
    print(colored('Creating class directories...', 'yellow'))
    os.makedirs(path_extracted_files + 'test')
    os.makedirs(path_extracted_files + 'train')

    # Get folders inside 'train'
    folders = os.listdir(path_extracted_files + 'imgs/train')

    # Create the classes folders in 'test' and 'train'
    for folder in folders:
        os.makedirs(path_extracted_files + f'test/{folder}')
        os.makedirs(path_extracted_files + f'train/{folder}')

    print(colored('Class directories have been created!', 'green'))

In [None]:
# From the initial train directory, generate a new train and test directories, comprising 85% and 15% of the images respectively
folders = os.listdir(path_extracted_files + 'imgs/train')

# Check if the content/imgs/train/c0 folder is empty
if len(os.listdir(path_extracted_files + 'imgs/train/c0')) != 0:

    print(colored('Moving images to test and train directories...', 'yellow'))
    # Iterate through each folder (c0-c9) in the train directory
    for folder in os.listdir(path_extracted_files + 'imgs/train'):

        # List all images in the current folder
        images = os.listdir(path_extracted_files + f'imgs/train/{folder}')

        # Shuffle the list of images randomly
        random.shuffle(images)

        # Calculate the number of images to move to the test directory (15% of total)
        n = int(len(images) * 0.15)

        # Select the first n images as test images
        test_images = images[:n]

        # Move the selected test images to the test directory
        for img in test_images:
            shutil.move(path_extracted_files + f'imgs/train/{folder}/{img}', path_extracted_files + f'test/{folder}/{img}')

    # Move the remaining 85% of the images to the train directory
    for folder in os.listdir(path_extracted_files + 'imgs/train'):

        # List all images in the current folder
        images = os.listdir(path_extracted_files + f'imgs/train/{folder}')

        # Shuffle the list of images randomly
        random.shuffle(images)

        # Move all images to the train directory
        for img in images:
            shutil.move(path_extracted_files + f'imgs/train/{folder}/{img}', path_extracted_files + f'train/{folder}/{img}')

    print(colored('Images have been moved!', 'green'))
else:
    print(colored('Images have already been moved', 'green'))

step 3: prepare the data for the model

In [None]:
# create an unaltered version of the data for training the model
unaltered_train_datagen = ImageDataGenerator(validation_split=0.1)

unaltered_train_generator = unaltered_train_datagen.flow_from_directory(path + 'train', 
                                                                        batch_size=128, 
                                                                        class_mode='categorical', 
                                                                        color_mode='rgb')

# create an unaltered version of the data for testing the model
unaltered_test_datagen = ImageDataGenerator()
unaltered_test_generator = unaltered_test_datagen.flow_from_directory(path + 'test',
                                                                      class_mode='categorical', 
                                                                      color_mode='rgb')

# create a data generator for the augmented data
augmented_train_datagen = ImageDataGenerator(validation_split=0.1, 
                                             target_size=(96,96), 
                                             rotation_range=20,
                                             zoom_range=0.15)

augmented_train_generator = augmented_train_datagen.flow_from_directory(path + 'train',
                                                                        batch_size=128,
                                                                        class_mode='categorical',
                                                                        color_mode='rgb')

# create a data generator for the augmented data
augmented_test_datagen = ImageDataGenerator(target_size=(96,96))

augmented_test_generator = augmented_test_datagen.flow_from_directory(path + 'test',
                                                                      class_mode='categorical',
                                                                      color_mode='rgb')

# Define the model

select what data we want to use

In [None]:
# set the data generator to use the training data
train_datagen = unaltered_train_datagen
test_datagen = unaltered_test_datagen

Define the inputs

In [None]:
# Define the input size of the images
input_width = train_datagen.target_size[0]
input_height = train_datagen.target_size[1]
num_colors = 3

Define the structure of our model

In [None]:
allow_testing = False

In [None]:
# if we are not testing the model, create a predefined model with the following layers
if not allow_testing:

# Definition of the network layers
layers = [
    # First convolution with 16 filters of size 3 x 3 and relu activation function
    keras.layers.Conv2D(16,
                       (9,9),
                       activation='relu',
                       input_shape=(input_size,input_height,num_colors)),

    keras.layers.MaxPooling2D(2, 2),

    # Second convolution with 32 filters of size 3 x 3 and relu activation function
    keras.layers.Conv2D(32,
                       (3,3),
                       activation='relu'),

    keras.layers.MaxPooling2D(2,2),

    # Third convolution with 64 filters of size 3 x 3 and relu activation function
    keras.layers.Conv2D(64,
                       (2,2),
                       activation='relu'),

    keras.layers.MaxPooling2D(2,2),

    # Flattening to transform the information into a vector
    keras.layers.Flatten(),
    # Dense layer with 512 neurons and relu activation function
    keras.layers.Dense(512,activation='relu'),
    keras.layers.Dense(512,activation='relu'),
    # Output layer with softmax activation function
    keras.layers.Dense(10, activation = tf.nn.softmax),
]

# Create the model with the layers defined above
fixed_model = keras.Sequential(layers, name='Fixed model')

# Configure the Adam optimizer with a learning rate of 0.001
opt = keras.optimizers.Adam(learning_rate=0.001)

# compile the model and show the summary
fixed_model.compile(optimizer=opt,
              loss='categorical_crossentropy',
              metrics=['accuracy'])

# Network structure
fixed_model.summary()

In [None]:
# if we are not testing the hyperparameters, train the model and evaluate it
if not allow_testing:
    # Train the model using the training set
    history = fixed_model.fit(train_generator,
                        steps_per_epoch = 4,
                        epochs = 50)

    # Evaluation of the model using the test set
    results = fixed_model.evaluate(test_generator)

    # show the results
    print(results)


In the following code block we will the define the possible testing cofigurations that our model will use for testing.

In [None]:
# configure the testing parameters

# define the configuration for when we use 3 convolutional layers
kernels_3_layers =[
    { "kernels": [(11,11), (5,5), (3,3)],
      "filters": [16, 32, 64],
    },
    { "kernels": [(9,9), (5,5), (3,3)],
      "filters": [16, 32, 64],
    },
    { "kernels": [(7,7), (5,5), (3,3)],
      "filters": [16, 32, 64],
    }
]

# define the configuration for when we use 2 convolutional layers
kernels_2_layers = [
    { "kernels": [(11,11), (5,5)],
      "filters": [16, 32],
    },
    { "kernels": [(9,9), (5,5)],
      "filters": [16, 32],
    },
    { "kernels": [(7,7), (5,5)],
      "filters": [16, 32],
    }
]

# define the configuration for when we use 4 convolutional layers
kernels_4_layers = [
    { "kernels": [(11,11), (5,5), (3,3), (3,3)],
      "filters": [16, 32, 64, 128],
    },
    { "kernels": [(9,9), (5,5), (3,3), (3,3)],
      "filters": [16, 32, 64, 128],
    },
    { "kernels": [(7,7), (5,5), (3,3), (3,3)],
      "filters": [16, 32, 64, 128],
    }
]
# consolidate the kernel configurations into a single dictionary
kernels = {
    2: kernels_2_layers,
    3: kernels_3_layers,
    4: kernels_4_layers
}

# define the number of convolutional layers
convolutional_layers = [2, 3, 4]

# define the number of dense layers before the softmax output layer
Dense_layers = [1, 2, 3]   

Once the different hyperarameter testing configurations have been defined, we will try them and store the models so that we can compare between them.

In [None]:
# if we do allow testing, we will test the model with the previously defined configurations
if allow_testing:
    stored_models_dir = './stored_models/'
    results_file = './results.csv'
    
    # create the directory if it does not exist
    if not os.path.exists(stored_models_dir):
        os.makedirs(stored_models_dir)
    
    # if the results file does not exist, create it and write the header
    if not os.path.exists(stored_models_dir + results_file):
        with open(stored_models_dir + results_file, 'w') as file:
            file.write("Convolutional Layers, Dense Layers, Kernels, Filters, Accuracy, Loss\n")
    

    # iterate through the number of convolutional layers
    for num_conv_layers in convolutional_layers:
        # iterate through the number of dense layers
        for num_dense_layers in Dense_layers:
            # iterate through the kernel configurations
            for kernel_config in kernels[num_conv_layers]:
                # create the layers for the model
                layers = []
                # iterate through the kernel configurations
                for i in range(num_conv_layers):
                    layers.append(keras.layers.Conv2D(kernel_config["filters"][i],
                                                       kernel_config["kernels"][i],
                                                       activation='relu',
                                                       input_shape=(input_width, input_height, num_colors)))
                    layers.append(keras.layers.MaxPooling2D(2, 2))
                # add the flatten layer
                layers.append(keras.layers.Flatten())
                # add the dense layers
                for i in range(num_dense_layers):
                    layers.append(keras.layers.Dense(512, activation='relu'))
                # add the output layer
                layers.append(keras.layers.Dense(10, activation = tf.nn.softmax))
                # create the model
                model = keras.Sequential(layers, name=f'{num_conv_layers} conv layers, {num_dense_layers} dense layers')
                # compile the model
                model.compile(optimizer=opt,
                              loss='categorical_crossentropy',
                              metrics=['accuracy'])
                # train the model
                history = model.fit(train_generator,
                                    steps_per_epoch = 4,
                                    epochs = 50)           

                # evaluate the model
                results = model.evaluate(test_generator)

                # get the loss and accuracy
                accuracy = results[1]
                loss = results[0]
                
                # set a name for the model
                name = f'acc:{accuracy}, loss:{loss}, {stored_models_dir}{num_conv_layers} conv layers, {num_dense_layers} dense layers, {kernel_config["kernels"]}, {kernel_config["filters"]}.h5'

                # save the model
                model.save(name)
                print(f'Saved: {name}')

                #Write the results to a csv file so we can print it later as a table
                with open(stored_models_dir + results_file, 'a') as file:
                    file.write(f'{num_conv_layers}, {num_dense_layers}, {kernel_config["kernels"]}, {kernel_config["filters"]}, {accuracy}, {loss}\n')
                    

# Create the confussion matrix for a chosen model

In [None]:
# check if we want to use a saved model
use_saved_model = False

# if we want to use a saved model, set the path to the model
if use_saved_model:
    # set the path to the model
    model_path = './stored_models/3 conv layers, 2 dense layers, (9, 9, 3), [16, 32].h5'
    
    # load the model
    model = keras.models.load_model(model_path)
    print(f'Loaded model from {model_path}')
else:
    model = fixed_model
    
# array for the predictions
predictions = []

# array for the true labels
true_labels = []

# Iterate over each batch in the test generator
for i in range(len(test_generator)):
    # Get batch of data and labels and predict on it
    batch_data, batch_labels = test_generator[i]
    batch_predictions = model.predict(batch_data, verbose=0)
    # Append predictions and true labels
    predictions.extend(np.argmax(batch_predictions, axis=1))
    true_labels.extend(np.argmax(batch_labels, axis=1))

cm = confusion_matrix(true_labels, predictions)
labels = ['C0', 'C1', 'C2', 'C3', 'C4', 'C5', 'C6', 'C7', 'C8', 'C9']
disp = ConfusionMatrixDisplay(cm, display_labels=labels)

# Plot the confusion matrix
disp.plot(cmap='viridis', text_kw={'size': 20, 'weight': 'bold', 'color': 'white', 'path_effects': [patheffects.withStroke(linewidth=5, foreground='black')]})

plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()

# Testing an uploaded image

In [None]:
label_names = ['safe driving', 'texting - right', 'talking on the phone - right', 'texting - left', 'talking on the phone - left', 'operating the radio', 'drinking', 'reaching behind', 'hair and makeup', 'talking to passenger']
minimum_value = 0.78

import math

# We use the file insertion system of Colab
uploaded = files.upload()

for filename, filedata in uploaded.items():
    # Display the image using IPython.display.Image
    display(Image(data=filedata, width=300))

    # Image path configuration
    path = '/content/' + filename

    # Preprocess the image
    img = image.load_img(path, target_size=(96, 96))
    x = image.img_to_array(img)
    x = np.expand_dims(x, axis=0)

    # Image transformation into a tensor
    image_tensor = np.vstack([x])
    #scale the tensor values between 0 and 1
    image_tensor = image_tensor / 255.0

    # Inference execution
    classes = model.predict(image_tensor)

    # Get the indices of top three classes
    top_three_indices = np.argsort(classes[0])[::-1][:3]

    # Print the top three classes and their probabilities
    print(f"{filename} is classified as:")
    for i in top_three_indices:
        probability = math.trunc(classes[0][i]*1000000000)/1000000000
        print(f"   - {label_names[i]} with probability {probability}")

    # If no class meets the threshold
    if not any(classes[0][i] > minimum_value for i in range(len(classes[0]))):
        print(filename + ' is not classified in any class.')