In [1]:
import sys
sys.path.append('D:\\Compute Science\\Machine Learning\\论文\\项目\\FairSPL\\venv_torch')
sys.path.append('D:\\Compute Science\\Machine Learning\\论文\\项目\\FairSPL\\venv_torch\\lib\\site-packages')

In [7]:
import torch
from torch import nn

In [8]:
# 定义一个简单的模型
class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(4, 3)
        self.fc2 = nn.Linear(3, 2)

    def forward(self, x):
        x = self.fc1(x)
        x = self.fc2(x)

        return x

In [9]:
# 输出模型的参数
model = MLP()
for param in model.parameters():
    print(param.size())

torch.Size([3, 4])
torch.Size([3])
torch.Size([2, 3])
torch.Size([2])


In [10]:
# 定义数据
data = torch.tensor([1,2,3,4], dtype=torch.float)
label = torch.tensor([5,6], dtype=torch.float)
pred = model(data)
loss_fn = nn.MSELoss()
loss = loss_fn(pred, label)

计算雅克比矩阵（一阶导数）

In [14]:
def grad(model, y):
    """ 计算一阶导数.
    Returns: 
        grads, grads[i]: dy / dx_i
    """
    grads = torch.autograd.grad(loss, model.parameters(), retain_graph=True, create_graph=True)
    # for grad in grads:
    #     print(grad.size()) # 可以发现一共 4 个 Tensor，分别为损失函数对四个参数 Tensor（两层，每层都有权重和偏置）的梯度。
        
    grads = torch.cat([x.flatten() for x in grads], dim=0)
    return grads

grads = grad(model, loss)
print(grads.size())

torch.Size([23])


计算 Hessian 矩阵（二阶导数）

In [16]:
# 如果直接传入 model.parameters()，会报错，目前不知道原因
def hess(model, y, grads=None):
    """ 计算二阶导数.
    Returns: 
        he, he[i,j]: d^2y / (dx_i dx_j)
    """
    if grads is None:
        grads = grad(model, y)
        
    total_params = sum(p.numel() for p in model.parameters())
    he = torch.zeros(total_params, total_params)
    
    for i, g in enumerate(grads):
        second_order_grad = grad(model, g)
        he[i, :] = second_order_grad

    return he

he = hess(model, loss)
print(he.size())

torch.Size([23, 23])


In [20]:
# # 计算 hessian 矩阵
# grad = torch.autograd.grad(outputs=loss, inputs=model.parameters(), create_graph=True)
# grad = torch.cat([x.flatten() for x in grad], dim=0)
# total_params = sum(p.numel() for p in model.parameters())
# he2 = torch.zeros(total_params, total_params)
    
# for i, g in enumerate(grad):
#     second_order_grad = torch.autograd.grad(outputs=g, inputs=model.parameters(), retain_graph=True)
#     second_order_grad = torch.cat([x.flatten() for x in second_order_grad], dim=0)
#     he2[i, :] = second_order_grad

实现影响函数（一次性返回所有点的影响函数值）
$$
\mathcal{I}_{\text {up,loss }}\left(z, z_{\text {test }}\right) =-\nabla_{\theta} L\left(z_{\text {test }}, \hat{\theta}\right)^{\top} H_{\hat{\theta}}^{-1} \nabla_{\theta} L(z, \hat{\theta})
$$

In [28]:
# 数据准备
train_set = torch.tensor([[1,2,3,4],[5,6,7,8]], dtype=torch.float)
train_label = torch.tensor([[9,10],[11,12]], dtype=torch.float)
test_point = torch.tensor([5,6,7,7], dtype=torch.float)
test_label = torch.tensor([7,8], dtype=torch.float)

In [58]:
# Step1: 计算V第三项
def grad_z(z, t):
    """ Calculates the gradient z. One grad_z should be computed for each
    training sample.
    
    Arguments:
        z: torch tensor, training data points
            e.g. an image sample (batch_size, 3, 256, 256)
        t: torch tensor, training data labels
        model: torch NN, model used to evaluate the dataset
    Returns:
        grad_z: list of torch tensor, containing the gradients
            from model parameters to loss
    """
    model.eval()
    y = model(z)
    loss = loss_fn(y, t)
    params = [ p for p in model.parameters() if p.requires_grad ]
    grads = list(torch.autograd.grad(loss, params, create_graph=True))
    #for grad in grads:
    #   print(grad.size()) # 可以发现一共 4 个 Tensor，分别为损失函数对四个参数 Tensor（两层，每层都有权重和偏置）的梯度。
        
    return grads

grad = grad_z(train_set, train_label)

In [75]:
# 验证是否是每一个样本的loss的梯度的和
grad1, grad2 = [grad_z(z,t) for (z, t) in zip(train_set, train_label)]
g1 = torch.cat([x.flatten() for x in grad], dim=0)
g2 = torch.cat([x.flatten() for x in grad1], dim=0)
g3 = torch.cat([x.flatten() for x in grad2], dim=0)
g1.allclose((g2+g3)/2)

True

In [76]:
# Step2: 计算前两项，也就是 s_test=v^T H^{-1}
# 辅助函数：快速计算 Hv，其中 H 是 Hessian 矩阵
def hvp(y, w, v): # 计算 y 对 w 的二阶导 H，返回 Hv
    """Multiply the Hessians of y and w by v.
    Uses a backprop-like approach to compute the product between the Hessian
    and another vector efficiently, which even works for large Hessians.
    Example: if: y = 0.5 * w^T A x then hvp(y, w, v) returns and expression
    which evaluates to the same values as (A + A.t) v.

    Arguments:
        y: scalar/tensor, for example the output of the loss function
        w: list of torch tensors, tensors over which the Hessian
            should be constructed
        v: list of torch tensors, same shape as w,
            will be multiplied with the Hessian

    Returns:
        return_grads: list of torch tensors, contains product of Hessian and v.

    Raises:
        ValueError: `y` and `w` have a different length."""
    if len(w) != len(v):
        raise(ValueError("w and v must have the same length."))

    # First backprop
    first_grads = grad(y, w, retain_graph=True, create_graph=True)

    # Elementwise products
    elemwise_products = 0
    for grad_elem, v_elem in zip(first_grads, v):
        elemwise_products += torch.sum(grad_elem * v_elem)

    # Second backprop
    return_grads = grad(elemwise_products, w, create_graph=True)

    return return_grads


def s_test(z_test, t_test, model, z_loader, damp=0.01, scale=25.0,
       recursion_depth=5000):
    """s_test can be precomputed for each test point of interest, and then
    multiplied with grad_z to get the desired value for each training point.
    Here, strochastic estimation is used to calculate s_test. s_test is the
    Inverse Hessian Vector Product.

    Arguments:
        z_test: torch tensor, test data points, such as test images
        t_test: torch tensor, contains all test data labels
        model: torch NN, model used to evaluate the dataset
        z_loader: torch Dataloader, can load the training dataset
        damp: float, dampening factor
        scale: float, scaling factor
        recursion_depth: int, number of iterations aka recursion depth
            should be enough so that the value stabilises.

    Returns:
        h_estimate: list of torch tensors, s_test
    """
    v = grad_z(z_test, y_test, model)
    h_estimate = v.copy()
    for i in range(recursion_depth):
        for x, t in z_loader():
            y = model(x)
            loss = calc_loss(y, t)
            params = [ p for p in model.parameters() if p.requires_grad ]
            hv = hvp(loss, params, h_estimate) 
            h_estimate = [
                _v + (1 - damp) * _h_e - _hv / scale
                for _v, _h_e, _hv in zip(v, h_estimate, hv)]
            break
    return h_estimate

In [77]:
s_test(test_point, test_label, model, train_label)

NameError: name 'y_test' is not defined

In [None]:
# Step3: 合并所有的结果
def calc_influence_function(train_dataset_size, grad_z_vecs, e_s_test):
    """Calculates the influence function

    Arguments:
        train_dataset_size: int, total train dataset size
        grad_z_vecs: list of torch tensor, containing the gradients
            from model parameters to loss
        e_s_test: list of torch tensor, contains s_test vectors

    Returns:
        influence: list of float, influences of all training data samples
            for one test sample
        harmful: list of float, influences sorted by harmfulness
        helpful: list of float, influences sorted by helpfulness.
    """
    influences = []
    # 对第 i 个样本
    for i in range(train_dataset_size):
        tmp_influence = -sum(
            [
                torch.sum(k * j).data.cpu().numpy()
                for k, j in zip(grad_z_vecs[i], e_s_test)
            ]) / train_dataset_size
        influences.append(tmp_influence)

    harmful = np.argsort(influences)
    helpful = harmful[::-1]

    return influences, harmful.tolist(), helpful.tolist()