In [1]:
import torch
from torchvision import transforms, datasets
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import numpy as np
import matplotlib.pyplot as plt

import PIL

import time

# Notes
###### Referent paper: https://ietresearch.onlinelibrary.wiley.com/doi/full/10.1049/iet-its.2018.5392

Need to install the mtcnn "library" to be able to pinpoint the correct areas (eyes) for input to the network  \
Github repo link: https://github.com/ipazc/mtcnn/tree/master/mtcnn  \
Before mtcnn: need to do gamma correction (see https://www.mdpi.com/1999-5903/11/5/115)

Could have something where train 2 2D CNNs, one with picture of eye, second with optical flow of mouth  \
Explained in paper for gamma correction. 

Then, architecture is two 2D CNNs (say based on Res-Net10 but need to check) then LSTM layer  \
For LSTM layer, number of frames is recommended to 16 (see referent paper) but could experiment with it  \
Can try: using the 2D CNNs as feature extractors then the LSTM as the classifier  \
For help on LSTM: https://pytorch.org/tutorials/beginner/nlp/sequence_models_tutorial.html  

For help on Transfer Learning: https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html 

Res-Net is used for transfer learning: initialise the convolution weights using ResNet  \
For help on Res-Net: 
- https://github.com/harlan-zhao/Face-Detection-Identification  
- https://github.com/nullbyte91/computer-vision-tasks-and-algorithms 
- https://pytorch.org/vision/stable/models.html 
- https://pytorch.org/blog/introducing-torchvision-new-multi-weight-support-api/


<img src="Architecture diagram.png" width="300">
<img src="Architecture table.png" width="600">  

To use GPUs from Imperial, need to go see : 
- https://www.imperial.ac.uk/computing/people/csg/guides/hpcomputing/gpucluster/
- https://www.imperial.ac.uk/admin-services/ict/self-service/research-support/rcs/get-access/

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Will need to change this so that it accepts different models: this mainly means add a parameter data
def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.  
            """This NEED to be changed depending on dataset we use
            Because we are gonna use different datasets for each model"""
            for inputs, labels in dataloaders[phase]:  
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [9]:
# Model for feature extraction for the eyes
model_efe = torchvision.models.resnet18(weights='DEFAULT')

for params in model_fe.parameters():
    params.requires_grad = False

num_ftrs = model_efe.fc.in_features #the last layer was fully connected 512 to 1000 for initial ResNet 18 
# but we are changing this one with the next line of code

model_efe.fc = nn.Linear(num_ftrs, 2) #where 2 is the number of labels on the dataset (not sure about the number)
# we can also add a fully connected layer in between to extract the number of features we want for LSTM
# or have the two outputs as the features for the eye for LSTM 

criterion_efe = nn.CrossEntropyLoss()

optimizer_efe = optim.Adam(model_efe.fc.parameters(), lr=0.001, momentum=0.9)

#decay learning rate by a factor of 0.1 every 7 epochs
efe_lr_scheduler = lr_scheduler.StepLR(optimizer_efe, step_size=7, gamma=0.1) 

#model_efe = train_model(model_efe, criterion_efe, optimizer_efe, efe_lr_scheduler,num_epochs=25)


512


In [12]:
class Mouth_Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels = 2, out_channels = 32, kernel_size = 5, padding = 2)
        # reason for 2 channels even if have grayscale image is because there would 
        
        #we want 32 neurons in hidden layer
        self.conv2 = nn.Conv2d(in_channels = 32, out_channels = 64, kernel_size = 5, padding = 2)
        
        self.fc1 = nn.Linear(64*7*7, 128)
        self.fc2 = nn.Linear(128, 2) # this is to have the same number of features from mouth as eye
        
    def Convolution(self, X):
        X = F.max_pool2d(F.relu(self.conv1(X)), kernel_size = 2)
        X = F.max_pool2d(F.relu(self.conv2(X)), kernel_size = 2)     
        return X

    #use F.relu and not nn.ReLu because would need to initiate the nn.ReLu that calls the F.ReLu anyway in source code  
    def forward(self, X):
        
        X = self.Convolution(X)
        X = X.view(-1, 64*7*7) # need to flatten output before linear layers
        
        X = F.relu(self.fc1(X))
        X = self.fc2(X)
        return F.softmax(X, dim = 1) # can return something else here to have the raw inputs but still train on the softmax
    
model_mfe = Mouth_Net()

'''Instances below need to be changed to adapt to the current model'''

#criterion_mfe = nn.CrossEntropyLoss()

#optimizer_mfe = optim.Adam(model_mfe.parameters(), lr=0.001, momentum=0.9)

#decay learning rate by a factor of 0.1 every 7 epochs
#mfe_lr_scheduler = lr_scheduler.StepLR(optimizer_mfe, step_size=7, gamma=0.1) 

print()




In [14]:
class Drowsy_LSTM (nn.Module):
    def __init__(self):
        super().__init__()
        
        self.lstm_layer = nn.LSTM(input_size = 4, hidden_size = 512)
        # input_size can change as well as hidden size
        self.dropout = nn.Dropout(p=0.1) #can play with this probability of dropout
        self.fc1 = nn.Linear(512, 128)
        self.fc2 = nn.Linear(128, 2) #classify between drowsy and not drowsy. 
        
    def forward(self, X):
        
        X = self.lstm_layer(X)
        
        # need to see if flattening applies to LSTM
        #X = X.view(-1, 64*7*7) # need to flatten output before linear layers
        
        X = self.dropout(X)
        X = F.relu(self.fc1(X))
        X = self.fc2(X)
        return F.softmax(X, dim = 1) # Might need to change the final function but should not be a problem
        # this would be the final result of the 
        
model_lstm = Drowsy_LSTM()

'''Instances below need to be changed to adapt to the current model'''

#criterion_DLSTM = nn.CrossEntropyLoss()

#optimizer_DLSTM = optim.Adam(model_DLSTM.parameters(), lr=0.001, momentum=0.9)

#decay learning rate by a factor of 0.1 every 7 epochs
#DLSTM_lr_scheduler = lr_scheduler.StepLR(optimizer_DLSTM, step_size=7, gamma=0.1) 

print()
        


