# Model Training - Driver Drowsiness Detection

This notebook implements the full training pipeline with **W&B Sweeps** for hyperparameter optimization:
- Data loading and preprocessing
- Train/validation/test splits
- PyTorch CNN model training
- **W&B Sweeps for automatic hyperparameter tuning**
- Model checkpointing
- Evaluation metrics


In [None]:
import os
import sys
from pathlib import Path
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision import transforms
from PIL import Image
import numpy as np
from sklearn.metrics import precision_recall_fscore_support
import wandb
from dotenv import load_dotenv

# Add src to path
PROJECT_ROOT = Path().resolve().parent
sys.path.insert(0, str(PROJECT_ROOT))
sys.path.insert(0, str(PROJECT_ROOT / "src"))

# Load environment variables
load_dotenv(PROJECT_ROOT / ".env")

from src.backend.models import DrowsinessCNN, create_model
from src.config.settings import WANDB_PROJECT, WANDB_API_KEY, MODEL_INPUT_SIZE

MODELS_DIR = PROJECT_ROOT / "models"
MODELS_DIR.mkdir(exist_ok=True)
MODEL_PATH = MODELS_DIR / "best_model.pth"

print(f"Project root: {PROJECT_ROOT}")
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"MPS available: {torch.backends.mps.is_available()}")

## W&B Sweep Configuration

Define the hyperparameter search space for automatic tuning.

In [None]:
# Sweep configuration for hyperparameter search
SWEEP_CONFIG = {
    "method": "bayes",  # Options: "grid", "random", "bayes"
    "metric": {
        "name": "val_accuracy",
        "goal": "maximize"
    },
    "parameters": {
        "learning_rate": {
            "distribution": "log_uniform_values",
            "min": 0.0001,
            "max": 0.01,
        },
        "batch_size": {
            "values": [16, 32, 64]
        },
        "epochs": {
            "values": [5, 10, 15]
        },
        "dropout_rate": {
            "values": [0.3, 0.5, 0.7]
        },
        "optimizer": {
            "values": ["Adam", "SGD", "AdamW"]
        },
    },
}

print("Sweep Configuration:")
print(f"  Method: {SWEEP_CONFIG['method']}")
print(f"  Metric: {SWEEP_CONFIG['metric']}")
print(f"  Parameters: {list(SWEEP_CONFIG['parameters'].keys())}")

In [None]:
# Dataset class
class DrowsinessDataset(Dataset):
    def __init__(self, drowsy_dir, non_drowsy_dir, transform=None):
        self.transform = transform
        self.images = []
        self.labels = []
        
        # Load Drowsy images (label 1)
        drowsy_path = Path(drowsy_dir)
        for img_path in drowsy_path.glob("*.png"):
            self.images.append(str(img_path))
            self.labels.append(1)
        
        # Load Non Drowsy images (label 0)
        non_drowsy_path = Path(non_drowsy_dir)
        for img_path in non_drowsy_path.glob("*.png"):
            self.images.append(str(img_path))
            self.labels.append(0)
        
        print(f"Loaded {len(self.images)} images")
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img_path = self.images[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, label


# Data transforms
train_transform = transforms.Compose([
    transforms.Resize(MODEL_INPUT_SIZE),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

val_transform = transforms.Compose([
    transforms.Resize(MODEL_INPUT_SIZE),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [None]:
def get_device():
    """Get the best available device."""
    if torch.cuda.is_available():
        return torch.device("cuda")
    elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
        return torch.device("mps")
    return torch.device("cpu")


def create_data_loaders(batch_size):
    """Create data loaders with proper transform separation."""
    DATA_DIR = PROJECT_ROOT / "Data"
    drowsy_dir = DATA_DIR / "Drowsy"
    non_drowsy_dir = DATA_DIR / "Non Drowsy"
    
    # Create separate datasets for different transforms
    train_full = DrowsinessDataset(drowsy_dir, non_drowsy_dir, transform=train_transform)
    val_full = DrowsinessDataset(drowsy_dir, non_drowsy_dir, transform=val_transform)
    
    # Split indices
    total_size = len(train_full)
    train_size = int(0.7 * total_size)
    val_size = int(0.15 * total_size)
    
    generator = torch.Generator().manual_seed(42)
    indices = torch.randperm(total_size, generator=generator).tolist()
    
    train_indices = indices[:train_size]
    val_indices = indices[train_size:train_size + val_size]
    test_indices = indices[train_size + val_size:]
    
    train_dataset = Subset(train_full, train_indices)
    val_dataset = Subset(val_full, val_indices)
    test_dataset = Subset(val_full, test_indices)
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
    
    return train_loader, val_loader, test_loader


def get_optimizer(model, name, lr):
    """Get optimizer by name."""
    if name == "Adam":
        return optim.Adam(model.parameters(), lr=lr)
    elif name == "AdamW":
        return optim.AdamW(model.parameters(), lr=lr)
    elif name == "SGD":
        return optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    return optim.Adam(model.parameters(), lr=lr)

In [None]:
def train_epoch(model, train_loader, criterion, optimizer, device):
    """Train for one epoch."""
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    return running_loss / len(train_loader), 100 * correct / total


def validate(model, val_loader, criterion, device):
    """Validate the model."""
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    all_preds, all_labels = [], []
    
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average="weighted", zero_division=0
    )
    
    return {
        "loss": running_loss / len(val_loader),
        "accuracy": 100 * correct / total,
        "precision": precision,
        "recall": recall,
        "f1": f1,
    }

In [None]:
def train_model(config=None, save_best=True):
    """Training function for sweeps."""
    with wandb.init(config=config):
        config = wandb.config
        
        # Get hyperparameters
        lr = config.get("learning_rate", 0.001)
        batch_size = config.get("batch_size", 32)
        epochs = config.get("epochs", 10)
        dropout_rate = config.get("dropout_rate", 0.5)
        optimizer_name = config.get("optimizer", "Adam")
        
        print(f"\nTraining: lr={lr:.6f}, batch={batch_size}, epochs={epochs}, dropout={dropout_rate}, opt={optimizer_name}")
        
        device = get_device()
        train_loader, val_loader, test_loader = create_data_loaders(batch_size)
        
        # Create model with dropout rate
        model = create_model(num_classes=2)
        for module in model.modules():
            if isinstance(module, nn.Dropout):
                module.p = dropout_rate
        model = model.to(device)
        
        criterion = nn.CrossEntropyLoss()
        optimizer = get_optimizer(model, optimizer_name, lr)
        
        best_val_acc = 0.0
        
        for epoch in range(epochs):
            train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
            val_metrics = validate(model, val_loader, criterion, device)
            
            wandb.log({
                "epoch": epoch,
                "train_loss": train_loss,
                "train_accuracy": train_acc,
                "val_loss": val_metrics["loss"],
                "val_accuracy": val_metrics["accuracy"],
                "val_f1": val_metrics["f1"],
            })
            
            print(f"  Epoch {epoch+1}/{epochs}: val_acc={val_metrics['accuracy']:.2f}%")
            
            if val_metrics["accuracy"] > best_val_acc:
                best_val_acc = val_metrics["accuracy"]
                if save_best:
                    torch.save({
                        "model_state_dict": model.state_dict(),
                        "config": dict(config),
                    }, MODEL_PATH)
        
        # Test evaluation
        test_metrics = validate(model, test_loader, criterion, device)
        wandb.log({"test_accuracy": test_metrics["accuracy"], "best_val_accuracy": best_val_acc})
        
        print(f"  Best val_acc: {best_val_acc:.2f}%, Test acc: {test_metrics['accuracy']:.2f}%")
        return best_val_acc

## Option 1: Run Hyperparameter Sweep

This will automatically search for the best hyperparameters using Bayesian optimization.

In [None]:
# Login to W&B
wandb.login(key=WANDB_API_KEY)

# Number of sweep trials
SWEEP_COUNT = 10  # Adjust as needed

# Create and run sweep
sweep_id = wandb.sweep(SWEEP_CONFIG, project=WANDB_PROJECT)
print(f"Sweep ID: {sweep_id}")
print(f"Running {SWEEP_COUNT} trials...")

wandb.agent(sweep_id, function=lambda: train_model(save_best=False), count=SWEEP_COUNT)

print(f"\n✓ Sweep complete! View results at: https://wandb.ai/{WANDB_PROJECT}/sweeps/{sweep_id}")

## Option 2: Train with Specific Configuration

Use this to train with a specific set of hyperparameters (e.g., the best ones from the sweep).

In [None]:
# Train with specific config and save model
best_config = {
    "learning_rate": 0.001,
    "batch_size": 32,
    "epochs": 10,
    "dropout_rate": 0.5,
    "optimizer": "Adam",
}

wandb.login(key=WANDB_API_KEY)

with wandb.init(project=WANDB_PROJECT, name="best_model_training", config=best_config):
    best_val_acc = train_model(config=wandb.config, save_best=True)
    
    # Save as W&B artifact
    artifact = wandb.Artifact("drowsiness_detection_model", type="model")
    artifact.add_file(str(MODEL_PATH))
    wandb.log_artifact(artifact)
    print(f"\n✓ Model saved to W&B as artifact")

print(f"\n✓ Training complete! Model saved to {MODEL_PATH}")