In [25]:
import pandas as pd
import torch
import torch.nn as nn
from torch import log_softmax
import copy
import math

# Encoder and Decoder

In [7]:
class EncoderDecoder(nn.Module):
   """
   标准的编码器-解码器架构。该架构是许多模型的基础。
   - encoder：编码器模块，用于处理输入（源）序列。
   - decoder：解码器模块，用于生成输出（目标）序列。
   - src_embed 和 tgt_embed：分别用于对源序列和目标序列进行嵌入（embedding）的模块，将输入序列转化为向量表示。
   - generator：通常是一个全连接层，用于将解码器的输出转化为最终的预测结果，例如词汇表中的概率分布。
   """

   def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):
      super(EncoderDecoder, self).__init__()
      self.encoder = encoder
      self.decoder = decoder
      self.src_embed = src_embed
      self.tgt_embed = tgt_embed
      self.generator = generator

   def forward(self, src, tgt, src_mask, tgt_mask):
      # 接收并处理 marked 的 src 和目标序列。
      return self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)

   def encode(self, src, src_mask):
      # 将 src 序列进行编码
      return self.encoder(self.src_embed(src), src_mask)

   def decode(self, memory, src_mask, tgt, tgt_mask):
      # 将 memory 和 tgt 序列进行解码
      return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)


class Generator(nn.Module):
   "定义标准线性softmax生成步骤。"

   def __init__(self, d_model, vocab):
      super(Generator, self).__init__()
      # 这个线性层将输入从 d_model 维度投影到 vocab 维度
      self.proj = nn.Linear(d_model, vocab)

   def forward(self, x):
      return log_softmax(self.proj(x), dim=-1)

# Encoder   
将 N 个相同的子层堆叠在一起，形成一个编码器或解码器。

In [8]:
def clones(module, N):
    "生成 N 个相同的 layer。"
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

In [9]:

class LayerNorm(nn.Module):
    "构造一个层归一化（LayerNorm）。"

    def __init__(self, features, eps=1e-6):
        super(LayerNorm, self).__init__()
        # 初始化3个可训练参数，用于缩放和平移
        self.a_2 = nn.Parameter(torch.ones(features))
        self.b_2 = nn.Parameter(torch.zeros(features))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.a_2 * (x - mean) / (std + self.eps) + self.b_2

In [10]:
class Encoder(nn.Module):
    "编码器由 N 个相同的层堆叠组成。"

    def __init__(self, layer, N):
        super(Encoder, self).__init__()
        # 将 layer 克隆 N 次
        self.layers = clones(layer, N)
        # 层归一化
        self.norm = LayerNorm(layer.size)

    def forward(self, x, mask):
        """
        依次通过每个层传递输入 (和掩码)。
        通过每个层运行 x，然后进行规范化。
        """
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)


In [11]:
class SublayerConnection(nn.Module):
    """
    A residual connection followed by a layer norm.
    Note for code simplicity the norm is first as opposed to last.
    """

    def __init__(self, size, dropout):
        super(SublayerConnection, self).__init__()
        self.norm = LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        "Apply residual connection to any sublayer with the same size."
        return x + self.dropout(sublayer(self.norm(x)))

In [12]:
class EncoderLayer(nn.Module):
    "编码器由自注意力机制和前馈网络 (定义如下) 组成"

    def __init__(self, size, self_attn, feed_forward, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = self_attn # self_attn将在下面的代码实现
        self.feed_forward = feed_forward
        self.sublayer = clones(SublayerConnection(size, dropout), 2)
        self.size = size

    def forward(self, x, mask):
        "Follow Figure 1 (left) for connections."
        x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
        return self.sublayer[1](x, self.feed_forward)

# Decoder

In [13]:
class Decoder(nn.Module):
       "生成具有掩码的通用N层解码器。"
       def __init__(self, layer, N):
              super(Decoder, self).__init__()
              self.layers = clones(layer, N)
              self.norm = LayerNorm(layer.size)

       def forward(self, x, memory, src_mask, tgt_mask):
              """
              - x：目标序列的嵌入表示。
              - memory：来自编码器的输出，包含源序列的信息。
              - src_mask：源序列的掩码，用于源注意力机制。
              - tgt_mask：目标序列的掩码，用于自注意力机制，确保自回归性质。
              """
              for layer in self.layers:
                     x = layer(x, memory, src_mask, tgt_mask)
              return self.norm(x)

In [14]:
class DecoderLayer(nn.Module):
       """
       解码器层，由三个主要部分组成：自注意力、源注意力和前馈神经网络。
       """

       def __init__(self, size, self_attn, src_attn, feed_forward, dropout):
              """
              初始化解码器层。

              参数:
              - size: 模型的维度
              - self_attn: 自注意力机制
              - src_attn: 源注意力机制（也称为交叉注意力）
              - feed_forward: 前馈神经网络
              - dropout: Dropout 比率
              """
              super(DecoderLayer, self).__init__()
              self.size = size
              self.self_attn = self_attn  
              self.src_attn = src_attn   
              self.feed_forward = feed_forward  # 前馈神经网络
              # 创建三个子层连接，用于实现残差连接和层归一化
              self.sublayer = clones(SublayerConnection(size, dropout), 3)

       def forward(self, x, memory, src_mask, tgt_mask):
              """
              前向传播函数。

              参数:
              - x: 解码器的输入
              - memory: 编码器的输出
              - src_mask: 源序列的掩码
              - tgt_mask: 目标序列的掩码

              返回:
              - 经过完整解码器层处理后的张量
              """
              m = memory
              # 1. 自注意力子层
              # 这里的 x 为解码器的输入，x 被用作 query、key 和 value。
              x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
              # 2. 源注意力子层（交叉注意力）
              # 这里 x 是来自解码器的查询（query），而 m（即 memory）是来自编码器的键（key）和值（value）
              x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, src_mask))
              # 3. 前馈神经网络子层
              return self.sublayer[2](x, self.feed_forward)

In [15]:
def subsequent_mask(size):
    "创建一个掩码来屏蔽后续位置。"
    # 创建一个形状为 (1, size, size) 的三维张量
    attn_shape = (1, size, size)
    
    # 使用 torch.triu 创建一个上三角矩阵，对角线上移一位
    # diagonal=1 表示主对角线上方的第一条对角线
    subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(torch.uint8)
    
    # 将上三角矩阵取反，得到最终的掩码
    return subsequent_mask == 0

In [24]:
subsequent_mask(5)

tensor([[[ True, False, False, False, False],
         [ True,  True, False, False, False],
         [ True,  True,  True, False, False],
         [ True,  True,  True,  True, False],
         [ True,  True,  True,  True,  True]]])

# Scaled Dot-Product Attention

In [None]:
def attention(query, key, value, mask=None, dropout=None):
    "Compute 'Scaled Dot Product Attention'"
    d_k = query.size(-1)
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
    if mask is not None:
        scores = scores.masked_fill(mask == 0, -1e9)
    p_attn = scores.softmax(dim=-1)
    if dropout is not None:
        p_attn = dropout(p_attn)
    return torch.matmul(p_attn, value), p_attn