In [28]:
from google.colab import drive

drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [29]:
!pip install tensorboardX
!pip install --quiet pytorch-lightning>=1.5

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [30]:
import os
import collections
import numpy as np
import random

import pytorch_lightning as pl
from pytorch_lightning import Trainer, seed_everything
from pytorch_lightning.loggers.csv_logs import CSVLogger

%matplotlib inline
import matplotlib.pyplot as plt
import pandas as pd

import sys
from tensorboardX import SummaryWriter
import torch
import torch.nn as nn
import torch.utils.data as data
from torch.optim.lr_scheduler import ExponentialLR
import numpy as np
import time
import shutil
import time
import datetime
import argparse
import os
import torch.nn.init as init
import torch.nn.functional as F
from math import floor
from math import ceil
import math

In [31]:
os.chdir('/content/gdrive/MyDrive/gaze_project')
os.getcwd()

'/content/gdrive/MyDrive/gaze_project'

In [32]:
# create dataset
class MyDataset(data.Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __getitem__(self, index):
        feature, target = self.features[index], self.labels[index]
        return feature, target
    
    def __len__(self):
        return len(self.features)

    
# load data.    
def LoadData(dataset_dir, batch_size):

    print("\nLoading the training dataset")
    trainingX = torch.from_numpy(np.load(dataset_dir + 'trainingX.npy')).float()
    trainingY = torch.from_numpy(np.load(dataset_dir + 'trainingY.npy')).float()    
    print('\nTraining Data Size: {}'.format(list(trainingX.size())))

    train_dataset = MyDataset(trainingX, trainingY)

    train_loader = data.DataLoader(dataset=train_dataset, num_workers=8, batch_size=batch_size, shuffle=True)

    print("\nLoading the testing dataset")
    testX = torch.from_numpy(np.load(dataset_dir + 'testX.npy')).float()
    testY = torch.from_numpy(np.load(dataset_dir + 'testY.npy')).float()

    test_size = testX.size()
    test_new_size = test_size[0]//2

    test_X = testX[0:test_new_size, :]
    test_Y = testY[0:test_new_size, :]

    print('\nTest Data Size: {}'.format(list(test_X.size())))
    test_dataset = MyDataset(test_X, test_Y)

    test_loader = data.DataLoader(dataset=test_dataset, num_workers=8, batch_size=batch_size, shuffle=False)

    print("\nLoading the validation dataset")
    validationX = testX[test_new_size:, :]
    validationY = testY[test_new_size:, :]

    print('\nValidation Data Size: {}'.format(list(validationX.size())))
    validation_dataset = MyDataset(validationX, validationY)

    validation_loader = data.DataLoader(dataset=validation_dataset, num_workers=8, batch_size=batch_size, shuffle=False)

    return train_loader, test_loader, validation_loader

In [33]:
class CosineWarmupScheduler(torch.optim.lr_scheduler._LRScheduler):
    
    def __init__(self, optimizer, warm_up, max_iters):
        self.warm_up = warm_up
        self.max_n_iters = max_iters
        super().__init__(optimizer)
        
    def get_lr(self):
        lr_factor = self.get_lr_factor(epoch=self.last_epoch)
        return [base_lr * lr_factor for base_lr in self.base_lrs]
    
    def get_lr_factor(self, epoch):
        lr_factor = 0.5 * (1 + np.cos(np.pi * epoch / self.max_n_iters))
        if epoch <= self.warm_up:
            lr_factor *= epoch * 1.0 / self.warm_up
        return lr_factor

In [34]:
class HeadObjCNN(pl.LightningModule):
    def __init__(self, 
                 input_size, 
                 seq_length, 
                 seq_feature_num, 
                 n_output, 
                 lr, 
                 warm_up,
                 max_iters,
                 criterion, 
                 dropout):
        super().__init__()

        # model params
        self.input_size = input_size
        self.lr = lr
        self.n_output = n_output
        self.warm_up = warm_up

        self.max_iters = max_iters
        self.dropout = dropout

        self.criterion = criterion

        self.seq_length = seq_length
        self.seq_feature_num = seq_feature_num
        self.seq_size = self.seq_length * self.seq_feature_num

        # the model params
        cnn_1d_out = 128
        cnn_1d_pooling_rate = 2
        cnn_1d_kernal_size = 2
        self.cnn_1d_output_size = floor((self.seq_length - cnn_1d_kernal_size + 1)/cnn_1d_pooling_rate)* cnn_1d_out

        prd_fc_linear_size_1 = 128
        prd_fc_linear_size_2 = 128

        self.cnn_1d = nn.Sequential(
            nn.Conv1d(in_channels=self.seq_feature_num, out_channels=cnn_1d_out,kernel_size=cnn_1d_kernal_size),
            nn.BatchNorm1d(cnn_1d_out),
            nn.ReLU(),
            nn.MaxPool1d(cnn_1d_pooling_rate),
            nn.Dropout(p = self.dropout),
             )
        
        self.prd_fc = nn.Sequential(
            nn.Linear(self.cnn_1d_output_size, prd_fc_linear_size_2),
            nn.BatchNorm1d(prd_fc_linear_size_1),
            nn.ReLU(),
            nn.Dropout(p = self.dropout),
            nn.Linear(prd_fc_linear_size_2, self.n_output)
             ) 
        
    def forward(self, x):

        head_object_seq = x[:, 0:self.seq_size]       
        
        head_object_seq = head_object_seq.reshape(-1, self.seq_length, self.seq_feature_num)
        head_object_seq = head_object_seq.permute(0,2,1)
        seq_out = self.cnn_1d(head_object_seq)
        seq_out = seq_out.reshape(-1, self.cnn_1d_output_size)
        gaze = self.prd_fc(seq_out)
        return gaze
    
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr)

        lr_scheduler = CosineWarmupScheduler(optimizer, 
                                             warm_up=self.warm_up, 
                                             max_iters=self.max_iters)
        return [optimizer], [{'scheduler': lr_scheduler, 'interval': 'step'}]

    def training_step(self, batch, batch_idx):

        features, labels = batch

        features = features.reshape(-1, self.input_size).to(device)
        labels = labels.reshape(-1, self.n_output).to(device)

        output = self(features)

        loss = self.criterion(output, labels)

        self.log('train_loss', loss)

        return loss

    def validation_step(self, batch, batch_idx):
        features, labels = batch

        features = features.reshape(-1, self.input_size).to(device)
        labels = labels.reshape(-1, self.n_output).to(device)

        output = self(features)

        loss = self.criterion(output, labels)

        self.log("val_loss", loss, prog_bar=True)
        return loss


    def test_step(self, batch, batch_idx):

        features, labels = batch

        features = features.reshape(-1, self.input_size).to(device)
        labels = labels.reshape(-1, self.n_output).to(device)

        output = self(features)

        loss = self.criterion(output, labels)

        prd_error = 0
        ver_error = 0
        hor_error = 0

        for i in range(output.size(0)):
            prd_error += CalAngularDist(labels[i, 0:2], output[i, 0:2])
            ver_error += abs(labels[i, 0] - output[i, 0])
            hor_error += abs(labels[i, 1] - output[i, 1])

        mean_ver_error = ver_error / output.size(0)
        mean_hor_error = hor_error / output.size(0)
        mean_prd_error = prd_error / output.size(0)

        pixel_pred = AngularCoord2PixelCoord(output[0])
        pixel_gth = AngularCoord2PixelCoord(labels[0])

        prd_x.append(pixel_pred[0])
        prd_y.append(pixel_pred[1])
        gth_x.append(pixel_gth[0])
        gth_y.append(pixel_gth[1])

        self.log("test_loss", loss, prog_bar=True)
        self.log("test_ang_error", mean_prd_error, prog_bar=True)
        self.log("mean_ver_error", mean_ver_error, prog_bar=True)
        self.log("mean_hor_error", mean_hor_error, prog_bar=True)

        return loss


In [35]:
def get_args(train=True):
    args = dict()
    args['feature_num'] = 1702
    args['seq_length'] = 50
    args['seq_feature_num'] = 11
    # the dropout rate of the model.
    args['dropout_rate'] = 0.5   
    # the directory that saves the dataset.
    args['dataset_dir'] = 'DGaze_TrainTest/'
    # the number of total epochs to run
    args['epochs'] = 30
    # the batch size
    args['batch_size'] = 64
    # the initial learning rate.
    args['lr'] = 1e-2
    args['warm_up'] = 2
    return args

In [36]:

def CalAngularDist(gth, prd):

	vertical_fov = math.pi*110/180;

	screen_w = 1080
	screen_h = 1200
	screen_center_x = 0.5*screen_w
	screen_center_y = 0.5*screen_h

	screen_dist = 0.5* screen_h/math.tan(vertical_fov/2)
	

	gth = AngularCoord2ScreenCoord(gth)
	prd = AngularCoord2ScreenCoord(prd)

	gth[0] = gth[0]*screen_w
	gth[1] = gth[1]*screen_h
	prd[0] = prd[0]*screen_w
	prd[1] = prd[1]*screen_h
	
	#the distance between eye and gth.
	eye2gth = np.sqrt(np.square(screen_dist) + np.square(gth[0] - screen_center_x) + np.square(gth[1] - screen_center_y))
	#the distance between eye and prd.
	eye2prd = np.sqrt(np.square(screen_dist) + np.square(prd[0] - screen_center_x) + np.square(prd[1] - screen_center_y))
	#the distance between gth and prd.
	gth2prd = np.sqrt(np.square(prd[0] - gth[0]) + np.square(prd[1] - gth[1]))
	
	#the angular distance between gth and prd.
	angular_dist = 180/math.pi*math.acos((np.square(eye2gth) + np.square(eye2prd) - np.square(gth2prd))/(2*eye2gth*eye2prd))
	return angular_dist

def AngularCoord2PixelCoord(angular_coord):
	screen_w = 1080
	screen_h = 1200

	screen_coord = AngularCoord2ScreenCoord(angular_coord);

	pixel_coord = np.zeros(2)

	pixel_coord[0] = screen_coord[0]*screen_w
	pixel_coord[1] = screen_coord[1]*screen_h

	return pixel_coord
	
def AngularCoord2ScreenCoord(angular_coord):

	vertical_fov = math.pi*110/180

	screen_w = 1080
	screen_h = 1200

	screen_dist = 0.5* screen_h/math.tan(vertical_fov/2)
	
	screen_coord = np.zeros(2)

	screen_coord[0] = (screen_dist * math.tan(math.pi*angular_coord[0] / 180) + 0.5*screen_w) / screen_w

	screen_coord[1] = (screen_dist * math.tan(-math.pi*angular_coord[1] / 180) + 0.5*screen_h) / screen_h
	return screen_coord

In [37]:
def main(args, train=True):

    # Load dataset
    train_loader, test_loader, validation_loader = LoadData(args['dataset_dir'], args['batch_size'])

    # Create the model.
    root_dir = os.getcwd()

    print('\n==> Starting...')

    csv_logger = CSVLogger('./', name='head_cnn', version='1'),

    trainer = Trainer(
        default_root_dir=root_dir,
        max_epochs=args['epochs'],
        logger=csv_logger,
        gpus=1,
        log_every_n_steps=1,
        progress_bar_refresh_rate=1
    )

    model = HeadObjCNN(
        input_size = args['feature_num'], 
        seq_length = args['seq_length'], 
        seq_feature_num = args['seq_feature_num'], 
        n_output = 2, 
        lr = args['lr'], 
        warm_up = args['warm_up'],
        max_iters = args['epochs'],
        criterion = nn.L1Loss(), 
        dropout = args['dropout_rate']
    )

    if train:
      print('\n==> Training...')
      trainer.fit(model, train_dataloaders=train_loader, val_dataloaders=validation_loader)

      metrics = pd.read_csv('./head_cnn/0/metrics.csv')
      train_loss = metrics[['train_loss', 'step', 'epoch']][~np.isnan(metrics['train_loss'])]
      val_loss = metrics[['val_loss', 'epoch']][~np.isnan(metrics['val_loss'])]

      fig, axes = plt.subplots(1, 2, figsize=(16, 5), dpi=100)
      axes[0].set_title('Train loss per batch')
      axes[0].plot(train_loss['step'][::2000], train_loss['train_loss'][::2000])
      axes[1].set_title('Validation loss per epoch')
      axes[1].plot(val_loss['epoch'], val_loss['val_loss'], color='orange')
      plt.show(block = True)

      print(f"Train loss: {train_loss['train_loss'].iloc[-1]:.3f}")
      print(f"Val loss:   {val_loss['val_loss'].iloc[-1]:.3f}")

    else:


      print('\n==> Testing...')
      chk_path = "./head_cnn/0/checkpoints/epoch=6-step=114373.ckpt"
      model2 = model.load_from_checkpoint(chk_path,         
                                          input_size = args['feature_num'], 
                                          seq_length = args['seq_length'], 
                                          seq_feature_num = args['seq_feature_num'], 
                                          n_output = 2, 
                                          lr = args['lr'], 
                                          warm_up = args['warm_up'],
                                          max_iters = args['epochs'],
                                          criterion = nn.L1Loss(), 
                                          dropout = args['dropout_rate'])
      trainer.test(model=model2, dataloaders=test_loader)

In [38]:
def plotComparisonGraph(gth_x, gth_y, prd_x, prd_y):
  s = [5] * len(prd_x)
  plt.figure(figsize=(12,9))

  gth = plt.scatter(gth_x, gth_y, s, color = '#88c999')

  prd = plt.scatter(prd_x, prd_y, s, color = 'hotpink')
  plt.xlim(0, 1080)
  plt.ylim(0, 1200)

  plt.title("Predicted gaze positions versus ground truth", fontsize=16)

  plt.xlabel("Horizontal /pixel", fontsize=16)
  plt.ylabel("Vertical /pixel", fontsize=16)

  plt.legend((prd, gth),
            ('Predicted', 'Ground Truth'),
            scatterpoints=1,
            loc='lower left',
            ncol=3,
            fontsize=12)

  plt.show()

In [None]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Device:", device)

# set the random seed to ensure reproducibility
seed_everything(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

train = True

prd_x = []
prd_y = []
gth_x = []
gth_y = []

args = get_args()
main(args, train)

if not train:
  plotComparisonGraph(gth_x, gth_y, prd_x, prd_y)

Global seed set to 42


Device: cuda

Loading the training dataset
