In [1]:
import numpy as np
import torch
import torch.autograd as autograd
import matplotlib.pyplot as plt

In [2]:
sample_size = 10000
sample_dim = 10
noise_scale = 1.0
train_split = 0.8
theta_range = 1.0
data_range = 500
epoch = 50000
lr = 1e-5
dim_per_task = 7
task_num = 10
shift_scale = 1e-1
tasks = [np.random.randint(0, sample_dim, dim_per_task) for _ in range(task_num)]

In [3]:
class DataGen:
    def __init__(self, sample_size, sample_dim, noise_scale, train_split, theta_range, data_range, tasks):
        self.sample_size = sample_size
        self.sample_dim = sample_dim
        self.noise_scale = noise_scale
        self.train_split = train_split
        self.theta_range = theta_range
        self.data_range = data_range
        self.theta = np.random.uniform(-theta_range, theta_range, (sample_dim, 1))
        self.tasks = tasks
    
    def get_data(self):
        for task in self.tasks:
            self.theta += shift_scale*self.noise_scale*np.random.normal(0, 1, (self.sample_dim, 1))
            X = np.zeros((self.sample_size, self.sample_dim))
            X[:, task] = np.random.uniform(-self.data_range, self.data_range, (self.sample_size, self.sample_dim))[:, task]
            y = np.dot(X, self.theta).squeeze() + self.noise_scale * np.random.normal(0, 1, self.sample_size)
            train_size = int(self.sample_size * self.train_split)
            X_train, X_test = torch.tensor(X[:train_size], dtype=torch.float), torch.tensor(X[train_size:], dtype=torch.float)
            y_train, y_test = torch.tensor(y[:train_size], dtype=torch.float), torch.tensor(y[train_size:], dtype=torch.float)
            yield X_train, y_train, X_test, y_test

In [4]:
class Model(torch.nn.Module):
    def __init__(self, sample_dim, lr):
        super().__init__()
        self.theta = torch.nn.Parameter(torch.randn(sample_dim, 1))
        self.optimizer = torch.optim.SGD(self.parameters(), lr=lr)
        self.loss_fn = torch.nn.MSELoss()
    
    def set_data(self, X, y):
        self.X, self.y = X.detach().clone(), y.detach().clone()

    def forward(self, X):
        return torch.matmul(X, self.theta).squeeze()
    
    def train(self, X, y, epoch, lr):
        for _ in range(epoch):
            self.optimizer.zero_grad()
            loss = self.loss_fn(self(X), y)
            loss.backward()
            self.optimizer.step()
    
    def test(self, X, y):
        return self.loss_fn(self(X), y)
    
    def get_loss_with_theta(self, theta):
        return self.loss_fn(torch.matmul(self.X, theta).squeeze(), self.y)
    
    def get_gradient(self, X, y, theta):
        self.set_data(X, y)
        return autograd.functional.jacobian(self.get_loss_with_theta, theta.squeeze()).detach().clone()
    
    def get_hessian(self, X, y, theta):
        self.set_data(X, y)
        return autograd.functional.hessian(self.get_loss_with_theta, theta.squeeze()).detach().clone()
    
    def prepare_estimation(self, X, y, old_theta):
        self.gradient = self.get_gradient(X, y, old_theta).reshape(1, -1)
        self.hessian = self.get_hessian(X, y, old_theta)
        self.old_theta = old_theta.detach().clone()
        self.old_ans = self.get_loss_with_theta(old_theta).detach().clone()
    
    def estimate(self, theta):
        dtheta = theta - self.old_theta
        ans = self.old_ans.detach().clone()
        ans += torch.matmul(self.gradient, dtheta).squeeze()
        ans += 0.5 * torch.matmul(torch.matmul(dtheta.T, self.hessian), dtheta).squeeze()
        return ans
    
    def diag_estimate(self, theta):
        dtheta = theta - self.old_theta
        ans = self.old_ans.detach().clone()
        ans += torch.matmul(self.gradient, dtheta).squeeze()
        ans += 0.5 * torch.matmul(torch.matmul(dtheta.T, torch.diag(torch.diag(self.hessian))), dtheta).squeeze()
        return ans

In [5]:
model = Model(sample_dim, lr)
Data = DataGen(sample_size, sample_dim, noise_scale, train_split, theta_range, data_range, tasks)
first_task = True
for X_train, y_train, X_test, y_test in Data.get_data():
    model.train(X_train, y_train, epoch, lr)
    if first_task:
        first_task = False
    else:
        estimation = float(model.estimate(model.theta))
        diag_estimation = float(model.diag_estimate(model.theta))
        actual = float(model.test(model.X, model.y))
        delta = float(np.abs(estimation - actual))
        diag_delta = float(np.abs(diag_estimation - estimation))
        print(f"Taylor Expansion Error Ratio: {delta/actual:.2e}, Diagonal Approximation Error Ratio: {diag_delta/actual:.2e}")
        print(f"Dominance Ratio: {'INF' if delta == 0 else diag_delta/delta}")
    model.prepare_estimation(X_train, y_train, model.theta)

Taylor Expansion Error Ratio: 6.95e-07, Diagonal Approximation Error Ratio: 4.96e-03
Dominance Ratio: 7136.666666666667
Taylor Expansion Error Ratio: 8.31e-07, Diagonal Approximation Error Ratio: 1.51e-02
Dominance Ratio: 18113.5
Taylor Expansion Error Ratio: 6.06e-07, Diagonal Approximation Error Ratio: 4.74e-03
Dominance Ratio: 7827.5
Taylor Expansion Error Ratio: 0.00e+00, Diagonal Approximation Error Ratio: 0.00e+00
Dominance Ratio: INF
Taylor Expansion Error Ratio: 6.10e-08, Diagonal Approximation Error Ratio: 0.00e+00
Dominance Ratio: 0.0
Taylor Expansion Error Ratio: 5.93e-07, Diagonal Approximation Error Ratio: 8.33e-03
Dominance Ratio: 14038.444444444445
Taylor Expansion Error Ratio: 3.74e-07, Diagonal Approximation Error Ratio: 7.24e-03
Dominance Ratio: 19376.25
Taylor Expansion Error Ratio: 1.33e-07, Diagonal Approximation Error Ratio: 7.86e-03
Dominance Ratio: 59095.0
Taylor Expansion Error Ratio: 5.75e-07, Diagonal Approximation Error Ratio: 1.84e-02
Dominance Ratio: 31903