# Machine Learning for Drowsiness Detection

First, download the data from this link:
http://vlm1.uta.edu/~athitsos/projects/drowsiness/ \\
Note that there is NOT enough space in standard colab to fit the whole dataset.
As such, we recommend doing feature extraction, face alignment, and feature engineering on a local machine and uploading the processed data back to google drive for training

## Data Preprocessing

###Face Extraction and Alignment

In [None]:
"""face_detection.ipynb

Automatically generated by Colaboratory.

Original file is located at
    https://colab.research.google.com/github/dortmans/ml_notebooks/blob/master/face_detection.ipynb

# Face detection using pre-trained model

We use following blog as a reference:
[Face detection with OpenCV and deep learning](https://www.pyimagesearch.com/2018/02/26/face-detection-with-opencv-and-deep-learning/)

Import required Python libraries
"""

import imutils
import numpy as np
import cv2
import os
import logging

def detectFace(image, model):

  # resize it to have a maximum width of 400 pixels
  im_resize = imutils.resize(image, width=400)

  # Use the [dnn.blobFromImage](https://www.pyimagesearch.com/2017/11/06/deep-learning-opencvs-blobfromimage-works/) function to construct an input blob by resizing the image to a fixed 300x300 pixels and then normalizing it.
  blob = cv2.dnn.blobFromImage(cv2.resize(im_resize, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0))

  #  Pass the blob through the neural network and obtain the detections and predictions
  model.setInput(blob)
  detections = model.forward()

  faces_bbox = []
  # Loop over the detections and draw boxes around the detected faces
  for i in range(0, detections.shape[2]):

    # extract the confidence (i.e., probability) associated with the prediction
    confidence = detections[0, 0, i, 2]

    # filter out weak detections by ensuring the `confidence` is
    # greater than the minimum confidence threshold
    if confidence > 0.5:
      # compute the (x, y)-coordinates of the bounding box for the object
      (h, w) = im_resize.shape[:2]
      box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
      (startX, startY, endX, endY) = box.astype("int")

      faces_bbox.append([startX, startY, endX, endY])

  return faces_bbox, im_resize

def detectAndSaveFace(image, model, image_dir, image_name, transpose=False, flip=False):

  if(transpose):
    image = image.transpose(1,0,2)
  
  if(flip):
    image = cv2.flip(image, -1)

  faces, img = detectFace(image, model);

  save_dir = './FacesDataSet/' + image_dir
  
  save_name = os.path.splitext(image_name)[0] + '_detected_face_'
  extension = os.path.splitext(image_name)[1]

  if(os.path.isdir(save_dir) == False):
    os.mkdir(save_dir)

  for i,face in enumerate(faces):
    face_im = cv2.resize(img[face[1]:face[-1], face[0]:face[2],:], (224,224))
    cv2.imwrite(save_dir + '/' + save_name + str(i) + extension, face_im)

if __name__ == "__main__":

  prototxt = 'deploy.prototxt'
  model = 'res10_300x300_ssd_iter_140000.caffemodel'
  model = cv2.dnn.readNetFromCaffe(prototxt, model)

  tp_images = {
    "01":['0'],
    "02":['5','10'],
    "03":['0'],
    "05":['0','5','10'],
    "07":['0','5'],
    "08":['0','5','10'],
    "10":['0','5','10'],
    "13":['0','5','10'],
    "14":['5'],
    "19":['0','5','10'],
    "20":['0'],
    "21":['0','5','10'],
    "22":['0','5','10'],
    "25":['0','5','10'],
    "26":['5'],
    "28":['0','5','10'],
    "30":['10'],
    "33":['0','5','10'],
    "38":['0','5','10'],
    "41":['5'],
    "42":['0','5','10'],
    "43":['0','5','10'],
    "47":['0','5','10'],
    "49":['0','5','10'],
    "50":['0','5','10'],
    "52":['0','5','10'],
    "53":['0','5','10'],
    "56":['0','5','10'],
    "57":['0','5','10'],
    "58":['0','5','10'],
    "59":['0','5','10']
  }

  flip_images = {
    "07":['0','5'],
    "08":['0','5','10'],
    "13":['0','5','10'],
    "14":['5'],
    "20":['0'],
    "25":['0','5','10'],
    "26":['5','10'],
    "30":['10'],
    "33":['0','5','10'],
    "38":['0','5','10'],
    "42":['0','5','10'],
    "45":['0','5','10'],
    "47":['0','5','10'],
    "49":['0','5','10'],
    "58":['0','5','10']
  }
  
  list_of_folders = []
  for i in range(2,3):
    dir = './DataSet/' + 'Sampled_frames' + str(i+1)
    sub_dir = 'Sampled_faces' + str(i+1)
    for j in range(2):
      folder_dir = dir + '_part' + str(j+1)
      sub_dir += '_part' + str(j+1)
      
      if(os.path.isdir('./FacesDataSet/' + sub_dir) == False):
        os.mkdir('./FacesDataSet/' + sub_dir)

      # print(folder_dir)
      for k in range(2,3):     
        sample_num = (i)*12 + j*6 + k
        print("Working on " + str(sample_num) + "...")
        input()
        sample_dir = "{:02d}".format(sample_num)
        list_of_folders.append(folder_dir + "/" + sample_dir)
        person_sub_dir = sub_dir +  '/' + sample_dir

        if(os.path.isdir('./FacesDataSet/' + person_sub_dir) == False):
          os.mkdir('./FacesDataSet/' + person_sub_dir)
        
        tp_status = None
        if sample_dir in tp_images.keys():
          tp_status = tp_images[sample_dir]

        flip_status = None
        if sample_dir in flip_images.keys():
          flip_status = flip_images[sample_dir]
        
        for label in [10]:#[0, 5, 10]:
          images_path = folder_dir + '/' + sample_dir + '/' + str(label)
          image_sub_dir = person_sub_dir +  '/' + str(label)
          try:
            for image_name in os.listdir(images_path):
              image_dir = images_path + '/' + image_name
              img = cv2.imread(image_dir)
              if img is not None:
                
                transpose = False
                if(tp_status is not None):
                  if(str(label) in tp_status):
                    transpose = True

                flip = False
                if(flip_status is not None):
                  if(str(label) in flip_status):
                    flip = True
                
                detectAndSaveFace(img, model, image_sub_dir, image_name, transpose, flip)

          except:
            logging.error(images_path + " does not exist!")
        print("Completed " + str(sample_num))      
  # for eachImage in
  print("LEN", len(list_of_folders))

### Optional: Downsample the Dataset (less number of frames)

In [None]:
train_image_paths = []
for classes_path in glob.glob(dataset_path + '/*'):
    classes.append(classes_path.split('/')[-1].split('\\')[-1])
    for train_idx in range(1,49):
        if train_idx < 10:
            train_image_paths.append(glob.glob(classes_path + '/0' + str(train_idx) + ' *'))
        else:
            train_image_paths.append(glob.glob(classes_path + '/' + str(train_idx) + ' *'))
        
print("Before Flattening :",len(train_image_paths))
train_image_paths = list(flatten(train_image_paths))
print("After Flattening :",len(train_image_paths))

In [None]:
valid_image_paths = []
for classes_path in glob.glob(dataset_path + '/*'):
    classes.append(classes_path.split('/')[-1].split('\\')[-1])
    for train_idx in range(49,61):
        if train_idx < 10:
            valid_image_paths.append(glob.glob(classes_path + '/0' + str(train_idx) + ' *'))
        else:
            valid_image_paths.append(glob.glob(classes_path + '/' + str(train_idx) + ' *'))
    
print("Before Flattening :"len(valid_image_paths))
valid_image_paths = list(flatten(valid_image_paths))
print("After Flattening :",len(valid_image_paths))

In [None]:
import shutil
import time
destination_train = "./Data/downsampled_clean/val/"
for each_img in valid_image_paths:
    file_number = "/".join(each_img.split("\\")).split('(')[-1].split(')')[0]
    if int(file_number) % 10 == 0:
        shutil.copy("/".join(each_img.split("\\")), destination_train + "/".join(each_img.split("\\")).split("/")[-2])

In [None]:
destination_train = "./Data/downsampled_clean/train/"
for each_img in train_image_paths:
    file_number = "/".join(each_img.split("\\")).split('(')[-1].split(')')[0]
    if int(file_number) % 10 == 0:
        shutil.copy("/".join(each_img.split("\\")), destination_train + "/".join(each_img.split("\\")).split("/")[-2])

### Splitting the Data into Training and Testing

In [None]:
####################################################
#       Create Train, Valid and Test sets
####################################################
dataset_path = 'FacesDataSet/Clean_Split_Data/' 

train_to_test_ratio = 0.8
train_to_validation_ratio = 0.8

image_paths = [] #to store image paths in list
classes = [] #to store class values

for classes_path in glob.glob(dataset_path + '/*'):
    classes.append(classes_path.split('/')[-1].split('\\')[-1])
    image_paths.append(glob.glob(classes_path + '/*'))
    
image_paths = list(flatten(image_paths))
# random.shuffle(image_paths)


print('train_image_path example: ', image_paths[-1])
print('class example: ', classes[-1])


#split train valid from train paths (80,20)
train_image_paths, test_image_paths = image_paths[:int(train_to_test_ratio*len(image_paths))], image_paths[int(train_to_test_ratio*len(image_paths)):]
# train_image_paths, valid_image_paths = train_image_paths[:int(train_to_validation_ratio*len(train_image_paths))], train_image_paths[int(train_to_validation_ratio*len(train_image_paths)):]

print("\nTrain size: {}\nValid size: {}\n".format(len(train_image_paths), len(test_image_paths)))

dataset_sizes = {'train': len(train_image_paths),
                'val' : len(test_image_paths)}

### Feature Engineering

In [None]:
def eye_aspect_ratio(eye):
	A = distance.euclidean(eye[1], eye[5])
	B = distance.euclidean(eye[2], eye[4])
	C = distance.euclidean(eye[0], eye[3])
	ear = (A + B) / (2.0 * C)
	return ear
def mouth_aspect_ratio(mouth):
    A = distance.euclidean(mouth[14], mouth[18])
    C = distance.euclidean(mouth[12], mouth[16])
    mar = (A ) / (C)
    return mar
def circularity(eye):
    A = distance.euclidean(eye[1], eye[4])
    radius  = A/2.0
    Area = math.pi * (radius ** 2)
    p = 0
    p += distance.euclidean(eye[0], eye[1])
    p += distance.euclidean(eye[1], eye[2])
    p += distance.euclidean(eye[2], eye[3])
    p += distance.euclidean(eye[3], eye[4])
    p += distance.euclidean(eye[4], eye[5])
    p += distance.euclidean(eye[5], eye[0])
    return 4 * math.pi * Area /(p**2)
def mouth_over_eye(eye):
    ear = eye_aspect_ratio(eye)
    mar = mouth_aspect_ratio(eye)
    mouth_eye = mar/ear
    return mouth_eye
def getFrame(sec):
    start = 180000
    vidcap.set(cv2.CAP_PROP_POS_MSEC, start + sec*1000)
    hasFrames,image = vidcap.read()
    return hasFrames, image

In [None]:
import numpy as np
import os.path
import imageio
import matplotlib.pyplot as plt
from mlxtend.image import extract_face_landmarks
import os
from scipy.spatial import distance
from PIL import Image


data = []
labels = []
failed_images=[]
break_flag = False
pre_path = '/content/drive/MyDrive/CIS 520 Project/Data_file_for_cis520_project'
working_path = '/content/drive/MyDrive/CIS 520 Project/Data_file_for_cis520_project/FacesDataSet/Sampled_faces1_part1'
save_path = pre_path + '/Feature Extraction'
#i = 0
count_fail=0
count_success=0
  # i += 1
# print("Each folder", eachFolder)
for i in os.listdir(working_path):
  #new directory location
  path = save_path + "/"+ str(i)
  os.mkdir(path)

  for num in [0, 5, 10]:
    data = []
    labels = []
    features = []
    count_fail=0
    count_success=0

    full_dir = working_path + "/" + str(i) + "/" + str(num)
    for image in os.listdir(full_dir):
      img = Image.open(full_dir + "/" + image)
      img = np.asarray(img)
      img_T = img.transpose((1, 0, 2))

      landmarks = extract_face_landmarks(img)
      landmarks_T = extract_face_landmarks(img_T)

      if sum(sum(landmarks)) != 0:
        data.append(landmarks)
        labels.append(num)
        count_success+=1
      elif sum(sum(landmarks_T)) != 0:
        data.append(landmarks_T)
        labels.append(num)
        count_success+=1
      else:
        count_fail+=1
        failed_images.append([image,i,num])

    for d in data:
      eye = d[36:68]
      ear = eye_aspect_ratio(eye)
      mar = mouth_aspect_ratio(eye)
      cir = circularity(eye)
      mouth_eye = mouth_over_eye(eye)
      features.append([ear, mar, cir, mouth_eye])
    features = np.array(features)
    labels = np.array(labels)
    print("Faces not detected =",count_fail)        
    
    np.savetxt(path + '/' + str(num) + "_features" + ".csv", features, delimiter = ",")
    np.savetxt(path + '/' + str(num) +"_labels" + ".csv", labels, delimiter = ",")        

###Data Imputation - Mean Data Imputation

In [None]:
import numpy as np
import numpy.ma as ma
from numpy import linalg as LA
import matplotlib.pyplot as plt
from scipy.spatial import distance
import random
import pandas
%matplotlib inline

In [None]:
def meanImpute(X_miss):
  '''
  Returns :
    X_imputed which has mean b the corresponding column instead of the missing values and same shape as X_miss.
  '''
  X_imputed = X_miss.copy()
  
  not_nan = ~np.isnan(X_imputed)
  column_mean = np.nanmean(X_imputed, axis=0)
  inds = np.where(np.isnan(X_imputed)) 
  X_imputed[inds] = np.take(column_mean, inds[1])
 
  assert X_imputed.shape == X_miss.shape
  # np.savetxt("/content/drive/MyDrive/CIS 520 Project/NAN TRIAL.txt", X_imputed)
  return X_imputed

In [None]:
import os
path = "/content/drive/MyDrive/CIS 520 Project/Data_file_for_cis520_project/Landmarks with NANs Extract/Landmarks with NANs"
for eachPerson in os.listdir(path):
  for eachState in [0, 5, 10]:
    filePath = path + "/" + eachPerson + "/" + str(eachState) + "_features.csv"
    y_filePath = path + "/" + eachPerson + "/" + str(eachState) + "_labels.csv"

    data_csv_X = pandas.read_csv(filePath)
    data_csv_y = pandas.read_csv(y_filePath)
    data_csv_X = data_csv_X.to_numpy()
    data_csv_y = data_csv_y.to_numpy()
    X_imputed = meanImpute(data_csv_X)
    
    savePath = "/content/drive/MyDrive/CIS 520 Project/Data_file_for_cis520_project/Landmarks without NANS/" + \
                str(eachState) + \
                "_features.csv" 
    print(savePath)
    np.savetxt(savePath, X_imputed.astype(int))

##Drowsiness MNIST (DNIST) Creation

In [None]:
import cv2
import matplotlib.pyplot as plt
from pandas.core.common import flatten
from sklearn.model_selection import train_test_split
import copy
import numpy as np
import random
import time, os
import glob

def draw_landmarks(img, landmark_tuple):
    routes = []
 
    for i in range(5):
        from_coordinate = landmark_tuple[i]
        to_coordinate = landmark_tuple[i+1]
        img = cv2.line(img, from_coordinate, to_coordinate, 1, 2)
 
    img = cv2.line(img, landmark_tuple[4], landmark_tuple[0], 1, 2)

    for i in range(6, 11):
        from_coordinate = landmark_tuple[i]
        to_coordinate = landmark_tuple[i+1]
        img = cv2.line(img, from_coordinate, to_coordinate, 1, 2)

    img = cv2.line(img, landmark_tuple[11], landmark_tuple[6], 1, 2)

    for i in range(12, 23):
        from_coordinate = landmark_tuple[i]
        to_coordinate = landmark_tuple[i+1]
        img = cv2.line(img, from_coordinate, to_coordinate, 1, 2)
        
    img = cv2.line(img, landmark_tuple[23], landmark_tuple[12], 1, 2)
    
    img = cv2.line(img, landmark_tuple[12], landmark_tuple[24], 1, 2)
    
    for i in range(24, len(landmark_tuple) - 1):
        from_coordinate = landmark_tuple[i]
        to_coordinate = landmark_tuple[i+1]
        img = cv2.line(img, from_coordinate, to_coordinate, 1, 2)
        
    img = cv2.line(img, landmark_tuple[len(landmark_tuple) - 1], landmark_tuple[24], 1, 2)
  
X = []
y = []

save_folder = "DNIST"  
    

for j in range(1, 61): 
    if (j < 10):
        s = "0"+str(j)
    else:
        s = str(j)

    for i in [0, 5, 10]:
        loaded_X = np.loadtxt("Adi_Features/Feature Extraction/"+s+"/"+str(i)+"_features.csv", delimiter=',')
        if(len(loaded_X) == 0):
            continue

        important_x = np.array(loaded_X)[:,72:]
        x_locs = np.int64(important_x[:,::2])
        y_locs = np.int64(important_x[:,1::2])
        images = np.zeros((x_locs.shape[0],224,224))
        for k, (x,y) in enumerate(zip(x_locs, y_locs)):
            images[k][y,x] = 1
            draw_landmarks(images[k], list(zip(x,y)))

            if j < 49:
                save_dir = save_folder + "/train/"+str(i) + "/"+ s + "_" + str(k) + ".jpeg"
            else:
                save_dir = save_folder + "/val/"+str(i) + "/"+ s + "_" + str(k) + ".jpeg"
            cv2.imwrite(save_dir, images[k]*255)

## Generate Time Series Data

In [None]:
X = []
y = []

for j in range(1, 61): 
    if (j < 10):
        s = "0"+str(j)
    else:
        s = str(j)
    for i in [0, 5, 10]:
        loaded_X = np.loadtxt("/content/drive/MyDrive/Data_file_for_cis520_project/All_Landmarks/"+s+"/"+str(i)+"_features.csv", delimiter=',')
        idx = np.arange(30, loaded_X.shape[0], 30)
        time_series = np.split(loaded_X, idx)
        if len(time_series)>=2:
            del time_series[-1]
            del time_series[0]
            X += time_series
            y_series = [int(i/5)] * len(time_series)
            y += y_series

        idx = np.arange(5, loaded_X.shape[0], 30)
        time_series = np.split(loaded_X, idx)
        if len(time_series)>=2:
            del time_series[-1]
            del time_series[0]
            X += time_series
            y_series = [int(i/5)] * len(time_series)
            y += y_series

        idx = np.arange(10, loaded_X.shape[0], 30)
        time_series = np.split(loaded_X, idx)
        if len(time_series)>=2:
            del time_series[-1]
            del time_series[0]
            X += time_series
            y_series = [int(i/5)] * len(time_series)
            y += y_series

        idx = np.arange(15, loaded_X.shape[0], 30)
        time_series = np.split(loaded_X, idx)
        if len(time_series)>=2:
            del time_series[-1]
            del time_series[0]
            X += time_series
            y_series = [int(i/5)] * len(time_series)
            y += y_series

        idx = np.arange(20, loaded_X.shape[0], 30)
        time_series = np.split(loaded_X, idx)
        if len(time_series)>=2:
            del time_series[-1]
            del time_series[0]
            X += time_series
            y_series = [int(i/5)] * len(time_series)
            y += y_series

        idx = np.arange(25, loaded_X.shape[0], 30)
        time_series = np.split(loaded_X, idx)
        if len(time_series)>=2:
            del time_series[-1]
            del time_series[0]
            X += time_series
            y_series = [int(i/5)] * len(time_series)
            y += y_series

In [None]:
print(len(X))
print(X[0].shape)
print(np.array(X).shape)
print(len(y))

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(np.array(X)[:,:,74:120], np.array(y), test_size=0.2, shuffle=False)
# X_train, X_test, y_train, y_test = train_test_split(np.array(X)[:,:,:], np.array(y), test_size=0.2, shuffle=False)

from sklearn import preprocessing
for i, x_item in enumerate(X_train):
    scaler = preprocessing.StandardScaler().fit(x_item[:,::2])
    X_train[i][:,::2] = scaler.transform(x_item[:,::2])

    scaler = preprocessing.StandardScaler().fit(x_item[:,1::2])
    X_train[i][:,1::2] = scaler.transform(x_item[:,1::2])

for i, x_item in enumerate(X_test):
    scaler = preprocessing.StandardScaler().fit(x_item[:,::2])
    X_test[i][:,::2] = scaler.transform(x_item[:,::2])

    scaler = preprocessing.StandardScaler().fit(x_item[:,1::2])
    X_test[i][:,1::2] = scaler.transform(x_item[:,1::2])

shuffler1 = np.random.permutation(len(X_train))
shuffler2 = np.random.permutation(len(X_test))
X_train = X_train[shuffler1]
y_train = y_train[shuffler1]
X_test = X_test[shuffler2]
y_test = y_test[shuffler2]

#Deep Learning Approaches

##Transfer Learning

In [None]:
import tensorflow as tf 

if tf.test.gpu_device_name(): 
    

    print('Default GPU Device:{}'.format(tf.test.gpu_device_name()))

else:

    print("Please install GPU version of TF")

In [None]:
print(tf.__version__)

In [None]:
from tensorflow.keras.layers import Input, Lambda, Dense, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.applications.resnet import ResNet50
# from tensorflow.keras.applications.resnet import preprocess_input
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import ImageDataGenerator,load_img
from tensorflow.keras.models import Sequential
import numpy as np
import glob
import random
import shutil
# from glob import glob
IMAGE_SIZE = [224, 224]
import os

Here, You can uncomment your model of choice and use that for transfer learning.

In [None]:
# Import the Vgg 16 library as shown below and add preprocessing layer to the front of VGG
# Here we will be using imagenet weights

vgg16 = VGG16(input_shape=IMAGE_SIZE + [3], weights='imagenet', include_top=False)
# res50 = ResNet50(input_shape=IMAGE_SIZE + [3], weights='imagenet', include_top=False)
# res18 = ResNet50(input_shape=IMAGE_SIZE + [3], weights='imagenet', include_top=False)

You can also chose between freezing the pretrained model weights for faster training, or alternatively training the whole model for potentially better accuracy.

In [None]:
# don't train existing weights
# for layer in vgg16.layers:
#     layer.trainable = False

# don't train existing weights
# for layer in res50.layers:
#     layer.trainable = False

# don't train existing weights
# for layer in res18.layers:
#     layer.trainable = False

In [None]:
# Use the Image Data Generator to import the images from the dataset
from tensorflow.keras.preprocessing.image import ImageDataGenerator

train_datagen = ImageDataGenerator(shear_range = 0.2,
                                   zoom_range = 0.2,
                                   horizontal_flip = True)

test_datagen = ImageDataGenerator()

Here, you can uncomment your dataset of choice:


1.   Downsampled dataset with fewer frames.
2.   Original Dataset with ~ 600 frames per video
3.   DNIST - Drowsiness MNIST Dataset



In [None]:
# Make sure you provide the same target size as initialied for the image size
training_set = train_datagen.flow_from_directory("Data/downsampled_clean/train",
                                                #  "FacesDataSet/clean_test_train_split/train",
                                                #  "DNIST/train",
                                                 target_size = (224, 224),
                                                 batch_size = 64,
                                                 class_mode = 'categorical', 
                                                 shuffle = True)

In [None]:
test_set = test_datagen.flow_from_directory("Data/downsampled_clean/val",
                                            #  "FacesDataSet/clean_test_train_split/val",
                                            #  "DNIST/val",
                                            target_size = (224, 224),
                                            batch_size = 64,
                                            class_mode = 'categorical', 
                                            shuffle = True)

Again, uncomment your model of choice

In [None]:
import tensorflow_hub as hub
feature_extractor_layer = hub.KerasLayer(
    # res18,
    # res50,
    vgg16,
    input_shape=(224, 224, 3),
    trainable=False)

model = tf.keras.Sequential([
  feature_extractor_layer,
  tf.keras.layers.Dense(3)
])

data_augmentation = tf.keras.Sequential([
  tf.keras.layers.RandomFlip('horizontal'),
  tf.keras.layers.RandomRotation(0.2),
])

In [None]:
image_batch, label_batch = next(iter(training_set))
feature_batch = model(image_batch)
print(feature_batch.shape)

In [None]:
global_average_layer = tf.keras.layers.GlobalMaxPool2D()
feature_batch_average = global_average_layer(feature_batch)
print(feature_batch_average.shape)

In [None]:
prediction_layer = tf.keras.layers.Dense(3)
prediction_batch = prediction_layer(feature_batch_average)
print(prediction_batch.shape)

In [None]:
inputs = tf.keras.Input(shape=(224, 224, 3))
x = data_augmentation(inputs)
x = preprocess_input(x)
# x = model(x, training=False)
x = model(x, training=True)
x = global_average_layer(x)
x = tf.keras.layers.Dropout(0.2)(x)
outputs = prediction_layer(x)
model = tf.keras.Model(inputs, outputs)

In [None]:
base_learning_rate = 0.0001
model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=base_learning_rate,momentum=0.9),
              loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
              metrics=['accuracy'])

In [None]:
model.summary()

Initial evalutaion of the model before training

In [None]:
initial_epochs = 50

loss0, accuracy0 = model.evaluate(test_set)

Fit the model and obtain results

In [None]:
history = model.fit(training_set,
                    epochs=initial_epochs,
                    validation_data=test_set)

## LSTM Model

In [None]:
n_input = len(X_train[0][0])
n_hidden = 17
n_classes = 3
decaying_learning_rate = True
learning_rate = 0.0025
init_learning_rate = 0.05
decay_rate = 0.96
decay_steps = 100000
global_step = tf.Variable(0, trainable=False)
lambd = 0.0015
epochs = 500
batch_size = 128
drop_out = 0.6

In [None]:
def LSTM_RNN(_X, _weights, _biases):
    _X = tf.transpose(_X, [1, 0, 2])
    _X = tf.reshape(_X, [-1, n_input]) 
    _X = tf.nn.relu(tf.matmul(_X, _weights['hidden']) + _biases['hidden'])
    _X = tf.split(_X, n_steps, 0)
    lstm_cell_1 = tf.contrib.rnn.BasicLSTMCell(n_hidden, forget_bias=1.0, state_is_tuple=True)
    lstm_cell_2 = tf.contrib.rnn.BasicLSTMCell(n_hidden, forget_bias=1.0, state_is_tuple=True)

    lstm_dropout1 = tf.contrib.rnn.DropoutWrapper(lstm_cell_1,input_keep_prob=0.6, output_keep_prob=0.6)
    lstm_dropout2 = tf.contrib.rnn.DropoutWrapper(lstm_cell_2,input_keep_prob=0.6, output_keep_prob=0.6)

    lstm_cells = tf.contrib.rnn.MultiRNNCell([lstm_dropout1, lstm_dropout2], state_is_tuple=True)
    outputs, states = tf.contrib.rnn.static_rnn(lstm_cells, _X, dtype=tf.float32)
    lstm_last_output = outputs[-1]
    return tf.matmul(lstm_last_output, _weights['out']) + _biases['out']

def extract_batch_size(_train, _labels, _unsampled, batch_size):
    shape = list(_train.shape)
    shape[0] = batch_size
    batch_s = np.empty(shape)
    batch_labels = np.empty((batch_size,1))
    
    for i in range(batch_size):
        uns = list(_unsampled)
        index = random.choice(uns)
        batch_s[i] = _train[index] 
        batch_labels[i] = _labels[index]
        uns.remove(index)
        
    return batch_s, batch_labels, _unsampled

def one_hot(y_):
    y_ = y_.reshape(len(y_))
    n_values = int(np.max(y_)) + 1
    return np.eye(n_values)[np.array(y_, dtype=np.int32)]

In [None]:
LABELS = [
    'alert',
    'low vigilant',
    'drowsy',
]

n_steps = 30

In [None]:
x = tf.placeholder(tf.float32, [None, n_steps, n_input])
y = tf.placeholder(tf.float32, [None, n_classes])

weights = {
    'hidden': tf.Variable(tf.random_normal([n_input, n_hidden])), 
    'out': tf.Variable(tf.random_normal([n_hidden, n_classes], mean=1.0))
}
biases = {
    'hidden': tf.Variable(tf.random_normal([n_hidden])),
    'out': tf.Variable(tf.random_normal([n_classes]))
}

pred = LSTM_RNN(x, weights, biases)

l2 = lambd * sum(tf.nn.l2_loss(tf_var) for tf_var in tf.trainable_variables())
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=y, logits=pred)) + l2
if decaying_learning_rate:
    learning_rate = tf.train.exponential_decay(init_learning_rate, global_step*batch_size, decay_steps, decay_rate, staircase=True)
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost,global_step=global_step)
correct_pred = tf.equal(tf.argmax(pred,1), tf.argmax(y,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

In [None]:
test_losses = []
test_accuracies = []
train_losses = []
train_accuracies = []
sess = tf.InteractiveSession(config=tf.ConfigProto(log_device_placement=True))
saver = tf.train.Saver(max_to_keep=3) 
init = tf.global_variables_initializer()
sess.run(init)
iters = epochs * len(X_train)

step = 1
time_start = time.time()
unsampled_indices = range(0,len(X_train))
while step * batch_size <= iters:
    if len(unsampled_indices) < batch_size:
        unsampled_indices = range(0,len(X_train)) 
    batch_xs, raw_labels, unsampled_indicies = extract_batch_size(X_train, y_train, unsampled_indices, batch_size)
    batch_ys = one_hot(raw_labels)
    if len(batch_ys[0]) < n_classes:
        temp_ys = np.zeros((batch_size, n_classes))
        temp_ys[:batch_ys.shape[0],:batch_ys.shape[1]] = batch_ys
        batch_ys = temp_ys
        
    _, loss, acc = sess.run([optimizer, cost, accuracy],feed_dict={x: batch_xs, y: batch_ys})
    train_losses.append(loss)
    train_accuracies.append(acc)
    
    if(step % 10 ==0) or (step == 1) or (step * batch_size > iters):
        print("Step" + str(step) +  ": learning_rate = " + "{:.6f}".format(sess.run(learning_rate)) + "\n" + "Training" + ":   batch_loss = " + "{:.6f}".format(loss) + ", accuracy = " + "{:.6f}".format(acc))
#               ":  Learning rate = " + "{:.6f}".format(sess.run(learning_rate)) + \
        loss, acc = sess.run([cost, accuracy],feed_dict={x: X_test,y: one_hot(y_test)})
        test_losses.append(loss)
        test_accuracies.append(acc)
        print("Test" + ":   batch_loss = " + "{:.6f}".format(loss) + ", accuracy = " + "{:.6f}".format(acc))

    step += 1
    if step % 100 == 0:
        saver.save(sess, "/content/drive/MyDrive/Data_file_for_cis520_project/Siming_LSTM_Model/lstm_asl.ckpt-" + str(step))
         
print("Optimization Finished!")

writer = tf.summary.FileWriter("/content/drive/MyDrive/Data_file_for_cis520_project/Siming_LSTM_Model",sess.graph)
writer.close()

one_hot_predictions, accuracy, final_loss = sess.run([pred, accuracy, cost],feed_dict={x:X_test, y: one_hot(y_test)})
test_losses.append(final_loss)
test_accuracies.append(accuracy)

print("Final result: " + "batch_loss = " + "{:.6f}".format(final_loss) + ", accuracy = " + "{:.6f}".format(accuracy))
time_stop = time.time()
print("Total time:  {}".format(time_stop - time_start))                                                                                    

In [None]:
k_values = range(1, len(test_accuracies)+1, 1)
t_accuracies = [0.3] + [0.3] + train_accuracies[::10]
plt.plot(k_values, t_accuracies, color='r', label = 'train accuracies')
plt.plot(k_values, test_accuracies, color='g', label='test accuracies')
plt.xlabel("Steps")
plt.ylabel("accuracies")
plt.title("The accuracies of Model 1")
plt.legend()
plt.show()

In [None]:
k_values = range(1, len(test_losses)+1, 1)
t_loss = [1.5] + [1.5] + train_losses[::10]
plt.plot(k_values, t_loss, color='r', label = 'train loss')
plt.plot(k_values, test_losses, color='g', label='test loss')
plt.xlabel("Steps")
plt.ylabel("Loss")
plt.title("The loss of Model 1")
plt.legend()
plt.show()

In [None]:
%matplotlib inline

font = {
    'family' : 'Bitstream Vera Sans',
    'weight' : 'bold',
    'size'   : 18
}
matplotlib.rc('font', **font)

width = 12
height = 12
plt.figure(figsize=(width, height))

# indep_train_axis = np.array(range(0，int(step)))
indep_train_axis = np.array(range(batch_size, (len(train_losses)+1)*batch_size, batch_size))
#plt.plot(indep_train_axis, np.array(train_losses),     "g--", label="Train losses")
plt.plot(indep_train_axis, np.array(train_accuracies), "g--", label="Train accuracies")

# indep_test_axis = np.append(np.array(range(0，int(step)))
indep_test_axis = np.append(np.array(range(batch_size, len(test_losses)*batch_size*8, batch_size*8)[:-1]), [iters])
#plt.plot(indep_test_axis, np.array(test_losses), "b-", linewidth=2.0, label="Test losses")
plt.plot(indep_test_axis, np.array(test_accuracies), "b-", linewidth=2.0, label="Test accuracies")
print(len(test_accuracies))
print(len(train_accuracies))

plt.title("Training performance over Iterations")
plt.legend(loc='lower right', shadow=True)
plt.ylabel('Training Performance')
plt.xlabel('Training Iteration')

plt.show()

predictions = one_hot_predictions.argmax(1)

print("Testing Accuracy: {}%".format(100*accuracy))

print("")
print("Precision: {}%".format(100*metrics.precision_score(y_test, predictions, average="weighted")))
print("Recall: {}%".format(100*metrics.recall_score(y_test, predictions, average="weighted")))
print("f1_score: {}%".format(100*metrics.f1_score(y_test, predictions, average="weighted")))

print("")
print("Confusion Matrix:")
print("Created using test set of {} datapoints, normalised to % of each class in the test dataset".format(len(y_test)))
confusion_matrix = metrics.confusion_matrix(y_test, predictions)


#print(confusion_matrix)
normalised_confusion_matrix = np.array(confusion_matrix, dtype=np.float32)/np.sum(confusion_matrix)*100


width = 12
height = 12
plt.figure(figsize=(width, height))
plt.imshow(
    normalised_confusion_matrix, 
    interpolation='nearest', 
    cmap=plt.cm.Blues
)
plt.title("Confusion matrix \n(normalised to % of total test data)")
plt.colorbar()
tick_marks = np.arange(n_classes)
plt.xticks(tick_marks, LABELS, rotation=90)
plt.yticks(tick_marks, LABELS)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()


## Basic Models

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, roc_auc_score, f1_score, precision_score, recall_score
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier
from sklearn.naive_bayes import BernoulliNB
from sklearn.tree import DecisionTreeClassifier
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn import metrics
import warnings
from numpy import mean
from numpy import std
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import RepeatedStratifiedKFold
# tune regularization for multinomial logistic regression
from numpy import mean
from numpy import std
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.linear_model import LogisticRegression
from matplotlib import pyplot

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
feature_df = pd.read_csv('/content/drive/MyDrive/Data_file_for_cis520_project/Feature Extraction (Normalized)/01/0.csv',sep=',')
print(feature_df)
feature_df.info()

In [None]:
label_df = pd.read_csv('/content/drive/MyDrive/Data_file_for_cis520_project/Feature Extraction/01/0_labels.csv',sep=',')
print(label_df)
label_df.info()

In [None]:
import os 
my_list = os.listdir('/content/drive/MyDrive/Data_file_for_cis520_project/Feature Extraction (Normalized)')

In [None]:
X = pd.read_csv('/content/drive/MyDrive/Data_file_for_cis520_project/Feature Extraction (Normalized)/01/0.csv', header=None).values
X = np.vstack([X, pd.read_csv('/content/drive/MyDrive/Data_file_for_cis520_project/Feature Extraction (Normalized)/01/5.csv', header=None).values])
X = np.vstack([X, pd.read_csv('/content/drive/MyDrive/Data_file_for_cis520_project/Feature Extraction (Normalized)/01/10.csv', header=None).values])
Y = pd.read_csv('/content/drive/MyDrive/Data_file_for_cis520_project/Feature Extraction/01/0_labels.csv', header=None).values
Y = np.vstack([Y, pd.read_csv('/content/drive/MyDrive/Data_file_for_cis520_project/Feature Extraction/01/5_labels.csv', header=None).values])
Y = np.vstack([Y, pd.read_csv('/content/drive/MyDrive/Data_file_for_cis520_project/Feature Extraction/01/10_labels.csv', header=None).values])
for i in my_list:
    for j in [0, 5, 10]:
        # print('/content/drive/MyDrive/Data_file_for_cis520_project/Feature Extraction/' + s + '/' + str(j) + '_features.csv')
        # print('/content/drive/MyDrive/Data_file_for_cis520_project/Feature Extraction/' + s + '/' + str(j) + '_labels.csv')
        X = np.vstack([X, pd.read_csv('/content/drive/MyDrive/Data_file_for_cis520_project/Feature Extraction (Normalized)/' + i + '/' + str(j) + '.csv', header=None).values])
        Y = np.vstack([Y, pd.read_csv('/content/drive/MyDrive/Data_file_for_cis520_project/Feature Extraction/' + i + '/' + str(j) + '_labels.csv', header=None).values])

Y = Y.astype(int)

np.savetxt("/content/drive/MyDrive/Data_file_for_cis520_project/FeatureCombined/X.csv", X, delimiter=",")
np.savetxt("/content/drive/MyDrive/Data_file_for_cis520_project/FeatureCombined/Y.csv", Y, delimiter=",")

### Logistic Regression

In [None]:
def get_models():
	models = dict()
	for p in [0.0, 0.0001, 0.001, 0.01, 0.1, 1.0]:
		# create name for model
		key = '%.4f' % p
		# turn off penalty in some cases
		if p == 0.0:
			# no penalty in this case
			models[key] = LogisticRegression(multi_class='multinomial', solver='lbfgs', penalty='none')
		else:
			models[key] = LogisticRegression(multi_class='multinomial', solver='lbfgs', penalty='l2', C=p)
	return models

In [None]:
def evaluate_model(model, X, y):
	# define the evaluation procedure
	cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
	# evaluate the model
	scores = cross_val_score(model, X, y, scoring='accuracy', cv=cv, n_jobs=-1)
	return scores

In [None]:
size = len(Y)
# p = np.random.permutation(size)
# X = X[p]
# Y = Y[p]

train_size = int(size * 0.8)
X_train = X[:train_size,:]
y_train = Y[:train_size,:]
X_test = X[train_size+1:,:]
y_test = Y[train_size+1:,:]

In [None]:
models = get_models()
# evaluate the models and store results
results, names = list(), list()
for name, model in models.items():
	# evaluate the model and collect the scores
	scores = evaluate_model(model, X_train, y_train)
	# store the results
	results.append(scores)
	names.append(name)
	# summarize progress along the way
	print('>%s %.3f (%.3f)' % (name, mean(scores), std(scores)))
# plot model performance for comparison
pyplot.boxplot(results, labels=names, showmeans=True)
pyplot.show()

In [None]:
model = LogisticRegression(multi_class='multinomial', penalty='none')

In [None]:
from sklearn import model_selection
cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
results = cross_val_score(model, X, Y, scoring='accuracy', cv=cv, n_jobs=-1)
print("Accuracy: %.3f (%.3f)" % (results.mean(), results.std()))

In [None]:
from sklearn.metrics import make_scorer
cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
f1_scorer = make_scorer(f1_score, pos_label="yes", average='macro')
results = cross_val_score(model, pd.DataFrame(X), pd.DataFrame(Y), scoring=f1_scorer, cv=cv, n_jobs=-1, error_score="raise")
print("f1: %.3f (%.3f)" % (results.mean(), results.std()))

In [None]:
cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
precision_score = make_scorer(precision_score, pos_label="yes", average='macro')
results = cross_val_score(model, X, Y, scoring=precision_score, cv=cv, n_jobs=-1)
print("precision: %.3f (%.3f)" % (results.mean(), results.std()))

In [None]:
cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
recall_score = make_scorer(recall_score, pos_label="yes", average='macro')
results = cross_val_score(model, X, Y, scoring=recall_score, cv=cv, n_jobs=-1)
print("recall: %.3f (%.3f)" % (results.mean(), results.std()))

In [None]:
cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
roc_auc_score = make_scorer(roc_auc_score, pos_label="yes", average='macro')
results = cross_val_score(model, X, Y, scoring=roc_auc_score, cv=cv, n_jobs=-1)
print("roc_auc: %.3f (%.3f)" % (results.mean(), results.std()))

### Dummy Classification

In [None]:
from sklearn.dummy import DummyClassifier
dummy_model = DummyClassifier(strategy="uniform")
cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
results = cross_val_score(dummy_model, X, Y, scoring='accuracy', cv=cv, n_jobs=-1)
print("Accuracy: %.3f (%.3f)" % (results.mean(), results.std()))

In [None]:
from sklearn.dummy import DummyClassifier
dummy_model = DummyClassifier(strategy="most_frequent")
cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
results = cross_val_score(dummy_model, X, Y, scoring='accuracy', cv=cv, n_jobs=-1)
print("Accuracy: %.3f (%.3f)" % (results.mean(), results.std()))

### Naive Bayes

In [None]:
from sklearn.naive_bayes import GaussianNB
model = GaussianNB()
model.fit(X_train, y_train);

In [None]:
y_pred = model.predict(X_test)

In [None]:
plt.scatter(X_train[:, 0], X_train[:, 1], c=y_train, s=50, cmap='RdBu')
lim = plt.axis()
plt.scatter(X_test[:, 0], X_test[:, 1], c=y_pred, s=20, cmap='RdBu', alpha=0.1)
plt.axis(lim);

In [None]:
from mpl_toolkits import mplot3d

%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt

idx = np.squeeze(y_train) == 0

arr = np.squeeze(y_train)
arr[np.squeeze(y_train) == 0] = 1
arr[np.squeeze(y_train) == 5] = 5
arr[np.squeeze(y_train) == 10] = 10


ax = plt.axes(projection='3d')

# Data for three-dimensional scattered points
zdata = X_train[:, 0]
xdata = X_train[:, 1]
ydata = X_train[:, 2]
output = X_train
ax.scatter3D(ydata, xdata, zdata, s=arr, c=X_train[:, 3], cmap='nipy_spectral');

In [None]:
from mpl_toolkits import mplot3d

%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt

idx = np.squeeze(y_train) == 0

ax = plt.axes(projection='3d')

# Data for three-dimensional scattered points
zdata = X_train[idx, 0]
xdata = X_train[idx, 1]
ydata = X_train[idx, 2]
output = X_train
ax.scatter3D(ydata, xdata, zdata, c=X_train[idx, 3], cmap='nipy_spectral');

In [None]:
from mpl_toolkits import mplot3d

%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt

idx = np.squeeze(y_train) == 5

ax = plt.axes(projection='3d')

# Data for three-dimensional scattered points
zdata = X_train[idx, 0]
xdata = X_train[idx, 1]
ydata = X_train[idx, 2]
output = X_train
ax.scatter3D(ydata, xdata, zdata, c=X_train[idx, 3], cmap='nipy_spectral');

In [None]:
from mpl_toolkits import mplot3d

%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt

idx = np.squeeze(y_train) == 10

ax = plt.axes(projection='3d')

# Data for three-dimensional scattered points
zdata = X_train[idx, 0]
xdata = X_train[idx, 1]
ydata = X_train[idx, 2]
output = X_train
ax.scatter3D(ydata, xdata, zdata, c=X_train[idx, 3], cmap='nipy_spectral');

In [None]:
from sklearn.metrics import zero_one_loss
1 - zero_one_loss(np.squeeze(y_test), y_pred)

### KNN

In [None]:
# import libraries
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from sklearn import datasets
from skimage import exposure
import matplotlib.pyplot as plt
import numpy as np
import cv2
import seaborn as sns

In [None]:
accuracies = []

# loop over various values of `k` for the k-Nearest Neighbor classifier

for k in range(1, 50, 2):
  model = KNeighborsClassifier(n_neighbors=k)
  model.fit(X_train, y_train)
  score = model.score(X_test, y_test)
  accuracies.append(score)

In [None]:
k_values = range(1, 50, 2)
plt.plot(k_values, accuracies, color='g')
plt.xlabel("K values")
plt.ylabel("Validation Accuracy")
plt.show()

### Decision Tree

In [None]:
from sklearn.tree import DecisionTreeClassifier

In [None]:
depth= [2,4,8,16,32, 64, 128]
random = [10,20,40,80,160]

for d in depth:
    for r in random:
        tree_clf = DecisionTreeClassifier(max_depth=d, random_state=r)
        tree_clf.fit(X_train, y_train)
        print("D: " + str(d) + " , R: " + str(r) + " , A: " + str(tree_clf.score(X_test, y_test)))

### Random Forest

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification

depth= [2,4,8,16,32, 64, 128]
random = [0, 10,20,40,80,160]

for d in depth:
    for r in random:
        clf = RandomForestClassifier(max_depth=d, random_state=r)
        clf.fit(X_train, y_train)
        print("D: " + str(d) + " , R: " + str(r) + " , A: " + str(clf.score(X_test, y_test)))

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification

X_bi_train = X_train[np.squeeze(np.logical_or(y_train==0, y_train == 10)),:]
y_bi_train = y_train[np.squeeze(np.logical_or(y_train==0, y_train == 10))]
X_bi_test = X_test[np.squeeze(np.logical_or(y_test==0, y_test == 10)),:]
y_bi_test = y_test[np.squeeze(np.logical_or(y_test==0, y_test == 10))]

depth= [2,4,8,16,32, 64, 128]
random = [0, 10,20,40,80,160]

for d in depth:
    for r in random:
        clf = RandomForestClassifier(max_depth=d, random_state=r)
        clf.fit(X_bi_train, y_bi_train)
        print("D: " + str(d) + " , R: " + str(r) + " , A: " + str(clf.score(X_bi_test, y_bi_test)))

### AutoML Model with Major Vote

In [None]:
%pip install auto-sklearn

In [None]:
import numpy as np
import math
import matplotlib.pyplot as plt
import operator
from sklearn.metrics import accuracy_score

import autosklearn.classification

loaded_X = []
loaded_y = []

for j in range(1, 61): 
    if (j < 10):
        s = "0"+str(j)
    else:
        s = str(j)
    for i in [0, 5, 10]:
        loaded_X.append(np.loadtxt("/content/drive/MyDrive/Data_file_for_cis520_project/All_Landmarks/"+s+"/"+str(i)+"_features.csv", delimiter=','))
        loaded_y.append(np.loadtxt("/content/drive/MyDrive/Data_file_for_cis520_project/All_Landmarks/"+s+"/"+str(i)+"_labels.csv", delimiter=','))

loaded_new_X=[]
loaded_new_y=[]
for i in range(len(loaded_X)):
  for j in range(len(loaded_X[i])):
    loaded_new_X.append(loaded_X[i][j])
    loaded_new_y.append(loaded_y[i][j])

#Splitting the data
train_X=np.array(loaded_new_X)[:70000,:]
train_y=np.array(loaded_new_y)[:70000]
test_X=np.array(loaded_new_X)[70000:,:]
test_y=np.array(loaded_new_y)[70000:]


shuffler1 = np.random.permutation(len(train_X))
shuffler2 = np.random.permutation(len(test_X))
train_X = train_X[shuffler1]
train_y = train_y[shuffler1]
test_X = test_X[shuffler2]
test_y = test_y[shuffler2]

import sklearn.metrics
import autosklearn.classification

automl = autosklearn.classification.AutoSklearnClassifier(
    time_left_for_this_task=600,
    per_run_time_limit=120, 
    resampling_strategy="cv",
    resampling_strategy_arguments={'folds': 4},
    )

automl.fit(train_X,
          train_y, test_X, test_y, dataset_name="Drowsiness_detection_automl_model")

import sklearn.metrics
predictions = automl.predict(test_X)
print("Accuracy score", sklearn.metrics.accuracy_score(test_y, predictions))

In [None]:
batch_X = []
batch_y = []

for j in range(50, 61): 
    if (j < 10):
        s = "0"+str(j)
    else:
        s = str(j)
    for i in [0, 5, 10]:
        loaded_X = np.loadtxt("/content/drive/MyDrive/Data_file_for_cis520_project/All_Landmarks/"+s+"/"+str(i)+"_features.csv", delimiter=',')
        length = (int) (loaded_X.shape[0] / 3)
        offset = (int) (length / 5)
        for k in range(5,length + 1,5):
            idx = np.arange(k, loaded_X.shape[0], length)
            time_series = np.split(loaded_X, idx)
            if len(time_series)>=2:
                del time_series[-1]
                batch_X += time_series
                y_series = [int(i/5)] * len(time_series)
                batch_y += y_series

batch_y = (np.array(batch_y) * 5).tolist()

from collections import Counter

def pipline(model, train_data):
    result = automl.predict(train_data)
    data = Counter(result)
    # print(result)
    return data.most_common(1)[0][0]

prediction = []
for bx in batch_X:
    prediction.append(pipline(automl, bx))

    
from sklearn.metrics import accuracy_score
from sklearn import metrics

accuracy_score(batch_y, prediction)

n_classes = 3
LABELS = [
    'alert',
    'low vigilant',
    'drowsy',
]

confusion_matrix = metrics.confusion_matrix(batch_y, prediction)


#print(confusion_matrix)
normalised_confusion_matrix = np.array(confusion_matrix, dtype=np.float32)/np.sum(confusion_matrix)*100


#print(confusion_matrix)
normalised_confusion_matrix = np.array(confusion_matrix, dtype=np.float32)/np.sum(confusion_matrix)*100


width = 12
height = 12
plt.figure(figsize=(width, height))
plt.imshow(
    normalised_confusion_matrix, 
    interpolation='nearest', 
    cmap=plt.cm.Blues
)
plt.title("Confusion matrix \n(normalised to % of total test data)")
plt.colorbar()
tick_marks = np.arange(n_classes)
plt.xticks(tick_marks, LABELS, rotation=90)
plt.yticks(tick_marks, LABELS)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()

AUTO ML

In [None]:
%pip install auto-sklearn

In [None]:
#train accuracy
import sklearn.metrics
predictions_train = automl.predict(train_X)
print("Accuracy score for training set", sklearn.metrics.accuracy_score(train_y, predictions))
predictions_test = automl.predict(train_X)
print("Accuracy score for testing set", sklearn.metrics.accuracy_score(train_y, predictions))