In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
import os
import pandas as pd

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1)
        self.conv5 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1)
        self.conv6 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1)
        self.conv7 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1)

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.relu = nn.ReLU()

        self.dropout1 = nn.Dropout(0.2)
        self.dropout2 = nn.Dropout(0.3)
        self.dropout3 = nn.Dropout(0.4)
        self.dropout4 = nn.Dropout(0.5)

        self.adaptive_pool = nn.AdaptiveAvgPool2d((2, 2))

        
        flattened_size = self._calculate_output_size()

        self.fc1 = nn.Linear(flattened_size, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 5) 

    def _calculate_output_size(self):
        
        x = torch.zeros(1, 3, 32, 32)  
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.pool(x)
        x = self.dropout1(x)

        x = self.relu(self.conv3(x))
        x = self.relu(self.conv4(x))
        x = self.pool(x)
        x = self.dropout2(x)

        x = self.relu(self.conv5(x))
        x = self.relu(self.conv6(x))
        x = self.relu(self.conv7(x))
        x = self.pool(x)
        x = self.dropout3(x)

        x = self.adaptive_pool(x)
        flattened_size = x.view(1, -1).size(1)
        return flattened_size

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.pool(x)
        x = self.dropout1(x)

        x = self.relu(self.conv3(x))
        x = self.relu(self.conv4(x))
        x = self.pool(x)
        x = self.dropout2(x)

        x = self.relu(self.conv5(x))
        x = self.relu(self.conv6(x))
        x = self.relu(self.conv7(x))
        x = self.pool(x)
        x = self.dropout3(x)

        x = self.adaptive_pool(x)
        x = x.view(x.size(0), -1)  

        x = self.relu(self.fc1(x))
        x = self.dropout4(x)
        x = self.relu(self.fc2(x))
        x = self.dropout4(x)
        x = self.fc3(x)

        return x

In [3]:
class DrowsinessDataset(Dataset):
    def __init__(self, folder_path, classes_df, transform=None):
        self.folder_path = folder_path
        self.classes_df = classes_df
        self.transform = transform
        self.image_paths = classes_df['filename'].values
        self.labels = classes_df[['awake', 'drowsy', 'phone', 'seatbelt', 'yawn']].values

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = os.path.join(self.folder_path, self.image_paths[idx])
        image = Image.open(image_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        # The label is a multi-label binary array, e.g., [1, 0, 0, 1, 0]
        label = torch.FloatTensor(self.labels[idx])
        return image, label

In [4]:
def load_classes_file(csv_file_path):
  
    df = pd.read_csv(csv_file_path)
    df.columns = df.columns.str.strip()  
    return df

In [5]:
train_transform = transforms.Compose([
    transforms.Resize((145, 145)),
    transforms.RandomHorizontalFlip(p=0.25),
    transforms.RandomRotation(60),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

test_transform = transforms.Compose([
    transforms.Resize((145, 145)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

In [6]:
train_folder = 'dataset/train'
test_folder = 'dataset/test'
valid_folder = 'dataset/valid'

In [7]:
train_classes_file = os.path.join(train_folder, '_classes.csv')
test_classes_file = os.path.join(test_folder, '_classes.csv')
valid_classes_file = os.path.join(valid_folder, '_classes.csv')

In [8]:
train_classes_df = load_classes_file(train_classes_file)
print("Train Classes Columns:", train_classes_df.columns.tolist()) 
test_classes_df = load_classes_file(test_classes_file)

Train Classes Columns: ['filename', 'awake', 'drowsy', 'phone', 'seatbelt', 'yawn']


In [9]:
train_dataset = DrowsinessDataset(train_folder, train_classes_df, transform=train_transform)
test_dataset = DrowsinessDataset(test_folder, test_classes_df, transform=test_transform)

In [10]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [11]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNN().to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [12]:
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.float().to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output.squeeze(), target)
        loss.backward()
        optimizer.step()

        if batch_idx % 100 == 0:
            print(f'Epoch {epoch+1}/{num_epochs}, Batch {batch_idx}/{len(train_loader)}, Loss: {loss.item():.4f}')

    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in test_loader: 
            data, target = data.to(device), target.float().to(device)

            outputs = model(data)

        
            predicted = (torch.sigmoid(outputs) > 0.5).float() 

            correct += (predicted == target).sum().item()
            total += target.numel()  


    accuracy = 100 * correct / total
    print(f'Validation Accuracy: {accuracy:.2f}%')

Epoch 1/10, Batch 0/25, Loss: 0.6984
Validation Accuracy: 67.89%
Epoch 2/10, Batch 0/25, Loss: 0.5536
Validation Accuracy: 67.89%
Epoch 3/10, Batch 0/25, Loss: 0.5927
Validation Accuracy: 67.89%
Epoch 4/10, Batch 0/25, Loss: 0.5529
Validation Accuracy: 73.68%
Epoch 5/10, Batch 0/25, Loss: 0.5422
Validation Accuracy: 67.89%
Epoch 6/10, Batch 0/25, Loss: 0.4777
Validation Accuracy: 72.11%
Epoch 7/10, Batch 0/25, Loss: 0.5521
Validation Accuracy: 72.63%
Epoch 8/10, Batch 0/25, Loss: 0.5191
Validation Accuracy: 67.89%
Epoch 9/10, Batch 0/25, Loss: 0.4743
Validation Accuracy: 72.11%
Epoch 10/10, Batch 0/25, Loss: 0.5396
Validation Accuracy: 68.95%


In [13]:
torch.save(model.state_dict(), 'drowsiness_model.pth')