# CISC/CMPE 452/COGS 400 Group Project - Distracted Driver Detection

Please put your name and student id

    Jared Taylor, 20075820


1. Create model
2. Train model
3. Test model

In [None]:
import os
import cv2
import glob
import numpy as np
from tensorflow import keras
from keras.applications.inception_v3 import InceptionV3
from sklearn.metrics import accuracy_score, precision_score, recall_score, confusion_matrix

# Data Preprocessing

In [None]:
#preprocessing class
class PreProcessing:
  '''
  Goes through all images, returns preprocessed tensor.
  '''

  def __init__(self, base_path):
      self.base_path = base_path
      self.kernel = np.array([[-1, -1, -1],
                   [-1, 8,-1],
                   [-1, -1, -1]])
      print(self.base_path)

  def get_colour_type(self, img_path):
    image = cv2.imread(img_path)
    if len(image.shape) == 3: return 3
    else: return 1

  def preprocess_image(self, img_path, height, width):
    '''
    Function takes the path to the image and applys the preprocessing.
    '''

    color_type = self.get_colour_type(img_path)

    if color_type == 1:
        img = cv2.imread(img_path, 0)
        img_gray = cv2.threshold(img,0,255,cv2.THRESH_TRUNC+cv2.THRESH_OTSU) 
        image_sharp = cv2.filter2D(src=img, ddepth=-1, kernel=self.kernel)

    elif color_type == 3:
        img = cv2.imread(img_path)
        img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        img_gray = cv2.threshold(img_gray,0,255,cv2.THRESH_TRUNC+cv2.THRESH_OTSU)
        image_sharp = cv2.filter2D(src=img, ddepth=-1, kernel=self.kernel)
        image_sharp = cv2.cvtColor(image_sharp, cv2.COLOR_BGR2GRAY)
    

    combined = cv2.add(image_sharp, img_gray[1])
    dst = cv2.resize(combined, (width, height))
    dst = cv2.cvtColor(dst, cv2.COLOR_GRAY2BGR)
    
    return dst

  def get_driver_data(self):
    '''
    Returns a dictionary of image name as the key and driver and class as value.
    '''

    driver_data = {}
    path = os.path.join(self.base_path,'driver_imgs_list.csv')

    print('Read drivers data')

    with open(path, 'r') as file:
      lines = file.readlines()
      lines = lines[1:]
    file.close()

    for line in lines:
      arr = line.strip().split(',')
      driver_data[arr[2]] = (arr[0], arr[1])
    
    return driver_data

  def load_train_data(self, height, width):
    '''
    loads driver data
    '''

    x_train = []
    y_train = []
    driver_ids = []

    driver_data = self.get_driver_data()

    print('Read train images')
    for class_number in range(10):
        print(f'Load folder c{class_number}')
        class_number_str = 'c' + str(class_number)
        path = os.path.join(self.base_path, 'imgs/train', class_number_str, '*.jpg')
        file_paths = glob.glob(path)  # Gets all file names matching given path.
        for file_path in file_paths:
            file_name = os.path.basename(file_path)
            image = self.preprocess_image(file_path, height, width)
            x_train.append(image)
            y_train.append(class_number)
            driver_id = driver_data[file_name][0]
            driver_ids.append(driver_id)

    return x_train, y_train, driver_ids

  # Not used since data has no classification
  def load_test_data(self, height, width):
    x_test = []
    x_test_ids = []
    
    print('Read test images')

    path = os.path.join(self.base_path, 'imgs/test/*.jpg')
    file_paths = glob.glob(path)
    number_of_files = len(file_paths)

    for count, file_path in enumerate(file_paths):
        file_name = os.path.basename(file_path)
        image = self.preprocess_image(file_path, height, width)
        x_test.append(image)
        x_test_ids.append(file_name)
        if count % 1000 == 0:
            print(f"Read {count} images from {number_of_files}")

    return x_test, x_test_ids


  def split_train_data_on_class(self, x_train, y_train):
    '''
    split training data into new train and test sets, based off percentage in each classification
    '''

    newTrain_x = []
    newTrain_y = []
    newTest_x = []
    newTest_y = []
    c = [[],[],[],[],[],[],[],[],[],[]] #[c0, c1, c2, c3, c4, c5, c6, c7, c8, c9]
    #fill classified train data
    for ind in range(len(y_train)):
      cls = y_train[ind]
      c[cls].append(x_train[ind])
    #for each classification, split 75% train, 25% test
    currentClass = 0
    for cls in c:
      splitPoint = int((len(cls) * 0.75) // 1)
      for ind in range(len(cls)):
        if ind <= splitPoint:
          newTrain_x.append(cls[ind])
          newTrain_y.append(currentClass)
        else:
          newTest_x.append(cls[ind])
          newTest_y.append(currentClass)
      currentClass += 1
    return np.array(newTrain_x), np.array(newTrain_y), np.array(newTest_x), np.array(newTest_y)


  def split_train_data_on_driver(self, x_train, y_train, driver_ids):
    '''
    divide train data into new test and train based on driver ids
    '''

    idList = []
    newTrain_x = []
    newTrain_y = []
    newTest_x = []
    newTest_y = []
    for driver in driver_ids:
        if driver not in idList:
            idList.append(driver)
    trainData = idList[:20]
    testData = idList[20:]
    #iterate trough x_train, compare driver_ids
    for ind in range(len(x_train)):
        if driver_ids[ind] in trainData:
            newTrain_x.append(x_train[ind])
            newTrain_y.append(y_train[ind])
        if driver_ids[ind] in testData:
            newTest_x.append(x_train[ind])
            newTest_y.append(y_train[ind])
    return np.array(newTrain_x), np.array(newTrain_y), np.array(newTest_x), np.array(newTest_y)


In [None]:
def shuffle_data(x, y):
    '''
    shuffle x and y of data set
    '''
    
    # x and y are same length
    temp = []
    for i in range(len(x)):
        temp.append((x[i], y[i]))
    np.random.shuffle(temp)
    t_x = []
    t_y = []
    for j in range(len(temp)):
        t_x.append(temp[j][0])
        t_y.append(temp[j][1])
    x = t_x
    y = t_y
    return np.array(x), np.array(y)

In [None]:
# get processed data
PATH = 'data'
p = PreProcessing(PATH)
x_data, y_data, driver_ids = p.load_train_data(112, 112)

In [None]:
# split train data and shuffle training set
x_train, y_train, x_test, y_test = p.split_train_data_on_class(x_data, y_data)
x_train, y_train = shuffle_data(x_train, y_train)
print(x_train.shape)
print(x_test.shape)

In [None]:
# data visualization
temp = x_test[0]
cv2.imshow('image', temp)
cv2.waitKey(0)
cv2.destroyAllWindows

# Model Implementation

In [None]:
class ICV3:
    '''
    creates model
    '''

    def __init__(self, width, height):
        inputShape = (width, height, 3)
        initLearningRate = 0.01
        self.lrSchedule = keras.optimizers.schedules.ExponentialDecay(initLearningRate, decay_steps = 526, decay_rate = 0.95, staircase = True)
        temp = keras.callbacks.ModelCheckpoint
        self.ICV3 = self.buildModel(inputShape)
        self.ICV3.summary()
    

    def fit(self, numEpochs, x_train, y_train, batchSize):
        self.ICV3.fit(x_train, y_train, epochs = numEpochs, batch_size = batchSize, verbose = 1)


    def buildModel(self, inputShape):
        preTrainedModel = InceptionV3(input_shape = inputShape, include_top = False, weights = None)
        WEIGHTS_PATH_NO_TOP = (
            'https://github.com/fchollet/deep-learning-models/'
            'releases/download/v0.5/'
            'inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5')
        iv3Weights = keras.utils.get_file('inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5', WEIGHTS_PATH_NO_TOP, cache_subdir='models',
                     file_hash='bcbd6486424b2319ff4ef7d526e38f63')
        preTrainedModel.load_weights(iv3Weights)
        for layer in preTrainedModel.layers:
            layer.trainable = False

        lastLayer = preTrainedModel.get_layer('mixed7')
        lastOutput = lastLayer.output

        x = keras.layers.Flatten()(lastOutput)
        x = keras.layers.Dense(1024, activation='relu')(x)
        x = keras.layers.BatchNormalization()(x)
        x = keras.layers.Dropout(0.2)(x)

        x = keras.layers.Flatten()(x)
        x = keras.layers.Dense(512, activation='relu')(x)
        x = keras.layers.BatchNormalization()(x)
        x = keras.layers.Dropout(0.2)(x)
        x = keras.layers.Dense(10, activation = 'softmax')(x)

        model = keras.Model(preTrainedModel.input, x)
        model.compile(optimizer = keras.optimizers.Adam(learning_rate = self.lrSchedule), loss = 'sparse_categorical_crossentropy', metrics = ['accuracy'])
        return model
        

In [None]:
epochs = 20
batchSize = 32
model1 = ICV3(112, 112)
model1.fit(epochs, x_train, y_train, batchSize)


In [None]:
# Accuracy and Scores
test_loss, test_acc = model1.ICV3.evaluate(x_test, y_test)
print(f'\nTest lost: {test_loss} -- Test accuracy: {test_acc}')
y_pred = model1.ICV3.predict(x_test, batch_size = 32, verbose = 1)

predicted = np.argmax(y_pred, axis=1)
cm = confusion_matrix(y_test, predicted)
print('Confusion matrix:')
for row in cm:
    print(row)
