In [32]:
import torch
from torch import nn
from d2l import torch as d2l
def custom_repr(self):
    return f'{{Tensor:{tuple(self.shape)}}} {original_repr(self)}'

original_repr = torch.Tensor.__repr__
torch.Tensor.__repr__ = custom_repr


In [33]:
def f(x):
    return 2 * torch.sin(x) + x**0.8

In [34]:
n_train = 50  # 训练样本数
x_train, _ = torch.sort(torch.rand(n_train) * 5)   # 排序后的训练样本 0~5 共50个
y_train = f(x_train) + torch.normal(0.0, 0.5, (n_train,))  # 训练样本的输出

In [35]:
x_test = torch.arange(0, 5, 0.1)  # 测试样本 # 0~5 50个
y_truth = f(x_test)  # 测试样本的真实输出
n_test = len(x_test)  # 测试样本数

In [36]:
def plot_kernel_reg(y_hat):
    d2l.plot(x_test, [y_truth, y_hat], 'x', 'y', legend=['Truth', 'Pred'],
             xlim=[0, 5], ylim=[-1, 5])
    d2l.plt.plot(x_train, y_train, 'o', alpha=0.5)

In [None]:
# 1 平均汇聚
y_hat = torch.repeat_interleave(y_train.mean(), n_test)
plot_kernel_reg(y_hat)

In [None]:
# 2 非参数化汇聚 Nadaraya-Watson核回归模型  采用高斯核
# X_repeat的形状:(n_test,n_train), 每一行都包含着相同的测试输入（例如：同样的查询）
X_repeat = x_test.repeat_interleave(n_train).reshape((-1, n_train))
# attention_weights的形状：(n_test,n_train), (i,j)为第i个query与第j个key的权重
# 每一行都包含着要在给定的某个查询的值与(key, value)之间分配的注意力权重
attention_weights = nn.functional.softmax(-(X_repeat - x_train)**2 / 2, dim=1)
y_hat = torch.matmul(attention_weights, y_train) #比如y0_hat = w00*y0 + w01*y1 + ...
plot_kernel_reg(y_hat)

In [None]:
d2l.show_heatmaps(attention_weights.unsqueeze(0).unsqueeze(0),
                  xlabel='Sorted training inputs',
                  ylabel='Sorted testing inputs')

In [None]:
# 小批量矩阵乘法
weights = torch.ones((2, 10)) * 0.1
values = torch.arange(20.0).reshape((2, 10))
weights_1 = weights.unsqueeze(1)
values_1 = values.unsqueeze(-1)
torch.bmm(weights_1, values_1) #(1,10)*(10,1)

In [41]:
# 3 参数化汇聚
class NWKernelRegression(nn.Module):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.w = nn.Parameter(torch.rand((1,),requires_grad=True)) #一维

    def forward(self, queries, keys, values):
        queries = queries.repeat_interleave(keys.shape[1]).reshape((-1, keys.shape[1])) #(50,49)
        self.attention_weights = nn.functional.softmax(-((queries - keys)*self.w)**2/2, dim=1) #(50,49)
        return torch.bmm(self.attention_weights.unsqueeze(1),      #(50,1,49)
                         values.unsqueeze(-1)).reshape(-1)         #(50,49,1)

In [47]:
# X_tile的形状:(n_train，n_train)，每一行都包含着相同的训练输入
X_tile = x_train.repeat((n_train, 1))
# Y_tile的形状:(n_train，n_train)，每一行都包含着相同的训练输出
Y_tile = y_train.repeat((n_train, 1))
# keys的形状:('n_train'，'n_train'-1)
keys = X_tile[(1 - torch.eye(n_train)).type(torch.bool)].reshape((n_train, -1)) 
# values的形状:('n_train'，'n_train'-1)
values = Y_tile[(1 - torch.eye(n_train)).type(torch.bool)].reshape((n_train, -1)) 
# 不知道为啥要去掉
# 可能是因为x_train和x_test都排过序了, 对角线上两者最为像近，因而权重很大，对于通过全局学习w不利，因而将对角线去掉
# 通过与非对角线元素的差异来学习权重w

In [None]:
net = NWKernelRegression()
loss = nn.MSELoss(reduction='none') #不求平均
trainer = torch.optim.SGD(net.parameters(), lr=0.5)
animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[1, 5])

for epoch in range(5):
    trainer.zero_grad()
    l = loss(net(x_train, keys, values), y_train)
    l.sum().backward()
    trainer.step()
    print(f'epoch {epoch + 1}, loss {float(l.sum()):.6f}')
    animator.add(epoch + 1, float(l.sum()))

In [None]:
# keys的形状:(n_test，n_train)，每一行包含着相同的训练输入（例如，相同的键）
keys = x_train.repeat((n_test, 1))
# value的形状:(n_test，n_train)
values = y_train.repeat((n_test, 1))
y_hat = net(x_test, keys, values).unsqueeze(1).detach()
plot_kernel_reg(y_hat)

In [None]:
d2l.show_heatmaps(net.attention_weights.unsqueeze(0).unsqueeze(0),   #(1,1,50,50)
                  xlabel='Sorted training inputs',
                  ylabel='Sorted testing inputs')