In [1]:
import mediapipe as mp
import cv2
import numpy as np
import os

In [2]:
actions = np.array(['neutral', 'drowsy', 'sleep'])

In [3]:
def extract_keypoints(results):
    '''
    Extract Face Keypoints
    '''
    face_keypoints = np.array([[res.x, res.y, res.z] for res in results.multi_face_landmarks[0].landmark]).flatten() if results.multi_face_landmarks else np.zeros(468*3)
    return face_keypoints

In [4]:
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
max_num_faces=1,
min_detection_confidence=0.3,
min_tracking_confidence=0.8
)

mp_drawing = mp.solutions.drawing_utils 
drawing_spec = mp_drawing.DrawingSpec(thickness=1, circle_radius=1)

In [5]:
def draw_Mesh(results):
    if results.multi_face_landmarks:
        for face_landmarks in results.multi_face_landmarks:
            mp_drawing.draw_landmarks(
                    image=image,
                    landmark_list = face_landmarks,
                    connections = mp_face_mesh.FACEMESH_TESSELATION,
                    landmark_drawing_spec = drawing_spec,
                    connection_drawing_spec = drawing_spec)

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


In [6]:
import tensorflow as tf

In [7]:
# CNN Model - 99.7% Accurate
interpreter = tf.lite.Interpreter(model_path="Drowsiness Detection/model.tflite")
interpreter.allocate_tensors()

# Get input and output tensors.
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

INFO: Created TensorFlow Lite delegate for select TF ops.
2022-03-04 20:36:32.351422: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-03-04 20:36:32.354153: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
INFO: TfLiteFlexDelegate delegate: 9 nodes delegated out of 36 nodes with 6 partitions.

INFO: TfLiteFlexDelegate delegate: 0 nodes delegated out of 1 nodes with 0 partitions.

INFO: TfLiteFlexDelegate delegate: 2 nodes delegated out of 18 nodes with 2 partitions.

INFO: TfLiteFlexDelegate delegate: 0 nodes delegated out of 1 nodes with 0 partitions.

INFO: TfLiteFlexDelegate delegate: 2 n

In [8]:
input_details

[{'name': 'serving_default_lstm_input:0',
  'index': 0,
  'shape': array([   1,    5, 1404], dtype=int32),
  'shape_signature': array([  -1,    5, 1404], dtype=int32),
  'dtype': numpy.float32,
  'quantization': (0.0, 0),
  'quantization_parameters': {'scales': array([], dtype=float32),
   'zero_points': array([], dtype=int32),
   'quantized_dimension': 0},
  'sparsity_parameters': {}}]

In [9]:
output_details

[{'name': 'StatefulPartitionedCall:0',
  'index': 74,
  'shape': array([1, 3], dtype=int32),
  'shape_signature': array([-1,  3], dtype=int32),
  'dtype': numpy.float32,
  'quantization': (0.0, 0),
  'quantization_parameters': {'scales': array([], dtype=float32),
   'zero_points': array([], dtype=int32),
   'quantized_dimension': 0},
  'sparsity_parameters': {}}]

In [10]:
# Detection Function
def detect(sequence):
    features = np.expand_dims(sequence, axis=0)
    features = features.astype("float32")
    interpreter.set_tensor(input_details[0]['index'], features)
    interpreter.invoke()
    prediction = interpreter.get_tensor(output_details[0]['index'])[0]
    
    return actions[np.argmax(prediction)], prediction[np.argmax(prediction)]

In [12]:
cap = cv2.VideoCapture(0)

sequence = []
prediction = ""
threshold = 0.8
thres = 0.0

while cap.isOpened():
    ret, frame = cap.read()
    image = cv2.flip(frame, 1)
    img_h, img_w, channels = frame.shape
    
    rgb_frame = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    img_h, img_w = frame.shape[:2]
    rgb_frame.flags.writeable = False
    results = face_mesh.process(rgb_frame)
    rgb_frame.flags.writeable = True
    draw_Mesh(results)
    
    keypoints = extract_keypoints(results)
    sequence.append(keypoints)
    sequence = sequence[-5:]
    
    if len(sequence) == 5:
        prediction, thres = detect(sequence)
    
    if thres > threshold:
        if prediction != "":
            print(prediction)
            cv2.putText(image, prediction, (3,30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
    cv2.imshow("Feed", image)
    
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()

neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
drowsy
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neutral
neut

In [13]:
cap.release()
cv2.destroyAllWindows()

'neutral'