

# Drowsiness Detection ResNet

a convolutional neural network with a ResNet architecture [(He et al, 2016)](https://arxiv.org/abs/1512.03385).

In [None]:
skip_training = False  # Set this flag to True before validation and submission

In [None]:
!pip install google

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

import sys
sys.path.insert(0,'/content/drive/MyDrive/DrowsinessDetection/source')

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import torch
import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import tools
import tests

In [None]:
# When running on your own computer, you can specify the data directory by:
data_dir = tools.select_data_dir('/content/drive/MyDrive/DrowsinessDetection')
source_dir = '/content/drive/MyDrive/DrowsinessDetection/source'
print(data_dir)

In [None]:
# Select the device for training (use GPU if you have one)
device = torch.device('cuda:0')
#device = torch.device('cpu')

In [None]:
print(data_dir)
data_dir1 = os.path.join(data_dir,'data/all_data')
print(data_dir1)

## Combined Dataset

Classes 'closed_eyes', 'open_eyes', 'alert', 'non_vigilant', 'tired', 'no_yawn' , 'yawn'' 


In [None]:
data_transform = transforms.Compose([
        transforms.Resize((28,28)),    
        transforms.Grayscale(),               
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))  # Scale images to [-1, 1]
    ])

all_dataset = datasets.ImageFolder(data_dir1, transform=data_transform)

print(len(all_dataset))
trainset, testset = torch.utils.data.random_split(all_dataset, [len(all_dataset)-780, 780])

classes = ['alert','closed_eyes', 'no_yawn', 'non_vigilant', 'open_eyes', 'tired', 'yawn']

trainloader = torch.utils.data.DataLoader(trainset, batch_size=32, shuffle=True)
testloader = torch.utils.data.DataLoader(testset, batch_size=5, shuffle=False)

print(len(trainset))
print(len(testset))

In [None]:
#Plot a few images in greyscale
images, labels = iter(trainloader).next()
#tests.plot_images(images[:8], n_rows=2)
print(images[0].shape)

## ResNet

We create a network with an architecure inspired by [ResNet](https://arxiv.org/pdf/1512.03385.pdf).

### ResNet block
Our ResNet consists of blocks with two convolutional layers and a skip connection.

In the most general case, our implementation should have:

<img src="https://drive.google.com/uc?id=1yHFVSWDnE_4N8zjLZtNIFCZ374L887rh" width=220 style="float: right;">


* Two convolutional layers with:
    * 3x3 kernel
    * no bias terms
    * padding with one pixel on both sides
    * 2d batch normalization after each convolutional layer.

* **The first convolutional layer also (optionally) has:**
    * different number of input channels and output channels
    * change of the resolution with stride.

* The skip connection:
    * simply copies the input if the resolution and the number of channels do not change.
    * if either the resolution or the number of channels change, the skip connection should have one convolutional layer with:
        * 1x1 convolution **without bias**
        * change of the resolution with stride (optional)
        * different number of input channels and output channels (optional)
    * if either the resolution or the number of channels change, the 1x1 convolutional layer is followed by 2d batch normalization.

* The ReLU nonlinearity is applied after the first convolutional layer and at the end of the block.

<div class="alert alert-block alert-warning">
<b>Note:</b> Batch normalization is expected to be right after a convolutional layer.
</div>

<img src="https://drive.google.com/uc?id=1M_PC7w7mRVrp8bbW4hIUpNH67iYJtTWQ" width=650 style="float: top;">



The implementation should also handle specific cases such as:

Left: The number of channels and the resolution do not change.
There are no computations in the skip connection.

Middle: The number of channels changes, the resolution does not change.

Right: The number of channels does not change, the resolution changes.

Your task is to implement this block. You should use the implementations of layers in `nn.Conv2d`, `nn.BatchNorm2d` as the tests rely on those implementations.

In [None]:
class Block(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        """
        Args:
          in_channels (int):  Number of input channels.
          out_channels (int): Number of output channels.
          stride (int):       Controls the stride.
        """
        super(Block, self).__init__()
        
        self.block_layers = nn.Sequential(
            nn.Conv2d(in_channels, out_channels , 3, stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, 3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels)
        )
        
        # YOUR CODE HERE
        if in_channels == out_channels:
            if stride != 1:
                #print("Stride > 1, same channels  ", stride)
                self.block_skip = nn.Sequential(
                    nn.Conv2d(in_channels, out_channels , 1, stride, bias=False),
                    nn.BatchNorm2d(out_channels)
                )
            else:
                self.block_skip = nn.Sequential()
        else:
            #print("Unequal channels  ", stride)
            self.block_skip = nn.Sequential(    
                nn.Conv2d(in_channels, out_channels , 1, stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
            
        self.block_relu = nn.Sequential(
            nn.ReLU()
        )
        
    def forward(self, x):
        # YOUR CODE HERE
        #print("Input :" , x.shape)
        y = self.block_layers(x)
        #print("Layers :" , y.shape)
        a = self.block_skip(x)
        #print("Skip :" , a.shape)
        y = y + a
        #print("Before relu :" , y.shape)
        y = self.block_relu(y)
        #print("After relu :" , y.shape)
        return y

In [None]:



def test_Block_shapes():

    # The number of channels and resolution do not change
    batch_size = 20
    x = torch.zeros(batch_size, 16, 28, 28)
    block = Block(in_channels=16, out_channels=16)
    y = block(x)
    assert y.shape == torch.Size([batch_size, 16, 28, 28]), "Bad shape of y: y.shape={}".format(y.shape)

    # Increase the number of channels
    block = Block(in_channels=16, out_channels=32)
    y = block(x)
    assert y.shape == torch.Size([batch_size, 32, 28, 28]), "Bad shape of y: y.shape={}".format(y.shape)

    # Decrease the resolution
    block = Block(in_channels=16, out_channels=16, stride=2)
    y = block(x)
    assert y.shape == torch.Size([batch_size, 16, 14, 14]), "Bad shape of y: y.shape={}".format(y.shape)

    # Increase the number of channels and decrease the resolution
    block = Block(in_channels=16, out_channels=32, stride=2)
    y = block(x)
    assert y.shape == torch.Size([batch_size, 32, 14, 14]), "Bad shape of y: y.shape={}".format(y.shape)

    print('Success')

test_Block_shapes()

In [None]:
tests.test_Block(Block)
tests.test_Block_relu(Block)
tests.test_Block_batch_norm(Block)

### Group of blocks

ResNet consists of several groups of blocks. The first block in a group may change the number of channels (often multiples the number by 2) and subsample (using strides).

<img src="https://drive.google.com/uc?id=1EhVWIAmPSgook2W_m1l_Ig1PmYRwmxmr
" width=200 style="float: right;">

In [None]:
# We implement a group of blocks in this cell
class GroupOfBlocks(nn.Module):
    def __init__(self, in_channels, out_channels, n_blocks, stride=1):
        super(GroupOfBlocks, self).__init__()

        first_block = Block(in_channels, out_channels, stride)
        other_blocks = [Block(out_channels, out_channels) for _ in range(1, n_blocks)]
        self.group = nn.Sequential(first_block, *other_blocks)

    def forward(self, x):
        return self.group(x)

In [None]:
# Let's print a block
group = GroupOfBlocks(in_channels=10, out_channels=20, n_blocks=3)
print(group)

### ResNet

Next we implement a ResNet with the following architecture. It contains three groups of blocks, each group having two basic blocks.

<img src="https://drive.google.com/uc?id=1th0iWvYPHjW9eh5O6-Hqu1LOpg4urwxZ" width=200 style="float: left;">




The cell below contains the implementation of our ResNet.

In [None]:
class ResNet(nn.Module):
    def __init__(self, n_blocks, n_channels=64, num_classes=7):
        """
        Args:
          n_blocks (list):   A list with three elements which contains the number of blocks in 
                             each of the three groups of blocks in ResNet.
                             For instance, n_blocks = [2, 4, 6] means that the first group has two blocks,
                             the second group has four blocks and the third one has six blocks.
          n_channels (int):  Number of channels in the first group of blocks.
          num_classes (int): Number of classes.
        """
        assert len(n_blocks) == 3, "The number of groups should be three."
        super(ResNet, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=n_channels, kernel_size=5, stride=1, padding=2, bias=False)
        self.bn1 = nn.BatchNorm2d(n_channels)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.group1 = GroupOfBlocks(n_channels, n_channels, n_blocks[0])
        self.group2 = GroupOfBlocks(n_channels, 2*n_channels, n_blocks[1], stride=2)
        self.group3 = GroupOfBlocks(2*n_channels, 4*n_channels, n_blocks[2], stride=2)

        self.avgpool = nn.AvgPool2d(kernel_size=4, stride=1)
        self.fc = nn.Linear(4*n_channels, num_classes)

        # Initialize weights
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, np.sqrt(2. / n))
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()

    def forward(self, x, verbose=False):
        """
        Args:
          x of shape (batch_size, 1, 28, 28): Input images.
          verbose: True if you want to print the shapes of the intermediate variables.
        
        Returns:
          y of shape (batch_size, 10): Outputs of the network.
        """
        if verbose: print(x.shape)
        x = self.conv1(x)
        if verbose: print('conv1:  ', x.shape)
        x = self.bn1(x)
        if verbose: print('bn1:    ', x.shape)
        x = self.relu(x)
        if verbose: print('relu:   ', x.shape)
        x = self.maxpool(x)
        if verbose: print('maxpool:', x.shape)

        x = self.group1(x)
        if verbose: print('group1: ', x.shape)
        x = self.group2(x)
        if verbose: print('group2: ', x.shape)
        x = self.group3(x)
        if verbose: print('group3: ', x.shape)

        x = self.avgpool(x)
        if verbose: print('avgpool:', x.shape)

        x = x.view(-1, self.fc.in_features)
        if verbose: print('x.view: ', x.shape)
        x = self.fc(x)
        if verbose: print('out:    ', x.shape)

        return x

In [None]:
def test_ResNet_shapes():
    # Create a network with 2 block in each of the three groups
    n_blocks = [2, 2, 2]  # number of blocks in the three groups
    net = ResNet(n_blocks, n_channels=10)
    net.to(device)

    # Feed a batch of images from the training data to test the network
    with torch.no_grad():
        images, labels = iter(trainloader).next()
        images = images.to(device)
        print('Shape of the input tensor:', images.shape)

        y = net.forward(images, verbose=True)
        print(y.shape)
        assert y.shape == torch.Size([trainloader.batch_size, 7]), "Bad shape of y: y.shape={}".format(y.shape)

    print('Success')

test_ResNet_shapes()

# Train the network

In [None]:
# This function computes the accuracy on the test dataset
def compute_accuracy(net, testloader):
    net.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in testloader:
            images, labels = images.to(device), labels.to(device)
            outputs = net(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total

### Training loop

In the cell below, implement the training loop. The recommended hyperparameters:
* Adam optimizer with learning rate 0.01.
* Cross-entropy loss. Note that we did not use softmax nonlinearity in the final layer of our network. Therefore, we need to use a loss function with log_softmax implemented, such as [`nn.CrossEntropyLoss`](https://pytorch.org/docs/stable/nn.html#torch.nn.CrossEntropyLoss).
* Number of epochs: 10

We recommend you to use function `compute_accuracy()` defined above to track the accaracy during training. The test accuracy should be above 0.9.

**Note: function `compute_accuracy()` sets the network into the evaluation mode which changes the way the batch statistics are computed in batch normalization. You need to set the network into the training mode (by calling `net.train()`) when you want to perform training.**

In [None]:
# Create the network
n_blocks = [2, 2, 2]  # number of blocks in the three groups
net = ResNet(n_blocks, n_channels=16)
net.to(device)

In [None]:
if not skip_training:

    # YOUR CODE HERE
    iteration=[]
    train_accu=[]
    losses=[]
    epochs_arr=[]
    train_accu_per_epoch=[]
    loss_per_epoch=[]
    optimizer = optim.Adam(net.parameters(), lr=0.01)
    criterion = nn.CrossEntropyLoss()
    epochs = 30
    net.train()
    for epoch in range(epochs):
        print("Epoch number:  ", epoch)
        for i, data in enumerate(trainloader, 0):
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            output = net(images)
            output.to(device)
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()
            iteration.append(i)
            train_accu.append(compute_accuracy(net,testloader))
            losses.append(loss)
            if i % 32 == 31:
                acc = compute_accuracy(net, testloader)
                print("Accuracy:  ", acc)
                net.train()
        epochs_arr.append(epoch)
        train_accu_per_epoch.append(acc)
        loss_per_epoch.append(loss)

In [None]:
print(iteration)
print(train_accu)
print(losses)
print(epochs_arr)
print(train_accu_per_epoch)
print(loss_per_epoch)

zipped = zip(iteration, train_accu,losses,epochs_arr, train_accu_per_epoch, loss_per_epoch )

np.savetxt('training_data.csv', zipped, fmt='%i,%i,%i,%i,%i,%i')

In [None]:
# Save the model to disk (the pth-files will be submitted automatically together with your notebook)
# Set confirm=False if you do not want to be asked for confirmation before saving.
if not skip_training:
  path = os.path.join(source_dir, 'resnet_all.pth')
  tools.save_model(net, path, confirm=True)

In [None]:
if skip_training:
    net = ResNet(n_blocks, n_channels=16)
    tools.load_model(net, 'resnet_all.pth', device)

In [None]:
# Compute the accuracy on the test set
accuracy = compute_accuracy(net, testloader)
print('Accuracy of the network on the test images: %.3f' % accuracy)
n_blocks = sum(type(m) == Block for _, m in net.named_modules())
assert n_blocks == 6, f"Wrong number ({n_blocks}) of blocks used in the network."

assert accuracy > 0.9, "Poor accuracy ({:.3f})".format(accuracy)
print('Success')

# Plotting

In [None]:
from matplotlib import pyplot as plt 
# for reference :- https://howtothink.readthedocs.io/en/latest/PvL_H.html

In [None]:
plt.plot([0.1, 0.2, 0.3, 0.4], [1, 2, 3, 4], label='first plot')
plt.plot([0.1, 0.2, 0.3, 0.4], [1, 4, 9, 16], label='second plot')
plt.legend()

In [None]:
plt.plot([0.1, 0.2, 0.3, 0.4], [1, 2, 3, 4])
plt.plot([0.1, 0.2, 0.3, 0.4], [1, 4, 9, 16])
plt.xlabel("Time (s)")
plt.ylabel("Scale (Bananas)")
plt.xlim(0, 1)
plt.ylim(-5, 20)

# Hypterparameter tuning 

In [None]:
!pip install ray

In [None]:
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler



In [None]:
# 1. I have to make a function for data loading
def load_data(data_dir1):
  data_transform = transforms.Compose([
        transforms.Resize((28,28)),    
        transforms.Grayscale(),               
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))  # Scale images to [-1, 1]
    ])
  all_dataset = datasets.ImageFolder(data_dir1, transform=data_transform)
  print(len(all_dataset))
  trainset, testset = torch.utils.data.random_split(all_dataset, [len(all_dataset)-780, 780])
  classes = ['alert','closed_eyes', 'no_yawn', 'non_vigilant', 'open_eyes', 'tired', 'yawn']
  trainloader = torch.utils.data.DataLoader(trainset, batch_size=32, shuffle=True)
  testloader = torch.utils.data.DataLoader(testset, batch_size=5, shuffle=False)

  print(len(trainset))
  print(len(testset))
  return trainloader,testloader


In [None]:
# 2. I have to make a configurable net, which is already there ResNet()
def train_Res(config, checkpoint_dir=None, data_dir=None):
    net = ResNet(config["n_blocks"],n_channels=16)
    device = "cpu"
    if torch.cuda.is_available():
        device = "cuda:0"
        if torch.cuda.device_count() > 1:
            net = nn.DataParallel(net)
    net.to(device)

    if checkpoint_dir:
        model_state, optimizer_state = torch.load(
            os.path.join(checkpoint_dir, "checkpoint"))
        net.load_state_dict(model_state)
        optimizer.load_state_dict(optimizer_state)

    trainloader, valloader = load_data(data_dir)
    optimizer = optim.Adam(net.parameters(), lr=config["lr"])
    criterion = nn.CrossEntropyLoss()
    epochs = 30
    net.train()
    for epoch in range(epochs):  # loop over the dataset multiple times
        running_loss = 0.0
        epoch_steps = 0
        print("Epoch number:  ", epoch)
        for i, data in enumerate(trainloader, 0):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            outputs.to(device)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            epoch_steps += 1
            if i % 2000 == 1999:  # print every 2000 mini-batches
                print("[%d, %5d] loss: %.3f" % (epoch + 1, i + 1,
                                                running_loss / epoch_steps))
                running_loss = 0.0

        # Validation loss
        val_loss = 0.0
        val_steps = 0
        total = 0
        correct = 0
        for i, data in enumerate(valloader, 0):
            with torch.no_grad():
                inputs, labels = data
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = net(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

                loss = criterion(outputs, labels)
                val_loss += loss.cpu().numpy()
                val_steps += 1

        with tune.checkpoint_dir(epoch) as checkpoint_dir:
            path = os.path.join(checkpoint_dir, "checkpoint")
            torch.save((net.state_dict(), optimizer.state_dict()), path)

        tune.report(loss=(val_loss / val_steps), accuracy=correct / total)
    print("Finished Training")

In [None]:
def main(num_samples=10, max_num_epochs=10, gpus_per_trial=2):
    data_dir = tools.select_data_dir('/content/drive/MyDrive/DrowsinessDetection')
    load_data(data_dir)
    config = {
        "lr": tune.loguniform(1e-4, 1e-1),
        "batch_size": tune.choice([2, 4, 8, 16])
    }
    scheduler = ASHAScheduler(
        metric="loss",
        mode="min",
        max_t=max_num_epochs,
        grace_period=1,
        reduction_factor=2)
    reporter = CLIReporter(
        # parameter_columns=["l1", "l2", "lr", "batch_size"],
        metric_columns=["loss", "accuracy", "training_iteration"])
    result = tune.run(
        partial(train_Res, data_dir=data_dir),
        resources_per_trial={"cpu": 2, "gpu": gpus_per_trial},
        config=config,
        num_samples=num_samples,
        scheduler=scheduler,
        progress_reporter=reporter)

    best_trial = result.get_best_trial("loss", "min", "last")
    print("Best trial config: {}".format(best_trial.config))
    print("Best trial final validation loss: {}".format(
        best_trial.last_result["loss"]))
    print("Best trial final validation accuracy: {}".format(
        best_trial.last_result["accuracy"]))

    best_trained_model = Net(best_trial.config["l1"], best_trial.config["l2"])
    device = "cpu"
    if torch.cuda.is_available():
        device = "cuda:0"
        if gpus_per_trial > 1:
            best_trained_model = nn.DataParallel(best_trained_model)
    best_trained_model.to(device)

    best_checkpoint_dir = best_trial.checkpoint.value
    model_state, optimizer_state = torch.load(os.path.join(
        best_checkpoint_dir, "checkpoint"))
    best_trained_model.load_state_dict(model_state)

    test_acc = test_accuracy(best_trained_model, device)
    print("Best trial test set accuracy: {}".format(test_acc))


if __name__ == "__main__":
    # You can change the number of GPUs per trial here:
    main(num_samples=10, max_num_epochs=10, gpus_per_trial=0)