In [1]:
!pip install pytorch-lightning gdown wandb --upgrade

In [2]:
# Haarcascades files.
!mkdir 'CheckPoints'
!mkdir 'Haarcascades'
!wget https://raw.githubusercontent.com/opencv/opencv/master/data/haarcascades/haarcascade_frontalface_default.xml
!wget https://raw.githubusercontent.com/opencv/opencv/master/data/haarcascades/haarcascade_eye.xml
!mv haarcascade_frontalface_default.xml 'Haarcascades/'
!mv haarcascade_eye.xml 'Haarcascades/'

In [3]:
#Dataset
!gdown --id 1lQBEo3mIgZ4-gBY8XMwxzbSa_iHImdYp

In [4]:
!unzip dataset_B_Eye_Images.zip -d Dataset

In [5]:
import torch
import torch.nn as nn
import torch.optim as optim  
import torchvision.transforms as transforms
import torchvision
import os
from os.path import dirname, join
import pandas as pd
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets,models
from torchvision.transforms import ToTensor
import torchmetrics
from torch import nn
from torchvision.io import read_image
import matplotlib.pyplot as plt
from pytorch_lightning.loggers import WandbLogger
import pytorch_lightning as pl
import wandb
import random
import glob
from PIL import Image
import cv2

In [7]:
wandb.login()

In [8]:
face_cascade = cv2.CascadeClassifier('Haarcascades/haar_models/haarcascade_frontalface_default.xml')
eye_cascade = cv2.CascadeClassifier('Haarcascades/haar_models/haarcascade_eye.xml')

In [9]:
OUTPUT_UNITS = 2
shape = (24, 24)
val_ratio = 0.25
lr = 0.001
BATCH_SIZE = 64
epochs = 100
Data_Path = 'Dataset/dataset_B_Eye_Images'
Save_Dir = 'CheckPoints/'
config = {
    'learning_rate': lr,
    'batch_size': BATCH_SIZE,
    'epochs': epochs,
    'input_shape': shape
}

In [10]:
def resize(image, bbox):
    (x,y,w,h) = bbox
    eye = image[y:y + h, x:x + w]
    return Image.fromarray(cv2.resize(eye, shape))

In [11]:
class CreateDataset(Dataset):
    def __init__(self, images, labels, transform=False):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):   
        image = self.images[index]
        label = self.labels[index]

        if self.transform:
            image = self.transform(image)

        return image, label

In [14]:
class DataSetFactory:
    def __init__(self):
        images = []
        labels = []

        files = list(map(lambda x: {'img_path': x, 'label':1}, glob.glob(Data_Path+'/openRightEyes/*.jpg')))
        files.extend(list(map(lambda x: {'img_path': x, 'label':1}, glob.glob(Data_Path+'/openLeftEyes/*.jpg'))))
        files.extend(list(map(lambda x: {'img_path': x, 'label':0}, glob.glob(Data_Path+'/closedLeftEyes/*.jpg'))))
        files.extend(list(map(lambda x: {'img_path': x, 'label':0}, glob.glob(Data_Path+'/closedRightEyes/*.jpg'))))
        random.shuffle(files)
        for file in files:
            img = cv2.imread(file['img_path'])
            images.append(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY))
            labels.append(file['label'])

        val_length = int(len(images) * val_ratio)
        val_images = images[:val_length]
        val_labels = labels[:val_length]
        train_images = images[val_length:]
        train_labels = labels[val_length:]

        print('training size %d : val size %d' % (len(train_images), len(val_images)))

        train_transform = transforms.Compose([
            ToTensor(),
        ])
        val_transform = transforms.Compose([
            ToTensor(),
        ])
        #print({'images': val_images, 'labels': val_labels})
        self.training = CreateDataset(images=images, labels=labels, transform=train_transform)
        self.validation = CreateDataset(images=val_images, labels=val_labels, transform=val_transform)

In [15]:
dataset = DataSetFactory()
train_dataloader = DataLoader(dataset.training, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
val_dataloader = DataLoader(dataset.validation, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

In [16]:
class SeparableConv2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=1, stride=1, padding=0, dilation=1, bias=False):
        super(SeparableConv2d, self).__init__()
        self.depthwise = nn.Conv2d(in_channels, in_channels, kernel_size, stride, padding, dilation, groups=in_channels,
                                   bias=bias)
        self.pointwise = nn.Conv2d(in_channels, out_channels, 1, 1, 0, 1, 1, bias=bias)

    def forward(self, x):
        x = self.depthwise(x)
        x = self.pointwise(x)
        return x

In [17]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ResidualBlock, self).__init__()

        self.residual_conv = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=2,
                                       bias=False)
        self.residual_bn = nn.BatchNorm2d(out_channels, momentum=0.99, eps=1e-3)

        self.sepConv1 = SeparableConv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=3, bias=False,
                                        padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels, momentum=0.99, eps=1e-3)
        self.relu = nn.ReLU()

        self.sepConv2 = SeparableConv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=3, bias=False,
                                        padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels, momentum=0.99, eps=1e-3)
        self.maxp = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

    def forward(self, x):
        res = self.residual_conv(x)
        res = self.residual_bn(res)
        x = self.sepConv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.sepConv2(x)
        x = self.bn2(x)
        x = self.maxp(x)
        return res + x

In [18]:
class Model(pl.LightningModule):
    def __init__(self, output_units, learning_rate):
        super(Model, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=8, kernel_size=3, stride=1, bias=False)
        self.bn1 = nn.BatchNorm2d(8, affine=True, momentum=0.99, eps=1e-3)
        self.relu1 = nn.ReLU()
        self.conv2 = nn.Conv2d(in_channels=8, out_channels=8, kernel_size=3, stride=1, bias=False)
        self.bn2 = nn.BatchNorm2d(8, momentum=0.99, eps=1e-3)
        self.relu2 = nn.ReLU()

        self.module1 = ResidualBlock(in_channels=8, out_channels=16)
        self.module2 = ResidualBlock(in_channels=16, out_channels=32)
        self.module3 = ResidualBlock(in_channels=32, out_channels=64)
        self.module4 = ResidualBlock(in_channels=64, out_channels=128)

        self.last_conv = nn.Conv2d(in_channels=128, out_channels=output_units, kernel_size=3, padding=1)
        self.avgp = nn.AdaptiveAvgPool2d((1, 1))
        
        self.criterion = nn.CrossEntropyLoss()
        self.train_acc = torchmetrics.Accuracy()
        self.val_acc = torchmetrics.Accuracy()
        
        self.learning_rate = learning_rate
        self.save_hyperparameters()
        
    def forward(self, input_data):
        x = input_data
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu2(x)
        x = self.module1(x)
        x = self.module2(x)
        x = self.module3(x)
        x = self.module4(x)
        x = self.last_conv(x)
        x = self.avgp(x)
        x = x.view((x.shape[0], -1))
        return x
        
    def training_step(self, batch, batch_idx):
        input_data, targets = batch
        preds = self(input_data)
        loss = self.criterion(preds, targets)
        self.log('train_loss', loss)
        self.train_acc(preds, targets)
        self.log('train_acc', self.train_acc, on_step=True, on_epoch=False, prog_bar=True)
        return loss
    
    def validation_step(self, batch, batch_idx):
        self._evaluate(batch, 'val')
        
    def test_step(self, batch, batch_nb):
        self._evaluate(batch, 'test')
        
    def _evaluate(self, batch, name):
        input_data, targets = batch
        preds = self(input_data)
        loss = self.criterion(preds, targets)
        self.log(f'{name}_loss', loss, on_step=False, on_epoch=True, prog_bar=True)
        self.val_acc(preds, targets)
        self.log(f'{name}_acc', self.val_acc, on_step=False, on_epoch=True, prog_bar=True)
        
    def predict_step(self, batch, batch_idx):
        input_data, targets = batch
        preds = self(input_data)
        return torch.argmax(preds, dim=1)
        
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        scheduler = torch.optim.lr_scheduler.CyclicLR(optimizer, base_lr=self.learning_rate, max_lr=1e-4, cycle_momentum=False)
        return [optimizer],[scheduler]

In [19]:
model = Model(OUTPUT_UNITS, lr)

callbacks = [
          pl.callbacks.ModelCheckpoint(monitor='val_acc', dirpath=Save_Dir, verbose=True, mode='max', filename='Drowsiness-{val_acc:.4f}'),
          pl.callbacks.EarlyStopping(monitor='val_acc', patience=20, verbose=True, mode='max')
]

wandb_logger = WandbLogger(project="Driver-Drowsiness", config=config)
trainer = pl.Trainer(max_epochs=epochs, callbacks=callbacks, gpus=1, logger=wandb_logger)    
trainer.fit(model, train_dataloader, val_dataloader)

In [20]:
trainer.validate(dataloaders=val_dataloader)