In [1]:
import torch
from torch.utils.tensorboard import SummaryWriter



In [40]:
import os

def delete_folder(folder_path):
    if os.path.exists(folder_path):
        for root, dirs, files in os.walk(folder_path, topdown=False):
            for name in files:
                os.remove(os.path.join(root, name))
            for name in dirs:
                os.rmdir(os.path.join(root, name))
        os.rmdir(folder_path)
    else:
        print(f"The folder {folder_path} does not exist")

delete_folder('./runs')

In [3]:
from torchvision import datasets, transforms

train_dataset = datasets.MNIST(root='./data', train=True, download=True,
                               transform=transforms.ToTensor())
test_dataset = datasets.MNIST(root='./data', train=False, download=True,
                              transform=transforms.ToTensor(), )

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=64, shuffle=False)

In [39]:
import torch.nn as nn

class LogisticRegression(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(LogisticRegression, self).__init__()
        self.linear = nn.Linear(input_dim, output_dim - 1)
        # self.linear.weight = nn.Parameter(torch.zeros(output_dim - 1,input_dim))
        # self.linear.bias = nn.Parameter(torch.zeros(output_dim - 1,input_dim))
        
    def forward(self, x):
        x = x.view(-1, self.linear.in_features)
        # outputs = torch.nn.functional.softmax(self.linear(x))
        outputs = self.linear(x)
        zeros_for_last_class = torch.zeros(
            (outputs.shape[0], 1),
            device=x.device,
            dtype=x.dtype
        )
        output_with_zeros = torch.cat((outputs, zeros_for_last_class), dim=1)
        return output_with_zeros

In [42]:
writer = SummaryWriter('runs/logistic_regression_10_mnist') 

In [43]:
model = LogisticRegression(28*28, 10)
optimizer = torch.optim.Adam(model.parameters())
criterion = torch.nn.CrossEntropyLoss()

batch_num = 0
for epoch in range(5):
    model.train() # Set the model to training mode
    for batch_data, batch_labels in train_loader:
        optimizer.zero_grad()
        batch_data = batch_data.view(-1, 28*28)
        output = model(batch_data)
        loss = criterion(output, batch_labels)
        writer.add_scalar('training loss', loss, batch_num)
        loss.backward()
        optimizer.step()
        batch_num += 1

In [44]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

val_loss = 0
val_correct = 0
with torch.no_grad():  # Disable gradient calculation during validation
    for data, labels in test_loader:
        # Transfer data to the appropriate device (CPU or GPU)
        data, labels = data.to(device), labels.to(device)

        # Forward pass
        outputs = model(data.view(-1, 28*28))

        # Calculate loss
        loss = criterion(outputs, labels)

        # Update validation metrics (e.g., accuracy)
        val_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        val_correct += (predicted == labels).sum().item()

In [45]:
val_loss /= len(test_loader)
val_accuracy = 100 * val_correct / len(test_dataset)

print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%')

Validation Loss: 0.2777, Validation Accuracy: 92.26%


In [9]:
def uniform_sample_dataset(dataset, t):
    dataset_size = len(dataset)
    indices = list(range(dataset_size))
    
    # Shuffle the indices
    sampler = torch.utils.data.sampler.RandomSampler(dataset, num_samples=t)
    sampled_dataloader = torch.utils.data.DataLoader(dataset, batch_size=100, sampler=sampler)
    return sampled_dataloader

In [10]:
sample_loader = uniform_sample_dataset(train_dataset, 10)
for batch_data, batch_labels in sample_loader:
    print(batch_data.shape, batch_labels)

torch.Size([10, 1, 28, 28]) tensor([3, 8, 1, 8, 8, 6, 0, 3, 7, 9])


In [11]:
def calc_criterion_first_order_derivative(loss, model):
    # Set requires_grad to True for the data tensor to enable gradient computation
    # data_tensor.requires_grad = True
    # label_tensor.requires_grad = True
    model.zero_grad()
    # output = model(data_tensor)
    # loss = criterion(output, label_tensor)
    # Compute the first-order gradient
    loss.backward(create_graph=True)
    param_grads = [ p.grad.flatten() for p in model.parameters() if p.requires_grad ]
    param_grads = torch.cat(param_grads)
    # print(param_grads)
    # return torch.concat([data_tensor.grad.reshape(1, -1), label_tensor.grad.reshape(1, -1)], dim=1)
    return param_grads

In [12]:
def calc_loss_second_order_derivative_list(loss, model):
    model.zero_grad()
    # Compute the first-order gradient
    loss.backward(create_graph=True)
    first_grads = [ (p.grad.flatten(), p) for p in model.parameters() if p.requires_grad ]
    # Compute the second-order gradient
    second_grads = []
    for first_grad, p in first_grads:
        sub_matrix = []
        # print(first_grad.shape, p.shape)
        for i in range(first_grad.shape[0]):
            sub_matrix.append(torch.autograd.grad(first_grad[i], p, create_graph=True)[0].flatten())
        sub_matrix = torch.stack(sub_matrix)
        second_grads.append(sub_matrix)
    # second_order_grads = torch.cat(second_order_grads)
    print(second_grads)
    return second_grads

In [13]:
def calc_criterion_second_order_derivative(loss, model):
    # Set requires_grad to True for the data tensor to enable gradient computation
    # data_tensor.requires_grad = True
    # label_tensor.requires_grad = True
    # loss = criterion(output, label_tensor)
    model.zero_grad()
    # output = model(data_tensor)
    # loss = criterion(output, label_tensor)
    # Compute the first-order gradient
    loss.backward(create_graph=True)
    first_grads = [ p.grad.flatten() for p in model.parameters() if p.requires_grad ]
    # Compute the second-order gradient
    # first_order_derivative.requires_grad = True
    second_grads = []
    for first_grad in first_grads:
        row = []
        for p in model.parameters():
            sub_matrix = []
            # print(first_grad.shape, p.shape)
            for i in range(first_grad.shape[0]):
                sub_matrix.append(torch.autograd.grad(first_grad[i], p, create_graph=True)[0].flatten())
            sub_matrix = torch.stack(sub_matrix)
            # print(sub_matrix)
            row.append(sub_matrix)
        row = torch.cat(row, dim=1)
        # print(row)
        second_grads.append(row)
    second_grads = torch.cat(second_grads, dim=0)
    # print("sec: ", second_grads)
    # hessian_matrix = torch.autograd.functional.hessian(lambda x: criterion(model(x), label_tensor), data_tensor).reshape(data_tensor.shape[1], -1)
    # hessian_matrix = torch.autograd.functional.hessian(lambda x, y: criterion(model(x), y), (data_tensor, label_tensor))
    return second_grads
    # matrix_list = []
    # for row, var in zip(hessian_matrix, [data_tensor, label_tensor]):
    #     list_row = []
    #     for tensor in row:
    #         if len(var.shape) == 1:
    #             list_row.append(tensor.reshape(1, -1))
    #         else:
    #             list_row.append(tensor.reshape(var.shape[1], -1))
    #     matrix_list.append(list_row)
    # # Concatenate along the first dimension
    # concatenated_hessian = torch.cat([torch.cat(row, dim=1) for row in matrix_list], dim=0)

    # return concatenated_hessian

In [14]:
test_model = torch.nn.Linear(2, 2)
test_model = LogisticRegression(2, 2)

In [15]:
data_tensor = torch.tensor([[1.0, -2.0]], requires_grad=True)
label_tensor = torch.tensor([1.0, 0.0])
label_tensor_long = torch.tensor([1, 0])

print(test_model(data_tensor)-label_tensor)
# 2 * (test_model(data_tensor) - label_tensor).item() * data_tensor

tensor([[-0.0851, -0.5285]], grad_fn=<SubBackward0>)


In [16]:
print(calc_criterion_first_order_derivative(torch.nn.MSELoss()(test_model(data_tensor), label_tensor), test_model))
hessian = calc_criterion_second_order_derivative(criterion(test_model(data_tensor), label_tensor_long), test_model)
print(hessian, torch.all(torch.linalg.eigvals(hessian).real >= 0))
calc_loss_second_order_derivative_list(torch.nn.MSELoss()(test_model(data_tensor), label_tensor), test_model)


tensor([-0.0851,  0.1702, -0.5285,  1.0570, -0.0851, -0.5285],
       grad_fn=<CatBackward0>)


  return F.mse_loss(input, target, reduction=self.reduction)
  Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass


ValueError: Expected input batch_size (1) to match target batch_size (2).

In [None]:
one_train_dataloader = uniform_sample_dataset(train_dataset, 1)
for batch_data, batch_labels in one_train_dataloader:
    print(calc_criterion_first_order_derivative(criterion(model(batch_data), batch_labels), model).shape)
    hessian = calc_criterion_second_order_derivative(torch.nn.MSELoss()(model(batch_data), batch_labels.to(torch.float32)), model)
    print(hessian.shape, torch.all(torch.linalg.eigvals(hessian).real >= 0))
    # hessian = calc_loss_second_order_derivative_list(criterion(model(batch_data), batch_labels), model)
    # hessian_1 = hessian[0]
    # hessian_2 = hessian[1]

torch.Size([7850])


  return F.mse_loss(input, target, reduction=self.reduction)


In [None]:
eigvals = torch.linalg.eigvals(hessian).real

In [17]:
def hvp(y, w, v):
    # First derivative
    first_grads = torch.autograd.grad(y, w, retain_graph=True, create_graph=True)

    first_grads = [g.flatten() for g in first_grads]
    first_grads = torch.cat(first_grads, dim=0)
    
    # Calculate the element-wise product between the first gradients and the vector v
    elemwise_products = torch.sum(first_grads * v)

    # Second derivative
    second_grads = torch.autograd.grad(elemwise_products, w, create_graph=True)
    second_grads = [g.flatten() for g in second_grads]
    second_grads = torch.cat(second_grads, dim=0)

    return second_grads

In [18]:
params = [p for p in test_model.parameters()]
vector = calc_criterion_first_order_derivative(torch.nn.MSELoss()(test_model(data_tensor), label_tensor), test_model)
print(vector)
# vector = torch.tensor([-3.0305, -6.0611, -3.0305])
expected = torch.matmul(calc_criterion_second_order_derivative(torch.nn.MSELoss()(test_model(data_tensor), label_tensor), test_model), vector.T)
vector._grad_fn = None
# print(vector)
test_model.zero_grad()
actual = hvp(torch.nn.MSELoss()(test_model(data_tensor), label_tensor), params, vector)
print(expected, actual)
assert torch.equal(expected, actual)

tensor([-0.0851,  0.1702, -0.5285,  1.0570, -0.0851, -0.5285],
       grad_fn=<CatBackward0>)
tensor([-0.5105,  1.0211, -3.1710,  6.3419, -0.5105, -3.1710],
       grad_fn=<MvBackward0>) tensor([-0.5105,  1.0211, -3.1710,  6.3419, -0.5105, -3.1710],
       grad_fn=<CatBackward0>)


  expected = torch.matmul(calc_criterion_second_order_derivative(torch.nn.MSELoss()(test_model(data_tensor), label_tensor), test_model), vector.T)


In [None]:
def ihvp(train_dataset, test_data, test_label, model, criterion, t, r, reg_factor=0.91, scale=0.04, unique_datapoint=None, ihvp_summary_writer=SummaryWriter('runs/ihvp_sum_summary')):
    ihvp_eval_avg = 0
    test_data = test_data
    def scaled_criterion(output, label):
        return criterion(output, label) * scale
    
    vector = calc_criterion_first_order_derivative(scaled_criterion(model(test_data), test_label), model)
    for i in range(r):
        if unique_datapoint is None:
            sampled_train_dataset = [(data, label) for data, label in uniform_sample_dataset(train_dataset, t)]
        else:
            sampled_train_dataset = []
            for _ in range(t):
                sampled_train_dataset.append(unique_datapoint)
        # Step 1. Initialize the evaluation of the Hessian-vector product
        ihvp_eval = vector
        data_number = 0
        for data, label in sampled_train_dataset:
            # Step 2. Compute the second order gradient of the loss w.r.t. the model parameters
            model.zero_grad()
            data_tensor = data.view(-1, 28*28)
            params = [p for p in model.parameters()]
            ihvp_eval._grad_fn = None
            # Step 3. Compute the inner product between the Hessian matrix and the test gradient vector using HVP
            second_grads = hvp(scaled_criterion(model(data_tensor), label), params, ihvp_eval)
            # hessian = calc_criterion_second_order_derivative(scaled_criterion(model(data_tensor), label), model)
            # hessian_list = calc_loss_second_order_derivative_list(scaled_criterion(model(data_tensor), label), model)
            # diff = torch.norm(torch.eye(hessian.shape[0])-hessian, p=2)
            # print(f"is semi positive definite: {torch.all(torch.linalg.eigvals(hessian).real >= 0)}, is semi positive definite list: {[torch.all(torch.linalg.eigvals(h).real >= 0) for h in hessian_list]}")
            # print(f"diff: {diff}, l2 for I: {torch.norm(torch.eye(hessian.shape[0]), p=2)}, hes: {hessian.sum()}, eig: {torch.max(torch.abs(torch.linalg.eigvals(hessian)))}")
            # return_grads_validation = torch.matmul(hessian, ihvp_eval.T)
            # print(f"diff: {torch.abs(return_grads.sum() - vector.sum())}, ihvp_eval: {ihvp_eval.sum()}, return_grads: {return_grads.sum()}, vector: {vector.sum()}")
            # print(f"validation: {return_grads_validation.sum()}, hessian: {hessian.sum()}")
            ihvp_eval = (1 - reg_factor) * ihvp_eval + vector - second_grads
            ihvp_summary_writer.add_scalar(f'ihvp_eval_sum_{i}', ihvp_eval.sum(), data_number)
            data_number += 1
        # print(f"ihvp iteration {i} done and ihvp sum is {ihvp_eval.sum()}")
        ihvp_eval_avg = i / (i + 1) * ihvp_eval_avg + 1 / (i + 1) * ihvp_eval         
    return ihvp_eval_avg
    

In [20]:
test_dataloader = uniform_sample_dataset(test_dataset, 1)
test_data_list = [(data, label) for data, label in test_dataloader]

In [21]:
uni_data_loader  = uniform_sample_dataset(train_dataset, 1)
uni_data_tuple = ()
for data, label in uni_data_loader:
    uni_data_tuple = (data, label)

In [27]:
ihvp_summary_writer = SummaryWriter('runs/ihvp_sum_summary') 
for test_data, test_label in test_data_list:
    print(f"Test data value sum: {test_data.sum()}, Test data label: {test_label}")
    model.zero_grad()
    ihvp_eval = ihvp(train_dataset, test_data, test_label, model, criterion, 5000, 10, reg_factor=0.96, scale=0.09, ihvp_summary_writer=ihvp_summary_writer)

Test data value sum: 73.66667175292969, Test data label: tensor([4])


In [28]:
ihvp_eval.sum()

tensor(1.0505e-06, grad_fn=<SumBackward0>)

In [None]:
def upweighting_loss_influence_function(train_dataset, upweighted_data, upweighted_label, test_data, test_label, model, criterion):
    # Step 1. Compute the Hessian-vector product
    ihvp_eval = ihvp(train_dataset, test_data, test_label, model, criterion)
    # Step 2. Compute the influence function
    influence = torch.dot(-ihvp_eval, calc_criterion_first_order_derivative(upweighted_data, upweighted_label, criterion, model))
    return influence

**Here is a validation**

In [None]:
import numpy as np

I = np.identity(2)
A = np.array([[2e-2,3e-2],[3e-2,4e-2]])

print(np.linalg.norm(I - A, ord=2))
A_inv = I

A_inv_ = np.linalg.inv(A)

for i in range(100):
    A_inv = A_inv + I - A@A_inv
    print(np.linalg.norm(I - A@A_inv, ord=2))
    
print(A_inv_)
print(A_inv)

1.0016227766016836
1.0032481866072664
1.0048762342901745
1.0065069239307687
1.0081402598163562
1.0097762462412017
1.011414887506538
1.0130561879205784
1.0147001517985268
1.0163467834625903
1.0179960872419902
1.019648067472972
1.021302728498819
1.022960074669863
1.024620110343494
1.0262828398841741
1.0279482676634477
1.0296163980599533
1.031287235459435
1.032960784254754
1.0346370488458994
1.0363160336400015
1.0379977430513423
1.0396821815013666
1.0413693534186945
1.043059263239133
1.044751915405687
1.0464473143685715
1.0481454645852237
1.0498463705203136
1.0515500366457566
1.0532564674407252
1.0549656673916599
1.0566776409922831
1.0583923927436076
1.0601099271539522
1.0618302487389504
1.0635533620215638
1.0652792715320947
1.067007981808196
1.068739497394884
1.070473822844552
1.072210962716979
1.0739509215793448
1.0756937040062404
1.0774393145796801
1.0791877578891147
1.0809390385314404
1.082693161111016
1.08445013023967
1.086209950536716
1.0879726266289629
1.0897381631507286
1.09150656

In [None]:
def is_semi_positive_definite(matrix):
    return torch.all(torch.linalg.eigvals(matrix).real >= 0)

In [253]:
logistic_test_model = LogisticRegression(3, 2)
logistic_test_model.linear.weight.data = torch.tensor([[1.0, 2.0, 3.0]])
logistic_test_model.linear.bias.data = torch.tensor([-1.0])
print(logistic_test_model.linear.weight, logistic_test_model.linear.bias)
# logistic_test_model.weight.data = torch.tensor([[1.0, 2.0], [3.0, 4.0]])
# logistic_test_model.bias.data = torch.tensor([1.0])

input = torch.tensor([[1.0, 2.0, 3.0]], requires_grad=True)
output = logistic_test_model(input)
label = torch.tensor([0])
label_float = torch.tensor([0.0])
print(output, label)

hessian_expected = torch.sigmoid(output[0][0]) * torch.sigmoid(1 - output[0][0]) * (input.T @ input)
print(f"hessain expected: {hessian_expected}, is semi positive definite: {is_semi_positive_definite(hessian_expected)}")

loss = torch.nn.functional.cross_entropy(output, label)
loss_MSE = torch.nn.MSELoss()(output, label_float)

test_hessian = calc_criterion_second_order_derivative(loss, logistic_test_model)
test_hessian_list = calc_loss_second_order_derivative_list(loss, logistic_test_model)
print(test_hessian, is_semi_positive_definite(test_hessian))
test_hessian_MSE = calc_criterion_second_order_derivative(loss_MSE, logistic_test_model)
print(test_hessian_MSE, is_semi_positive_definite(test_hessian_MSE))
print(torch.linalg.eigvals(test_hessian_list[0]).real)
print(torch.linalg.eigvals(hessian_expected).real)

Parameter containing:
tensor([[1., 2., 3.]], requires_grad=True) Parameter containing:
tensor([-1.], requires_grad=True)
tensor([[13.,  0.]], grad_fn=<CatBackward0>) tensor([0])
hessain expected: tensor([[6.1442e-06, 1.2288e-05, 1.8432e-05],
        [1.2288e-05, 2.4577e-05, 3.6865e-05],
        [1.8432e-05, 3.6865e-05, 5.5297e-05]], grad_fn=<MulBackward0>), is semi positive definite: False
[tensor([[2.2650e-06, 4.5300e-06, 6.7949e-06],
        [4.5300e-06, 9.0599e-06, 1.3590e-05],
        [6.6757e-06, 1.3351e-05, 2.0027e-05]], grad_fn=<StackBackward0>), tensor([[2.2650e-06]], grad_fn=<StackBackward0>)]
tensor([[2.2650e-06, 4.5300e-06, 6.7949e-06, 2.2650e-06],
        [4.5300e-06, 9.0599e-06, 1.3590e-05, 4.5300e-06],
        [6.6757e-06, 1.3351e-05, 2.0027e-05, 6.6757e-06],
        [2.2650e-06, 4.5300e-06, 6.7949e-06, 2.2650e-06]],
       grad_fn=<CatBackward0>) tensor(False)
tensor([[1., 2., 3., 1.],
        [2., 4., 6., 2.],
        [3., 6., 9., 3.],
        [1., 2., 3., 1.]], grad_fn

  return F.mse_loss(input, target, reduction=self.reduction)


In [201]:
from math import exp
exp(-6)/(1+exp(-6))*4

0.009890492626539099