In [1]:
"""
Driver Drowsiness Detection using EAR / I-EAR style approach
- Works with webcam or pre-recorded video
- Uses MediaPipe Face Mesh for facial landmarks
- Triggers alarm if eyes closed for too long OR blink rate too low
- Low-latency yawning detection (fires DURING a sustained mouth-open)
"""

import cv2
import mediapipe as mp
import numpy as np
import time
from collections import deque
import threading
import os

# -------------------- Config --------------------
USE_VIDEO_FILE = False
VIDEO_FILE = "sample_driver.mp4"  # path to your test video

CAMERA_INDEX = 0
FRAME_WIDTH = 640
FRAME_HEIGHT = 480
FPS = 15  # will update if camera reports real fps

# Drowsiness thresholds (from paper)
LONG_FRAME_THRESHOLD = 40        # frames eyes continuously closed
BLINKS_PER_MIN_THRESHOLD = 8     # blinks per minute below this = drowsy

SMOOTHING_WINDOW = 5
BLINK_WINDOW_SECONDS = 60
BLINK_DETECTION_EAR_THRESHOLD = 0.21
MIN_BEEP_INTERVAL = 5  # seconds between alarms (general)
# ------------------------------------------------

# ------------- Low-latency yawn detection ---------------
MAR_SMOOTHING_WINDOW = 3
MAR_BASELINE_WINDOW = 150
MAR_DELTA = 0.14          # margin over baseline
MAR_FLOOR = 0.50          # absolute minimum threshold
MAR_HYST = 0.03           # hysteresis to avoid flicker
YAWN_MIN_DURATION = 0.5   # seconds of sustained open to trigger
MIN_BEEP_INTERVAL_YAWN = 2  # seconds between yawn alarms (faster)
# ---------------------------------------------------------

mp_face_mesh = mp.solutions.face_mesh

# MediaPipe indices for eyes
LEFT_EYE_IDX = [33, 160, 158, 133, 153, 144]
RIGHT_EYE_IDX = [263, 387, 385, 362, 380, 373]

# MediaPipe indices for mouth (corners + inner lips)
MOUTH_LEFT_CORNER = 61
MOUTH_RIGHT_CORNER = 291
MOUTH_UPPER_INNER = 13
MOUTH_LOWER_INNER = 14

def dist(p, q):
    return np.linalg.norm(np.array(p) - np.array(q))

def eye_aspect_ratio(landmarks, eye_idx):
    p1 = landmarks[eye_idx[0]]
    p2 = landmarks[eye_idx[1]]
    p3 = landmarks[eye_idx[2]]
    p4 = landmarks[eye_idx[3]]
    p5 = landmarks[eye_idx[4]]
    p6 = landmarks[eye_idx[5]]
    vert1 = dist(p2, p6)
    vert2 = dist(p3, p5)
    horiz = dist(p1, p4)
    if horiz == 0:
        return 0.0
    return (vert1 + vert2) / (2.0 * horiz)

def mouth_aspect_ratio(landmarks):
    p_left  = landmarks[MOUTH_LEFT_CORNER]
    p_right = landmarks[MOUTH_RIGHT_CORNER]
    p_up    = landmarks[MOUTH_UPPER_INNER]
    p_low   = landmarks[MOUTH_LOWER_INNER]
    horiz = dist(p_left, p_right)
    vert  = dist(p_up, p_low)
    return 0.0 if horiz == 0 else vert / horiz

def play_alarm_thread(_=None):
    try:
        # macOS voice alert
        os.system("say 'You are drowsy'")
        # Alternative: system sound
        # os.system("afplay /System/Library/Sounds/Glass.aiff")
    except Exception as e:
        print("[ALARM] YOU ARE DROWSY!", e)

def main():
    if USE_VIDEO_FILE:
        cap = cv2.VideoCapture(VIDEO_FILE)
    else:
        cap = cv2.VideoCapture(CAMERA_INDEX)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)

    actual_fps = cap.get(cv2.CAP_PROP_FPS)
    global FPS
    if actual_fps and actual_fps > 0:
        FPS = actual_fps

    # yawn early trigger needs FPS -> compute frames threshold now
    YAWN_MIN_FRAMES = max(3, int(0.35 * FPS))

    face_mesh = mp_face_mesh.FaceMesh(
        static_image_mode=False,
        max_num_faces=1,
        refine_landmarks=True,
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5
    )

    ear_buffer = deque(maxlen=SMOOTHING_WINDOW)
    closed_frame_counter = 0
    blink_timestamps = deque()
    last_alarm_time = 0

    # yawn state
    mar_buffer = deque(maxlen=MAR_SMOOTHING_WINDOW)
    mar_baseline_ring = deque(maxlen=MAR_BASELINE_WINDOW)
    yawn_open_frames = 0
    last_yawn_alarm_time = 0

    print("Press 'q' to quit.")
    while True:
        ret, frame = cap.read()
        if not ret:
            print("End of video or camera error.")
            break

        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = face_mesh.process(frame_rgb)
        h, w = frame.shape[:2]

        if results.multi_face_landmarks:
            lm = results.multi_face_landmarks[0].landmark
            landmarks = [(l.x * w, l.y * h) for l in lm]

            # ------------ EAR ------------
            left_ear = eye_aspect_ratio(landmarks, LEFT_EYE_IDX)
            right_ear = eye_aspect_ratio(landmarks, RIGHT_EYE_IDX)
            ear_val = (left_ear + right_ear) / 2.0

            ear_buffer.append(ear_val)
            ear_smoothed = float(np.mean(ear_buffer))

            for idx in LEFT_EYE_IDX + RIGHT_EYE_IDX:
                (x, y) = landmarks[idx]
                cv2.circle(frame, (int(x), int(y)), 1, (0,255,0), -1)

            cv2.putText(frame, f"EAR: {ear_smoothed:.3f}", (10,30),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,255,255), 2)

            eye_closed = ear_smoothed < BLINK_DETECTION_EAR_THRESHOLD

            if eye_closed:
                closed_frame_counter += 1
            else:
                if closed_frame_counter > 0:
                    blink_duration_seconds = closed_frame_counter / FPS
                    if 0.05 < blink_duration_seconds < 1.5:
                        blink_timestamps.append(time.time())
                    closed_frame_counter = 0

            current_time = time.time()
            while blink_timestamps and (current_time - blink_timestamps[0] > BLINK_WINDOW_SECONDS):
                blink_timestamps.popleft()
            blinks_per_min = len(blink_timestamps) * (60.0 / BLINK_WINDOW_SECONDS)

            cv2.putText(frame, f"Blinks/min: {blinks_per_min:.1f}", (10,60),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,0), 2)

            # ------------ LOW-LATENCY YAWN ------------
            mar_val = mouth_aspect_ratio(landmarks)
            mar_buffer.append(mar_val)
            mar_smoothed = float(np.mean(mar_buffer))

            # learn baseline only when clearly not yawning
            provisional_thr = max((np.median(mar_baseline_ring) + MAR_DELTA) if mar_baseline_ring else 0.0, MAR_FLOOR)
            if mar_smoothed < provisional_thr:
                mar_baseline_ring.append(mar_smoothed)

            mar_baseline = np.median(mar_baseline_ring) if mar_baseline_ring else mar_smoothed
            enter_thr = max(mar_baseline + MAR_DELTA, MAR_FLOOR)
            exit_thr  = max(enter_thr - MAR_HYST, MAR_FLOOR - MAR_HYST)

            # hysteresis to avoid flicker
            mouth_open = mar_smoothed > (enter_thr if yawn_open_frames == 0 else exit_thr)

            if mouth_open:
                yawn_open_frames += 1
                # fire DURING the yawn (no need to wait for mouth to close)
                if (yawn_open_frames >= YAWN_MIN_FRAMES) or ((yawn_open_frames / FPS) >= YAWN_MIN_DURATION):
                    if current_time - last_yawn_alarm_time > MIN_BEEP_INTERVAL_YAWN:
                        last_yawn_alarm_time = current_time
                        cv2.putText(frame, "YAWN DETECTED!", (10, 150),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 3)
                        threading.Thread(target=play_alarm_thread, daemon=True).start()
            else:
                yawn_open_frames = 0

            cv2.putText(frame, f"MAR: {mar_smoothed:.3f} thr:{enter_thr:.2f}", (10, 90),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (200, 200, 255), 2)

            # ------------ Drowsy decision (existing) ------------
            is_drowsy_frames = closed_frame_counter >= LONG_FRAME_THRESHOLD
            is_drowsy_blinks = blinks_per_min < BLINKS_PER_MIN_THRESHOLD

            if is_drowsy_frames or is_drowsy_blinks:
                cv2.putText(frame, "YOU ARE DROWSY!", (10,110),
                            cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0,0,255), 3)
                if current_time - last_alarm_time > MIN_BEEP_INTERVAL:
                    last_alarm_time = current_time
                    threading.Thread(target=play_alarm_thread, daemon=True).start()

        else:
            ear_buffer.clear()
            closed_frame_counter = 0
            mar_buffer.clear()
            mar_baseline_ring.clear()
            yawn_open_frames = 0

        cv2.imshow("Drowsiness Detection", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

if _name_ == "_main_":
    main()

ModuleNotFoundError: No module named 'mediapipe'