<a href="https://colab.research.google.com/github/AryanRaj315/HeadPoseEstimation/blob/master/Headpose_300wlp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Run Once

In [0]:
# from google.colab import drive
# drive.mount('/content/drive')

Extracting the .zip file 

In [0]:
# !unzip -qq '/content/drive/My Drive/300W-LP.zip'

# Getting the dataset and installing necessary libraries

In [0]:
import os
import cv2
from math import atan2, asin 
import numpy as np
import pandas as pd
import dlib
import math
import matplotlib.pyplot as plt
from tqdm import tqdm_notebook as tqdm
from torch.utils.data import DataLoader, Dataset, sampler
from torch.optim.lr_scheduler import ReduceLROnPlateau
from albumentations.pytorch import ToTensor
from albumentations import (HorizontalFlip, ShiftScaleRotate, Normalize, Resize, Compose, GaussNoise)
import torch
from torchvision import transforms
import torch.nn as nn
from torch.nn import functional as F
import torch.optim as optim
import torch.backends.cudnn as cudnn
import torchvision.models as models
import time
import scipy.io

# Useful functions

Function to get the path

In [0]:
def PathList(path):
  Img_Path = []
  for root, dirs, files in os.walk(path, topdown=False):
      for filename in files:
        if(filename[-3:] == 'jpg'):
          Img_Path.append(root+'/'+filename)
  return Img_Path        

Here ImagesPath is a list that contains all the .png files from the dataset

In [0]:
ImagesPath = PathList("300W_LP/")

In [6]:
len(ImagesPath)

122450

This function is used to convert the transformation matrix from 
all the .txt files to Yaw, Pitch and Roll 

In [0]:
def getAngles(path):
    mat = scipy.io.loadmat(path)
    angle_mat = mat['Pose_Para'][0]
    roll = angle_mat[0]*180/math.pi
    pitch = angle_mat[1]*180/math.pi
    yaw = angle_mat[2]*180/math.pi
    return np.array([yaw, pitch, roll]) 

# Dataset Class

In [0]:
class WLP300Dataset(Dataset):
    def __init__(self, image_path_list, phase):
        self.phase = phase
        self.transform = ToTensor()
        self.path = image_path_list
        self.train = image_path_list[:int(len(image_path_list)*0.8)]
        self.val = image_path_list[int(len(image_path_list)*0.8):]

    def __getitem__(self, idx):
        if(self.phase == 'train'):
          Image = cv2.imread(self.train[idx])
          Angle = getAngles(self.train[idx][:-3]+'mat')
        elif(self.phase == 'val'):
          Image = cv2.imread(self.val[idx])
          Angle = getAngles(self.val[idx][:-3]+'mat')
        Image = cv2.resize(Image,(450,450))
        Image = Image.transpose(2,0,1)  # Making Channel First 
        Image = torch.from_numpy(Image).type(torch.FloatTensor)
        Angle = torch.from_numpy(Angle).type(torch.FloatTensor)  
        return Image, Angle

    def __len__(self):
        if self.phase == 'train':
            return len(self.train)
        else:
            return len(self.val)

This function provides the training and validation dataloader accordingly

In [0]:
def provider(phase, image_list,  batch_size=8, num_workers=0):    
    image_dataset = WLP300Dataset(image_list, phase)
    dataloader = DataLoader(
        image_dataset,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=True,
        shuffle=True,   
    )
    return dataloader

# Trainer Class

In [0]:
class Trainer(object):
    '''This class takes care of training and validpathation of our model'''
    def __init__(self, model, image_list, bs, lr, epochs):
        self.num_workers = 0
        self.batch_size = bs
        self.accumulation_steps = 32 // self.batch_size
        self.lr = lr
        self.num_epochs = epochs
        self.best_loss = float("inf")
        self.phases = ["train", "val"]
        self.device = torch.device("cuda:0")
        torch.set_default_tensor_type("torch.cuda.FloatTensor")
        self.net = model
        self.image_list = image_list
        self.criterion = torch.nn.L1Loss()
        self.optimizer = optim.Adam(self.net.parameters(), lr=self.lr)
        self.scheduler = ReduceLROnPlateau(self.optimizer, mode="min", patience=3, verbose=True,factor = 0.5,min_lr = 1e-5)
        self.net = self.net.to(self.device)
        cudnn.benchmark = True
        self.dataloaders = {
            phase: provider(
                phase=phase,
                image_list = self.image_list,
                batch_size=self.batch_size,
                num_workers=self.num_workers
            )
            for phase in self.phases
        }
        self.losses = {phase: [] for phase in self.phases}
        
    def forward(self, images, targets):
        images = images.to(self.device)
        target = targets.to(self.device)
        outputs = self.net(images)
        loss = self.criterion(outputs, target)
        return loss, outputs

    def iterate(self, epoch, phase):
        start = time.strftime("%H:%M:%S")
        print(f"Starting epoch: {epoch} | phase: {phase} | ⏰: {start}")
        batch_size = self.batch_size
        self.net.train(phase == "train")
        dataloader = self.dataloaders[phase]
        running_loss = 0.0
        total_batches = len(dataloader)
        tk0 = tqdm(dataloader, total=total_batches)
        self.optimizer.zero_grad()
        for itr, batch in enumerate(tk0): # replace `dataloader` with `tk0` for tqdm
            images, targets = batch
            loss, outputs = self.forward(images, targets)
            loss = loss / self.accumulation_steps
            if phase == "train":
                loss.backward()
                if (itr + 1 ) % self.accumulation_steps == 0:
                    self.optimizer.step()
                    self.optimizer.zero_grad()
            running_loss += loss.item()
            outputs = outputs.detach().cpu()
            tk0.set_postfix(loss=(running_loss / ((itr + 1))))
        epoch_loss = (running_loss * self.accumulation_steps) / total_batches
        torch.cuda.empty_cache()
        print(f'loss:{epoch_loss}')
        return epoch_loss

    def train_end(self):
        train_loss = self.losses["train"]
        val_loss = self.losses["val"]
        df_data=np.array([train_loss,val_loss]).T
        df = pd.DataFrame(df_data,columns = ['train_loss','val_loss'])
        df.to_csv("Training_log.csv")

    def start(self):
        for epoch in range(self.num_epochs):
            train_loss = self.iterate(epoch, "train")
            self.losses["train"].append(train_loss)
            state = {
                "epoch": epoch,
                "best_loss": self.best_loss,
                "state_dict": self.net.state_dict(),
                "optimizer": self.optimizer.state_dict(),
            }
            with torch.no_grad():
                val_loss = self.iterate(epoch, "val")
                self.losses["val"].append(val_loss)
                self.scheduler.step(val_loss)
            if val_loss < self.best_loss:
                print("******** New optimal found, saving state ********")
                state["best_loss"] = self.best_loss = val_loss
                torch.save(state, "./model.pth")
            print()
            self.train_end()    


#Getting the model and Training

In [11]:
!pip install efficientnet_pytorch



In [0]:
from efficientnet_pytorch import EfficientNet

In [13]:
model = EfficientNet.from_pretrained('efficientnet-b3')

Loaded pretrained weights for efficientnet-b3


In [0]:
# model = models.resnet50(pretrained = True)

In [0]:
num_ftrs = model._fc.in_features
model._fc = nn.Linear(num_ftrs, 3)

In [0]:
model_trainer = Trainer(model, ImagesPath,14,1e-3,10)
model_trainer.start()

Starting epoch: 0 | phase: train | ⏰: 08:13:42


HBox(children=(IntProgress(value=0, max=6998), HTML(value='')))

In [0]:
model.eval()