In [None]:
import torch
import os
from UnsupervisedGaze_model import *
from torchvision import transforms
from PIL import Image
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import math
import time
from sklearn.utils import shuffle
from progress.bar import Bar
import sys
import random

def progressbar(it, prefix="", size=60, out=sys.stdout):
    count = len(it)
    def show(j):
        x = int(size*j/count)
        print("{}[{}{}] {}/{}".format(prefix, "#"*x, "."*(size-x), j, count), 
                end='\r', file=out, flush=True)
    show(0)
    for i, item in enumerate(it):
        yield item
        show(i+1)
    print("\n", flush=True, file=out)

In [None]:
preprocess = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
])


randseed = 0.4 * np.random.rand(6) + 0.3

preprocess_augmentation = transforms.Compose([
    transforms.RandomResizedCrop((224, 224), scale=(0.5, 1.0), ratio=(0.95, 1.05)),
    transforms.Normalize(mean=randseed[:3], std=randseed[3:]),
])

class Data_loader():
    def __init__(self, path, train_test_ratio = 0.7, augmentation = True, plot_sample=False):
        self.dirs = []
        self.files_path = []
        self.count = []
        self.path = path
        for each in os.listdir(path):
            self.count.append(0)
            if each[:2] == '00':
                self.dirs.append(each)
                self.files_path.append([f.path for f in os.scandir(path+each+'/') if f.is_file() and f.path[-9:]=='H_new.jpg'])
                self.count[-1] += 1
        self.split_index = round(train_test_ratio * len(self.files_path))
        print(f"[Data_loader] training set: {self.split_index} person |")
        print(f"[Data_loader] test set: {len(self.files_path) - self.split_index} person |")
        self.images_train = []
        self.gts_train = []
        self.images_test = []
        self.gts_test = []
        self.augmentation = augmentation

    def read_angles(self, name):
        filter = name.split("/")[-1].split("_")[3:]
        yaw = int(filter[0][:-1])
        pitch = int(filter[1].replace("H", ''))
        return yaw, pitch

    def read_data(self, person, mode="TRAIN"):
        temp = []
        ang = []
        for file_name in person:
            yaw, pitch = self.read_angles(file_name)
            with Image.open(file_name) as image:
              image_matrix = preprocess(image)
            if self.augmentation and mode=="TRAIN" and np.random.randint(0,100) > 63:
                image_matrix_augmentation = preprocess_augmentation(image_matrix)
                temp.append(image_matrix_augmentation)
                ang.append([yaw, pitch])
            temp.append(image_matrix)
            ang.append([yaw, pitch])
        if mode == "TRAIN":
            self.images_train.extend(temp)
            self.gts_train.extend(ang)
        else:
            self.images_test.extend(temp)
            self.gts_test.extend(ang)

    def get_data(self):
        for i, person in enumerate(self.files_path):
            if i >= self.split_index:
                self.read_data(person, mode="TEST")
            self.read_data(person)
        return self.images_train, self.gts_train, self.images_test, self.gts_test

ColumbiaGazeDataset = Data_loader(path = "/root/Columbia Gaze Data Set/")
images_train, gts_train, images_test, gts_test = ColumbiaGazeDataset.get_data()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("Running on : ", device)
print(f"Number of training images : {len(images_train)} test: {len(images_test)}")

images_train, images_test = torch.stack(images_train).to(device), torch.stack(images_test).to(device)
gts_train, gts_test = torch.tensor(gts_train).to(device), torch.tensor(gts_test).to(device)

In [None]:
def find_abs_angle_difference(a, b):
    cos_theta = torch.cos(a/180 * math.pi) * torch.cos(b/180 * math.pi) 
    theta = torch.acos(cos_theta)
    return torch.abs(theta * 180 / math.pi)

def test_baseline(data, number_of_epoch = 250, lr=1e-5, weight_decay=1e-3, batch_size = 16, show_images=False):
    images = data[0]
    gts = data[1]
    images_test = data[2]
    gts_test = data[3]
    loss_hist = []
    if show_images:
        for _ in range(2):
            num = np.random.randint(0, len(images))
            plt.imshow(images[num, 0, :, :])
            plt.title(f"{gts[num]}")
            plt.show()
    model = GazeRepresentationLearning_fullface()
    model = model.to(device)
    torch.backends.cudnn.benchmark=True
    criterion = nn.L1Loss()
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    for epoch in range(number_of_epoch):
        prev_time = time.time()
        loss_hist.append([])
        for i in range(3840//batch_size):
            image_1 = images[batch_size*i:batch_size*(i+1)].view(batch_size, 3, images.size(-2), images.size(-1))
            outputs = model(image_1)
            gt = gts[batch_size*i:batch_size*(i+1)]
            if epoch % 50 == 25 and i==0:
                for g in optimizer.param_groups:
                    g['lr'] /= 4
                print("reduce lr")
            loss = criterion(outputs, gts[batch_size*i:batch_size*(i+1)])
            loss.backward()
            optimizer.step()
            loss_hist[-1].append(float(loss))
        if epoch%20 == 0:
            print(f'epoch: {epoch+1} / {number_of_epoch}, loss: {sum(loss_hist[-1])/len(loss_hist[-1])}, time: {time.time() - prev_time} s')
        torch.save(model, f"/root/_KD/UnsupervisedBaseline_log_5/full_log/baseline_mixup_epoch={epoch}_loss={float(loss)}_{batch_size=}_{weight_decay=}.pt")
    outputs = model(images_test)
    dif = gts_test - outputs
    yaw = dif[:, 0]
    pitch = dif[:, 1]
    val = find_abs_angle_difference(yaw, pitch)
    error = torch.sum(val/outputs.size(0))
    print(f": got mean angle error: {error}")
    torch.save(model, f"/root/_KD/UnsupervisedBaseline_log_5/_final_baseline_mixup_error={float(error)}.pt")