In [None]:
# Model Download
!wget -O face_landmarker_v2_with_blendshapes.task -q https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task

In [None]:
# Download packages
!pip install -r requirements.txt

In [None]:
# Import Packages
import cv2
import mediapipe as mp
import numpy as np
import time
import sqlite3

from mediapipe.tasks import python
from mediapipe.tasks.python import vision

In [2]:
# SQL Connection
conn = sqlite3.connect("fatigue_data.sqlite")
cursor = conn.cursor()

cursor.execute("""
CREATE TABLE IF NOT EXISTS fatigue_data (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    timestamp REAL,
    ear REAL,
    mar REAL,
    roll REAL,
    pitch REAL,
    label INTEGER
)
""")
conn.commit()

In [3]:
# ---------- Load Model ----------
base_options = python.BaseOptions(model_asset_path='face_landmarker_v2_with_blendshapes.task')

# Model Configuration(s)
options = vision.FaceLandmarkerOptions(
    base_options=base_options,
    running_mode=vision.RunningMode.VIDEO,
    num_faces=1,
    output_face_blendshapes=False,
    output_facial_transformation_matrixes=False
)

landmarker = vision.FaceLandmarker.create_from_options(options)


W0000 00:00:1771060890.263230  111699 face_landmarker_graph.cc:174] Sets FaceBlendshapesGraph acceleration to xnnpack by default.
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
W0000 00:00:1771060890.273121  111705 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1771060890.288486  111708 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


In [4]:
# Utility Functions
def calculate_angle(p1, p2):
    dx = p2[0] - p1[0]
    dy = p2[1] - p1[1]
    return np.degrees(np.arctan2(dy, dx))

def euclidean(p1, p2):
    return np.linalg.norm(np.array(p1) - np.array(p2))

def calculate_ear(eye_pts):
    p1, p2, p3, p4, p5, p6 = eye_pts
    return (euclidean(p2,p6) + euclidean(p3,p5)) / (2.0 * euclidean(p1,p4))

def calculate_mar(mouth_pts):
    p1, p2, p3, p4, p5, p6 = mouth_pts[:6]
    return (euclidean(p2,p6) + euclidean(p3,p5)) / (2.0 * euclidean(p1,p4))


In [5]:
# ---------------------- Landmark Indices ----------------------
LEFT_EYE = [33, 160, 158, 133, 153, 144]
RIGHT_EYE = [362, 385, 387, 263, 373, 380]
MOUTH = [13, 14, 78, 308, 82, 312, 87, 317]

In [6]:
cap = cv2.VideoCapture(0)
cv2.namedWindow("Fatigue Data Collector", cv2.WINDOW_NORMAL)

recording = False
waiting_for_label = False
recorded_data = []
record_start_time = None
message = "Press R to start recording | Q to quit"

global_start_time = time.time()

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame = cv2.flip(frame, 1)
    mesh_frame = frame.copy()

    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)
    timestamp = int((time.time() - global_start_time) * 1000)

    result = landmarker.detect_for_video(mp_image, timestamp)

    if result.face_landmarks:
        face_landmarks = result.face_landmarks[0]
        h, w, _ = frame.shape

        nose = face_landmarks[1]
        left_eye_outer = face_landmarks[33]
        right_eye_outer = face_landmarks[263]
        chin = face_landmarks[152]

        nose_pt = (int(nose.x * w), int(nose.y * h))
        left_eye_pt = (int(left_eye_outer.x * w), int(left_eye_outer.y * h))
        right_eye_pt = (int(right_eye_outer.x * w), int(right_eye_outer.y * h))
        chin_pt = (int(chin.x * w), int(chin.y * h))

        roll_angle = calculate_angle(left_eye_pt, right_eye_pt)
        pitch_distance = chin_pt[1] - nose_pt[1]

        left_eye_pts = [(int(face_landmarks[i].x*w), int(face_landmarks[i].y*h)) for i in LEFT_EYE]
        right_eye_pts = [(int(face_landmarks[i].x*w), int(face_landmarks[i].y*h)) for i in RIGHT_EYE]
        mouth_pts = [(int(face_landmarks[i].x*w), int(face_landmarks[i].y*h)) for i in MOUTH]

        ear = (calculate_ear(left_eye_pts) + calculate_ear(right_eye_pts)) / 2.0
        mar = calculate_mar(mouth_pts)
        
                # -------- Draw Eye Landmarks (Green) --------
        for pt in left_eye_pts + right_eye_pts:
            cv2.circle(mesh_frame, pt, 3, (0, 255, 0), -1)

        # -------- Draw Mouth Landmarks (Red) --------
        for pt in mouth_pts:
            cv2.circle(mesh_frame, pt, 3, (0, 0, 255), -1)

        # -------- Draw Head Tilt Lines --------
        cv2.line(mesh_frame, left_eye_pt, right_eye_pt, (255, 0, 0), 2)
        cv2.line(mesh_frame, nose_pt, chin_pt, (255, 255, 0), 2)

        # Optional: Show feature values
        cv2.putText(mesh_frame, f"EAR: {ear:.3f}", (30, 80),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)

        cv2.putText(mesh_frame, f"MAR: {mar:.3f}", (30, 110),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)

        cv2.putText(mesh_frame, f"Roll: {roll_angle:.2f}", (30, 140),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)


        if recording:
            recorded_data.append((timestamp, ear, mar, roll_angle, pitch_distance))

    # ---------------------- UI STATES ----------------------

    if recording:
        elapsed = int(time.time() - record_start_time)
        remaining = 5 - elapsed

        cv2.putText(mesh_frame, f"REC ‚óè {remaining}s", (30, 40),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)

        if remaining <= 0:
            recording = False
            waiting_for_label = True
            message = "Press 0 = Normal | 1 = Fatigue"

    elif waiting_for_label:
        cv2.putText(mesh_frame, message, (30, 40),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)

    else:
        cv2.putText(mesh_frame, message, (30, 40),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)

    combined = np.hstack((frame, mesh_frame))
    cv2.imshow("Fatigue Data Collector", combined)

    key = cv2.waitKey(1) & 0xFF

    if key == ord('r') and not recording and not waiting_for_label:
        recording = True
        record_start_time = time.time()
        recorded_data = []
        message = ""

    elif key == ord('0') and waiting_for_label:
        for row in recorded_data:
            cursor.execute("INSERT INTO fatigue_data (timestamp, ear, mar, roll, pitch, label) VALUES (?, ?, ?, ?, ?, ?)",
                           (*row, 0))
        conn.commit()
        waiting_for_label = False
        message = "Saved as NORMAL | Press R to record again"

    elif key == ord('1') and waiting_for_label:
        for row in recorded_data:
            cursor.execute("INSERT INTO fatigue_data (timestamp, ear, mar, roll, pitch, label) VALUES (?, ?, ?, ?, ?, ?)",
                           (*row, 1))
        conn.commit()
        waiting_for_label = False
        message = "Saved as FATIGUE | Press R to record again"

    elif key == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()
conn.close()

QFontDatabase: Cannot find font directory /home/sonu/Desktop/fatigue/.venv/lib/python3.10/site-packages/cv2/qt/fonts.
Note that Qt no longer ships fonts. Deploy some (from https://dejavu-fonts.github.io/ for example) or switch to fontconfig.
QFontDatabase: Cannot find font directory /home/sonu/Desktop/fatigue/.venv/lib/python3.10/site-packages/cv2/qt/fonts.
Note that Qt no longer ships fonts. Deploy some (from https://dejavu-fonts.github.io/ for example) or switch to fontconfig.
QFontDatabase: Cannot find font directory /home/sonu/Desktop/fatigue/.venv/lib/python3.10/site-packages/cv2/qt/fonts.
Note that Qt no longer ships fonts. Deploy some (from https://dejavu-fonts.github.io/ for example) or switch to fontconfig.
QFontDatabase: Cannot find font directory /home/sonu/Desktop/fatigue/.venv/lib/python3.10/site-packages/cv2/qt/fonts.
Note that Qt no longer ships fonts. Deploy some (from https://dejavu-fonts.github.io/ for example) or switch to fontconfig.
QFontDatabase: Cannot find font 

In [9]:
import sqlite3
import pandas as pd

# Connect to database
conn = sqlite3.connect("fatigue_data.sqlite")

# Read full table into pandas dataframe
df = pd.read_sql_query("SELECT * FROM fatigue_data", conn)

conn.close()

# Show first 10 rows
df.head(10)


Unnamed: 0,id,timestamp,ear,mar,roll,pitch,label
0,1,13041.0,0.298366,0.519745,1.181189,76.0,0
1,2,13077.0,0.294782,0.523849,1.193489,76.0,0
2,3,13109.0,0.297783,0.520574,1.193489,76.0,0
3,4,13141.0,0.297694,0.520574,0.596809,76.0,0
4,5,13177.0,0.296279,0.542947,0.596809,76.0,0
5,6,13209.0,0.296279,0.544879,0.596809,76.0,0
6,7,13241.0,0.29507,0.544726,0.596809,76.0,0
7,8,13277.0,0.29507,0.520428,0.596809,76.0,0
8,9,13309.0,0.289809,0.520428,0.590657,76.0,0
9,10,13341.0,0.281253,0.546044,0.590657,76.0,0
