In [7]:
import random
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torch.cuda.amp import autocast, GradScaler
import sys
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import numpy as np
import time
import pickle
import matplotlib


import os
import sys
from pathlib import Path
sys.path.append(str(Path(os.getcwd()).parent))

In [2]:
%matplotlib inline
matplotlib.rcParams['figure.figsize'] = (13, 5)

seed = 678
torch.manual_seed(seed)
random.seed(seed)
np.random.seed(seed)

In [3]:
import torch.optim as optim
import torch.nn.functional as F
import torch
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import lightning as L


train_indices = [
    21, 54, 103, 67, 109, 10, 338, 297, 332, 284, 251,  # forehead
    108, 151, 337,  # forehead lower
    143, 156, 70, 63, 105, 66, 107,  # brow right outer
    336, 296, 334, 293, 300, 383, 372,  # brow left outer
    124, 46, 53, 52, 65, 55, 193,  # brow right middle
    285, 295, 282, 283, 276, 353, 417,  # brow left middle
    226, 247, 246, 221,  # around right eye
    446, 467, 466, 441,  # around left eye
    189, 190, 173, 133, 243, 244, 245, 233,  # right z
    413, 414, 398, 362, 463, 464, 465, 153,  # left z
    58, 172, 136, 150,  # right cheek
    288, 397, 365, 379,  # left cheek
    468, 469, 470, 471, 472,  # right iris
    473, 474, 475, 476, 477,  # left iris
]


class GazePredictor(L.LightningModule):
    def __init__(self, arch):
        super().__init__()
        self.scaler = StandardScaler()
        self.learning_rate = 0.001
        self.arch = arch
        self.fc1 = nn.Linear(*arch[0:1+1])
        self.relu = nn.LeakyReLU()
        if len(arch) > 3:
            self.hidden1 = nn.Linear(*arch[1:2+1])
            self.relu2 = nn.LeakyReLU()
        if len(arch) > 4:
            self.hidden2 = nn.Linear(*arch[2:3+1])
            self.relu3 = nn.LeakyReLU()
        if len(arch) > 5:
            self.hidden3 = nn.Linear(*arch[3:4+1])
            self.relu4 = nn.LeakyReLU()
        self.fc2 = nn.Linear(*arch[-2:])

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        if len(self.arch) > 3:
            x = self.hidden1(x)
            x = self.relu2(x)
        if len(self.arch) > 4:
            x = self.hidden2(x)
            x = self.relu3(x)
        if len(self.arch) > 5:
            x = self.hidden3(x)
            x = self.relu4(x)
        x = self.fc2(x)
        return x

    def configure_optimizers(self):
        return optim.Adam(self.parameters(), self.learning_rate)

    def training_step(self, batch, batch_idx):
        input_i, label_i = batch
        output_i = self.forward(input_i)
        loss = torch.mean((output_i - label_i) ** 2)
        self.log("train_loss", loss, prog_bar=True, on_step=True, on_epoch=True)
        return loss

    def validation_step(self, batch, batch_idx):
        input_i, label_i = batch
        output_i = self.forward(input_i)
        loss = torch.mean((output_i - label_i) ** 2)
        self.log("test_loss", loss, prog_bar=True, on_step=True, on_epoch=True)
        return loss


In [4]:
X, y = pickle.load(open('../data/big-dataset.pickle', 'rb'))
X = X.reshape(len(X), -1, 2)[:,train_indices].reshape(len(X), len(train_indices) * 2) # each landmark has 2 coordinates (x and y)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=seed)

input_size = X.shape[1]
output_size = y.shape[1]
model = GazePredictor([input_size, 256, 64, output_size])
model.scaler.fit(X_train)

X_train = model.scaler.transform(X_train)
X_test = model.scaler.transform(X_test)

X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_dataloader = DataLoader(train_dataset)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
test_dataloader = DataLoader(test_dataset)

print('train', X_train_tensor.size(), y_train_tensor.size(), 'mean xy', y.mean(axis=0))
print('test ', X_test_tensor.size(), y_test_tensor.size())
print(model.arch)

train torch.Size([458936, 168]) torch.Size([458936, 2]) mean xy [ 0.01054647 -0.09292439]
test  torch.Size([114734, 168]) torch.Size([114734, 2])
[168, 256, 64, 2]


In [5]:
trainer = L.Trainer(max_epochs=int(1e3), accelerator='auto', devices='auto')
trainer.fit(model, train_dataloaders=train_dataloader, val_dataloaders=test_dataloader)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name    | Type      | Params
--------------------------------------
0 | fc1     | Linear    | 43.3 K
1 | relu    | LeakyReLU | 0     
2 | hidden1 | Linear    | 16.4 K
3 | relu2   | LeakyReLU | 0     
4 | fc2     | Linear    | 130   
--------------------------------------
59.8 K    Trainable params
0         Non-trainable params
59.8 K    Total params
0.239     Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

  rank_zero_warn(
  rank_zero_warn(


Training: 0it [00:00, ?it/s]

  rank_zero_warn("Detected KeyboardInterrupt, attempting graceful shutdown...")
