In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
import os

folder_path = "/content/drive/MyDrive/Colab Notebooks/tools & technique/resume & porfolio projects/drowniss_project/Drowninss-dataset/train"

image_count = 0

for root, dirs, files in os.walk(folder_path):
    for file in files:
        if file.lower().endswith((".jpg", ".jpeg", ".png")):
            image_count += 1

print("Total images in all classes:", image_count)


Total images in all classes: 2900


In [2]:
pip install opencv-python mediapipe tensorflow numpy matplotlib scikit-learn playsound


Collecting mediapipe
  Downloading mediapipe-0.10.21-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
Collecting playsound
  Downloading playsound-1.3.0.tar.gz (7.7 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
INFO: pip is looking at multiple versions of mediapipe to determine which version is compatible with other requirements. This could take a while.
Collecting mediapipe
  Downloading mediapipe-0.10.20-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
  Downloading mediapipe-0.10.18-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
  Downloading mediapipe-0.10.15-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
  Downloading mediapipe-0.10.14-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
Collecting protobuf<5,>=4.25.3 (from mediapipe)
  Downloading protobuf-4.25.8-cp37-abi3-manylinux2014_x86_64.whl.metadata (541 bytes)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downlo

In [None]:
# train_dual_models.py
import os, numpy as np, matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import GlobalAveragePooling2D, Dropout, Dense
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report, confusion_matrix

# -------- config --------
DATASET_ROOT = "/content/drive/MyDrive/Colab Notebooks/tools & technique/resume & porfolio projects/drowniss_project/Drowninss-dataset/train"       # change if needed
IMG_SIZE     = 224
BATCH        = 32
EPOCHS       = 12
LR           = 1e-4
VAL_SPLIT    = 0.20
OUTDIR       = "/content/drive/MyDrive/Colab Notebooks/tools & technique/resume & porfolio projects/drowniss_project"
os.makedirs(OUTDIR, exist_ok=True)

def build_base():
    base = MobileNetV2(input_shape=(IMG_SIZE,IMG_SIZE,3), include_top=False, weights='imagenet')
    base.trainable = False
    x = GlobalAveragePooling2D()(base.output)
    x = Dropout(0.4)(x)
    out = Dense(1, activation='sigmoid')(x)
    model = Model(base.input, out)
    model.compile(optimizer=Adam(LR), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def make_gens(parent_dir, classes, val_split=VAL_SPLIT):
    dg = ImageDataGenerator(
        rescale=1./255,
        validation_split=val_split,
        rotation_range=15,
        width_shift_range=0.1, height_shift_range=0.1,
        zoom_range=0.2, horizontal_flip=True,
    )
    train_gen = dg.flow_from_directory(
        parent_dir, classes=classes, target_size=(IMG_SIZE,IMG_SIZE),
        batch_size=BATCH, class_mode='binary', subset='training', shuffle=True)
    val_gen = dg.flow_from_directory(
        parent_dir, classes=classes, target_size=(IMG_SIZE,IMG_SIZE),
        batch_size=BATCH, class_mode='binary', subset='validation', shuffle=False)
    return train_gen, val_gen

def get_class_weights(gen):
    y = gen.classes
    classes = np.unique(y)
    weights = compute_class_weight(class_weight='balanced', classes=classes, y=y)
    return {int(c): float(w) for c, w in zip(classes, weights)}

def evaluate(model, val_gen, title):
    val_gen.reset()
    y_true = val_gen.classes
    y_prob = model.predict(val_gen, verbose=0).ravel()
    y_pred = (y_prob >= 0.5).astype(int)
    print(f"\n=== {title} – Classification Report ===")
    print(classification_report(y_true, y_pred, digits=3))
    print("Confusion matrix:\n", confusion_matrix(y_true, y_pred))
    # small accuracy plot not required; kept simple
    return y_true, y_pred

def to_tflite(h5_path):
    try:
        m = tf.keras.models.load_model(h5_path)
        conv = tf.lite.TFLiteConverter.from_keras_model(m)
        conv.optimizations = [tf.lite.Optimize.DEFAULT]
        tfl = conv.convert()
        tpath = h5_path.replace(".h5", ".tflite")
        open(tpath, "wb").write(tfl)
        print("Saved:", tpath)
    except Exception as e:
        print("TFLite conversion failed:", e)

if __name__ == "__main__":
    # Eye model (IMPORTANT order): ["Closed","Open"] → class 0=Closed, 1=Open
    eye_classes = ["Closed", "Open"]
    eye_train, eye_val = make_gens(DATASET_ROOT, eye_classes)
    print("Eye class_indices:", eye_train.class_indices)  # {'Closed':0,'Open':1}
    eye_cw = get_class_weights(eye_train)
    eye_model = build_base()
    eye_hist = eye_model.fit(eye_train, validation_data=eye_val, epochs=EPOCHS,
                             class_weight=eye_cw, verbose=1)
    eye_h5 = os.path.join(OUTDIR, "eye_open_closed.h5")
    eye_model.save(eye_h5); print("Saved:", eye_h5)
    evaluate(eye_model, eye_val, "Eye (Closed vs Open)")
    to_tflite(eye_h5)

    # Yawn model (IMPORTANT order): ["no_yawn","yawn"] → class 0=no_yawn, 1=yawn
    yawn_classes = ["no_yawn", "yawn"]
    yawn_train, yawn_val = make_gens(DATASET_ROOT, yawn_classes)
    print("Yawn class_indices:", yawn_train.class_indices)  # {'no_yawn':0,'yawn':1}
    yawn_cw = get_class_weights(yawn_train)
    yawn_model = build_base()
    yawn_hist = yawn_model.fit(yawn_train, validation_data=yawn_val, epochs=EPOCHS,
                               class_weight=yawn_cw, verbose=1)
    yawn_h5 = os.path.join(OUTDIR, "yawn_no_yawn.h5")
    yawn_model.save(yawn_h5); print("Saved:", yawn_h5)
    evaluate(yawn_model, yawn_val, "Yawn (no_yawn vs yawn)")
    to_tflite(yawn_h5)

    print("\nAll done. Models in:", OUTDIR)


Found 1162 images belonging to 2 classes.
Found 290 images belonging to 2 classes.
Eye class_indices: {'Closed': 0, 'Open': 1}


  self._warn_if_super_not_called()


Epoch 1/12
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m100s[0m 2s/step - accuracy: 0.5163 - loss: 1.0392 - val_accuracy: 0.6138 - val_loss: 0.6491
Epoch 2/12
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m90s[0m 2s/step - accuracy: 0.6025 - loss: 0.7202 - val_accuracy: 0.7690 - val_loss: 0.5337
Epoch 3/12
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m90s[0m 2s/step - accuracy: 0.7020 - loss: 0.5783 - val_accuracy: 0.8379 - val_loss: 0.4663
Epoch 4/12
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m91s[0m 2s/step - accuracy: 0.7543 - loss: 0.5010 - val_accuracy: 0.8862 - val_loss: 0.3891
Epoch 5/12
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m89s[0m 2s/step - accuracy: 0.7949 - loss: 0.4494 - val_accuracy: 0.9172 - val_loss: 0.3498
Epoch 6/12
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m89s[0m 2s/step - accuracy: 0.8194 - loss: 0.4175 - val_accuracy: 0.9276 - val_loss: 0.3119
Epoch 7/12
[1m37/37[0m [32m━━━━━━━━━



Saved: /content/drive/MyDrive/Colab Notebooks/tools & technique/resume & porfolio projects/drowniss_project/eye_open_closed.h5

=== Eye (Closed vs Open) – Classification Report ===
              precision    recall  f1-score   support

           0      0.900     0.993     0.944       145
           1      0.992     0.890     0.938       145

    accuracy                          0.941       290
   macro avg      0.946     0.941     0.941       290
weighted avg      0.946     0.941     0.941       290

Confusion matrix:
 [[144   1]
 [ 16 129]]




Saved artifact at '/tmp/tmpdxeaum3p'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name='input_layer_2')
Output Type:
  TensorSpec(shape=(None, 1), dtype=tf.float32, name=None)
Captures:
  137706520461584: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137706430545744: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137707070996304: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137707070994576: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137706520461776: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137707070993040: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137707067139728: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137707067141072: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137707067144912: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137707067145488: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1377070671364

  self._warn_if_super_not_called()


Epoch 1/12
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m131s[0m 3s/step - accuracy: 0.5054 - loss: 0.8329 - val_accuracy: 0.5433 - val_loss: 0.7135
Epoch 2/12
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m90s[0m 2s/step - accuracy: 0.5103 - loss: 0.7936 - val_accuracy: 0.5087 - val_loss: 0.7139
Epoch 3/12
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m91s[0m 2s/step - accuracy: 0.5446 - loss: 0.7680 - val_accuracy: 0.5606 - val_loss: 0.6728
Epoch 4/12
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m89s[0m 2s/step - accuracy: 0.5538 - loss: 0.7350 - val_accuracy: 0.5190 - val_loss: 0.6981
Epoch 5/12
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m89s[0m 2s/step - accuracy: 0.5423 - loss: 0.7613 - val_accuracy: 0.5571 - val_loss: 0.6647
Epoch 6/12
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m89s[0m 2s/step - accuracy: 0.5490 - loss: 0.7198 - val_accuracy: 0.5779 - val_loss: 0.6550
Epoch 7/12
[1m37/37[0m [32m━━━━━━━━━



Saved: /content/drive/MyDrive/Colab Notebooks/tools & technique/resume & porfolio projects/drowniss_project/yawn_no_yawn.h5

=== Yawn (no_yawn vs yawn) – Classification Report ===
              precision    recall  f1-score   support

           0      0.569     0.566     0.567       145
           1      0.566     0.569     0.567       144

    accuracy                          0.567       289
   macro avg      0.567     0.567     0.567       289
weighted avg      0.567     0.567     0.567       289

Confusion matrix:
 [[82 63]
 [62 82]]




Saved artifact at '/tmp/tmpjxp6020t'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name='input_layer_3')
Output Type:
  TensorSpec(shape=(None, 1), dtype=tf.float32, name=None)
Captures:
  137706385772560: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137706300880464: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137706300879888: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137706300882768: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137706300878160: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137706300882192: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137706305283920: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137706305284496: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137706300879312: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137706300882000: TensorSpec(shape=(), dtype=tf.resource, name=None)
  1377063052850

In [None]:
!pip uninstall -y mediapipe protobuf absl-py
!pip cache purge

# Versions known to work together on Colab
!pip install --no-cache-dir mediapipe==0.10.14 protobuf==3.20.3 absl-py==1.4.0 opencv-python==4.9.0.80


[0mFound existing installation: absl-py 1.4.0
Uninstalling absl-py-1.4.0:
  Successfully uninstalled absl-py-1.4.0
Files removed: 39
Collecting mediapipe==0.10.14
  Downloading mediapipe-0.10.14-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
Collecting protobuf==3.20.3
  Downloading protobuf-3.20.3-py2.py3-none-any.whl.metadata (720 bytes)
Collecting absl-py==1.4.0
  Downloading absl_py-1.4.0-py3-none-any.whl.metadata (2.3 kB)
Collecting opencv-python==4.9.0.80
  Downloading opencv_python-4.9.0.80-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (20 kB)
INFO: pip is looking at multiple versions of mediapipe to determine which version is compatible with other requirements. This could take a while.
[31mERROR: Cannot install mediapipe==0.10.14 and protobuf==3.20.3 because these package versions have conflicting dependencies.[0m[31m
[0m
The conflict is caused by:
    The user requested protobuf==3.20.3
    mediapipe 0.10.14 depends on pro

In [None]:
# Base face landmarker (468 landmarks)
!wget -q -O face_landmarker.task \
  https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task


In [3]:
!pip install --upgrade opencv-python mediapipe==0.10.14 tensorflow==2.15.0 pillow
# playsound is only useful locally; in Colab we’ll use IPython Audio automatically
!pip install playsound==1.3.0


[31mERROR: Could not find a version that satisfies the requirement tensorflow==2.15.0 (from versions: 2.16.0rc0, 2.16.1, 2.16.2, 2.17.0rc0, 2.17.0rc1, 2.17.0, 2.17.1, 2.18.0rc0, 2.18.0rc1, 2.18.0rc2, 2.18.0, 2.18.1, 2.19.0rc0, 2.19.0, 2.19.1, 2.20.0rc0, 2.20.0)[0m[31m
[0m[31mERROR: No matching distribution found for tensorflow==2.15.0[0m[31m


In [5]:
# drowsiness_realtime_fast_fix_video_10s40f_yawncal.py
# Colab: capture 10 s in the browser, then uniformly sample 40 frames.
# Local: webcam stream. Saves continuous annotated MP4 + IMAGES ONLY on alerts.
# Fixes: robust yawn detection via normalized MAR, auto-thresholds, auto-CNN polarity.

import os, sys, time, threading, urllib.request, csv, collections, base64
import numpy as np
import cv2
import mediapipe as mp
from io import BytesIO
from PIL import Image

# ===================== USER PATHS ======================
EYE_MODEL_H5   = "/content/drive/MyDrive/Colab Notebooks/tools & technique/resume & porfolio projects/drowniss_project/eye_open_closed.h5"
YAWN_MODEL_H5  = "/content/drive/MyDrive/Colab Notebooks/tools & technique/resume & porfolio projects/drowniss_project/yawn_no_yawn.h5"

USE_TFLITE     = False
EYE_TFLITE     = "/content/drive/MyDrive/Colab Notebooks/tools & technique/resume & porfolio projects/drowniss_project/eye_open_closed.tflite"
YAWN_TFLITE    = "/content/drive/MyDrive/Colab Notebooks/tools & technique/resume & porfolio projects/drowniss_project/yawn_no_yawn.tflite"

ALARM_WAV      = "/content/drive/MyDrive/Colab Notebooks/tools & technique/resume & porfolio projects/drowniss_project/mixkit-classic-alarm-995.wav"

ROOT_SAVE      = "/content/drive/MyDrive/Colab Notebooks/tools & technique/resume & porfolio projects/drowniss_project/images"
os.makedirs(ROOT_SAVE, exist_ok=True)

# Session folder
SESSION_TS   = time.strftime("%Y%m%d_%H%M%S")
SAVE_DIR     = os.path.join(ROOT_SAVE, f"session_{SESSION_TS}")
os.makedirs(SAVE_DIR, exist_ok=True)

# Save policy
SAVE_ALERT_IMAGES_ONLY = True
VIDEO_FPS_DISK         = 30

# ===================== CAPTURE SETTINGS (Colab) =========
TARGET_W, TARGET_H, TARGET_FPS = 640, 480, 30
CAPTURE_SECONDS  = 10          # browser capture length
SAMPLED_FRAMES   = 40          # uniformly sampled frames from those 10 s

# ===================== THRESHOLDS (EYES/HEAD) ===========
IMG_SIZE = 224
EAR_THRESH      = 0.22           # will be refined by auto-calib of p_open
HEAD_PITCH_DEG  = 25.0

# Instant triggers (per-frame)
INSTANT_EAR        = 0.16
INSTANT_POPEN_MAX  = 0.40

# Voting window for alerts (works with 40 frames)
WIN = 5
VOTES_TO_ALERT = 3

# ===================== ENV / UI HELPERS =================
def in_colab():
    try:
        import google.colab  # type: ignore
        return True
    except Exception:
        return False

IN_COLAB = in_colab()

def show_frame(frame, win="Drowsiness Detector"):
    if IN_COLAB:
        from google.colab.patches import cv2_imshow
        cv2_imshow(frame)
    else:
        cv2.imshow(win, frame)

def want_stop():
    if IN_COLAB:
        return False
    k = cv2.waitKey(1) & 0xFF
    return k in (27, ord('q'))

def safe_alarm(path):
    try:
        if IN_COLAB:
            from IPython.display import Audio, display
            if os.path.exists(path):
                display(Audio(path, autoplay=True))
            else:
                print("\a"); time.sleep(0.2); print("\a")
        else:
            from playsound import playsound
            if os.path.exists(path):
                playsound(path)
            else:
                print("\a"); time.sleep(0.2); print("\a")
    except Exception:
        for _ in range(3):
            print("\a"); time.sleep(0.2)

# ===================== MODELS ===========================
def load_models():
    try:
        if USE_TFLITE:
            import tensorflow as tf
            eye_inter  = tf.lite.Interpreter(model_path=EYE_TFLITE);  eye_inter.allocate_tensors()
            yawn_inter = tf.lite.Interpreter(model_path=YAWN_TFLITE); yawn_inter.allocate_tensors()
            return dict(mode="tflite",
                        eye=eye_inter,  eye_in=eye_inter.get_input_details(),  eye_out=eye_inter.get_output_details(),
                        mouth=yawn_inter, mouth_in=yawn_inter.get_input_details(), mouth_out=yawn_inter.get_output_details())
        else:
            from tensorflow.keras.models import load_model
            eye_model  = load_model(EYE_MODEL_H5,  compile=False)
            yawn_model = load_model(YAWN_MODEL_H5, compile=False)
            return dict(mode="keras", eye=eye_model, mouth=yawn_model)
    except Exception as e:
        raise RuntimeError(f"[ModelLoad] Failed to load CNNs: {e}")

def predict_prob(model_pack, roi, eye_model=True):
    x = cv2.resize(roi, (IMG_SIZE, IMG_SIZE)).astype("float32")/255.0
    x = np.expand_dims(x, 0)
    if model_pack["mode"] == "tflite":
        inter  = model_pack["eye"] if eye_model else model_pack["mouth"]
        in_d   = model_pack["eye_in"] if eye_model else model_pack["mouth_in"]
        out_d  = model_pack["eye_out"] if eye_model else model_pack["mouth_out"]
        inter.set_tensor(in_d[0]["index"], x)
        inter.invoke()
        return float(inter.get_tensor(out_d[0]["index"])[0][0])
    else:
        model = model_pack["eye"] if eye_model else model_pack["mouth"]
        return float(model.predict(x, verbose=0)[0][0])

# ===================== MEDIAPIPE TASK: LANDMARKER ======
from mediapipe.tasks import python as mp_tasks
from mediapipe.tasks.python import vision as mp_vision

TASK_PATH = "face_landmarker.task"
TASK_URL  = "https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task"

def ensure_task_model():
    if not os.path.exists(TASK_PATH):
        urllib.request.urlretrieve(TASK_URL, TASK_PATH)

def build_landmarker():
    ensure_task_model()
    BaseOptions = mp_tasks.BaseOptions
    FaceLandmarkerOptions = mp_vision.FaceLandmarkerOptions
    FaceLandmarker = mp_vision.FaceLandmarker
    VisionRunningMode = mp_vision.RunningMode
    options = FaceLandmarkerOptions(
        base_options=BaseOptions(model_asset_path=TASK_PATH),
        running_mode=VisionRunningMode.IMAGE,
        num_faces=1
    )
    return FaceLandmarker.create_from_options(options)

def detect_landmarks_468(face_landmarker, frame_bgr):
    h, w = frame_bgr.shape[:2]
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB,
                        data=cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB))
    res = face_landmarker.detect(mp_image)
    if not res.face_landmarks:
        return None
    pts = [(int(p.x*w), int(p.y*h)) for p in res.face_landmarks[0]]
    return np.array(pts, dtype=np.int32)

# ===================== GEOMETRY / INDICES ===============
LEFT_EYE  = [33,160,158,133,153,144]
RIGHT_EYE = [263,387,385,362,380,373]
MOUTH_4PT = [13,14,78,308]   # top, bottom, left, right
LIPS      = [61,291,0,17,13,14,78,308,81,311,402,318,82,312,87,317,178,88,95,185]

HP_IDX = {"nose":1,"chin":199,"l_eye":33,"r_eye":263,"l_mouth":78,"r_mouth":308}
MODEL_3D = np.array([
    (0,0,0),(0,-330,-65),(-225,170,-135),(225,170,-135),(-150,-150,-125),(150,-150,-125)
], dtype=np.float64)

def euclid(a,b): return np.linalg.norm(np.array(a)-np.array(b))
def EAR(eye_pts):
    p1,p2,p3,p4,p5,p6 = eye_pts
    A = euclid(p2,p6); B = euclid(p3,p5); C = euclid(p1,p4)
    return 0.0 if C==0 else (A+B)/(2.0*C)

# Robust, scale-normalized mouth opening (MAR_N) using inter-ocular width
def MAR_N(mouth4, left_eye_outer, right_eye_outer, eps=1e-6):
    top,bottom,left,right = mouth4
    v = euclid(top,bottom)
    face_w = euclid(left_eye_outer, right_eye_outer)
    return v / (face_w + eps)

# ===================== VIDEO IO HELPERS ==================
def _set_cam_props(cap):
    try:
        cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG"))
        cap.set(cv2.CAP_PROP_FRAME_WIDTH,  TARGET_W)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, TARGET_H)
        cap.set(cv2.CAP_PROP_FPS,          TARGET_FPS)
    except Exception:
        pass

def open_camera_robust(preferred=(0,1,2,3)):
    backend_flags = []
    if sys.platform.startswith("win"):
        backend_flags = [cv2.CAP_DSHOW, cv2.CAP_MSMF]
    elif sys.platform == "darwin":
        backend_flags = [cv2.CAP_AVFOUNDATION]
    else:
        backend_flags = [cv2.CAP_V4L2]
    for idx in preferred:
        cap = cv2.VideoCapture(idx)
        if cap.isOpened():
            _set_cam_props(cap);  return cap
        cap.release()
        for be in backend_flags:
            cap = cv2.VideoCapture(idx, be)
            if cap.isOpened():
                _set_cam_props(cap);  return cap
            cap.release()
    return None

# ===================== COLAB 10s -> 40f CAPTURE =========
def capture_10s_then_sample_40(quality=0.8):
    from IPython.display import Javascript, display
    from google.colab.output import eval_js

    js = Javascript(r"""
      async function robustCapture(seconds, targetW, targetH, targetFps, quality){
        const stream = await navigator.mediaDevices.getUserMedia({
          video: { width:{ideal:targetW}, height:{ideal:targetH},
                   frameRate:{ideal:targetFps, max:targetFps} },
          audio: false
        });
        const video = document.createElement('video');
        video.style.display = 'none';
        document.body.appendChild(video);
        video.srcObject = stream;
        await video.play();
        let tries = 0;
        while ((video.videoWidth === 0 || video.videoHeight === 0) && tries < 60) {
          await new Promise(r => setTimeout(r, 100));
          tries++;
        }
        if (video.videoWidth === 0 || video.videoHeight === 0) {
          stream.getTracks().forEach(t=>t.stop());
          video.remove();
          throw new Error("Camera not ready (no dimensions).");
        }
        const canvas = document.createElement('canvas');
        canvas.width = video.videoWidth;
        canvas.height = video.videoHeight;
        const ctx = canvas.getContext('2d');
        const frames = [];
        const start = performance.now();
        const hardStop = start + (seconds + 2)*1000;
        let last = start;
        const hasRVFC = ('requestVideoFrameCallback' in HTMLVideoElement.prototype);
        function grab(){ ctx.drawImage(video, 0, 0); frames.push(canvas.toDataURL('image/jpeg', quality)); }
        async function loop(){
          while (true){
            const now = performance.now();
            if (now - start >= seconds*1000) break;
            if (now > hardStop) break;
            if (now - last >= 33) { grab(); last = now; }
            if (hasRVFC) await new Promise(res => video.requestVideoFrameCallback(()=>res()));
            else         await new Promise(res => setTimeout(res, 10));
          }
        }
        try { await loop(); } finally { stream.getTracks().forEach(t=>t.stop()); video.remove(); }
        return {frames: frames, w: canvas.width, h: canvas.height};
      }
    """)
    display(js)

    data = eval_js(f"robustCapture({CAPTURE_SECONDS}, {TARGET_W}, {TARGET_H}, {TARGET_FPS}, {quality})")
    raw = []
    for d in data['frames']:
        b = d.split(',')[1]
        img = Image.open(BytesIO(base64.b64decode(b))).convert('RGB')
        raw.append(cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR))
    if len(raw) == 0:
        raise RuntimeError("No frames captured.")
    idxs = np.linspace(0, len(raw)-1, num=SAMPLED_FRAMES, dtype=int)
    return [raw[i] for i in idxs]

# ===================== LIGHT ENHANCEMENT =================
def enhance_full(frame):
    if frame.mean() < 60:
        yuv = cv2.cvtColor(frame, cv2.COLOR_BGR2YUV)
        yuv[:,:,0] = cv2.equalizeHist(yuv[:,:,0])
        return cv2.cvtColor(yuv, cv2.COLOR_YUV2BGR)
    return frame

def enhance_eye_roi(roi_bgr):
    return roi_bgr

class EMA:
    def __init__(self, alpha=0.7):
        self.alpha = alpha; self.v = None
    def update(self, x):
        self.v = x if self.v is None else self.alpha*self.v + (1-self.alpha)*x
        return self.v

# ===================== LOGGING ==========================
def init_csv_log(csv_path):
    if not os.path.exists(csv_path):
        with open(csv_path, "w", newline="") as f:
            w = csv.writer(f)
            w.writerow(["time", "frame_idx", "EAR", "MAR_N", "Pitch", "p_open_s", "p_yawn_s", "reason", "saved_path"])

# ===================== CORE LOOP ========================
def process_frames_iter(frames_iter, models, face_landmarker, win="Drowsiness Detector"):
    last_alarm = 0.0
    p_open_ema = EMA(0.7); p_yawn_ema = EMA(0.7)

    csv_path = os.path.join(SAVE_DIR, "session_metrics.csv")
    init_csv_log(csv_path)
    frame_idx = 0

    video_path = os.path.join(SAVE_DIR, f"session_{SESSION_TS}_annotated.mp4")
    video_writer = None
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")

    # rolling votes
    eye_buf  = collections.deque(maxlen=WIN)
    yawn_buf = collections.deque(maxlen=WIN)
    tilt_buf = collections.deque(maxlen=WIN)

    # --- Auto-calibration over early frames (for yawn) ---
    CAL_FRAMES = 15
    yawn_raw_samples = []
    marN_samples_closed = []
    mouth_polarity = +1  # +1 means model outputs P(yawn); -1 means P(no-yawn) -> we invert
    pyawn_thresh = 0.55  # will be updated after calibration
    marN_thresh  = 0.07  # default; will be updated after calibration

    LAST = {"ear":None, "marN":None, "pitch":0.0, "popen":None, "pyawn":0.0}

    for frame in frames_iter:
        if frame is None: break
        frame_idx += 1
        frame = enhance_full(cv2.flip(frame, 1))
        h, w = frame.shape[:2]

        if video_writer is None:
            video_writer = cv2.VideoWriter(video_path, fourcc, VIDEO_FPS_DISK, (w, h))
            print(f"[Video] Writing annotated MP4 to: {video_path}")

        pts = detect_landmarks_468(face_landmarker, frame)
        ear_val = marN_val = None
        pitch = 0.0
        eye_closed_now = False
        yawn_now = False
        tilt_now = False
        p_open_s = None; p_yawn_s = 0.0

        if pts is not None:
            # EAR
            le = [tuple(pts[i]) for i in LEFT_EYE]
            re = [tuple(pts[i]) for i in RIGHT_EYE]
            ear_val = (EAR(le) + EAR(re))/2.0
            cv2.putText(frame, f"EAR:{ear_val:.2f}", (10,26), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0),2)

            # Head pitch
            try:
                ip = np.array([pts[HP_IDX[k]] for k in ["nose","chin","l_eye","r_eye","l_mouth","r_mouth"]], dtype=np.float64)
                fx = fy = 1.0 * w; cx, cy = w/2.0, h/2.0
                cam = np.array([[fx,0,cx],[0,fy,cy],[0,0,1]], dtype="double")
                ok, rvec, _ = cv2.solvePnP(MODEL_3D, ip, cam, np.zeros((4,1)), flags=cv2.SOLVEPNP_ITERATIVE)
                if ok:
                    R, _ = cv2.Rodrigues(rvec); sy = np.sqrt(R[0,0]**2 + R[1,0]**2)
                    pitch = np.degrees(np.arctan2(-R[2,0], sy))
            except Exception:
                pass
            cv2.putText(frame, f"Pitch:{pitch:.1f}", (10,48), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0),2)

            # ROIs and probabilities
            def crop_box(points, pad=6):
                xs = [p[0] for p in points]; ys = [p[1] for p in points]
                x1,x2 = max(0,min(xs)-pad), min(w,max(xs)+pad)
                y1,y2 = max(0,min(ys)-pad), min(h,max(ys)+pad)
                return frame[y1:y2, x1:x2]

            eye_roi   = crop_box(le+re, pad=10)
            lips_pts  = [tuple(pts[i]) for i in LIPS]
            mouth_roi = crop_box(lips_pts, pad=6)

            if eye_roi.size != 0:
                p_open = predict_prob(models, enhance_eye_roi(eye_roi), eye_model=True)
                p_open_s = p_open_ema.update(p_open)

            # --- normalized MAR using outer eye corners (33, 263) as scale ---
            mouth4 = [tuple(pts[i]) for i in MOUTH_4PT]
            marN_val = MAR_N(mouth4, tuple(pts[33]), tuple(pts[263]))
            cv2.putText(frame, f"MAR_N:{marN_val:.3f}", (10,70), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0),2)

            if mouth_roi.size != 0:
                p_raw = predict_prob(models, mouth_roi, eye_model=False)
                yawn_raw_samples.append(p_raw)
                # polarity is decided after CAL_FRAMES using frames with small MAR_N
                if mouth_polarity == +1:
                    p_yawn = p_raw
                else:
                    p_yawn = 1.0 - p_raw
                p_yawn_s = p_yawn_ema.update(p_yawn)

            cv2.putText(frame, f"Eye(Open)={0.0 if p_open_s is None else p_open_s:.2f}", (220,26), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,0,0),2)
            cv2.putText(frame, f"Yawn={p_yawn_s:.2f}", (220,48), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,0,0),2)

            # ---------- auto-calibration during first CAL_FRAMES ----------
            if frame_idx <= CAL_FRAMES:
                # collect MAR_N for presumed-closed mouth frames (small MAR_N)
                if marN_val is not None:
                    marN_samples_closed.append(marN_val)
            elif frame_idx == CAL_FRAMES + 1:
                # decide polarity: when mouth likely closed, P(yawn) should be low
                if len(yawn_raw_samples) > 5 and len(marN_samples_closed) > 3:
                    mean_raw = float(np.mean(yawn_raw_samples))
                    # if mean_raw is high with closed mouth, invert
                    mouth_polarity = -1 if mean_raw > 0.5 else +1
                # set thresholds: closed median + delta; and a moderate pyawn threshold
                med_marN = float(np.median(marN_samples_closed)) if marN_samples_closed else 0.02
                marN_thresh = np.clip(med_marN + 0.02, 0.04, 0.12)
                pyawn_thresh = 0.55  # conservative default
            # --------------------------------------------------------------

            # Eyes fusion + instant
            if (ear_val is not None) and (p_open_s is not None):
                eye_closed_now = ((ear_val < EAR_THRESH and p_open_s < 0.45) or
                                  (ear_val < INSTANT_EAR and p_open_s < INSTANT_POPEN_MAX))
            elif ear_val is not None:
                eye_closed_now = ear_val < INSTANT_EAR
            elif p_open_s is not None:
                eye_closed_now = p_open_s < INSTANT_POPEN_MAX

            # Yawn fusion using calibrated thresholds
            # If calibration hasn't run yet, fall back to safe defaults.
            mar_thr = locals().get("marN_thresh", 0.08)
            py_thr  = locals().get("pyawn_thresh", 0.55)
            yawn_now = ((marN_val is not None and marN_val > mar_thr) or
                        (p_yawn_s is not None and p_yawn_s > py_thr))

            tilt_now = abs(pitch) > HEAD_PITCH_DEG

            LAST.update({"ear":ear_val, "marN":marN_val, "pitch":pitch, "popen":p_open_s, "pyawn":p_yawn_s})
        else:
            # brief fallback using last good state
            ear_val = LAST["ear"]; marN_val = LAST["marN"]; pitch = LAST["pitch"]
            p_open_s = LAST["popen"]; p_yawn_s = LAST["pyawn"]
            mar_thr = locals().get("marN_thresh", 0.08)
            py_thr  = locals().get("pyawn_thresh", 0.55)
            eye_closed_now = (ear_val is not None and p_open_s is not None and ear_val < EAR_THRESH and p_open_s < 0.45)
            yawn_now = ((p_yawn_s is not None and p_yawn_s > py_thr) or
                        (marN_val is not None and marN_val > mar_thr))
            tilt_now = abs(pitch) > HEAD_PITCH_DEG
            cv2.putText(frame, "Face lost… using last state", (10,92), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)

        # votes
        eye_buf.append(bool(eye_closed_now))
        yawn_buf.append(bool(yawn_now))
        tilt_buf.append(bool(tilt_now))

        reason = None
        if sum(eye_buf)  >= VOTES_TO_ALERT: reason = "Eyes closed"
        elif sum(yawn_buf) >= VOTES_TO_ALERT: reason = "Yawning"
        elif sum(tilt_buf) >= VOTES_TO_ALERT: reason = "Head tilt"

        if reason:
            cv2.putText(frame, f"DROWSINESS ALERT: {reason}", (40,140),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,0,255), 2)
            now = time.time()
            if now - last_alarm > 5.0:
                last_alarm = now
                threading.Thread(target=safe_alarm, args=(ALARM_WAV,), daemon=True).start()

        # write video every frame
        if video_writer is not None:
            video_writer.write(frame)

        # save alert frame
        img_path = ""
        if SAVE_ALERT_IMAGES_ONLY and reason:
            ts = time.strftime("%Y%m%d_%H%M%S")
            img_path = os.path.join(SAVE_DIR, f"alert_{reason.replace(' ','_')}_{ts}_{frame_idx:06d}.jpg")
            cv2.imwrite(img_path, frame, [cv2.IMWRITE_JPEG_QUALITY, 85])

        # CSV log
        ts_now = time.strftime("%Y%m%d_%H%M%S")
        with open(csv_path, "a", newline="") as f:
            w = csv.writer(f)
            w.writerow([ts_now, frame_idx,
                        None if ear_val is None else f"{ear_val:.3f}",
                        None if marN_val is None else f"{marN_val:.3f}",
                        None if pitch   is None else f"{pitch:.1f}",
                        None if p_open_ema.v is None else f"{p_open_ema.v:.3f}",
                        None if p_yawn_ema.v is None else f"{p_yawn_ema.v:.3f}",
                        "" if reason is None else reason,
                        img_path])

        show_frame(frame)
        if want_stop():
            break

    if video_writer is not None:
        video_writer.release()
        print(f"[Video] Saved: {video_path}")
    if not IN_COLAB:
        cv2.destroyAllWindows()

# ===================== MAIN =============================
def main():
    global TARGET_W, TARGET_H, TARGET_FPS, CAPTURE_SECONDS, SAMPLED_FRAMES
    models = load_models()
    landmarker = build_landmarker()

    cap = None if IN_COLAB else open_camera_robust()
    if (cap is not None) and cap.isOpened():
        def webcam_frames():
            while True:
                ok, f = cap.read()
                if not ok: break
                yield f
        try:
            process_frames_iter(webcam_frames(), models, landmarker)
        finally:
            cap.release()
        return

    if IN_COLAB:
        print("[Info] Colab: capturing 10 s then sampling 40 frames …")
        frames = capture_10s_then_sample_40(quality=0.8)
        process_frames_iter(frames, models, landmarker)
        return

    raise RuntimeError("Cannot open webcam. If local, check camera permissions or indices/backends.")

if __name__ == "__main__":
    main()


Output hidden; open in https://colab.research.google.com to view.

In [None]:
# Colab webcam capture helper (grabs N frames through JS)
from IPython.display import Javascript, display
from google.colab.output import eval_js
import numpy as np, cv2, base64
from PIL import Image
from io import BytesIO

def capture_frames_colab(n_frames=120, quality=0.8):
    js = Javascript('''
    async function captureFrames(n, quality) {
      const stream = await navigator.mediaDevices.getUserMedia({video: true});
      const video = document.createElement('video');
      document.body.appendChild(video);
      video.srcObject = stream;
      await video.play();
      const canvas = document.createElement('canvas');
      canvas.width = video.videoWidth;
      canvas.height = video.videoHeight;
      const ctx = canvas.getContext('2d');

      let frames = [];
      for (let i=0; i<n; i++) {
        ctx.drawImage(video, 0, 0);
        const dataURL = canvas.toDataURL('image/jpeg', quality);
        frames.push(dataURL);
        await new Promise(r => setTimeout(r, 33)); // ~30 fps
      }
      stream.getTracks().forEach(t => t.stop());
      video.remove();
      return frames;
    }''')
    display(js)
    data = eval_js(f'captureFrames({n_frames}, {quality})')
    frames = []
    for d in data:
        b = d.split(',')[1]
        img = Image.open(BytesIO(base64.b64decode(b))).convert('RGB')
        frames.append(cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR))
    return frames

# Example usage:
frames = capture_frames_colab(180, 0.8)   # capture ~6 seconds
len(frames), frames[0].shape


<IPython.core.display.Javascript object>

(180, (480, 640, 3))

In [None]:
# drowsiness_realtime_fast_fix_video_10s40f.py
# Colab: capture 10 s in the browser, then uniformly sample 40 frames.
# Local: webcam stream. Saves continuous annotated MP4 + images ONLY on alerts.

import os, sys, time, threading, urllib.request, csv, collections, base64
import numpy as np
import cv2
import mediapipe as mp
from io import BytesIO
from PIL import Image

# ===================== USER PATHS ======================
EYE_MODEL_H5   = "/content/drive/MyDrive/Colab Notebooks/tools & technique/resume & porfolio projects/drowniss_project/eye_open_closed.h5"
YAWN_MODEL_H5  = "/content/drive/MyDrive/Colab Notebooks/tools & technique/resume & porfolio projects/drowniss_project/yawn_no_yawn.h5"

USE_TFLITE     = False
EYE_TFLITE     = "/content/drive/MyDrive/Colab Notebooks/tools & technique/resume & porfolio projects/drowniss_project/eye_open_closed.tflite"
YAWN_TFLITE    = "/content/drive/MyDrive/Colab Notebooks/tools & technique/resume & porfolio projects/drowniss_project/yawn_no_yawn.tflite"

ALARM_WAV      = "/content/drive/MyDrive/Colab Notebooks/tools & technique/resume & porfolio projects/drowniss_project/mixkit-classic-alarm-995.wav"

ROOT_SAVE      = "/content/drive/MyDrive/Colab Notebooks/tools & technique/resume & porfolio projects/drowniss_project/images"
os.makedirs(ROOT_SAVE, exist_ok=True)

# Session folder
SESSION_TS   = time.strftime("%Y%m%d_%H%M%S")
SAVE_DIR     = os.path.join(ROOT_SAVE, f"session_{SESSION_TS}")
os.makedirs(SAVE_DIR, exist_ok=True)

# Save policy
SAVE_ALERT_IMAGES_ONLY = True
VIDEO_FPS_DISK         = 30

# ===================== CAPTURE SETTINGS (Colab) =========
TARGET_W, TARGET_H, TARGET_FPS = 640, 480, 30
CAPTURE_SECONDS  = 10          # browser capture length
SAMPLED_FRAMES   = 40          # uniformly sampled frames from those 10 s

# ===================== THRESHOLDS ======================
IMG_SIZE = 224
EAR_THRESH      = 0.22
MAR_THRESH      = 0.60
HEAD_PITCH_DEG  = 25.0

# Instant triggers
INSTANT_EAR        = 0.16
INSTANT_POPEN_MAX  = 0.40
INSTANT_MAR        = 0.80
INSTANT_PYAWN_MIN  = 0.75

# Short rolling vote window (works with 40 frames)
WIN = 5
VOTES_TO_ALERT = 3

MOUTH_MODEL_RETURNS_P_YAWN = False

# ===================== ENV / UI HELPERS =================
def in_colab():
    try:
        import google.colab  # type: ignore
        return True
    except Exception:
        return False

IN_COLAB = in_colab()

def show_frame(frame, win="Drowsiness Detector"):
    if IN_COLAB:
        from google.colab.patches import cv2_imshow
        cv2_imshow(frame)
    else:
        cv2.imshow(win, frame)

def want_stop():
    if IN_COLAB:
        return False
    k = cv2.waitKey(1) & 0xFF
    return k in (27, ord('q'))

def safe_alarm(path):
    try:
        if IN_COLAB:
            from IPython.display import Audio, display
            if os.path.exists(path):
                display(Audio(path, autoplay=True))
            else:
                print("\a"); time.sleep(0.2); print("\a")
        else:
            from playsound import playsound
            if os.path.exists(path):
                playsound(path)
            else:
                print("\a"); time.sleep(0.2); print("\a")
    except Exception:
        for _ in range(3):
            print("\a"); time.sleep(0.2)

# ===================== MODELS ===========================
def load_models():
    try:
        if USE_TFLITE:
            import tensorflow as tf
            eye_inter  = tf.lite.Interpreter(model_path=EYE_TFLITE);  eye_inter.allocate_tensors()
            yawn_inter = tf.lite.Interpreter(model_path=YAWN_TFLITE); yawn_inter.allocate_tensors()
            return dict(mode="tflite",
                        eye=eye_inter,  eye_in=eye_inter.get_input_details(),  eye_out=eye_inter.get_output_details(),
                        mouth=yawn_inter, mouth_in=yawn_inter.get_input_details(), mouth_out=yawn_inter.get_output_details())
        else:
            from tensorflow.keras.models import load_model
            eye_model  = load_model(EYE_MODEL_H5,  compile=False)
            yawn_model = load_model(YAWN_MODEL_H5, compile=False)
            return dict(mode="keras", eye=eye_model, mouth=yawn_model)
    except Exception as e:
        raise RuntimeError(f"[ModelLoad] Failed to load CNNs: {e}")

def predict_prob(model_pack, roi, eye_model=True):
    x = cv2.resize(roi, (IMG_SIZE, IMG_SIZE)).astype("float32")/255.0
    x = np.expand_dims(x, 0)
    if model_pack["mode"] == "tflite":
        inter  = model_pack["eye"] if eye_model else model_pack["mouth"]
        in_d   = model_pack["eye_in"] if eye_model else model_pack["mouth_in"]
        out_d  = model_pack["eye_out"] if eye_model else model_pack["mouth_out"]
        inter.set_tensor(in_d[0]["index"], x)
        inter.invoke()
        return float(inter.get_tensor(out_d[0]["index"])[0][0])
    else:
        model = model_pack["eye"] if eye_model else model_pack["mouth"]
        return float(model.predict(x, verbose=0)[0][0])

# ===================== MEDIAPIPE TASK: LANDMARKER ======
from mediapipe.tasks import python as mp_tasks
from mediapipe.tasks.python import vision as mp_vision

TASK_PATH = "face_landmarker.task"
TASK_URL  = "https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task"

def ensure_task_model():
    if not os.path.exists(TASK_PATH):
        urllib.request.urlretrieve(TASK_URL, TASK_PATH)

def build_landmarker():
    ensure_task_model()
    BaseOptions = mp_tasks.BaseOptions
    FaceLandmarkerOptions = mp_vision.FaceLandmarkerOptions
    FaceLandmarker = mp_vision.FaceLandmarker
    VisionRunningMode = mp_vision.RunningMode
    options = FaceLandmarkerOptions(
        base_options=BaseOptions(model_asset_path=TASK_PATH),
        running_mode=VisionRunningMode.IMAGE,
        num_faces=1
    )
    return FaceLandmarker.create_from_options(options)

def detect_landmarks_468(face_landmarker, frame_bgr):
    h, w = frame_bgr.shape[:2]
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB,
                        data=cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB))
    res = face_landmarker.detect(mp_image)
    if not res.face_landmarks:
        return None
    pts = [(int(p.x*w), int(p.y*h)) for p in res.face_landmarks[0]]
    return np.array(pts, dtype=np.int32)

# ===================== GEOMETRY / INDICES ===============
LEFT_EYE  = [33,160,158,133,153,144]
RIGHT_EYE = [263,387,385,362,380,373]
MOUTH_4PT = [13,14,78,308]
LIPS      = [61,291,0,17,13,14,78,308,81,311,402,318,82,312,87,317,178,88,95,185]

HP_IDX = {"nose":1,"chin":199,"l_eye":33,"r_eye":263,"l_mouth":78,"r_mouth":308}
MODEL_3D = np.array([
    (0,0,0),(0,-330,-65),(-225,170,-135),(225,170,-135),(-150,-150,-125),(150,-150,-125)
], dtype=np.float64)

def euclid(a,b): return np.linalg.norm(np.array(a)-np.array(b))
def EAR(eye_pts):
    p1,p2,p3,p4,p5,p6 = eye_pts
    A = euclid(p2,p6); B = euclid(p3,p5); C = euclid(p1,p4)
    return 0.0 if C==0 else (A+B)/(2.0*C)
def MAR(m4):
    top,bottom,left,right = m4
    v,h = euclid(top,bottom), euclid(left,right)
    return 0.0 if h==0 else v/h

# ===================== VIDEO IO HELPERS ==================
def _set_cam_props(cap):
    try:
        cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG"))
        cap.set(cv2.CAP_PROP_FRAME_WIDTH,  TARGET_W)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, TARGET_H)
        cap.set(cv2.CAP_PROP_FPS,          TARGET_FPS)
    except Exception:
        pass

def open_camera_robust(preferred=(0,1,2,3)):
    backend_flags = []
    if sys.platform.startswith("win"):
        backend_flags = [cv2.CAP_DSHOW, cv2.CAP_MSMF]
    elif sys.platform == "darwin":
        backend_flags = [cv2.CAP_AVFOUNDATION]
    else:
        backend_flags = [cv2.CAP_V4L2]
    for idx in preferred:
        cap = cv2.VideoCapture(idx)
        if cap.isOpened():
            _set_cam_props(cap);  return cap
        cap.release()
        for be in backend_flags:
            cap = cv2.VideoCapture(idx, be)
            if cap.isOpened():
                _set_cam_props(cap);  return cap
            cap.release()
    return None

# ===================== COLAB 10s -> 40f CAPTURE =========
def capture_10s_then_sample_40(quality=0.8):
    """Robust 10 s browser capture; uniformly sample exactly 40 frames."""
    from IPython.display import Javascript, display
    from google.colab.output import eval_js

    js = Javascript(r"""
      async function robustCapture(seconds, targetW, targetH, targetFps, quality){
        const stream = await navigator.mediaDevices.getUserMedia({
          video: { width:{ideal:targetW}, height:{ideal:targetH},
                   frameRate:{ideal:targetFps, max:targetFps} },
          audio: false
        });

        const video = document.createElement('video');
        video.style.display = 'none';
        document.body.appendChild(video);
        video.srcObject = stream;
        await video.play();

        // Wait for real dimensions
        let tries = 0;
        while ((video.videoWidth === 0 || video.videoHeight === 0) && tries < 60) {
          await new Promise(r => setTimeout(r, 100));
          tries++;
        }
        if (video.videoWidth === 0 || video.videoHeight === 0) {
          stream.getTracks().forEach(t=>t.stop());
          video.remove();
          throw new Error("Camera not ready (no dimensions).");
        }

        const canvas = document.createElement('canvas');
        canvas.width = video.videoWidth;
        canvas.height = video.videoHeight;
        const ctx = canvas.getContext('2d');

        const frames = [];
        const start = performance.now();
        const hardStop = start + (seconds + 2)*1000;
        let last = start;
        const hasRVFC = ('requestVideoFrameCallback' in HTMLVideoElement.prototype);

        function grab(){
          ctx.drawImage(video, 0, 0);
          frames.push(canvas.toDataURL('image/jpeg', quality));
        }

        async function loop(){
          while (true){
            const now = performance.now();
            if (now - start >= seconds*1000) break;
            if (now > hardStop) break;

            const elapsed = now - last;
            if (elapsed >= 1000/30 - 1) { grab(); last = now; }

            if (hasRVFC) {
              await new Promise(res => video.requestVideoFrameCallback(()=>res()));
            } else {
              await new Promise(res => setTimeout(res, 10));
            }
          }
        }

        try { await loop(); } finally {
          stream.getTracks().forEach(t=>t.stop());
          video.remove();
        }
        return {frames: frames, w: canvas.width, h: canvas.height};
      }
    """)
    display(js)

    data = eval_js(f"robustCapture({CAPTURE_SECONDS}, {TARGET_W}, {TARGET_H}, {TARGET_FPS}, {quality})")
    raw = []
    for d in data['frames']:
        b = d.split(',')[1]
        img = Image.open(BytesIO(base64.b64decode(b))).convert('RGB')
        raw.append(cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR))

    if len(raw) == 0:
        raise RuntimeError("No frames captured. Keep the Colab tab focused; allow camera; close other apps using camera.")

    # Uniform downsample to SAMPLED_FRAMES
    idxs = np.linspace(0, len(raw)-1, num=SAMPLED_FRAMES, dtype=int)
    return [raw[i] for i in idxs]

# ===================== LIGHT ENHANCEMENT =================
def enhance_full(frame):
    if frame.mean() < 60:
        yuv = cv2.cvtColor(frame, cv2.COLOR_BGR2YUV)
        yuv[:,:,0] = cv2.equalizeHist(yuv[:,:,0])
        return cv2.cvtColor(yuv, cv2.COLOR_YUV2BGR)
    return frame

def enhance_eye_roi(roi_bgr):
    return roi_bgr

class EMA:
    def __init__(self, alpha=0.7):
        self.alpha = alpha; self.v = None
    def update(self, x):
        self.v = x if self.v is None else self.alpha*self.v + (1-self.alpha)*x
        return self.v

# ===================== LOGGING ==========================
def init_csv_log(csv_path):
    if not os.path.exists(csv_path):
        with open(csv_path, "w", newline="") as f:
            w = csv.writer(f)
            w.writerow(["time", "frame_idx", "EAR", "MAR", "Pitch", "p_open_s", "p_yawn_s", "reason", "saved_path"])

# ===================== CORE LOOP ========================
def process_frames_iter(frames_iter, models, face_landmarker, win="Drowsiness Detector"):
    last_alarm = 0.0
    p_open_ema = EMA(0.7); p_yawn_ema = EMA(0.7)

    csv_path = os.path.join(SAVE_DIR, "session_metrics.csv")
    init_csv_log(csv_path)
    frame_idx = 0

    # Annotated video writer
    video_path = os.path.join(SAVE_DIR, f"session_{SESSION_TS}_annotated.mp4")
    video_writer = None
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")

    # rolling votes
    eye_buf  = collections.deque(maxlen=WIN)
    yawn_buf = collections.deque(maxlen=WIN)
    tilt_buf = collections.deque(maxlen=WIN)

    LAST = {"ear":None, "mar":None, "pitch":0.0, "popen":None, "pyawn":0.0}

    for frame in frames_iter:
        if frame is None: break
        frame_idx += 1
        frame = enhance_full(cv2.flip(frame, 1))
        h, w = frame.shape[:2]

        if video_writer is None:
            video_writer = cv2.VideoWriter(video_path, fourcc, VIDEO_FPS_DISK, (w, h))
            print(f"[Video] Writing annotated MP4 to: {video_path}")

        pts = detect_landmarks_468(face_landmarker, frame)
        ear_val = mar_val = None
        pitch = 0.0
        eye_closed_now = False
        yawn_now = False
        tilt_now = False

        if pts is not None:
            le = [tuple(pts[i]) for i in LEFT_EYE]
            re = [tuple(pts[i]) for i in RIGHT_EYE]
            ear_val = (EAR(le) + EAR(re))/2.0
            cv2.putText(frame, f"EAR:{ear_val:.2f}", (10,26), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0),2)

            mouth4 = [tuple(pts[i]) for i in MOUTH_4PT]
            mar_val = MAR(mouth4)
            cv2.putText(frame, f"MAR:{mar_val:.2f}", (10,48), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0),2)

            try:
                ip = np.array([pts[HP_IDX[k]] for k in ["nose","chin","l_eye","r_eye","l_mouth","r_mouth"]], dtype=np.float64)
                fx = fy = 1.0 * w; cx, cy = w/2.0, h/2.0
                cam = np.array([[fx,0,cx],[0,fy,cy],[0,0,1]], dtype="double")
                ok, rvec, _ = cv2.solvePnP(MODEL_3D, ip, cam, np.zeros((4,1)), flags=cv2.SOLVEPNP_ITERATIVE)
                if ok:
                    R, _ = cv2.Rodrigues(rvec)
                    sy = np.sqrt(R[0,0]**2 + R[1,0]**2)
                    pitch = np.degrees(np.arctan2(-R[2,0], sy))
                cv2.putText(frame, f"Pitch:{pitch:.1f}", (10,70), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0),2)
            except Exception:
                pass

            def crop_box(points, pad=6):
                xs = [p[0] for p in points]; ys = [p[1] for p in points]
                x1,x2 = max(0,min(xs)-pad), min(w,max(xs)+pad)
                y1,y2 = max(0,min(ys)-pad), min(h,max(ys)+pad)
                return frame[y1:y2, x1:x2]

            eye_roi   = crop_box(le+re, pad=10)
            lips_pts  = [tuple(pts[i]) for i in LIPS]
            mouth_roi = crop_box(lips_pts, pad=6)

            p_open_s = None
            if eye_roi.size != 0:
                p_open = predict_prob(models, enhance_eye_roi(eye_roi), eye_model=True)
                p_open_s = p_open_ema.update(p_open)
                cv2.putText(frame, f"Eye(Open)={p_open_s:.2f}", (220,26), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,0,0),2)

            p_yawn_s = 0.0
            if mouth_roi.size != 0:
                p_raw = predict_prob(models, mouth_roi, eye_model=False)
                p_yawn = (p_raw if MOUTH_MODEL_RETURNS_P_YAWN else (1.0 - p_raw))
                if mar_val is not None and mar_val < 0.20:
                    p_yawn = 0.0
                p_yawn_s = p_yawn_ema.update(p_yawn)
                cv2.putText(frame, f"Yawn={p_yawn_s:.2f}", (220,48), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,0,0),2)

            # Instant rules for this frame
            if ear_val is not None and p_open_s is not None:
                eye_closed_now = ((ear_val < EAR_THRESH and p_open_s < 0.45) or
                                  (ear_val < INSTANT_EAR and p_open_s < INSTANT_POPEN_MAX))
            elif ear_val is not None:
                eye_closed_now = ear_val < INSTANT_EAR
            elif p_open_s is not None:
                eye_closed_now = p_open_s < INSTANT_POPEN_MAX

            yawn_now = ((mar_val is not None and mar_val > MAR_THRESH) or (p_yawn_s > 0.5) or
                        (mar_val is not None and mar_val > INSTANT_MAR) or (p_yawn_s > INSTANT_PYAWN_MIN))

            tilt_now = abs(pitch) > HEAD_PITCH_DEG

            LAST.update({"ear":ear_val, "mar":mar_val, "pitch":pitch, "popen":p_open_s, "pyawn":p_yawn_s})
        else:
            # brief fallback
            ear_val = LAST["ear"]; mar_val = LAST["mar"]; pitch = LAST["pitch"]
            p_open_s = LAST["popen"]; p_yawn_s = LAST["pyawn"]
            eye_closed_now = (ear_val is not None and p_open_s is not None and ear_val < EAR_THRESH and p_open_s < 0.45)
            yawn_now = (p_yawn_s is not None and p_yawn_s > 0.5) or (mar_val is not None and mar_val > MAR_THRESH)
            tilt_now = abs(pitch) > HEAD_PITCH_DEG
            cv2.putText(frame, "Face lost… using last state", (10,92), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)

        # votes
        eye_buf.append(eye_closed_now)
        yawn_buf.append(yawn_now)
        tilt_buf.append(tilt_now)

        reason = None
        if sum(eye_buf)  >= VOTES_TO_ALERT: reason = "Eyes closed"
        elif sum(yawn_buf) >= VOTES_TO_ALERT: reason = "Yawning"
        elif sum(tilt_buf) >= VOTES_TO_ALERT: reason = "Head tilt"

        if reason:
            cv2.putText(frame, f"DROWSINESS ALERT: {reason}", (40,140),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,0,255), 2)
            now = time.time()
            if now - last_alarm > 5.0:
                last_alarm = now
                threading.Thread(target=safe_alarm, args=(ALARM_WAV,), daemon=True).start()

        # write video every frame
        if video_writer is not None:
            video_writer.write(frame)

        # save alert frame
        img_path = ""
        if SAVE_ALERT_IMAGES_ONLY and reason:
            ts = time.strftime("%Y%m%d_%H%M%S")
            img_path = os.path.join(SAVE_DIR, f"alert_{reason.replace(' ','_')}_{ts}_{frame_idx:06d}.jpg")
            cv2.imwrite(img_path, frame, [cv2.IMWRITE_JPEG_QUALITY, 85])

        # CSV log
        ts_now = time.strftime("%Y%m%d_%H%M%S")
        with open(csv_path, "a", newline="") as f:
            w = csv.writer(f)
            w.writerow([ts_now, frame_idx,
                        None if ear_val is None else f"{ear_val:.3f}",
                        None if mar_val is None else f"{mar_val:.3f}",
                        None if pitch   is None else f"{pitch:.1f}",
                        None if p_open_ema.v is None else f"{p_open_ema.v:.3f}",
                        None if p_yawn_ema.v is None else f"{p_yawn_ema.v:.3f}",
                        "" if reason is None else reason,
                        img_path])

        show_frame(frame)
        if want_stop():
            break

    if video_writer is not None:
        video_writer.release()
        print(f"[Video] Saved: {video_path}")
    if not IN_COLAB:
        cv2.destroyAllWindows()

# ===================== MAIN =============================
def main():
    models = load_models()
    landmarker = build_landmarker()

    cap = None if IN_COLAB else open_camera_robust()
    if (cap is not None) and cap.isOpened():
        def webcam_frames():
            while True:
                ok, f = cap.read()
                if not ok: break
                yield f
        try:
            process_frames_iter(webcam_frames(), models, landmarker)
        finally:
            cap.release()
        return

    if IN_COLAB:
        print("[Info] Colab: capturing 10 s then sampling 40 frames …")
        frames = capture_10s_then_sample_40(quality=0.8)
        process_frames_iter(frames, models, landmarker)
        return

    raise RuntimeError("Cannot open webcam. If local, check camera permissions or indices/backends.")

if __name__ == "__main__":
    main()


Output hidden; open in https://colab.research.google.com to view.