# Installation of Software and Packages

Note: Importing mediapipe may cause some issues, please restart the runtime and run all cells again.

In [None]:
import cv2
import os
import numpy as np
from datetime import datetime
import os
import math
from joblib import dump, load
from google.colab import drive
import time
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!git clone https://github.com/AssemblyAI-Examples/mediapipe-python.git
!pip uninstall -y mediapipe
!pip install mediapipe --upgrade

fatal: destination path 'mediapipe-python' already exists and is not an empty directory.
Found existing installation: mediapipe 0.10.21
Uninstalling mediapipe-0.10.21:
  Successfully uninstalled mediapipe-0.10.21
Collecting mediapipe
  Using cached mediapipe-0.10.21-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
Using cached mediapipe-0.10.21-cp311-cp311-manylinux_2_28_x86_64.whl (35.6 MB)
Installing collected packages: mediapipe
Successfully installed mediapipe-0.10.21


In [None]:
import mediapipe as mp

# Data Preparation

Here, we prepare data to be input to the model. We first extract the features into the eye, head, and pose streams. Then, we process the data to extract the coordinates for each datapoint


In [None]:
input = '/content/drive/My Drive/multimodal/test/drowsy/d_109'

In [None]:
start_time = time.time()

# MediaPipe


In [None]:
# RATIO = 0.8 # Test-train ratio
# DATA_SET_SIZE = 100
# VALIDATION_SET_SIZE = 100
# FRAMES = 240
# TRAIN_PATH = "/content/drive/My Drive/multimodal/train/"
# TEST_PATH = "/content/drive/My Drive/multimodal/test/"
# VALIDATION_PATH = "/content/drive/My Drive/multimodal/validate/"
# mapping = {'d': "drowsy", 'n':"nondrowsy"}

In [None]:
weights = np.array([1.87858007, 3.86422247, 3.73796147])
offset = -4.839405413695547
THRESHOLD = 0.59

In [None]:
mp_drawing = mp.solutions.drawing_utils
mp_face_mesh = mp.solutions.face_mesh
mp_pose = mp.solutions.pose

In [None]:
#identify which indices of the facemesh correspond to eyes, as opposed to face
EYE_IDX = set()
for i in mp_face_mesh.FACEMESH_IRISES:
  EYE_IDX.add(i[0])
  EYE_IDX.add(i[1])
for i in mp_face_mesh.FACEMESH_LEFT_EYE:
  EYE_IDX.add(i[0])
  EYE_IDX.add(i[1])
for i in mp_face_mesh.FACEMESH_RIGHT_EYE:
  EYE_IDX.add(i[0])
  EYE_IDX.add(i[1])

# specify UPPER body pose points
POSE_IDX = [mp_pose.PoseLandmark.NOSE, \
            mp_pose.PoseLandmark.LEFT_EAR, mp_pose.PoseLandmark.RIGHT_EAR, \
            mp_pose.PoseLandmark.MOUTH_LEFT, mp_pose.PoseLandmark.MOUTH_RIGHT, \
            mp_pose.PoseLandmark.LEFT_SHOULDER, mp_pose.PoseLandmark.RIGHT_SHOULDER]

FACE_NO_EYES_LEN = 438

print(EYE_IDX)
print(POSE_IDX)

{384, 385, 386, 387, 388, 133, 390, 263, 7, 398, 144, 145, 153, 154, 155, 157, 158, 159, 160, 33, 161, 163, 173, 466, 469, 470, 471, 472, 474, 475, 476, 477, 362, 373, 374, 246, 249, 380, 381, 382}
[<PoseLandmark.NOSE: 0>, <PoseLandmark.LEFT_EAR: 7>, <PoseLandmark.RIGHT_EAR: 8>, <PoseLandmark.MOUTH_LEFT: 9>, <PoseLandmark.MOUTH_RIGHT: 10>, <PoseLandmark.LEFT_SHOULDER: 11>, <PoseLandmark.RIGHT_SHOULDER: 12>]


 # Feature Extraction


In [None]:
def extract_features_by_sample(sample):
  """
  given just one sample, extract the features of EYE (iris), HEAD, POSE from that sample
  """
  eye = []
  head = []
  pose =[]
  sorted_sample = sorted(os.listdir(sample), key=lambda x: int(x.split('.')[0]))
  with mp_face_mesh.FaceMesh(
        static_image_mode=False,
        max_num_faces=1,
        refine_landmarks=True) as face_mesh, \
        mp_pose.Pose(
        static_image_mode=False,
        model_complexity=1) as pose_mesh:
        for img in sorted_sample:
          frame = cv2.imread(os.path.join(sample, img))

          eye_features = []
          head_features = []
          face_results = face_mesh.process(frame)
          if face_results.multi_face_landmarks:
            landmarks = face_results.multi_face_landmarks[0].landmark
            for idx, landmark in enumerate(landmarks):
              if idx in EYE_IDX:
                eye_features.append([landmark.x, landmark.y, landmark.z])
              else:
                head_features.append([landmark.x, landmark.y, landmark.z])

          pose_features = []
          pose_results = pose_mesh.process(frame)
          if pose_results.pose_landmarks:
            for i, landmark in enumerate(pose_results.pose_landmarks.landmark):
              if i in POSE_IDX:
                pose_features.append([landmark.x, landmark.y, landmark.z])
          eye.append(eye_features)
          head.append(head_features)
          pose.append(pose_features)
  return eye, head, pose


In [None]:
def extract_all_features(path):
  """
  given a path to all samples, runs feature extraction
  """
  eyes = []
  heads = []
  poses = []
  e, h, p = extract_features_by_sample(path)
  eyes.append(e)
  heads.append(h)
  poses.append(p)
  print(" done")
  return eyes, heads, poses

In [None]:
input_eye, input_head, input_pose = extract_all_features(input)
print(input_head)

 done
[[[], [[0.4512869715690613, 0.6349046230316162, -0.03132558614015579], [0.43519943952560425, 0.6175229549407959, -0.05271286144852638], [0.4316452741622925, 0.6256382465362549, -0.03144863620400429], [0.4222494065761566, 0.5967892408370972, -0.03805587440729141], [0.4354110360145569, 0.6108482480049133, -0.05453617125749588], [0.43396100401878357, 0.602906346321106, -0.049317825585603714], [0.42776548862457275, 0.5836719870567322, -0.021135112270712852], [0.42370209097862244, 0.5719061493873596, -0.011227778159081936], [0.42309466004371643, 0.5647886991500854, -0.010210135951638222], [0.42195361852645874, 0.5335099697113037, 0.007817345671355724], [0.44987574219703674, 0.6375243663787842, -0.030271610245108604], [0.44656312465667725, 0.6406583189964294, -0.02796309068799019], [0.4427623152732849, 0.6430901885032654, -0.023353273048996925], [0.43402335047721863, 0.659969687461853, -0.017222382128238678], [0.43487077951431274, 0.6620771288871765, -0.01837898977100849], [0.434579938

# Data Splitting

In [None]:
def prepare_data(inputs, features):
  """
  Given the drowsy and nondrowsy datasets, returns the X and Y data points ready for training
  Note that drowsy = 1, nondrowsy = 0
  removes frames where there are no features detected
  """
  x = []
  padding = np.zeros((features, 3))
  lengths = set()
  for i in inputs:
    sample = []
    for frame in i:
      if len(frame):
        sample.append(frame)
      else:
        sample.append(padding)
    x.append(sample)
  arr_x = np.array(x)
  return arr_x.reshape(arr_x.shape[0], -1)

In [None]:

eye_data_x = prepare_data(input_eye, len(EYE_IDX))
head_data_x = prepare_data(input_head, FACE_NO_EYES_LEN)
pose_data_x = prepare_data(input_pose, len(POSE_IDX))

[[0.         0.         0.         ... 0.46199095 0.57626629 0.02282696]]


# Calculate Predictions

Now that the three models are trained, in this section, we determine the best possible weighted combination of their predictions.

In [None]:
def load_models():
  eye = load('/content/drive/My Drive/multimodal/eye_model-100-0.1-spec2.joblib')
  head = load('/content/drive/My Drive/multimodal/head_model-spec.joblib')
  pose = load('/content/drive/My Drive/multimodal/upperbodypose_model-spec.joblib')
  return eye, head, pose

In [None]:
def get_probs(sample_eye, sample_head, sample_pose):
  eye_pred = e.predict_proba(sample_eye.reshape(1,-1))
  print((sample_eye.shape))
  head_pred = h.predict_proba(sample_head.reshape(1,-1))
  print((sample_head.shape))
  pose_pred = p.predict_proba(sample_pose.reshape(1,-1))
  print((sample_pose.shape))
  return eye_pred, head_pred, pose_pred

In [None]:
e, h, p = load_models()
X = np.zeros((1,3))
for i, [sample_eye, sample_head, sample_pose] in enumerate(zip(eye_data_x, head_data_x, pose_data_x)):
  eye_p, head_p, pose_p = get_probs(sample_eye, sample_head, sample_pose)
  X[i] = [eye_p[0][1], head_p[0][1], pose_p[0][1]]

(28800,)
(315360,)
(5040,)


# Evaluation
Determine if the current sample displays drowsy driving or not



In [None]:
def alarm():
  pass

In [None]:
def sigmoid(z):
    return 1 / (1 + np.exp(-z))

In [None]:
X = X @ weights + offset
X = sigmoid(X)
X_bin = (X > THRESHOLD).astype(int)
print(X_bin)

[1]


In [None]:
if X_bin[0] == 1:
  alarm()

In [None]:
t = time.time() - start_time

In [None]:
print("TIME TO ALARM: ", t)

TIME TO ALARM:  27.88985300064087
