<a href="https://colab.research.google.com/github/Ummmahek/Driver-drowsiness-detection/blob/main/YOLOFACEMARK.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install opencv-python



In [3]:
# YOLOFaceMark on NTHU-DDD Dataset (with YOLOFaceMark model and EAR/MAR classification)

import os
import cv2
import glob
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import ToTensor
from tqdm import tqdm
import requests
import zipfile

# --- 1. DOWNLOAD AND EXTRACT DATASET ---
data_url = "https://universe.roboflow.com/ds/p8jjDsbFGl?key=Bwh2h28aje"
dataset_path = "nthu_ddd_dataset.zip"
extract_path = "nthu_ddd_dataset"

if not os.path.exists(extract_path):
    print("Downloading dataset...")
    response = requests.get(data_url, stream=True)
    with open(dataset_path, "wb") as file:
        for chunk in response.iter_content(chunk_size=1024):
            file.write(chunk)
    print("Extracting dataset...")
    with zipfile.ZipFile(dataset_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    print("Dataset extracted to:", extract_path)



Downloading dataset...
Extracting dataset...
Dataset extracted to: nthu_ddd_dataset


In [2]:
!pip install dlib



In [None]:
# YOLOFaceMark on NTHU-DDD Dataset (Complete Pipeline: YOLOFaceMark + Dlib Landmark Bootstrapping + EAR/MAR Drowsiness Classification)

import os
import cv2
import glob
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import ToTensor
from tqdm import tqdm
import requests
import zipfile
import dlib

# --- 1. DOWNLOAD AND EXTRACT DATASET ---
data_url = "https://universe.roboflow.com/ds/p8jjDsbFGl?key=Bwh2h28aje"
dataset_path = "nthu_ddd_dataset.zip"
extract_path = "nthu_ddd_dataset"

if not os.path.exists(extract_path):
    print("Downloading dataset...")
    response = requests.get(data_url, stream=True)
    with open(dataset_path, "wb") as file:
        for chunk in response.iter_content(chunk_size=1024):
            file.write(chunk)
    print("Extracting dataset...")
    with zipfile.ZipFile(dataset_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    print("Dataset extracted to:", extract_path)

# --- 2. Dataset with TXT Annotations and Optional Landmark Bootstrapping ---
class NTHUDataset(Dataset):
    def __init__(self, img_dir, txt_dir, transform=None, use_dlib=False):
        self.img_paths = sorted(glob.glob(os.path.join(img_dir, '*.jpg')))
        self.txt_dir = txt_dir
        self.transform = transform if transform else ToTensor()
        self.use_dlib = use_dlib
        if use_dlib:
            self.detector = dlib.get_frontal_face_detector()
            predictor_path = "shape_predictor_68_face_landmarks.dat"
            if not os.path.exists(predictor_path):
                print("Downloading dlib model...")
                url = "http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2"
                r = requests.get(url)
                with open("temp.bz2", "wb") as f:
                    f.write(r.content)
                import bz2
                with bz2.BZ2File("temp.bz2") as f_in, open(predictor_path, "wb") as f_out:
                    f_out.write(f_in.read())
                os.remove("temp.bz2")
            self.predictor = dlib.shape_predictor(predictor_path)

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        img_name = os.path.basename(img_path).replace('.jpg', '.txt')
        txt_path = os.path.join(self.txt_dir, img_name)

        img = cv2.imread(img_path)
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        h, w = img.shape[:2]

        # Parse annotation txt
        with open(txt_path, 'r') as f:
            parts = list(map(float, f.read().strip().split()))
        bbox = parts[1:5]  # [cx, cy, w, h] normalized
        bbox_abs = [bbox[0]*w, bbox[1]*h, bbox[2]*w, bbox[3]*h]  # absolute

        # Bootstrap 68 landmarks using dlib
        if self.use_dlib:
            gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
            faces = self.detector(gray)
            if faces:
                shape = self.predictor(gray, faces[0])
                landmarks = np.array([[p.x / w, p.y / h, 1.0] for p in shape.parts()])  # normalize
            else:
                landmarks = np.zeros((68, 3), dtype=np.float32)
        else:
            landmarks = np.zeros((68, 3), dtype=np.float32)

        img_tensor = self.transform(img_rgb).float() # Convert to Float
        target = {
            'bbox': torch.tensor(bbox, dtype=torch.float32),
            'landmarks': torch.tensor(landmarks.flatten(), dtype=torch.float32)
        }
        return img_tensor, target, img_path

# --- 3. YOLOFaceMark Model Definition ---
class ConvBlock(nn.Module):
    def __init__(self, in_ch, out_ch, k=3, s=1, p=1):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, k, s, p, bias=False),
            nn.BatchNorm2d(out_ch),
            nn.SiLU()
        )

    def forward(self, x):
        return self.conv(x)


class YOLOFaceMark(nn.Module):
    def __init__(self):
        super().__init__()
        self.backbone = nn.Sequential(
            ConvBlock(3, 32),
            ConvBlock(32, 64),
            nn.MaxPool2d(2),
            ConvBlock(64, 128),
            nn.MaxPool2d(2),
            ConvBlock(128, 256),
            nn.MaxPool2d(2)
        )
        self.neck = nn.Sequential(
            ConvBlock(256, 256),
            nn.Upsample(scale_factor=2),
            ConvBlock(256, 128)
        )
        self.head_bbox = nn.Conv2d(128, 4, 1) # Changed output channels from 6 to 4
        self.head_lmks = nn.Conv2d(128, 204, 1)

    def forward(self, x):
        x = self.backbone(x)
        x = self.neck(x)
        bbox = self.head_bbox(x).mean([2, 3])
        lmk = self.head_lmks(x).mean([2, 3])
        return bbox, lmk


# --- 4. OKS Loss Function for Landmarks ---
class OKSLoss(nn.Module):
    def __init__(self, s=1.0, k=0.1):
        super().__init__()
        self.s = s
        self.k = k

    def forward(self, pred, gt):
        pred = pred.view(-1, 68, 3)
        gt = gt.view(-1, 68, 3)
        d = torch.norm(pred[..., :2] - gt[..., :2], dim=2)
        oks = torch.exp(-d ** 2 / (2 * self.s ** 2 * self.k ** 2))
        mask = (gt[..., 2] > 0).float()
        oks = (oks * mask).sum(dim=1) / mask.sum(dim=1).clamp(min=1)
        return 1 - oks.mean()


# --- 5. EAR / MAR + Classification ---
def compute_ear(eye):
    d0 = np.linalg.norm(eye[0] - eye[3])
    d1 = np.linalg.norm(eye[1] - eye[5])
    d2 = np.linalg.norm(eye[2] - eye[4])
    return (d1 + d2) / (2.0 * d0)

def compute_mar(mouth):
    d5 = np.linalg.norm(mouth[2] - mouth[6])
    d6 = np.linalg.norm(mouth[0] - mouth[4])
    return d5 / d6

def classify_drowsiness(landmarks):
    eye_pts = [36, 37, 38, 39, 40, 41]
    mouth_pts = [60, 61, 62, 63, 64, 65, 66, 67]
    eye = np.array([landmarks[i][:2] for i in eye_pts])
    mouth = np.array([landmarks[i][:2] for i in mouth_pts])
    ear = compute_ear(eye)
    mar = compute_mar(mouth)
    return (ear < 0.2 or mar > 0.5), ear, mar


# --- 6. Training + Evaluation Loop ---
def train(model, train_loader, val_loader, epochs=10, lr=1e-3):
    # Check if CUDA is available and use GPU, otherwise use CPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion_bbox = nn.MSELoss()
    criterion_lmk = OKSLoss()

    train_losses = []
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for imgs, targets, _ in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
            # Move data to the selected device
            imgs = imgs.to(device)
            bbox_gt = targets['bbox'].to(device)
            lmk_gt = targets['landmarks'].to(device)

            bbox_pred, lmk_pred = model(imgs)
            loss = criterion_bbox(bbox_pred, bbox_gt) + criterion_lmk(lmk_pred, lmk_gt)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch+1} Loss: {avg_loss:.4f}")
        train_losses.append(avg_loss)
        # Return train_losses after each epoch to plot progressively


    return train_losses

def evaluate(model, dataloader):
    # Check if CUDA is available and use GPU, otherwise use CPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.eval()
    TP, FP, TN, FN = 0, 0, 0, 0
    with torch.no_grad():
        for imgs, targets, _ in tqdm(dataloader, desc="Evaluating"):
            # Move data to the selected device
            imgs = imgs.to(device)
            _, lmk_out = model(imgs)
            landmarks = lmk_out.view(-1, 68, 3)[0, :, :2].cpu().numpy()
            drowsy, ear, mar = classify_drowsiness(landmarks)
            pred = int(drowsy)
            # Read true label from .txt file (first value is class: 1=drowsy, 0=awake)
            label_path = _[0].replace('.jpg', '.txt')
            with open(label_path, 'r') as f:
                label = int(f.read().strip().split()[0])
            if pred == 1 and label == 1: TP += 1
            elif pred == 1 and label == 0: FP += 1
            elif pred == 0 and label == 0: TN += 1
            elif pred == 0 and label == 1: FN += 1
    accuracy = (TP + TN) / max(TP + TN + FP + FN, 1)
    precision = TP / max(TP + FP, 1)
    recall = TP / max(TP + FN, 1)
    print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}")

# --- 7. Run the Complete Pipeline ---
if __name__ == '__main__':
    # Paths
    train_img_dir = os.path.join(extract_path, 'train')
    valid_img_dir = os.path.join(extract_path, 'valid')

    # Data
    train_set = NTHUDataset(train_img_dir, train_img_dir, use_dlib=True)
    valid_set = NTHUDataset(valid_img_dir, valid_img_dir, use_dlib=True)
    train_loader = DataLoader(train_set, batch_size=16, shuffle=True) # Decreased batch size
    valid_loader = DataLoader(valid_set, batch_size=16, shuffle=False) # Decreased batch size

    # Model
    model = YOLOFaceMark()

    # Train
    train_losses = train(model, train_loader, valid_loader, epochs=10)

    # Evaluate
    evaluate(model, valid_loader)

    # Plot Training Loss
    plt.plot(train_losses, label="Train Loss")
    plt.title("Training Loss vs. Epochs")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.show()

Downloading dataset...
Extracting dataset...
Dataset extracted to: nthu_ddd_dataset
Downloading dlib model...


Epoch 1:   0%|          | 0/98 [00:00<?, ?it/s]