# Settings

In [None]:
%pip install mediapipe opencv-python pandas scikit-learn

In [19]:
import mediapipe as mp
import cv2
import time
import csv
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score #accuracy score
import pickle #library to save the model

In [33]:
mp_drawing = mp.solutions.drawing_utils
mp_holistic = mp.solutions.holistic # Holistic model that predicts 543 landmarks

with open('fatigue.pkl', 'rb') as f:
    model = pickle.load(f) #load the model
model

In [32]:
# Variables for eyes detection
eye_closed = False
eye_closed_start_time = 0
eye_closed_duration = 0
blink_counter = 0
blinks_rate_array = []
blinks_times = []

# Variables for average blinks per minute
window_size = 5
threshold_blinks = 5
start_time = time.time()

# Variables for mouth detection
is_mouth_open = False
lips_distance_threshold = 1

# Variables for face color detection
face_is_red = False
base_red = 255
base_green = 255
base_blue = 255

# Variables for score calculation
#score = 0

# Functions

In [31]:
def calculate_moving_average(blinks_rate, window_size):
    if len(blinks_rate) < window_size:
        return None
    window = blinks_rate[-window_size:] #get the last window_size blinks
    average = sum(window) / window_size #calculate the average
    return average

In [30]:
# return true if the person is fatigued
def detect_anomalies(blinks_rate, window_size, threshold):
    average = calculate_moving_average(blinks_rate, window_size)
    if average is None:
        return False
    if average > threshold:
        return True
    else:
        return False

In [29]:
def mouth_is_open(results, threshold):
    # extract the mouth landmarks
    landmark_upperlip_center = results.face_landmarks.landmark[13]
    landmark_bottomlip_center = results.face_landmarks.landmark[14]
    #define the distance between the upper and bottom lip when the mouth is closed taking the minimum value
    distance_Y = landmark_bottomlip_center.y - landmark_upperlip_center.y
    if distance_Y < threshold and distance_Y > 0: # the distance has to be positive and less than the threshold
        threshold = distance_Y #update the threshold with the new minimum value of distance_Y when the mouth is closed 
    if distance_Y > (threshold*10):
        is_open = True
    else:
        is_open = False
    
    return is_open, threshold

In [28]:
def is_face_red (frame, results, base_red, base_green, base_blue):
    # select the landmarks of the face
    face_landmarks = results.face_landmarks.landmark
    x_coords = [int(landmark.x * frame.shape[1]) for landmark in face_landmarks]
    y_coords = [int(landmark.y * frame.shape[0]) for landmark in face_landmarks]
    mask = np.zeros(frame.shape[:2], dtype=np.uint8) #create a mask with the same shape of the frame
    cv2.fillPoly(mask, [np.array(list(zip(x_coords, y_coords)))], (255, 255, 255)) #fill the mask with the face landmarks coordinates
    mean_color = cv2.mean(frame, mask=mask) #calculate the mean color of the face in the frame using the mask created before
    
    blue, green, red, _ = mean_color
    #base_blue, base_green, base_red, _ = base_color
    #print("Face Color RGB: ", red, green, blue)
    
    #check if the green and blue values of the face are unless 40 points lower than the base color (the face is red)
    if (base_green - green) > 40 and (base_blue - blue) > 40:
        return True, red, green, blue
    
    #check if the mean color of the face is red
    if red > 150 and green < 100 and blue < 100:
        return True, red, green, blue 
    
    else:
        return False, red, green, blue 
    

In [35]:
def compute_score (model_class, is_mouth_open, face_is_red, blinks_rate_array, window_size, threshold_blinks, blinks_times):
    score = 0
    
    if model_class == 'fatigue': 
        score = score + 40
    if is_mouth_open:
        score = score + 20
    if face_is_red:
        score = score + 20
    if detect_anomalies(blinks_rate_array, window_size, threshold_blinks) | (blinks_times[-1] > 5):
        score = score + 20
    # print(score)
    return score

# Detection


In [34]:
cap = cv2.VideoCapture(0)

# Initiate holistic model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        # Recolor Feed
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False        
        
        # Make Detections
        results = holistic.process(image)
        
        # Recolor image back to BGR for rendering
        image.flags.writeable = True   
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        

        
        # 1. Draw face landmarks
        mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION, 
                                 mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
                                 mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                                 )
        
        # 2. Right hand
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4),
                                 mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                                 )

        # 3. Left Hand
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4),
                                 mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                                 )
        
        # 4. Pose Detections
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
                                 mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                                 )
                        
        # Export coordinates
        try:
            # Extract Pose landmarks
            pose = results.pose_landmarks.landmark       
            pose_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in pose]).flatten())         
            # Extract Face landmarks
            face = results.face_landmarks.landmark
            face_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in face]).flatten())        
            elapsed_time = time.time() - start_time
            # Concate rows
            row = pose_row+face_row
            
            # Make Detections
            x = pd.DataFrame([row])
            model_class = model.predict(x)[0] #class of the model
            model_prob = model.predict_proba(x)[0] #probability of each class
            #print(model_class, model_prob)
            
            # Get status box
            cv2.rectangle(image, (0,0), (250, 60), (245, 117, 16), -1)
            
            # Display Class
            cv2.putText(image, 'CLASS'
                        , (95,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
            cv2.putText(image, model_class.split(' ')[0]
                        , (90,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
            
            # Display Probability
            cv2.putText(image, 'PROB'
                        , (15,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
            cv2.putText(image, str(round(model_prob[np.argmax(model_prob)],2))
                        , (10,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

            #Detect eye landmarks
            if results.face_landmarks is not None:
                left_eye_landmarks = [results.face_landmarks.landmark[33], results.face_landmarks.landmark[133], results.face_landmarks.landmark[159], results.face_landmarks.landmark[145], results.face_landmarks.landmark[153], results.face_landmarks.landmark[157]]
                right_eye_landmarks = [results.face_landmarks.landmark[362], results.face_landmarks.landmark[263], results.face_landmarks.landmark[386], results.face_landmarks.landmark[374], results.face_landmarks.landmark[380], results.face_landmarks.landmark[382]]
                # Calculate vertical distance between top and bottom of left eye
                left_eye_top = left_eye_landmarks[0].y
                left_eye_bottom = left_eye_landmarks[3].y
                left_eye_vertical_distance = left_eye_bottom - left_eye_top
                right_eye_top = right_eye_landmarks[0].y
                right_eye_bottom = right_eye_landmarks[3].y
                right_eye_vertical_distance = right_eye_bottom - right_eye_top
                # Calculate horizontal distance between left and right of left eye
                left_eye_left = left_eye_landmarks[4].x
                left_eye_right = left_eye_landmarks[5].x
                left_eye_horizontal_distance = left_eye_right - left_eye_left
                right_eye_left = right_eye_landmarks[4].x
                right_eye_right = right_eye_landmarks[5].x
                right_eye_horizontal_distance = right_eye_right - right_eye_left
                # Calculate eye aspect ratio
                left_eye_aspect_ratio = left_eye_vertical_distance / left_eye_horizontal_distance
                right_eye_aspect_ratio = right_eye_vertical_distance / right_eye_horizontal_distance
                # Calculate average eye aspect ratio
                average_eye_aspect_ratio = (left_eye_aspect_ratio + right_eye_aspect_ratio) / 2
                # Detect if eyes are closed
                if average_eye_aspect_ratio < 0.2:
                    if not eye_closed: #if eyes were open before, start timer
                        eye_closed_start_time = time.time()
                        blink_counter += 1
                        #print(blinks)
                        #print(blinks_times)
                    eye_closed = True
                    cv2.putText(image, 'Eyes Closed', (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
                else:
                    if eye_closed:
                        eye_closed_duration = time.time() - eye_closed_start_time
                        blinks_times.append(eye_closed_duration)
                        #print(blinks_times[-1])
                        #print(calculate_moving_average(blinks_rate_array, window_size)) # print moving average of blinks rate
                        #print('Eyes were closed for ' + str(round(eye_closed_duration, 2)) + ' seconds')
                    cv2.putText(image, 'Eyes Open', (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
                    eye_closed = False           
                
                
                
                
                # every 5 seconds, calculate blinks rate
                if elapsed_time >= 5:
                    blinking_rate = blink_counter / elapsed_time * 60
                    blinks_rate_array.append(blinking_rate)
                    #print('Blinking rate: ' + str(round(blinking_rate, 2)) + ' blinks per minute')
                    #print('Blinks in 10 seconds: ' + str(blink_counter))
                    blink_counter = 0
                    start_time = time.time()
                
                # Removing old rates from array if more than 20 seconds have passed
                if len(blinks_rate_array) > 20:
                    blinks_rate_array.pop(0)
                    blinks_times.pop(0)
                
                # Detect anomalies in blinks rate or if eyes are closed for more than 5 seconds
                # if detect_anomalies(blinks_rate_array, window_size, threshold_blinks) | (blinks_times[-1] > 5):
                #     cv2.putText(image, 'ANOMALY DETECTED', (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
                    #print('ANOMALY DETECTED')

                is_mouth_open, lips_distance_threshold = mouth_is_open(results, lips_distance_threshold)
                #print(is_mouth_open, lips_distance_threshold)
                
                #extract the BGR values
                face_is_red, red, green, blue = is_face_red(frame, results, base_red, base_green, base_blue)
                if (base_red == 255) & (base_green == 255) & (base_blue == 255): #assign base color if it is not assigned yet
                    base_red = red
                    base_green = green
                    base_blue = blue
                    #print('Base color is assigned', base_red, base_green, base_blue)
            
                #print(face_is_red, red, green, blue)
                
                # Compute score
                score = compute_score(model_class, is_mouth_open, face_is_red, blinks_rate_array, window_size, threshold_blinks, blinks_times)
                
                if score > 50 :
                    print('Alert!')
                
    
        except:
            pass             
                        
        cv2.imshow('Raw Webcam Feed', image)
        
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

20
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
60
60
60
60
60
60
60
60
60
60
60
60
60
60
60
40
40
40
40
40
40
40
40
40
40
40
40
40
40
40
40
60
60
60
60
60
60
60
60
60
60
60
20
20
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
20
0
0
0
0
0
20
20
0
0
20
0
0
20
20
20
20
20
20
20
20
20
20
20
20
20
20
20
20
40
40
40
40
40
20
20
20
20
20
20
20
20
20
20
20
40
20
20
20
20
20
20
20
20
20
20
20
20
20
20
20
20
40
20
20
20
20
40
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
80
