CELL 1

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import cv2
import numpy as np

# CELL 1: WFLW Dataset
class WFLWDataset(Dataset):
    def __init__(self, annotation_file, img_dir, transform=None):
        self.img_dir = img_dir
        self.transform = transform
        self.data = []
        with open(annotation_file, 'r') as f:
            for line in f:
                parts = line.strip().split()
                landmarks = np.array(parts[:196], dtype=np.float32).reshape(-1, 2)  # 98 points
                pose = np.array(parts[196:199], dtype=np.float32)  # Pitch, yaw, roll
                img_path = parts[-1]
                full_img_path = os.path.join(self.img_dir, img_path)
                if os.path.exists(full_img_path):
                    self.data.append((img_path, landmarks, pose))
                else:
                    print(f"Warning: Image not found: {full_img_path}")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, landmarks, pose = self.data[idx]
        img = cv2.imread(os.path.join(self.img_dir, img_path))
        if img is None:
            raise FileNotFoundError(f"Image not found: {img_path}")
        img = cv2.resize(img, (112, 112))
        img = img.transpose(2, 0, 1) / 255.0
        landmarks = landmarks / img.shape[1]
        if self.transform:
            img = self.transform(img)
        return (torch.tensor(img, dtype=torch.float32), 
                torch.tensor(landmarks.flatten(), dtype=torch.float32), 
                torch.tensor(pose, dtype=torch.float32))

CELL 2

In [None]:
# CELL 2: Visualize Sample WFLW Image
def visualize_sample_wflw():
    annotation_file = 'F:/human behaviour detection/widerface/WFLW/WFLW_annotations/list_98pt_rect_attr_train_test/list_98pt_rect_attr_train.txt'
    img_dir = 'F:/human behaviour detection/widerface/WFLW/WFLW_images'
    with open(annotation_file, 'r') as f:
        line = f.readline().strip()
        parts = line.split()
        img_path = parts[-1]
        landmarks = np.array(parts[:196], dtype=np.float32).reshape(-1, 2)
        pose = np.array(parts[196:199], dtype=np.float32)
        full_img_path = os.path.join(img_dir, img_path)
        if os.path.exists(full_img_path):
            img = cv2.imread(full_img_path)
            if img is not None:
                h, w = img.shape[:2]
                landmarks = landmarks * [w, h]
                for (x, y) in landmarks:
                    cv2.circle(img, (int(x), int(y)), 2, (0, 255, 0), -1)
                cv2.putText(img, f'Pitch: {pose[0]:.2f}', (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
                cv2.putText(img, f'Yaw: {pose[1]:.2f}', (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
                cv2.putText(img, f'Roll: {pose[2]:.2f}', (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
                cv2.imshow('Sample WFLW Image', img)
                cv2.waitKey(0)
                cv2.destroyAllWindows()
                print(f"Visualized: {full_img_path}")
            else:
                print(f"Failed to load: {full_img_path}")
        else:
            print(f"Image not found: {full_img_path}")


cell 3

In [None]:
# CELL 3: Lightweight HRNet Model
class LightweightHRNet(nn.Module):
    def __init__(self, num_landmarks=98):
        super(LightweightHRNet, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)
        self.stage1 = nn.Sequential(
            nn.Conv2d(16, 16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=True)
        )
        self.downsample = nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1)
        self.stage2_high = nn.Conv2d(16, 16, kernel_size=3, padding=1)
        self.stage2_low = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.upsample = nn.ConvTranspose2d(32, 16, kernel_size=4, stride=2, padding=1)
        self.final_conv = nn.Conv2d(16, 16, kernel_size=1)
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.fc_landmarks = nn.Linear(16, num_landmarks * 2)
        self.fc_pose = nn.Linear(16, 3)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.stage1(x)
        x_high = self.stage2_high(x)
        x_low = self.downsample(x)
        x_low = self.stage2_low(x_low)
        x_low_up = self.upsample(x_low)
        x = x_high + x_low_up
        x = self.final_conv(x)
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        landmarks = self.fc_landmarks(x)
        pose = self.fc_pose(x)
        return landmarks, pose

cell 4

In [None]:
# CELL 4: Train HRNet on WFLW
def train_pose():
    train_dataset = WFLWDataset(
        annotation_file='F:/human behaviour detection/widerface/WFLW/WFLW_annotations/list_98pt_rect_attr_train_test/list_98pt_rect_attr_train.txt',
        img_dir='F:/human behaviour detection/widerface/WFLW/WFLW_images'
    )
    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=0)
    val_dataset = WFLWDataset(
        annotation_file='F:/human behaviour detection/widerface/WFLW/WFLW_annotations/list_98pt_rect_attr_train_test/list_98pt_rect_attr_test.txt',
        img_dir='F:/human behaviour detection/widerface/WFLW/WFLW_images'
    )
    val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=0)
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = LightweightHRNet().to(device)
    criterion_lm = nn.MSELoss()
    criterion_pose = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    for epoch in range(30):
        model.train()
        train_loss_lm, train_loss_pose = 0.0, 0.0
        for images, landmarks, poses in train_loader:
            images, landmarks, poses = images.to(device), landmarks.to(device), poses.to(device)
            optimizer.zero_grad()
            pred_lm, pred_pose = model(images)
            loss_lm = criterion_lm(pred_lm, landmarks)
            loss_pose = criterion_pose(pred_pose, poses)
            loss = loss_lm + 0.1 * loss_pose
            loss.backward()
            optimizer.step()
            train_loss_lm += loss_lm.item()
            train_loss_pose += loss_pose.item()
        model.eval()
        val_loss_lm, val_loss_pose = 0.0, 0.0
        with torch.no_grad():
            for images, landmarks, poses in val_loader:
                images, landmarks, poses = images.to(device), landmarks.to(device), poses.to(device)
                pred_lm, pred_pose = model(images)
                val_loss_lm += criterion_lm(pred_lm, landmarks).item()
                val_loss_pose += criterion_pose(pred_pose, poses).item()
        print(f"Epoch [{epoch+1}/30], Train LM Loss: {train_loss_lm/len(train_loader):.4f}, "
              f"Train Pose Loss: {train_loss_pose/len(train_loader):.4f}, "
              f"Val LM Loss: {val_loss_lm/len(val_loader):.4f}, "
              f"Val Pose Loss: {val_loss_pose/len(val_loader):.4f}")
    
    torch.save(model.state_dict(), "landmark_pose_model.pt")
    print("Model saved as landmark_pose_model.pt")


cell 5

In [None]:
# CELL 5: Main Execution
if __name__ == '__main__':
    visualize_sample_wflw()
    train_pose()

cell 6(testing model)

In [None]:

import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import cv2
import numpy as np
from collections import defaultdict

# CELL 1: WFLW Dataset
class WFLWDataset(Dataset):
    def __init__(self, annotation_file, img_dir, transform=None):
        self.img_dir = img_dir
        self.transform = transform
        self.data = []
        self.category_counts = defaultdict(int)
        with open(annotation_file, 'r') as f:
            for line in f:
                parts = line.strip().split()
                landmarks = np.array(parts[:196], dtype=np.float32).reshape(-1, 2)  # 98 points
                pose = np.array(parts[196:199], dtype=np.float32)  # Pitch, yaw, roll
                img_path = parts[-1]
                full_img_path = os.path.join(self.img_dir, img_path)
                if os.path.exists(full_img_path):
                    self.data.append((img_path, landmarks, pose))
                    category = img_path.split('/')[0]
                    self.category_counts[category] += 1
                else:
                    print(f"Warning: Image not found: {full_img_path}")
        print(f"Loaded {len(self.data)} images for testing.")
        print("Categories in test set:", {k: v for k, v in sorted(self.category_counts.items())})

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, landmarks, pose = self.data[idx]
        img = cv2.imread(os.path.join(self.img_dir, img_path))
        if img is None:
            raise FileNotFoundError(f"Image not found: {img_path}")
        img = cv2.resize(img, (112, 112))
        img = img.transpose(2, 0, 1) / 255.0
        landmarks = landmarks / img.shape[1]
        if self.transform:
            img = self.transform(img)
        return torch.tensor(img, dtype=torch.float32), torch.tensor(landmarks.flatten(), dtype=torch.float32), torch.tensor(pose), img_path

# CELL 2: Lightweight HRNet Model
class LightweightHRNet(nn.Module):
    def __init__(self, num_landmarks=98):
        super(LightweightHRNet, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)
        self.stage1 = nn.Sequential(
            nn.Conv2d(16, 16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=True)
        )
        self.downsample = nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1)
        self.stage2_high = nn.Conv2d(16, 16, kernel_size=3, padding=1)
        self.stage2_low = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.upsample = nn.ConvTranspose2d(32, 16, kernel_size=4, stride=2, padding=1)
        self.final_conv = nn.Conv2d(16, 16, kernel_size=1)
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.fc_landmarks = nn.Linear(16, num_landmarks * 2)
        self.fc_pose = nn.Linear(16, 3)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.stage1(x)
        x_high = self.stage2_high(x)
        x_low = self.downsample(x)
        x_low = self.stage2_low(x_low)
        x_low_up = self.upsample(x_low)
        x = x_high + x_low_up
        x = self.final_conv(x)
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        landmarks = self.fc_landmarks(x)
        pose = self.fc_pose(x)
        return landmarks, pose

# CELL 3: Compute NME for Landmarks
def compute_nme(pred_lm, gt_lm, inter_ocular_dist):
    pred_lm = pred_lm.reshape(-1, 2)
    gt_lm = gt_lm.reshape(-1, 2)
    error = np.sqrt(((pred_lm - gt_lm) ** 2).sum(axis=1)).mean()
    nme = error / inter_ocular_dist
    return nme

# CELL 4: Visualize Test Prediction
def visualize_test_prediction(model, device, test_loader):
    model.eval()
    with torch.no_grad():
        images, landmarks, poses, img_paths = next(iter(test_loader))
        images = images.to(device)
        img_path = img_paths[0]
        gt_landmarks = landmarks[0].numpy().reshape(-1, 2)
        gt_pose = poses[0].numpy()
        
        pred_landmarks, pred_pose = model(images)
        pred_landmarks = pred_landmarks[0].numpy().reshape(-1, 2)
        pred_pose = pred_pose[0].numpy()
        
        img = cv2.imread(os.path.join('F:/human behaviour detection/widerface/WFLW/WFLW_images', img_path))
        if img is not None:
            h, w = img.shape[:2]
            gt_landmarks = gt_landmarks * [w, h]
            pred_landmarks = pred_landmarks * [w, h]
            
            for (x, y) in gt_landmarks:
                cv2.circle(img, (int(x), int(y)), 1, (0, 255, 0), -1)
            for (x, y) in pred_landmarks:
                cv2.circle(img, (int(x), int(y)), 1, (0, 0, 255), -1)
            
            cv2.putText(img, f'GT: P:{gt_pose[0]:.2f}, Y:{gt_pose[1]:.2f}, R:{gt_pose[2]:.2f}', (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 0), 1)
            cv2.putText(img, f'Pred: P:{pred_pose[0]:.2f}, Y:{pred_pose[1]:.2f}, R:{pred_pose[2]:.2f}', (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 0, 255), 1)
            
            cv2.imshow('Test Prediction', img)
            cv2.waitKey(0)
            cv2.destroyAllWindows()
            print(f"Visualized test image: {img_path}")
        else:
            print(f"Failed to load: {img_path}")

# CELL 5: Test Model on WFLW Test Dataset
def test_pose():
    test_dataset = WFLWDataset(
        annotation_file='F:/human behaviour detection/widerface/WFLW/WFLW_annotations/list_98pt_rect_attr_train_test/list_98pt_rect_attr_test.txt',
        img_dir='F:/human behaviour detection/widerface/WFLW/WFLW_images'
    )
    test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=0)
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = LightweightHRNet().to(device)
    
    model_path = 'landmark_pose_model.pt'
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model file not found: {model_path}")
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    
    total_nme, total_mae_pitch, total_mae_yaw, total_mae_roll = 0.0, 0.0, 0.0, 0.0
    num_samples = 0
    
    with torch.no_grad():
        for images, landmarks, poses, _ in test_loader:
            images, landmarks, poses = images.to(device), landmarks.to(device), poses.to(device)
            pred_lm, pred_pose = model(images)
            
            # Compute NME for landmarks
            for i in range(images.size(0)):
                gt_lm = landmarks[i].cpu().numpy().reshape(-1, 2)
                pred_lm_i = pred_lm[i].cpu().numpy().reshape(-1, 2)
                # Inter-ocular distance (distance between outer eye corners, points 33 and 62 in WFLW)
                inter_ocular_dist = np.sqrt(((gt_lm[33] - gt_lm[62]) ** 2).sum())
                if inter_ocular_dist == 0:  # Avoid division by zero
                    continue
                nme = compute_nme(pred_lm_i, gt_lm, inter_ocular_dist)
                total_nme += nme
                
                # Compute MAE for pose
                gt_pose_i = poses[i].cpu().numpy()
                pred_pose_i = pred_pose[i].cpu().numpy()
                total_mae_pitch += abs(pred_pose_i[0] - gt_pose_i[0])
                total_mae_yaw += abs(pred_pose_i[1] - gt_pose_i[1])
                total_mae_roll += abs(pred_pose_i[2] - gt_pose_i[2])
                num_samples += 1
    
    avg_nme = total_nme / num_samples
    avg_mae_pitch = total_mae_pitch / num_samples
    avg_mae_yaw = total_mae_yaw / num_samples
    avg_mae_roll = total_mae_roll / num_samples
    print(f"Test Results (WFLW Metrics):")
    print(f"Average NME (Landmarks): {avg_nme:.4f}")
    print(f"Average MAE Pitch: {avg_mae_pitch:.2f} degrees")
    print(f"Average MAE Yaw: {avg_mae_yaw:.2f} degrees")
    print(f"Average MAE Roll: {avg_mae_roll:.2f} degrees")
    print(f"Total Samples Evaluated: {num_samples}")
    
    visualize_test_prediction(model, device, test_loader)

# CELL 6: Main Execution
if __name__ == '__main__':
    test_pose()


real time infernece 

In [None]:
import os
import torch
import torch.nn as nn
import cv2
import numpy as np
import time
from collections import deque

# CELL 1: Lightweight HRNet Model
class LightweightHRNet(nn.Module):
    def __init__(self, num_landmarks=98):
        super(LightweightHRNet, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)
        self.stage1 = nn.Sequential(
            nn.Conv2d(16, 16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=True)
        )
        self.downsample = nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1)
        self.stage2_high = nn.Conv2d(16, 16, kernel_size=3, padding=1)
        self.stage2_low = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.upsample = nn.ConvTranspose2d(32, 16, kernel_size=4, stride=2, padding=1)
        self.final_conv = nn.Conv2d(16, 16, kernel_size=1)
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.fc_landmarks = nn.Linear(16, num_landmarks * 2)
        self.fc_pose = nn.Linear(16, 3)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.stage1(x)
        x_high = self.stage2_high(x)
        x_low = self.downsample(x)
        x_low = self.stage2_low(x_low)
        x_low_up = self.upsample(x_low)
        x = x_high + x_low_up
        x = self.final_conv(x)
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        landmarks = self.fc_landmarks(x)
        pose = self.fc_pose(x)
        return landmarks, pose

# CELL 2: Compute EAR, MAR, and Blink Rate
def compute_ear(eye_points):
    A = np.linalg.norm(eye_points[1] - eye_points[5])
    B = np.linalg.norm(eye_points[2] - eye_points[4])
    C = np.linalg.norm(eye_points[0] - eye_points[3])
    ear = (A + B) / (2.0 * C)
    return ear

def compute_mar(mouth_points):
    A = np.linalg.norm(mouth_points[1] - mouth_points[7])
    B = np.linalg.norm(mouth_points[2] - mouth_points[6])
    C = np.linalg.norm(mouth_points[3] - mouth_points[5])
    D = np.linalg.norm(mouth_points[0] - mouth_points[4])
    mar = (A + B + C) / (2.0 * D)
    return mar

# CELL 3: Real-Time Sleep Behavior Classification
def realtime_sleep_behavior_classification():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = LightweightHRNet().to(device)
    
    model_path = 'landmark_pose_model.pt'
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model file not found: {model_path}")
    model.load_state_dict(torch.load(model_path, map_location=device, weights_only=True))
    model.eval()
    
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    if face_cascade.empty():
        raise RuntimeError("Failed to load Haar Cascade classifier")
    
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        raise RuntimeError("Failed to open webcam")
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 240)
    
    # Behavior detection parameters
    EAR_THRESHOLD = 0.25  # Eye closure threshold
    MAR_THRESHOLD = 0.5   # Yawning threshold
    PITCH_THRESHOLD = -30  # Head tilt down threshold (degrees)
    CONSECUTIVE_FRAMES = 90  # ~3 seconds at 30 FPS
    BLINK_EAR_THRESHOLD = 0.25
    BLINK_RATE_THRESHOLD = 2  # Blinks per second indicating drowsiness
    
    # Track states
    eyes_closed_counter = 0
    yawning_counter = 0
    head_down_counter = 0
    ear_history = deque(maxlen=30)  # Track EAR for blink detection (1 second at 30 FPS)
    blink_counter = 0
    last_ear = 1.0  # Initial EAR value
    state = "Active"
    
    print("Press 'q' to exit the webcam feed.")
    while True:
        ret, frame = cap.read()
        if not ret:
            print("Failed to grab frame")
            break
        
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
        
        for (x, y, w, h) in faces:
            face = frame[y:y+h, x:x+w]
            if face.size == 0:
                continue
            face = cv2.resize(face, (112, 112))
            face = face.transpose(2, 0, 1) / 255.0
            face_tensor = torch.tensor(face, dtype=torch.float32).unsqueeze(0).to(device)
            
            with torch.no_grad():
                landmarks, pose = model(face_tensor)
                landmarks = landmarks[0].cpu().numpy().reshape(-1, 2) * [w, h] + [x, y]
                pose = pose[0].cpu().numpy()
                pose[0] = np.clip(pose[0], -90, 90)  # Pitch
                pose[1] = np.clip(pose[1], -90, 90)  # Yaw
                pose[2] = np.clip(pose[2], -180, 180)  # Roll
            
            # Compute features
            left_eye = landmarks[36:42]
            right_eye = landmarks[42:48]
            mouth = landmarks[60:68]
            
            left_ear = compute_ear(left_eye)
            right_ear = compute_ear(right_eye)
            avg_ear = (left_ear + right_ear) / 2.0
            mar = compute_mar(mouth)
            pitch = pose[0]
            
            # Blink detection
            ear_history.append(avg_ear)
            if len(ear_history) >= 2:
                if last_ear > BLINK_EAR_THRESHOLD and avg_ear <= BLINK_EAR_THRESHOLD:
                    blink_counter += 1
            last_ear = avg_ear
            blink_rate = blink_counter / (len(ear_history) / 30.0)  # Blinks per second
            
            # State tracking
            if avg_ear < EAR_THRESHOLD:
                eyes_closed_counter += 1
            else:
                eyes_closed_counter = 0
            
            if mar > MAR_THRESHOLD:
                yawning_counter += 1
            else:
                yawning_counter = 0
            
            if pitch < PITCH_THRESHOLD:
                head_down_counter += 1
            else:
                head_down_counter = 0
            
            # Classify behavior
            if (eyes_closed_counter >= CONSECUTIVE_FRAMES and head_down_counter >= CONSECUTIVE_FRAMES):
                state = "Sleeping"
            elif (eyes_closed_counter >= CONSECUTIVE_FRAMES or 
                  yawning_counter >= CONSECUTIVE_FRAMES or 
                  head_down_counter >= CONSECUTIVE_FRAMES or 
                  blink_rate > BLINK_RATE_THRESHOLD):
                state = "Drowsy"
            else:
                state = "Active"
            
            # Visualize
            for (lx, ly) in landmarks:
                cv2.circle(frame, (int(lx), int(ly)), 1, (0, 255, 0), -1)
            cv2.rectangle(frame, (x, y), (x+w, y+h), (255, 0, 0), 1)
            cv2.putText(frame, f'P:{pose[0]:.2f}, Y:{pose[1]:.2f}, R:{pose[2]:.2f}', 
                        (x, y-50), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 0, 255), 1)
            cv2.putText(frame, f'EAR:{avg_ear:.2f}, MAR:{mar:.2f}', 
                        (x, y-30), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 0, 255), 1)
            cv2.putText(frame, f'Blink Rate:{blink_rate:.2f}/s', 
                        (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 0, 255), 1)
            color = (0, 255, 0) if state == "Active" else (0, 165, 255) if state == "Drowsy" else (0, 0, 255)
            cv2.putText(frame, state, (x, y+h+15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
        
        cv2.imshow('Sleep Behavior Classification', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

# CELL 4: Main Execution
if __name__ == '__main__':
    realtime_sleep_behavior_classification()