# **Imports**

In [None]:
from __future__ import print_function, division
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.autograd import Variable
import numpy as np
import torchvision
import torch.utils.data
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy

In [None]:

class TransferLearning(nn.Module):
    def __init__(self):
        super(TransferLearning, self).__init__()
        self.model = models.vgg16_bn(pretrained=True)


        num_features = self.model.classifier[6].in_features
        features = list(self.model.classifier.children())[:-1]  # Remove last layer
        features.extend([nn.Linear(num_features, 2)])  # Add our layer with 4 outputs
        self.model.classifier = nn.Sequential(*features)  # Replace the model classifier
        self.device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
        print("Current device is: ",self.device)
        state_dic = torch.load('../input/trainedmodel/TrainedModels/VGG16_bn-2_class.pt', map_location=torch.device(self.device))
        self.model.load_state_dict(state_dic)

        
    #Loading and applying traonsformations to the dataset    
    def load_data(self):
        data_transforms = {
            'DataTransform': transforms.Compose([
            transforms.Resize(248),
            transforms.RandomCrop(size=(224, 224)),
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(10),
            transforms.RandomApply(torch.nn.ModuleList([
            transforms.ColorJitter(brightness=.5, hue=.3),
            transforms.GaussianBlur(kernel_size = (5.9), sigma=(0.1, 2.0))
            ]), p=0.2),  
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
         ])
      }
        image_datasets = datasets.ImageFolder(os.path.join('../input/yawning/Dataset/train'),
                                            data_transforms['DataTransform'])
        print(image_datasets.classes)
        
        size = image_datasets.__len__()
        print("Size of dataset: ", image_datasets.__len__())
        self.test_size = int(size * 0.2)
        self.train_size = int((size - self.test_size) * 0.8)
        self.dev = size - self.train_size - self.test_size
        train_set, dev_set, test_set = torch.utils.data.random_split(image_datasets, [self.train_size, self.dev, self.test_size])
        train_dataloader = torch.utils.data.DataLoader(train_set, batch_size=32, shuffle=True)
        dev_dataloader = torch.utils.data.DataLoader(dev_set, batch_size=4, shuffle=True)
        test_dataloader = torch.utils.data.DataLoader(test_set, batch_size=16, shuffle=True)
        class_names = image_datasets.classes
        print("Train data size", self.train_size)
        print("Dev data size", self.dev)
        print("test data size", self.test_size)

        return [train_dataloader, dev_dataloader,test_dataloader]

    #Training the network
    def retrain(self,num_of_epochs = 25,learning_rate = 0.001):
            self.model.to(self.device)
            criterion = nn.CrossEntropyLoss()
            self.train_acc = []
            self.dev_acc = []
          # Observe that all parameters are being optimized
            optimizer = optim.SGD(self.model.parameters(), lr=learning_rate,momentum = 0.9)

          # Decay LR by a factor of 0.1 every 7 epochs
            scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
            dataloadersets = self.load_data()
            since = time.time()
            best_model_wts = copy.deepcopy(self.model.state_dict())
            best_acc = 0.0
            dataset_size = 0
            for epoch in range(num_of_epochs):
                print('Epoch {}/{}'.format(epoch, num_of_epochs - 1))
                print('-' * 10)
                self.model.train()
                running_loss = 0.0
                running_corrects = 0
                for inputs,labels in dataloadersets[0]:
                    inputs = inputs.to(self.device)
                    labels = labels.to(self.device)
                    dataset_size += 1
                    optimizer.zero_grad()
                    with torch.set_grad_enabled(True):
                        outputs = self.model(inputs)
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)
                        loss.backward()
                        optimizer.step()
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)
                    scheduler.step()
                epoch_loss = running_loss / self.train_size
                epoch_acc = running_corrects.double() / self.train_size
                self.train_acc.append(epoch_acc)
                print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                    'Train', epoch_loss, epoch_acc))
                dataset_size = 0
                self.model.eval()
                running_loss = 0.0
                running_corrects = 0
                for inputs,labels in dataloadersets[1]:
                    inputs = inputs.to(self.device)
                    labels = labels.to(self.device)
                    dataset_size += 1
                    optimizer.zero_grad()
                    with torch.set_grad_enabled(False):
                        outputs = self.model(inputs)
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)
                        
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)
                epoch_loss = running_loss / self.dev
                epoch_acc = running_corrects.double() / self.dev
                self.dev_acc.append(epoch_acc)
            
                print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                'Dev', epoch_loss, epoch_acc))
                if epoch_acc > best_acc:
                    best_acc = epoch_acc
                    best_model_wts = copy.deepcopy(self.model.state_dict())

            time_elapsed = time.time() - since
            print('Training complete in {:.0f}m {:.0f}s'.format(
             time_elapsed // 60, time_elapsed % 60))
            print('Best val Acc: {:4f}'.format(best_acc))
            self.model.load_state_dict(best_model_wts)
            
        




    def calc_confusion(self,tp,tn,fp,fn):
        accuarcy = (tp+tn)/(tp+tn+fp+fn)
        precision = tp/(tp+fp)
        sensitivity =tp/(tp+fn)
        print()
        print("Accuarcy is: ",accuarcy)
        print("Precision is: ",precision)
        print("Sensitivity is: ",sensitivity)
    #Testing the network
    def test(self,criterion = nn.CrossEntropyLoss()):
        print(self.device)
        self.model.to(self.device)
        a = time.time()
        DataLoaders = self.load_data()
        b = time.time() -a
        print(b // 60,"min", b % 60,"second")
        tp, tn, fp, fn = 0,0,0,0
        since = time.time()
        avg_loss = 0
        avg_acc = 0
        loss_test = 0
        acc_test = 0

        test_batches = len(DataLoaders[2])
        print("Evaluating model")
        print('-' * 10)

        for i, data in enumerate(DataLoaders[2]):
            if i % 100 == 0:
                print("\rTest batch {}/{}".format(i, test_batches), end='', flush=True)

            self.model.train(False)
            self.model.eval()
            inputs, labels = data

            inputs = inputs.to(self.device)
            labels = labels.to(self.device)

            outputs = self.model(inputs)

            _, preds = torch.max(outputs.data, 1)
            loss = criterion(outputs, labels)

            loss_test += loss.item()
            acc_test += torch.sum(preds == labels.data)
            for i in range(len(preds)):
                if preds[i] == 1 and labels.data[i] == preds[i]:
                    tp += 1
                elif preds[i] == 0 and labels.data[i] == preds[i]:
                    tn += 1
                elif preds[i] == 1 and labels.data[i] != preds[i]:
                    fp +=1
                else:
                    fn += 1
            del inputs, labels, outputs, preds
            torch.cuda.empty_cache()

        avg_loss = loss_test / self.test_size
        avg_acc = acc_test / self.test_size

        elapsed_time = time.time() - since
        self.calc_confusion(tp,tn,fp,fn)
        print()
        print("Evaluation completed in {:.0f}m {:.0f}s".format(elapsed_time // 60, elapsed_time % 60))
        print("Avg loss (test): {:.4f}".format(avg_loss))
        print("Avg acc (test): {:.4f}".format(avg_acc))
        print('-' * 10)

In [None]:
Trained_model = TransferLearning()
print(Trained_model.model.__class__)


Trained_model.test()

In [None]:
num_epoch = 30
Trained_model.retrain(num_of_epochs = num_epoch)

In [None]:
x  = np.arange(0, num_epoch)
plt.title("Training Accuarcy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.plot(x, Trained_model.train_acc, color ="green")
plt.show()

In [None]:
plt.title("Dev Accuarcy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.plot(x, Trained_model.dev_acc, color ="red")
plt.show()

In [None]:
Trained_model.test()

In [None]:
torch.save(Trained_model.model.state_dict(), 'VGG16_bn-2_class(Version2)4.pt')

In [None]:
def imshow(inp, title=None):
    inp = inp.numpy().transpose((1, 2, 0))
    # plt.figure(figsize=(10, 10))
    plt.axis('off')
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)
def show_databatch(inputs, classes):
    out = torchvision.utils.make_grid(inputs)
    imshow(out, title=[class_names[x] for x in classes])
class_names = ['no_yawn','yawn']    
inputs, classes = next(iter(Trained_model.load_data()[2]))
show_databatch(inputs, classes)    

In [None]:
def visualize_model(vgg, num_images=6):
    was_training = vgg.training
    datasets = Trained_model.load_data()
    # Set model for evaluation
    vgg.train(False)
    vgg.eval() 
    
    images_so_far = 0

    for i, data in enumerate(datasets[2]):
        inputs, labels = data
        size = inputs.size()[0]
        
        inputs, labels = Variable(inputs.cuda(), volatile=True), Variable(labels.cuda(), volatile=True)
      
        
        outputs = vgg(inputs)
        
        _, preds = torch.max(outputs.data, 1)
        predicted_labels = [preds[j] for j in range(inputs.size()[0])]
        
        print("Ground truth:")
        show_databatch(inputs.data.cpu(), labels.data.cpu())
        print("Prediction:")
        show_databatch(inputs.data.cpu(), predicted_labels)
        
        del inputs, labels, outputs, preds, predicted_labels
        torch.cuda.empty_cache()
        
        images_so_far += size
        if images_so_far >= num_images:
            break
        
    vgg.train(mode=was_training) # Revert model back to original training state

In [None]:
visualize_model(Trained_model.model,num_images = 16)

<a href="./VGG16_bn-2_class(Version2)4.pt"> Download File </a>