# Real-Time Head Pose Monitoring (Webcam) â€” Improved Prototype

This notebook contains a **cleaner, more robust** version of your project:
- Correct **GPU/CPU device handling**
- **MediaPipe** face detection (more robust than Haar cascades)
- **Temporal smoothing** (EMA) for stable angles
- **Flagging logic** with grace period + sustained-looking-away duration
- **Multi-face handling**
- **Event logging** to CSV (timestamps, reason, angles)

> **Note:** The head-pose model requires weights (e.g., `hopenet_robust_alpha1.pkl`).  
Place the file in the notebook directory or update the path in the config cell.


## 0) Setup

If you're running this locally, install dependencies (uncomment and run).



In [None]:
# Uncomment if you need installs in a clean environment.
# !pip install opencv-python torch torchvision pillow mediapipe

## 1) Configuration

In [None]:
import os
from dataclasses import dataclass

@dataclass
class Config:
    # Camera
    camera_id: int = 0

    # Model weights
    model_path: str = "hopenet_robust_alpha1.pkl"
    num_bins: int = 66

    # Flag thresholds (degrees)
    yaw_thr: float = 20.0
    pitch_thr: float = 15.0
    roll_thr: float = 10.0

    # Temporal logic
    ema_alpha: float = 0.2              # smoothing factor (0..1), higher = less smoothing
    no_face_grace_s: float = 1.0        # seconds allowed without face before warning
    away_required_s: float = 1.2        # looking-away must persist this long to flag

    # Logging
    log_dir: str = "logs"
    session_name: str = "session"

cfg = Config()
os.makedirs(cfg.log_dir, exist_ok=True)
cfg

## 2) Imports

In [None]:
import cv2
import time
import math
import csv
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image

# Robust face detection
import mediapipe as mp

## 3) Hopenet model loader (with correct device handling)

In [None]:
# NOTE:
# This assumes you have a `hopenet.py` file providing the `Hopenet` module.
# If your repo already contains hopenet.py, keep it next to this notebook.
# Otherwise, copy your existing Hopenet implementation into hopenet.py.

from hopenet import Hopenet  # expects a local file: hopenet.py

class Bottleneck(nn.Module):
    expansion = 4
    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super().__init__()
        self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(planes * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample

    def forward(self, x):
        identity = x
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.relu(self.bn1(self.conv1(x)))
        out = self.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += identity
        out = self.relu(out)
        return out

def load_hopenet(model_path: str, num_bins: int, device: torch.device):
    model = Hopenet(Bottleneck, [3, 4, 6, 3], num_bins)
    checkpoint = torch.load(model_path, map_location=device)
    if isinstance(checkpoint, dict):
        model.load_state_dict(checkpoint)
    else:
        raise ValueError("Invalid checkpoint format. Expected a state_dict dict.")
    model.to(device)
    model.eval()
    return model

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)
model = load_hopenet(cfg.model_path, cfg.num_bins, device)

## 4) Preprocessing + MediaPipe face detection

In [None]:
preprocess = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

mp_face = mp.solutions.face_detection
face_detector = mp_face.FaceDetection(model_selection=0, min_detection_confidence=0.5)

def _clamp(v, lo, hi):
    return max(lo, min(hi, v))

def detect_faces_mediapipe(frame_bgr):
    """Return list of (x, y, w, h) in pixel coords."""
    h, w = frame_bgr.shape[:2]
    frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
    results = face_detector.process(frame_rgb)
    boxes = []
    if results.detections:
        for det in results.detections:
            bb = det.location_data.relative_bounding_box
            x = int(bb.xmin * w)
            y = int(bb.ymin * h)
            bw = int(bb.width * w)
            bh = int(bb.height * h)
            # Clamp to image bounds
            x = _clamp(x, 0, w-1)
            y = _clamp(y, 0, h-1)
            bw = _clamp(bw, 1, w - x)
            bh = _clamp(bh, 1, h - y)
            boxes.append((x, y, bw, bh))
    return boxes

def crop_preprocess_face(frame_bgr, bbox):
    x, y, w, h = bbox
    face_bgr = frame_bgr[y:y+h, x:x+w]
    # Convert to RGB before PIL/torch transforms
    face_rgb = cv2.cvtColor(face_bgr, cv2.COLOR_BGR2RGB)
    face_pil = Image.fromarray(face_rgb)
    tensor = preprocess(face_pil)
    return tensor

## 5) Head pose estimation + smoothing + flagging + drawing

In [None]:
def estimate_head_pose(model, face_tensor, device: torch.device, num_bins=66):
    """Return yaw, pitch, roll in degrees (float)."""
    face_tensor = face_tensor.unsqueeze(0).to(device)  # [1,3,224,224]
    with torch.no_grad():
        yaw_logits, pitch_logits, roll_logits = model(face_tensor)

    idx_tensor = torch.arange(num_bins, dtype=torch.float32, device=device).unsqueeze(0)
    yaw = torch.sum(F.softmax(yaw_logits, dim=1) * idx_tensor, dim=1) * 3 - 99
    pitch = torch.sum(F.softmax(pitch_logits, dim=1) * idx_tensor, dim=1) * 3 - 99
    roll = torch.sum(F.softmax(roll_logits, dim=1) * idx_tensor, dim=1) * 3 - 99
    return float(yaw.item()), float(pitch.item()), float(roll.item())

class EMASmoother:
    def __init__(self, alpha=0.2):
        self.alpha = alpha
        self.state = None  # (yaw,pitch,roll)

    def update(self, values):
        if self.state is None:
            self.state = values
        else:
            a = self.alpha
            self.state = tuple((1-a)*s + a*v for s, v in zip(self.state, values))
        return self.state

def is_looking_away(yaw, pitch, roll, yaw_thr, pitch_thr, roll_thr):
    return (abs(yaw) > yaw_thr) or (abs(pitch) > pitch_thr) or (abs(roll) > roll_thr)

def draw_overlay(frame, bbox, yaw, pitch, roll, warning_text=None):
    x, y, w, h = bbox
    cv2.rectangle(frame, (x, y), (x+w, y+h), (255, 0, 0), 2)
    cv2.putText(frame, f"Yaw: {yaw:6.2f}", (x, y-50), cv2.FONT_HERSHEY_SIMPLEX, 0.65, (0, 255, 0), 2)
    cv2.putText(frame, f"Pitch:{pitch:6.2f}", (x, y-30), cv2.FONT_HERSHEY_SIMPLEX, 0.65, (0, 255, 0), 2)
    cv2.putText(frame, f"Roll: {roll:6.2f}", (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.65, (0, 255, 0), 2)

    if warning_text:
        cv2.putText(frame, warning_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2)

## 6) CSV event logging

In [None]:
def make_log_path(cfg):
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    return os.path.join(cfg.log_dir, f"{cfg.session_name}_{ts}.csv")

log_path = make_log_path(cfg)
print("Logging to:", log_path)

def log_event(writer, t, event, yaw=None, pitch=None, roll=None, extra=""):
    writer.writerow({
        "timestamp_s": f"{t:.3f}",
        "event": event,
        "yaw": "" if yaw is None else f"{yaw:.3f}",
        "pitch": "" if pitch is None else f"{pitch:.3f}",
        "roll": "" if roll is None else f"{roll:.3f}",
        "extra": extra
    })

## 7) Run real-time monitoring

In [None]:
# Real-time loop
cap = cv2.VideoCapture(cfg.camera_id)
if not cap.isOpened():
    raise RuntimeError(f"Could not open camera_id={cfg.camera_id}")

smoother = EMASmoother(alpha=cfg.ema_alpha)

start_t = time.time()
last_face_seen_t = start_t

away_start_t = None   # when sustained looking-away began
is_currently_away = False

with open(log_path, "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=["timestamp_s","event","yaw","pitch","roll","extra"])
    writer.writeheader()
    log_event(writer, 0.0, "SESSION_START", extra=f"device={device}")

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        now = time.time()
        t = now - start_t

        bboxes = detect_faces_mediapipe(frame)

        warning = None

        # Multi-face handling
        if len(bboxes) > 1:
            warning = "CRITICAL: Multiple faces detected!"
            log_event(writer, t, "MULTI_FACE", extra=f"count={len(bboxes)}")
            # Draw all boxes
            for bb in bboxes:
                cv2.rectangle(frame, (bb[0], bb[1]), (bb[0]+bb[2], bb[1]+bb[3]), (0, 0, 255), 2)
            cv2.putText(frame, warning, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2)
            cv2.imshow("Head Pose Monitoring", frame)

        elif len(bboxes) == 0:
            # No face: allow grace period to avoid false alerts from detection misses
            if (now - last_face_seen_t) > cfg.no_face_grace_s:
                warning = "Warning: No face detected!"
                log_event(writer, t, "NO_FACE")
            cv2.putText(frame, warning or "No face (within grace period)", (10, 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255) if warning else (0, 255, 255), 2)
            cv2.imshow("Head Pose Monitoring", frame)

            # Reset away tracking if face disappears (policy choice)
            away_start_t = None
            is_currently_away = False

        else:
            # Exactly one face
            bbox = bboxes[0]
            last_face_seen_t = now

            face_tensor = crop_preprocess_face(frame, bbox)
            yaw, pitch, roll = estimate_head_pose(model, face_tensor, device, num_bins=cfg.num_bins)

            # Smooth angles
            yaw_s, pitch_s, roll_s = smoother.update((yaw, pitch, roll))

            # Away logic
            away_now = is_looking_away(yaw_s, pitch_s, roll_s, cfg.yaw_thr, cfg.pitch_thr, cfg.roll_thr)

            if away_now and not is_currently_away:
                # Just started deviating; start timer
                away_start_t = now
                is_currently_away = True
                log_event(writer, t, "AWAY_START", yaw_s, pitch_s, roll_s)

            if away_now and is_currently_away:
                # Has it lasted long enough to flag?
                if away_start_t and (now - away_start_t) >= cfg.away_required_s:
                    warning = "Warning: Looking away!"
                    log_event(writer, t, "AWAY_FLAG", yaw_s, pitch_s, roll_s, extra=f"duration_s={now-away_start_t:.2f}")

            if (not away_now) and is_currently_away:
                # Returned to normal
                log_event(writer, t, "AWAY_END", yaw_s, pitch_s, roll_s, extra=f"duration_s={now-away_start_t:.2f}" if away_start_t else "")
                away_start_t = None
                is_currently_away = False

            draw_overlay(frame, bbox, yaw_s, pitch_s, roll_s, warning_text=warning)
            cv2.imshow("Head Pose Monitoring", frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            log_event(writer, time.time()-start_t, "SESSION_END")
            break

cap.release()
cv2.destroyAllWindows()
print("Done. Log saved to:", log_path)

## 8) Quick log preview

Run the next cell to see the first few log entries.


In [None]:
import pandas as pd
df = pd.read_csv(log_path)
df.head(20)