In [2]:
!pip install ultralytics opencv-python torch scikit-learn

Collecting ultralytics
  Downloading ultralytics-8.3.108-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.14-py3-none-any.whl.metadata (9.4 kB)
Downloading ultralytics-8.3.108-py3-none-any.whl (974 kB)
   ---------------------------------------- 0.0/974.8 kB ? eta -:--:--
   --------------------------------------- 974.8/974.8 kB 11.3 MB/s eta 0:00:00
Downloading ultralytics_thop-2.0.14-py3-none-any.whl (26 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.108 ultralytics-thop-2.0.14


In [None]:
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn.model_selection import train_test_split
import time

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Paths to datasets
ddd_path = r"C:\Users\ommak\Drowsiness detection and alert system\Data\Driver Drowsiness Dataset (DDD)"
fi_path = r"C:\Users\ommak\Drowsiness detection and alert system\Data\0 FaceImages"
fi1_path = r"C:\Users\ommak\Drowsiness detection and alert system\Data\0 FaceImages 1"

# Enhanced CNN Model with Dropout and BatchNorm
class DrowsinessClassifier(nn.Module):
    def __init__(self):
        super(DrowsinessClassifier, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.dropout = nn.Dropout(0.5)
        self.fc1 = nn.Linear(128 * 8 * 8, 256)
        self.fc2 = nn.Linear(256, 2)  # 2 classes: drowsy/not-drowsy

    def forward(self, x):
        x = self.pool(torch.relu(self.bn1(self.conv1(x))))
        x = self.pool(torch.relu(self.bn2(self.conv2(x))))
        x = self.pool(torch.relu(self.bn3(self.conv3(x))))
        x = x.view(x.size(0), -1)
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return x

# Enhanced Dataset class with data balancing
class DrowsinessDataset(Dataset):
    def __init__(self, dataset_path, transform=None, balance_classes=True):
        self.dataset_path = dataset_path
        self.transform = transform
        self.classes = sorted([d for d in os.listdir(dataset_path) 
                            if os.path.isdir(os.path.join(dataset_path, d))])
        self.images, self.labels = self.load_data(balance_classes)

    def load_data(self, balance_classes):
        images = []
        labels = []
        class_counts = {}
        
        # First pass to count samples per class
        for label, class_name in enumerate(self.classes):
            class_path = os.path.join(self.dataset_path, class_name)
            class_counts[label] = len(os.listdir(class_path))
        
        max_samples = max(class_counts.values()) if balance_classes else None
        
        for label, class_name in enumerate(self.classes):
            class_path = os.path.join(self.dataset_path, class_name)
            samples = []
            
            for image_name in os.listdir(class_path):
                image_path = os.path.join(class_path, image_name)
                try:
                    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
                    if image is None:
                        continue
                    image = cv2.resize(image, (64, 64))
                    samples.append(image)
                except Exception as e:
                    print(f"Error loading {image_path}: {e}")
            
            # Apply class balancing if requested
            if balance_classes and len(samples) > max_samples:
                samples = samples[:max_samples]
            
            images.extend(samples)
            labels.extend([label] * len(samples))
        
        return np.array(images, dtype=np.float32), np.array(labels, dtype=np.int64)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, torch.tensor(label, dtype=torch.long)

# Advanced data augmentation
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.RandomAffine(0, translate=(0.1, 0.1)),
    transforms.RandomApply([transforms.GaussianBlur(3)], p=0.3),
])

# Load datasets with class balancing
print("Loading datasets...")
try:
    ddd_dataset = DrowsinessDataset(ddd_path, transform=transform, balance_classes=True)
    fi_dataset = DrowsinessDataset(fi_path, transform=transform, balance_classes=True)
    fi1_dataset = DrowsinessDataset(fi1_path, transform=transform, balance_classes=True)
    combined_dataset = torch.utils.data.ConcatDataset([ddd_dataset, fi_dataset, fi1_dataset])
    print(f"Total samples: {len(combined_dataset)}")
except Exception as e:
    print(f"Error loading datasets: {e}")
    exit()

# Split into train/val with stratification
train_indices, val_indices = train_test_split(
    range(len(combined_dataset)),
    test_size=0.2,
    random_state=42,
    stratify=[label for _, label in combined_dataset]
)

train_dataset = torch.utils.data.Subset(combined_dataset, train_indices)
val_dataset = torch.utils.data.Subset(combined_dataset, val_indices)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=4, pin_memory=True)

# Initialize model with better initialization
def init_weights(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
        if m.bias is not None:
            nn.init.constant_(m.bias, 0)

model = DrowsinessClassifier().to(device)
model.apply(init_weights)

# Improved optimizer with learning rate scheduling
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.5, verbose=True)

# Early stopping
best_val_loss = float('inf')
patience = 5
no_improve = 0

# Training loop with enhanced metrics
print("Training classifier...")
for epoch in range(30):  # Increased max epochs
    model.train()
    train_loss, train_correct, train_total = 0.0, 0, 0
    
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Ensure input has channel dimension
        if inputs.dim() == 3:
            inputs = inputs.unsqueeze(1)
            
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        _, predicted = torch.max(outputs.data, 1)
        train_total += labels.size(0)
        train_correct += (predicted == labels).sum().item()
        train_loss += loss.item()
    
    # Validation
    model.eval()
    val_loss, val_correct, val_total = 0.0, 0, 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            if inputs.dim() == 3:
                inputs = inputs.unsqueeze(1)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            _, predicted = torch.max(outputs.data, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()
            val_loss += loss.item()
    
    # Update learning rate
    scheduler.step(val_loss/len(val_loader))
    
    # Early stopping check
    current_val_loss = val_loss/len(val_loader)
    if current_val_loss < best_val_loss:
        best_val_loss = current_val_loss
        no_improve = 0
        torch.save(model.state_dict(), "best_drowsiness_model.pth")
        print(f"New best model saved with val loss: {best_val_loss:.4f}")
    else:
        no_improve += 1
    
    print(f"Epoch {epoch+1}")
    print(f"Train Loss: {train_loss/len(train_loader):.4f} | Acc: {100*train_correct/train_total:.2f}%")
    print(f"Val Loss: {current_val_loss:.4f} | Acc: {100*val_correct/val_total:.2f}%")
    print(f"LR: {optimizer.param_groups[0]['lr']:.2e}")
    print("-" * 50)
    
    if no_improve >= patience:
        print(f"Early stopping after {epoch+1} epochs")
        break

print("Training complete. Best model saved to best_drowsiness_model.pth")

Using device: cuda
Loading datasets...
Total samples: 60033




Training classifier...
