# Driver Distraction Detection Project
## Using Deep Learning and Raspberry Pi
(This project is part of the internship program in Wadi Makkah - ML track)

### Team members:
- Hanan Alharbi
- Ahmad Alkaf
- Neaam Hariri

### Team Mentor: 
- Sarah Khyyat

## Description:


## Step 1: Import Dependencies

In [None]:
#import dependencies
from collections import OrderedDict
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.models as models
import torchvision.transforms as transforms
from torchvision import datasets
from PIL import Image
from PIL import ImageFile
import matplotlib.pyplot as plt                        
%matplotlib inline 

## Step 2: Import Datasets

In [None]:
data_dir = 'Images'

In [None]:
IMG_SIZE = 224

In [None]:
#data transformations
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomRotation(45),
        transforms.RandomResizedCrop(IMG_SIZE),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], 
                             [0.229, 0.224, 0.225])
    ]),
    'valid': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(IMG_SIZE),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], 
                             [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(IMG_SIZE),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], 
                             [0.229, 0.224, 0.225])
    ]),
}


In [None]:
train_dataset = datasets.ImageFolder(data_dir + '/train', transform=data_transforms['train'])
valid_dataset = datasets.ImageFolder(data_dir + '/valid', transform=data_transforms['valid'])
test_dataset = datasets.ImageFolder(data_dir + '/test', transform=data_transforms['test'])

In [None]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=64, shuffle=False)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=64, shuffle=False)

In [None]:
loaders = {
    'train': train_loader,
    'valid': valid_loader,
    'test': test_loader
}

## Step 3: Train Using Transfer Learning (VGG16 - Resenet - DenseNet)

In [None]:
use_cuda = torch.cuda.is_available()

In [None]:
def chooseModel(model):
  
    if model == 'vgg':
        #Load vgg-16 pre-trained model
        model_transfer_vgg16 = models.vgg16(pretrained=True)
        save_transfer_vgg16 = 'transfer_vgg16'
        IMG_SIZE = 224
        return model_transfer_vgg16, save_transfer_vgg16
  
    elif model == 'resnet':
        #Load Resenet-50 pre-trained model
        model_transfer_resnet50 = models.resnet50(pretrained=True)
        save_transfer_resnet50 = 'transfer_resent50'
        IMG_SIZE = 229 #224
        print('resnet')
        # in - 2048
        return model_transfer_resnet50, save_transfer_resnet50
  
    elif model == 'densenet':
        #Load DenseNet-161 pre-trained model
        model_transfer_densenet = models.densenet161(pretrained=True)
        save_transfer_densenet = 'transfer_densenet'
        IMG_SIZE = 224 #
        print('densenet')
        # in - 2208
        return model_transfer_densenet, save_transfer_densenet
  
    else:
        return print('The model you chose does not exist')

In [None]:
#fine-tune model
def fineTuneModel(model, in_features, out_features, dropout):
  
    model_transfer = model
  
    # freeze the model parameters
    for param in model_transfer.parameters():
        param.require_grad = False

    # define the classifier layers
    classifier = nn.Sequential(OrderedDict([
        ('dropout1', nn.Dropout(dropout)),
        ('fc1', nn.Linear(in_features, 1000)),
        ('relu1', nn.ReLU()),
        ('dropout2', nn.Dropout(dropout)),
        ('fc2', nn.Linear(1000, out_features)),
        ('output', nn.LogSoftmax(dim=1))
    ]))
  
    # update the model classifier
    model_transfer.classifier = classifier
    
    if use_cuda:
        model_transfer = model_transfer.cuda()
    
    return model_transfer

In [None]:
# loss and optimizer
def lossAndOptimizer(opt, model, lr=0.5):
    criterion = nn.CrossEntropyLoss()
    if opt == 'adam':
        optimizer = optim.Adam(model.classifier.parameters(), lr)
    elif opt == 'SGD':
        optimizer = optim.SGD(model.classifier.parameters(), lr)
        
    return criterion, optimizer

In [None]:
# load the truncated images to prevent the error OSError: image file is truncated
ImageFile.LOAD_TRUNCATED_IMAGES = True

def train(n_epochs, loaders, model, optimizer, criterion, use_cuda, save_path):

    # initialize tracker for minimum validation loss
    valid_loss_min = np.Inf 
    
    for epoch in range(1, n_epochs+1):
        # initialize variables to monitor training and validation loss
        train_loss = 0.0
        valid_loss = 0.0

        # train the model
        model.train()
        for batch_idx, (data, target) in enumerate(loaders['train']):

            if use_cuda:
                data, target = data.cuda(), target.cuda()
                
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            train_loss += ((1 / (batch_idx + 1)) * (loss.data - train_loss))
          
          
        # validate the model 
        model.eval()
        for batch_idx, (data, target) in enumerate(loaders['valid']):

            if use_cuda:
                data, target = data.cuda(), target.cuda()
                
            # update the average validation loss
            output = model(data)
            loss = criterion(output, target)
            valid_loss += ((1 / (batch_idx + 1)) * (loss.data - valid_loss))

            
        # print training/validation statistics 
        print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'.format(
            epoch, 
            train_loss,
            valid_loss
            ))
        
        # save the model if validation loss has decreased
        if valid_loss < valid_loss_min:
            print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(
            valid_loss_min,
            valid_loss))
            torch.save(model.state_dict(), save_path)
            valid_loss_min = valid_loss
             
    return model

In [None]:
def test(loaders, model, criterion, use_cuda):
  
    test_loss = 0.
    correct = 0.
    total = 0.

    model.eval()
    
    for batch_idx, (data, target) in enumerate(loaders['test']):
        
        if use_cuda:
            data, target = data.cuda(), target.cuda()
            
        output = model(data)
        loss = criterion(output, target)
        test_loss = test_loss + ((1 / (batch_idx + 1)) * (loss.data - test_loss))
                                                          
        # convert output probabilities to predicted class
        pred = output.data.max(1, keepdim=True)[1]
                                                          
        # compare predictions to true label
        correct += np.sum(np.squeeze(pred.eq(target.data.view_as(pred))).cpu().numpy())
        total += data.size(0)
            
    print('Test Loss: {:.6f}'.format(test_loss))
    print('Test Accuracy: %2d%% (%2d/%2d)' % (100. * correct / total, correct, total))

In [None]:
#define predict function

In [None]:
model, save_path = chooseModel('vgg')

In [None]:
model = fineTuneModel(model, 25088, 10, 0.25)

In [None]:
criterion, optimizer = lossAndOptimizer('adam', model, 0.2)

In [None]:
model = train(3, loaders, model, optimizer, criterion, use_cuda, save_path)

In [None]:
model.load_state_dict(torch.load(save_path))

In [None]:
test(loaders, model, criterion, use_cuda)