In [None]:
!pip install opencv-python

In [19]:
import uuid
import time
import cv2
import os
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

## 1. Collect Images

In [6]:
labels = ['awake', 'drowsy']

IMAGES_PATH_TRAIN = os.path.join('data', 'train', 'images')
IMAGES_PATH_TEST = os.path.join('data', 'test', 'images')


number_imgs = 20

### 1.1. Collect Training Images

In [7]:
cap = cv2.VideoCapture(0)
for label in labels:
    print('Collecting Images for {}'.format(label))
    time.sleep(5)
    
    for image in range(number_imgs):
        print('Collecting Images for {}, and image number {}'.format(label, image))
        
        ret, frame = cap.read()
        imgname = os.path.join(IMAGES_PATH_TRAIN, label+'.'+str(uuid.uuid1())+'.jpg')
        frame = cv2.flip(frame, 1)
        cv2.imshow("Image Collection", frame)   
        
        time.sleep(2)
        cv2.imwrite(imgname, frame)
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

Collecting Images for awake
Collecting Images for awake, and image number 0
Collecting Images for awake, and image number 1
Collecting Images for drowsy
Collecting Images for drowsy, and image number 0
Collecting Images for drowsy, and image number 1


### 1.2. Collect Testing Images

In [None]:
cap = cv2.VideoCapture(0)
for label in labels:
    print('Collecting Images for {}'.format(label))
    time.sleep(5)
    
    for image in range(number_imgs):
        print('Collecting Images for {}, and image number {}'.format(label, image))
        
        ret, frame = cap.read()
        imgname = os.path.join(IMAGES_PATH_TEST, label+'.'+str(uuid.uuid1())+'.jpg')
        frame = cv2.flip(frame, 1)
        cv2.imshow("Image Collection", frame)   
        
        time.sleep(2)
        cv2.imwrite(imgname, frame)
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

## 2. Labeling images

In [None]:
!pip install labelme

In [8]:
!labelme

## 3. Data Augmentation

In [1]:
import albumentations as alb

ModuleNotFoundError: No module named 'albumentations'

In [None]:
#Initialize image augmentator
transform = alb.Compose([alb.RandomCrop(width = 450, height = 450),
                         alb.HorizontalFlip(p=0.5),
                         alb.RandomBrightnessContrast(p=0.2),
                         alb.RandomGamma(p=0.2),
                         alb.RGBShift(p=0.2),
                         alb.VerticalFlip(p=0.5)],
                        bbox_params=alb.BboxParams(format='albumentations',
                                                   label_fields = ['class_labels']))

In [None]:
for partition in ['train', 'test']:
    for image in os.listdir(os.path.join('data', partition, 'images')):
        
        #Load full image name 
        img = cv2.imread(os.path.join('data', partition, 'images', image))
        #Get the image name without the .jpg
        img_name = image.split('.')[0] + '.' + image.split('.')[1]
        #Format label path with the respect image name
        label_path = os.path.join('data', partition, 'labels', f'{img_name}.json')
        
    #Load label
    with open(label_path, "r") as f:
        label = json.load(f)
        
    #Get bounding box's coordinates
    coords = np.array(label['shapes'][0]['points']).flatten()
    #Normalize the coordinates
    coords = list(np.divide(coords, [640, 480, 640, 480]))
    
    #Get the label
    label = label['shapes'][0]['label']
    
    #Generate 60 images from one base image using augmentation
    for x in range(60):
        transformed = transform(image = img, bboxes = [coords], class_labels=[label])
        if transformed['bboxes'] == []:
            break
        
        #Write transformed image
        cv2.imwrite(os.path.join('data', 'augmented', partition, 'images', f'{img_name}.{x}.jpg'), transformed['images'])
        
        #Create a dictionanry that contain label, bounding box's coordinates and name of the image
        annotation = {}
        #Name of the image
        annotation['image'] = image
        #Bounding box's coordinates
        annotation['bbox'] = transformed['bboxes'][0]
        
        #Onehot coding the label
        if label == 'awake':
            annotation['class'] = 0
        else:
            annotation['class'] = 1
        
        #Write the dict into a json file
        with open(os.path.join('data', 'augmented', partition, 'labels', f'{img_name}.{x}.json'), 'w') as f:
            json.dump(annotation, f)

## 4. Create Data Pipeline for Training

In [None]:
#Define two functions to load images and labels
def load_image(x):
    image = tf.io.read_file(x)
    image = tf.io.decode_jpeg(image)
    return image

def load_labels(label_path):
    with open(label_path.numpy(), "r", encoding='utf-8') as f:
        label = json.load(f)
    return [label['class'], label['bbox']]


In [None]:
#Image pipeline
train_images = tf.data.Dataset.list_files('data/augmented/train/images/*.jpg', shuffle=False)
train_images = train_images.map(load_image)
train_images = train_images.map(lambda x: tf.image.resize(x, (288,288)))

test_images = tf.data.Dataset.list_files('data/augmented/test/images/*.jpg', shuffle=False)
test_images = test_images.map(load_image)
test_images = test_images.map(lambda x: tf.image.resize(x, (288,288)))

In [None]:
#Labels pipeline
train_labels = tf.data.Dataset.list_files('data/augmented/train/labels/*.json', shuffle=False)
train_labels = train_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

test_labels = tf.data.Dataset.list_files('data/augmented/test/labels/*.json', shuffle=False)
test_labels = test_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
#Zip images and labels 
train = tf.data.Dataset.zip((train_images, train_labels))
train = train.shuffle(1000)
train = train.batch(16)
train = train.prefetch(tf.data.AUTOTUNE)

test = tf.data.Dataset.zip((test_images, test_labels))
test = test.shuffle(1000)
test = test.batch(16)
test = test.prefetch(tf.data.AUTOTUNE)

## 5. Model Training

### 5.1. Build Model Structure

In [18]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Dense, GlobalMaxPooling2D, Dropout, BatchNormalization, Activation
from tensorflow.keras.applications import efficientnet_v2
from tensorflow.keras import regularizers

In [10]:
def build_model():
    input_layer = Input(shape=(288,288,3))
    base = efficientnet_v2.EfficientNetV2S(input_shape = (288, 288, 3), include_top = False, pooling="max")(input_layer)

    #Classification
    hidden1 = Dense(2024, kernel_regularizer = regularizers.L2(0.0001), use_bias = False)(base)
    norm1 = BatchNormalization()(hidden1)
    act1 = Activation('relu')(norm1)
    hidden3 = Dense(512, kernel_regularizer = regularizers.L2(0.00001), use_bias = False)(act1)
    norm2 = BatchNormalization()(hidden3)
    act3 = Activation('relu')(norm2)
    hidden4 = Dense(256, kernel_regularizer = regularizers.L2(0.00001), use_bias = False)(act3)
    norm3 = BatchNormalization()(hidden4)
    act4 = Activation('relu')(norm3)
    hidden6 = Dense(64, activation = 'relu')(act4)

    class_output = Dense(1, activation = 'linear')(hidden6)

    #Regression for bounding boxes
    hidden12 = Dense(2048, kernel_regularizer = regularizers.L2(0.0001), use_bias = False)(base)
    norm12 = BatchNormalization()(hidden12)
    act12 = Activation('relu')(norm12)
    hidden22 = Dense(1024, kernel_regularizer = regularizers.L2(0.00001), use_bias = False)(act12)
    norm12 = BatchNormalization()(hidden22)
    act22 = Activation('relu')(norm12)
    hidden32 = Dense(512, kernel_regularizer = regularizers.L2(0.000001), use_bias = False)(act22)
    norm22 = BatchNormalization()(hidden32)
    act32 = Activation('relu')(norm22)
    hidden42 = Dense(256, kernel_regularizer = regularizers.L2(0.000001))(act32)
#     #norm32 = BatchNormalization()(hidden42)
    act42 = Activation('relu')(hidden42)
    
    reg_output = Dense(4, activation = 'sigmoid')(act42)

    model = Model(inputs=input_layer, outputs = [class_output, reg_output])
    return model

In [11]:
model_struct = build_model()
model_struct.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_5 (InputLayer)           [(None, 288, 288, 3  0           []                               
                                )]                                                                
                                                                                                  
 efficientnetv2-s (Functional)  (None, 1280)         20331360    ['input_5[0][0]']                
                                                                                                  
 dense_6 (Dense)                (None, 2048)         2621440     ['efficientnetv2-s[0][0]']       
                                                                                                  
 dense_1 (Dense)                (None, 2024)         2590720     ['efficientnetv2-s[0][0]']   

### 5.2. Define Loss Functions

In [None]:
#Loss function for bounding boxes
def box_loss(y_pred, y_true):
    coords_delta = tf.reduce_sum(tf.square(y_pred[:,:2] - y_true[:,:2]))

    h_true = y_true[:,3] - y_true[:,1]
    w_true = y_true[:,2] - y_true[:,0]

    h_pred = y_pred[:,3] - y_pred[:,1]
    w_pred = y_pred[:,2] - y_pred[:,0]

    frame_delta = tf.reduce_sum(tf.square(h_true - h_pred) + tf.square(w_true - w_pred))

    return coords_delta + frame_delta

class_losses = tf.keras.losses.BinaryCrossentropy(from_logits = True)
box_losses = box_loss

### 5.3. Define Optimizer

In [None]:
batch_per_epochs = len(train)
lr_decay = (1./0.75-1)/batch_per_epochs
opt = tf.keras.optimizers.legacy.Adam(learning_rate=0.001, decay = lr_decay)

### 5.4. Build Custom Model Class

In [None]:
class DrowsyDetector(Model):
    def __init__(self, model, **kargs):
        super().__init__(**kargs)
        self.model = model

    def compile(self, opt, classloss, regloss, **kargs):
        super().compile(**kargs)
        self.classloss = classloss
        self.regloss = regloss
        self.opt = opt

    def train_step(self, batch, **kargs):

        X, y = batch
        
        #Optimization
        with tf.GradientTape() as tape:
            classes, coords = self.model(X, training=True)

            batch_classloss = self.classloss(y[0], classes)
            batch_regloss = self.regloss(tf.cast(y[1], tf.float32), coords)

            total_loss = batch_classloss + 0.5*batch_regloss 

            diff = tape.gradient(total_loss, self.model.trainable_variables)

        opt.apply_gradients(zip(diff, self.model.trainable_variables))

        return {'Train Total Loss':total_loss, 'Train Regression Loss':batch_regloss, 'Train Classification Loss':batch_classloss}

    def test_step(self, batch, **kargs):
        X, y = batch

        classes, coords = self.model(X, training=False)

        batch_classloss = self.classloss(y[0], classes)
        batch_regloss = self.regloss(tf.cast(y[1], tf.float32), coords)

        total_loss = batch_classloss + 0.5*batch_regloss

        return {'Total Loss':total_loss, 'Regression Loss':batch_regloss, 'Classification Loss':batch_classloss}

    def call(self, X, **kargs):
        return self.model(X, **kargs)

In [None]:
model = DrowsyDetector(model_struct)
model.compile(opt = opt, classloss = class_losses, regloss = box_losses)

In [None]:
checkpoint_path = 'training/checkpoint/weight/cp.ckpt'
checkpoint_dir = os.path.dirname(checkpoint_path)

cp_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_path, save_weights_only=True, verbose=1)

In [None]:
hist = model.fit(train, epochs = 40, validation_data = test, verbose = 1, callbacks = [cp_callback])

In [None]:
test_data = test.as_numpy_iterator().next()
test_images = test_data[0]
test_labels = test_data[1][0]
pred = model.predict(test_images)

pred_labels = [1 if x > 0.5 else 0 for x in pred[0]]

In [10]:
from sklearn.metrics import recall_score, accuracy_score, precision_score, f1_score

In [61]:
print('Accuracy: ' + str(accuracy_score(test_labels, pred_labels)))
print('Recall: '+str(recall_score(test_labels, pred_labels)))
print('Precision: '+str(precision_score(test_labels, pred_labels)))
print('F1 Score: '+str(f1_score(test_labels, pred_labels)))

Accuracy: 0.9326171875
Recall: 0.9862204724409449
Precision: 0.8898756660746003
F1 Score: 0.9355742296918768


In [None]:
model.save('models/eff', save_format='tf')

In [12]:
import keras

In [25]:
model = keras.models.load_model('models/eff', compile = False)

In [26]:
model.summary()

Model: "drowsy_detector"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 model (Functional)          [(None, 1),               23425249  
                              (None, 4)]                         
                                                                 
Total params: 23,425,249
Trainable params: 23,299,569
Non-trainable params: 125,680
_________________________________________________________________


In [29]:
cap = cv2.VideoCapture(0)
while(True):
    _, frame = cap.read()
    frame = cv2.flip(frame, 1)
    frame = frame[50:500, 50:500,:]
    
    #Pre-proccessing
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    resized = tf.image.resize(rgb, (288,288))
    
    
    #Make real time prediction
    pred = model.predict(np.expand_dims(resized, axis = 0))
    
    if pred[0] < 0.5:
        label = 'awake'
    else:
        label = 'drowsy'
    coords = pred[1][0]
    
    #Show the bounding box

    
    #Show the label
    if label == 'awake':
        cv2.rectangle(frame, 
                        tuple(np.multiply(coords[:2], [450, 450]).astype(int)),
                        tuple(np.multiply(coords[2:], [450, 450]).astype(int)), 
                        (255,0,0), 2)
            
        cv2.rectangle(frame, 
                          tuple(np.add(np.multiply(coords[:2], [450,450]).astype(int), 
                                        [0,-30])),
                          tuple(np.add(np.multiply(coords[:2], [450,450]).astype(int),
                                        [100,0])), 
                                (255,0,0), -1)
        
        cv2.putText(frame, label, tuple(np.add(np.multiply(coords[:2], [450,450]).astype(int),
                                                       [0,-5])),
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
    else: 
        cv2.rectangle(frame, 
                        tuple(np.multiply(coords[:2], [450, 450]).astype(int)),
                        tuple(np.multiply(coords[2:], [450, 450]).astype(int)), 
                        (0,0,255), 2)
            
        cv2.rectangle(frame, 
                          tuple(np.add(np.multiply(coords[:2], [450,450]).astype(int), 
                                        [0,-30])),
                          tuple(np.add(np.multiply(coords[:2], [450,450]).astype(int),
                                        [110,0])), 
                                (0,0,255), -1)
        
        cv2.putText(frame, label, tuple(np.add(np.multiply(coords[:2], [450,450]).astype(int),
                                                       [0,-5])),
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
    
    cv2.imshow('c', frame)
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

