<a href="https://colab.research.google.com/github/NatalieGergov/Fatigue_Detection/blob/main/Sequence_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Feature Extraction

In [None]:
# Install libraries and packages
!pip install opencv-python mediapipe torch torchvision scikit-learn optuna

import cv2
import numpy as np
import mediapipe as mp

Collecting mediapipe
  Downloading mediapipe-0.10.21-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
Collecting optuna
  Downloading optuna-4.4.0-py3-none-any.whl.metadata (17 kB)
INFO: pip is looking at multiple versions of mediapipe to determine which version is compatible with other requirements. This could take a while.
Collecting mediapipe
  Downloading mediapipe-0.10.20-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
  Downloading mediapipe-0.10.18-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
  Downloading mediapipe-0.10.15-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
  Downloading mediapipe-0.10.14-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
Collecting protobuf<5,>=4.25.3 (from mediapipe)
  Downloading protobuf-4.25.8-cp37-abi3-manylinux2014_x86_64.whl.metadata (541 bytes)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.2-py3-none-any.whl.

In [None]:
def dist(a, b):
    return np.linalg.norm(np.array([a.x, a.y]) - np.array([b.x, b.y]))

def calculate_ear(landmarks):
    # EAR left
    eye = [landmarks[i] for i in [33, 160, 158, 133, 153, 144]]
    A = dist(eye[1], eye[5])
    B = dist(eye[2], eye[4])
    C = dist(eye[0], eye[3])
    ear_left = (A + B) / (2.0 * C)

    # EAR right
    eye = [landmarks[i] for i in [362, 385, 387, 263, 373, 380]]
    A = dist(eye[1], eye[5])
    B = dist(eye[2], eye[4])
    C = dist(eye[0], eye[3])
    ear_right = (A + B) / (2.0 * C)

    ear = (ear_left + ear_right) / 2.0
    return ear

def calculate_mar(landmarks):
    mar = dist(landmarks[13], landmarks[14]) / dist(landmarks[61], landmarks[291])
    return mar


In [None]:
def extract_face_mesh(video_path, frame_skip=1):

    # Open up facemesh
    face_mesh = mp.solutions.face_mesh.FaceMesh(
        max_num_faces=1,
        refine_landmarks=True,
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5
    )

    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    features = []
    frame_index = 0

    while cap.isOpened() and frame_index < 3000:
        ret, frame = cap.read()
        if not ret:
            break

        if int(cap.get(cv2.CAP_PROP_POS_FRAMES)) % frame_skip != 0:
            continue

        img_rgb = frame[:, :, ::-1]
        img_height, img_width, _ = img_rgb.shape
        landmarks = face_mesh.process(img_rgb)

        if landmarks.multi_face_landmarks:
            landmarks_list = landmarks.multi_face_landmarks[0].landmark

            # Raw landmark coords
            coords = np.array([[lm.x, lm.y, lm.z] for lm in landmarks_list]).flatten()

            ear = calculate_ear(landmarks_list)
            mar = calculate_mar(landmarks_list)

            final_features = np.concatenate([coords, [ear, mar]])

            features.append(final_features)
        else:
            # Append a zero array with the same shape as final_features when no face is detected
            features.append(np.zeros(1436))

        frame_index += 1

    cap.release()
    return np.array(features), fps

In [None]:
# Build the classification dataset, with assigned binary labels
def build_classification_dataset(video_paths, kss_scores, chunk_size=150):
    X, y = [], []

    for path, score in zip(video_paths, kss_scores):

        print(f"Processing video: {path}")

        label = int(score > 6)

        tensor, fps = extract_face_mesh(path)
        #chunks = chunk_tensor(tensor, chunk_size)

        if tensor.shape[0] > 0:
            X.append(tensor)
            y.append(label)

    return np.array(X), np.array(y)

### Model

In [None]:
# Define LTSM and Transformer Models
import torch
import torch.nn as nn

class LSTMClassifier(nn.Module):
    def __init__(self, input_dim=1436, hidden_dim=128, num_layers=1):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True, num_layers=num_layers)
        self.fc = nn.Linear(hidden_dim, 1)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        _, (hn, _) = self.lstm(x)
        out = self.dropout(hn[-1])
        return torch.sigmoid(self.fc(hn[-1])).squeeze(1)

class TransformerClassifier(nn.Module):
    def __init__(self, input_dim=1436, d_model=256, nhead=8, num_layers=2):
        super().__init__()

        if d_model % nhead != 0:
            for h in range(nhead, 0, -1):
                if d_model % h == 0:
                    nhead = h
                    break

        self.proj = nn.Linear(input_dim, d_model)
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(d_model, 1)

    def forward(self, x):
        x = self.proj(x).permute(1, 0, 2)  # (seq, batch, d_model)
        x = self.encoder(x)
        return torch.sigmoid(self.fc(x[-1])).squeeze(1)

In [None]:
from sklearn.metrics import f1_score, accuracy_score

def train_classifier(model, train_loader, val_loader, epochs=5, lr=1e-3, device='cuda'):
    model.to(device)
    loss_fn = nn.BCELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    for _ in range(epochs):
        model.train()
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.float().to(device)
            pred = model(xb)
            loss = loss_fn(pred, yb)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    # Evaluation
    model.eval()
    y_true, y_pred = [], []
    with torch.no_grad():
        for xb, yb in val_loader:
            xb = xb.to(device)
            preds = model(xb).cpu().numpy()
            y_true.extend(yb.numpy())
            y_pred.extend((preds > 0.5).astype(int))

    acc = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)
    return acc, f1, model

## Data Acquistion

In [None]:
# Mount google drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
video_folder = "/content/drive/MyDrive/DROZY/videos_i8"
video_paths = []
for i in range(1, 15):
    for j in range(1, 4):
        video_path = f"{video_folder}/{i}-{j}.mp4"
        video_paths.append(video_path)

kss_file_path = '/content/drive/MyDrive/DROZY/KSS.txt'
kss_scores = []

with open(kss_file_path, 'r') as f:
    for line in f:
        numbers = list(map(float, line.strip().split()))
        kss_scores.append(numbers)

kss_scores = [int(num) for sublist in kss_scores for num in sublist]
print(kss_scores)

[3, 6, 7, 3, 7, 6, 2, 3, 4, 4, 8, 9, 3, 7, 8, 2, 3, 7, 0, 4, 9, 2, 6, 8, 2, 6, 8, 3, 6, 7, 4, 7, 7, 2, 5, 6, 6, 3, 7, 5, 7, 8]


In [None]:
# Data setup before tuning
from sklearn.model_selection import train_test_split
import torch

X, y = build_classification_dataset(video_paths, kss_scores, chunk_size=150)

X_tensor = torch.tensor(X, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.long)

X_train, X_val, y_train, y_val = train_test_split(X_tensor, y_tensor, test_size=0.2, stratify=y)

Processing video: /content/drive/MyDrive/DROZY/videos_i8/1-1.mp4




Processing video: /content/drive/MyDrive/DROZY/videos_i8/1-2.mp4
Processing video: /content/drive/MyDrive/DROZY/videos_i8/1-3.mp4
Processing video: /content/drive/MyDrive/DROZY/videos_i8/2-1.mp4
Processing video: /content/drive/MyDrive/DROZY/videos_i8/2-2.mp4
Processing video: /content/drive/MyDrive/DROZY/videos_i8/2-3.mp4
Processing video: /content/drive/MyDrive/DROZY/videos_i8/3-1.mp4
Processing video: /content/drive/MyDrive/DROZY/videos_i8/3-2.mp4
Processing video: /content/drive/MyDrive/DROZY/videos_i8/3-3.mp4
Processing video: /content/drive/MyDrive/DROZY/videos_i8/4-1.mp4
Processing video: /content/drive/MyDrive/DROZY/videos_i8/4-2.mp4
Processing video: /content/drive/MyDrive/DROZY/videos_i8/4-3.mp4
Processing video: /content/drive/MyDrive/DROZY/videos_i8/5-1.mp4
Processing video: /content/drive/MyDrive/DROZY/videos_i8/5-2.mp4
Processing video: /content/drive/MyDrive/DROZY/videos_i8/5-3.mp4
Processing video: /content/drive/MyDrive/DROZY/videos_i8/6-1.mp4
Processing video: /conten

## Hyperparameter Tuning

In [None]:
def objective(trial):
    model_type = trial.suggest_categorical("model_type", ["lstm", "transformer"])
    hidden_dim = trial.suggest_int("hidden_dim", 64, 512)
    num_layers = trial.suggest_int("num_layers", 1, 4)
    lr = trial.suggest_loguniform("lr", 1e-5, 1e-2) #trial.suggest_float("lr", 1e-5, 1e-2, log=True)
    batch_size = trial.suggest_categorical("batch_size", [8, 16, 32])

    # Transformer-specific constraint
    if model_type == "transformer":
        num_heads = 6  # CHANGE THIS if your model uses a different number of heads
        if hidden_dim % num_heads != 0:
            raise optuna.TrialPruned()

        model = TransformerClassifier(
            input_dim=1436,
            d_model=hidden_dim,
            num_layers=num_layers,
            nhead=num_heads  # Make sure your model accepts this argument
        )
    else:
        model = LSTMClassifier(
            input_dim=1436,
            hidden_dim=hidden_dim,
            num_layers=num_layers
        )

    train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=batch_size)

    acc, f1, _ = train_classifier(model, train_loader, val_loader, epochs=5, lr=lr)
    return 1 - f1  # minimize (to maximize F1)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [None]:
# Run Optuna search
import optuna
from torch.utils.data import DataLoader, TensorDataset

study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=20)

print("Best F1 score:", 1 - study.best_value)
print("Best params:", study.best_params)

[I 2025-07-28 22:11:52,367] A new study created in memory with name: no-name-479eafa9-c37b-45e5-8c16-097a7c187261
  lr = trial.suggest_loguniform("lr", 1e-5, 1e-2) #trial.suggest_float("lr", 1e-5, 1e-2, log=True)
[I 2025-07-28 22:11:58,093] Trial 0 finished with value: 0.4545454545454546 and parameters: {'model_type': 'transformer', 'hidden_dim': 222, 'num_layers': 1, 'lr': 0.000828333923548531, 'batch_size': 16}. Best is trial 0 with value: 0.4545454545454546.
  lr = trial.suggest_loguniform("lr", 1e-5, 1e-2) #trial.suggest_float("lr", 1e-5, 1e-2, log=True)
[I 2025-07-28 22:12:00,366] Trial 1 finished with value: 1.0 and parameters: {'model_type': 'lstm', 'hidden_dim': 100, 'num_layers': 1, 'lr': 0.0025034876603867956, 'batch_size': 16}. Best is trial 0 with value: 0.4545454545454546.
  lr = trial.suggest_loguniform("lr", 1e-5, 1e-2) #trial.suggest_float("lr", 1e-5, 1e-2, log=True)
[I 2025-07-28 22:12:00,368] Trial 2 pruned. 
[I 2025-07-28 22:12:00,371] Trial 3 pruned. 
[I 2025-07-28 

Best F1 score: 0.5454545454545454
Best params: {'model_type': 'transformer', 'hidden_dim': 222, 'num_layers': 1, 'lr': 0.000828333923548531, 'batch_size': 16}


## Run and evaluate model

In [None]:
# Final retraining with best model
best = study.best_params
if best["model_type"] == "lstm":
    model = LSTMClassifier(1436, best["hidden_dim"], best["num_layers"])
else:
    model = TransformerClassifier(1436, best["hidden_dim"], num_layers=best["num_layers"])

train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=best["batch_size"], shuffle=True)
val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=best["batch_size"])

acc, f1, model = train_classifier(model, train_loader, val_loader, epochs=10, lr=best["lr"])
print(f"Retrained Accuracy: {acc:.3f}, F1: {f1:.3f}")



Retrained Accuracy: 0.375, F1: 0.545


In [None]:
print("Label distribution:", np.bincount(y_train))

Label distribution: [16 12]


In [None]:
from sklearn.metrics import confusion_matrix
import torch

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for xb, yb in val_loader:
        xb = xb.to(next(model.parameters()).device)
        preds = model(xb).round().cpu().numpy()
        all_preds.extend(preds)
        all_labels.extend(yb.cpu().numpy())

print(confusion_matrix(all_labels, all_preds))

[[0 5]
 [0 3]]
