In [None]:
#@title Mount google drive

from google.colab import drive
drive.mount("/content/drive", force_remount=True)

%cd './drive/MyDrive/gaze_estimation'

In [None]:
#@title Import required modules

import os
import time
import copy
import random
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from tqdm.auto import tqdm
from sklearn.linear_model import SGDRegressor, LinearRegression

from util import make_reproducibility, TensorDataset, convert_to_xyz, mae, convert_to_spherical
from networks import *
from posthoc import fitrlinear

In [None]:
def chm_ut_3fold_posthoc(fold, seed, path = '.', experiment_name = 'ut_3fold', patience=10, device=torch.device('cuda:0')) :
    make_reproducibility(seed + fold)

    train_ids = np.load(f'../ut_dataset/3-fold/fold_{fold}_train_ids.npy').reshape(-1)
    train_images = torch.as_tensor(np.load(f'../ut_dataset/3-fold/fold_{fold}_train_images.npy'), dtype=torch.float)
    train_hps = torch.as_tensor(np.load(f'../ut_dataset/3-fold/fold_{fold}_train_2d_hps.npy'), dtype=torch.float)
    train_gazes = torch.as_tensor(np.load(f'../ut_dataset/3-fold/fold_{fold}_train_2d_gazes.npy'), dtype=torch.float)
    train_y = train_gazes * 180 / np.pi

    test_ids = np.load(f'../ut_dataset/3-fold/fold_{fold}_test_ids.npy').reshape(-1)
    test_images = torch.as_tensor(np.load(f'../ut_dataset/3-fold/fold_{fold}_test_images.npy'), dtype=torch.float)
    test_hps = torch.as_tensor(np.load(f'../ut_dataset/3-fold/fold_{fold}_test_2d_hps.npy'), dtype=torch.float)
    test_gazes = torch.as_tensor(np.load(f'../ut_dataset/3-fold/fold_{fold}_test_2d_gazes.npy'), dtype=torch.float)
    test_y = test_gazes * 180 / np.pi


    # Load model
    # mean_model = ResNet_batchnorm.ResNet_batchnorm(out_features=2).to(device)
    # mean_model.load_state_dict(torch.load(f'{path}/results/corr_prec_{experiment_name}_{looid}_nhll_selected_mean_model.pt'))

    # Load trained parameters
    v_list = np.load(f'{path}/prediction/corr_prec_{experiment_name}_{fold}_v_list.npy')
    ind = np.max(np.where(v_list[:,0,0,0] !=0)[0]) - patience
    best_v_list = torch.as_tensor(v_list[ind], dtype=torch.float)

    test_Gamma = torch.as_tensor(np.load(f'{path}/prediction/corr_prec_{experiment_name}_{fold}_test_Gamma.npy')[2], dtype=torch.float)
    train_Gamma = torch.as_tensor(np.load(f'{path}/prediction/corr_prec_{experiment_name}_{fold}_train_Gamma.npy')[2], dtype=torch.float)

    beta_list = torch.as_tensor(np.load(f'{path}/prediction/corr_prec_{experiment_name}_{fold}_beta.npy'), dtype=torch.float)

    # Load prediction
    prediction = torch.as_tensor(np.load(f'{path}/prediction/corr_prec_{experiment_name}_{fold}_pred.npy'), dtype=torch.float)


    # Recover fixed and random parts  (train dataset)
    train_fixed = train_Gamma @ beta_list[ind]
    train_random = torch.zeros_like(train_y)
    m = len(best_v_list)
    train_cluster= [np.where(train_ids == ids)[0] for ids in np.unique(train_ids)]

    for i in range(m) :
        train_random[train_cluster[i]] += train_Gamma[train_cluster[i]] @ best_v_list[i]

    test_fixed = convert_to_spherical(prediction[ind], deg=True)


    K = 2
    train_z = train_Gamma[:,1:]
    test_z = test_Gamma[:,1:]

    train_v_list = [torch.zeros_like(train_Gamma) for _ in range(K)]
    for i in range(m) :
        for k in range(K) :
            train_v_list[k][train_cluster[i]] = best_v_list[i][:,k].repeat(len(train_cluster[i]), 1)

    w_list = []
    for k in range(K) :
        w_list.append(fitrlinear(X = train_z.detach().cpu(), y = train_v_list[k].cpu(), device=device).to(device))
        w_list[k].fit(max_epoch=50)

    test_nu_list = [w_list[k].predict(test_z).detach().cpu() for k in range(K)]

    test_new_adj_1 = torch.zeros_like(test_gazes)
    test_new_adj_1[:,0] = torch.sum(test_Gamma * test_nu_list[0], dim = 1)
    test_new_adj_1[:,1] = torch.sum(test_Gamma * test_nu_list[1], dim = 1)


    print(f'Test MAE (fixed) : {mae(test_fixed, test_y, is_3d=False, deg=True)}')
    print(f'Test MAE (new adjustment) : {mae(test_fixed + test_new_adj_1, test_y, is_3d=False, deg=True)}')

    return test_Gamma, test_nu_list, w_list, test_new_adj_1

In [None]:
# looid = 0
seed = 100
path = '.'
experiment_name = 'ut_3fold'
patience=10
device=torch.device('cuda:0')

posthoc_list = []
for fold in range(3) :
    posthoc_list.append(chm_ut_3fold_posthoc(fold, seed, path = path, experiment_name = experiment_name, patience=patience, device=device))

In [None]:
nu_list = [torch.stack(posthoc_list[fold][1]) for fold in range(3)]

os.makedirs('./posthoc_results', exist_ok=True)
torch.save(nu_list, f'./posthoc_results/chm_ut_3fold_nu.pt')