In [1]:
import torch
import torch.nn as nn
import math

class PositionalEncoder(nn.Module):

    def __init__(self, d_model, max_seq_len=80):
        super().__init__()
        # d_model 表示输入嵌入向量的维度，在 Transformer 模型中，通常是一个固定的值
        self.d_model = d_model
        # pe为待生成的位置编码矩阵，创建一个形状为 (max_seq_len, d_model) 的全零张量
        # max_seq_len 输入序列的最大长度，即句子的最大单词数。
        pe = torch.zeros(max_seq_len, d_model)
        # 遍历每个位置
        for pos in range(max_seq_len):
            # 遍历 d_model 中的偶数索引，因为位置编码在偶数和奇数位置上有不同的计算方式
            for i in range(0, d_model, 2):
                # 计算偶数位置的位置编码，使用正弦函数
                pe[pos, i] = math.sin(pos / (10000**((2 * i) / d_model)))
                # 计算奇数位置的位置编码，使用余弦函数
                # 注意这里要确保 i + 1 不超过 d_model 的范围
                if i + 1 < d_model:
                    pe[pos, i + 1] = math.cos(pos / (10000**((2 * (i + 1)) / d_model)))
        # 在第 0 维上增加一个维度，将 pe 的形状从 (max_seq_len, d_model) 变为 (1, max_seq_len, d_model)
        # 这是为了方便后续与输入的批量数据进行广播操作
        pe = pe.unsqueeze(0)
        # 将 pe 注册为缓冲区，这样它会成为模型状态的一部分，但不会被视为模型的参数（即不会被优化器更新）
        # 这样做的好处是在保存和加载模型时，位置编码信息也会被保存和加载
        self.register_buffer('pe', pe)

    def forward(self, x):
        # 在将输入的嵌入向量与位置编码相加之前，先将输入向量乘以 sqrt(d_model)
        # 这是为了使得输入的嵌入向量的数值相对大一些，避免位置编码的影响过小
        x = x * math.sqrt(self.d_model)
        # 获取输入序列的长度
        seq_len = x.size(1)
        # 将输入的嵌入向量与位置编码相加
        # 由于 pe 的形状是 (1, max_seq_len, d_model)，我们只取前 seq_len 个位置的编码信息
        # 这样可以确保位置编码的长度与输入序列的长度一致
        x = x + self.pe[:, :seq_len]
        # 返回添加了位置编码的输入向量
        return x

In [None]:
import torch
import torch.nn as nn
import math

class PositionalEncoder(nn.Module):

    def __init__(self, d_model, max_seq_len=80):
        super().__init__()
        # d_model 表示输入嵌入向量的维度，在 Transformer 模型中，通常是一个固定的值
        self.d_model = d_model
        # pe为待生成的位置编码矩阵，创建一个形状为 (max_seq_len, d_model) 的全零张量
        # max_seq_len 输入序列的最大长度，即句子的最大单词数。
        pe = torch.zeros(max_seq_len, d_model)
        # 遍历每个位置
        for pos in range(max_seq_len):
            # 遍历 d_model 中的偶数索引，因为位置编码在偶数和奇数位置上有不同的计算方式
            for i in range(0, d_model, 2):
                # 计算偶数位置的位置编码，使用正弦函数
                pe[pos, i] = math.sin(pos / (10000**((2 * i) / d_model)))
                # 计算奇数位置的位置编码，使用余弦函数
                # 注意这里要确保 i + 1 不超过 d_model 的范围
                if i + 1 < d_model:
                    pe[pos, i + 1] = math.cos(pos / (10000**((2 * (i + 1)) / d_model)))
        # 在第 0 维上增加一个维度，将 pe 的形状从 (max_seq_len, d_model) 变为 (1, max_seq_len, d_model)
        # 这是为了方便后续与输入的批量数据进行广播操作
        pe = pe.unsqueeze(0)
        # 将 pe 注册为缓冲区，这样它会成为模型状态的一部分，但不会被视为模型的参数（即不会被优化器更新）
        # 这样做的好处是在保存和加载模型时，位置编码信息也会被保存和加载
        self.register_buffer('pe', pe)

    def forward(self, x):
        # 在将输入的嵌入向量与位置编码相加之前，先将输入向量乘以 sqrt(d_model)
        # 这是为了使得输入的嵌入向量的数值相对大一些，避免位置编码的影响过小
        x = x * math.sqrt(self.d_model)
        # 获取输入序列的长度
        seq_len = x.size(1)
        # 将输入的嵌入向量与位置编码相加
        # 由于 pe 的形状是 (1, max_seq_len, d_model)，我们只取前 seq_len 个位置的编码信息
        # 这样可以确保位置编码的长度与输入序列的长度一致
        x = x + self.pe[:, :seq_len]
        # 返回添加了位置编码的输入向量
        return x

多头注意力机制

heads：它表示注意力头的数量。多头注意力机制通过将输入的特征向量分割成多个头，
每个头独立地计算注意力，从而可以捕捉到不同方面的信息。

例如，设置 heads = 8 表示将输入的特征向量分割成 8 个子空间进行处理。

### d_k 的计算和作用
计算方式：self.d_k = d_model // heads 这行代码使用整数除法

每个注意力头的维度 是d_k。

例如，如果 d_model = 512，heads = 8，那么 d_k = 512 // 8 = 64。

将多头注意力机制和掩码注意力机制写在一个类中
判断是否有掩码

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

# 定义多头注意力机制类，继承自 PyTorch 的 nn.Module 类
class MultiHeadAttention(nn.Module):

    def __init__(self, heads, d_model, dropout=0.1):
        """
        初始化多头注意力机制。

        参数:
        heads (int): 注意力头的数量。
        d_model (int): 输入嵌入向量的维度。
        dropout (float): 丢弃率，用于防止过拟合，默认值为 0.1。
        """
        # 调用父类 nn.Module 的构造函数
        super().__init__()
        # 保存输入嵌入向量的维度
        self.d_model = d_model
       
        # 保存注意力头的数量
        self.h = heads
        # 定义Q,K,V的线性变换层，将输入的 d_model 维度映射到 d_model 维度
        self.q_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        # 定义丢弃层，用于防止过拟合
        self.dropout = nn.Dropout(dropout)
        # 定义输出的线性变换层，将多头注意力的输出映射回 d_model 维度
        self.out = nn.Linear(d_model, d_model)

    def attention(self, q, k, v, d_k, mask=None, dropout=None):
        """
        计算注意力分数并应用注意力机制。

        参数:
        q (torch.Tensor): 查询张量，形状为 (batch_size, heads, seq_len, d_k)。
        k (torch.Tensor): 键张量，形状为 (batch_size, heads, seq_len, d_k)。
        v (torch.Tensor): 值张量，形状为 (batch_size, heads, seq_len, d_k)。
        d_k (int): 每个注意力头的维度。
        mask (torch.Tensor, 可选): 掩码张量，用于屏蔽某些位置的注意力分数，默认为 None。
        dropout (nn.Dropout, 可选): 丢弃层，用于防止过拟合，默认为 None。

        返回:
        torch.Tensor: 注意力机制的输出，形状为 (batch_size, heads, seq_len, d_k)。
        """
        # 计算查询和键的点积，得到注意力分数
        # k.transpose(-2, -1) 将 k 的最后两个维度交换，以便进行矩阵乘法
        # 除以 math.sqrt(d_k) 是为了缩放注意力分数，防止梯度消失或爆炸
        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k)
        # 如果提供了掩码，这里的掩码是与------相同形状的矩阵。
        if mask is not None:
            # 在第 1 维上增加一个维度，以便与 scores 张量进行广播操作
            mask = mask.unsqueeze(1)
            # 使用掩码将需要屏蔽的位置的注意力分数设置为负无穷大
            # 这样在后续的 softmax 操作中，这些位置的概率将趋近于 0
            scores = scores.masked_fill(mask == 0, -1e9)
        # 对注意力分数应用 softmax 函数，得到注意力分布
        scores = F.softmax(scores, dim=-1)
        # 如果提供了丢弃层
        if dropout is not None:
            # 对注意力分布应用丢弃操作，防止过拟合
            scores = dropout(scores)
        # 将注意力分布与值张量相乘，得到注意力机制的输出
        output = torch.matmul(scores, v)
        return output

    def forward(self, q, k, v, mask=None):
        """
        前向传播函数，实现多头注意力机制的计算。

        参数:
        q (torch.Tensor): 查询输入，形状为 (batch_size, seq_len, d_model)。
        k (torch.Tensor): 键输入，形状为 (batch_size, seq_len, d_model)。
        v (torch.Tensor): 值输入，形状为 (batch_size, seq_len, d_model)。
        mask (torch.Tensor, 可选): 掩码张量，用于屏蔽某些位置的注意力分数，默认为 None。

        返回:
        torch.Tensor: 多头注意力机制的输出，形状为 (batch_size, seq_len, d_model)。
        """
        # 获取批量大小
        bs = q.size(0)
        # 对键输入进行线性变换，然后将其分割成多个头
        # view(bs, -1, self.h, self.d_k) 将张量重塑为 (batch_size, seq_len, heads, d_k)
        # transpose(1, 2) 将第 1 维和第 2 维交换，得到 (batch_size, heads, seq_len, d_k)
        k = self.k_linear(k).view(bs, -1, self.h, self.d_k).transpose(1, 2)
        # 对查询输入进行线性变换，然后将其分割成多个头
        q = self.q_linear(q).view(bs, -1, self.h, self.d_k).transpose(1, 2)
        # 对值输入进行线性变换，然后将其分割成多个头
        v = self.v_linear(v).view(bs, -1, self.h, self.d_k).transpose(1, 2)
        # 调用 attention 函数计算多头注意力分数
        scores = self.attention(q, k, v, self.d_k, mask, self.dropout)
        # 将多头注意力的输出进行合并
        # transpose(1, 2) 将第 1 维和第 2 维交换，得到 (batch_size, seq_len, heads, d_k)
        # contiguous() 确保张量在内存中是连续的，以便后续的 view 操作能够正常进行
        # view(bs, -1, self.d_model) 将张量重塑为 (batch_size, seq_len, d_model)
        concat = scores.transpose(1, 2).contiguous().view(bs, -1, self.d_model)
        # 对合并后的输出进行线性变换，得到最终的多头注意力机制的输出
        output = self.out(concat)
        return output

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# 定义前馈神经网络类，继承自 PyTorch 的 nn.Module 类
class FeedForward(nn.Module):

    def __init__(self, d_model, d_ff=2048, dropout=0.1):
        """
        初始化前馈神经网络层。

        参数:
        d_model (int): 输入和输出的特征维度，通常是模型的嵌入维度。
        d_ff (int): 前馈网络隐藏层的维度，默认值为 2048。
        dropout (float): 丢弃率，用于防止过拟合，默认值为 0.1。
        """
        # 调用父类 nn.Module 的构造函数
        super().__init__()
        # 定义第一个线性变换层，将输入的 d_model 维度映射到 d_ff 维度
        # 这里的 d_ff 是前馈网络隐藏层的维度，默认设置为 2048
        self.linear_1 = nn.Linear(d_model, d_ff)
        # 定义丢弃层，用于防止过拟合
        self.dropout = nn.Dropout(dropout)
        # 定义第二个线性变换层，将 d_ff 维度映射回 d_model 维度
        self.linear_2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        """
        前向传播函数，实现前馈神经网络的计算。

        参数:
        x (torch.Tensor): 输入张量，形状通常为 (batch_size, seq_len, d_model)。

        返回:
        torch.Tensor: 前馈神经网络的输出，形状为 (batch_size, seq_len, d_model)。
        """
        # 首先将输入 x 通过第一个线性变换层
        # 然后使用 ReLU 激活函数引入非线性
        # 最后应用丢弃层防止过拟合
        x = self.dropout(F.relu(self.linear_1(x)))
        # 将经过非线性变换和丢弃操作后的结果通过第二个线性变换层
        # 得到最终的输出，其维度与输入维度相同，都是 d_model
        x = self.linear_2(x)
        return x

In [None]:
import torch
import torch.nn as nn

# 定义层归一化层类，继承自 PyTorch 的 nn.Module 类
class NormLayer(nn.Module):

    def __init__(self, d_model, eps=1e-6):
        """
        初始化层归一化层。

        参数:
        d_model (int): 输入张量的特征维度，即每个样本的特征数量。
        eps (float): 一个小的常量，用于数值稳定性，防止分母为零，默认值为 1e-6。
        """
        # 调用父类 nn.Module 的构造函数
        super().__init__()
        # 保存输入张量的特征维度
        self.size = d_model
        # 层归一化包含两个可学习的参数
        # alpha 是缩放因子，初始化为全 1 的张量，形状为 (d_model,)
        # 使用 nn.Parameter 将其包装为可训练的参数，在训练过程中会自动更新
        self.alpha = nn.Parameter(torch.ones(self.size))
        # bias 是偏移量，初始化为全 0 的张量，形状为 (d_model,)
        # 同样使用 nn.Parameter 将其包装为可训练的参数
        self.bias = nn.Parameter(torch.zeros(self.size))
        # 保存用于数值稳定性的小常量
        self.eps = eps

    def forward(self, x):
        """
        前向传播函数，实现层归一化的计算。
 
        参数:
        x (torch.Tensor): 输入张量，形状通常为 (batch_size, seq_len, d_model)。

        返回:
        torch.Tensor: 层归一化后的输出张量，形状与输入相同，为 (batch_size, seq_len, d_model)。
        """
        # 计算输入张量在最后一个维度（特征维度）上的均值
        # dim=-1 表示在最后一个维度上计算，keepdim=True 表示保持维度不变
        # 这样得到的均值张量形状与输入张量相同，方便后续计算
        mean = x.mean(dim=-1, keepdim=True)
        # 计算输入张量在最后一个维度（特征维度）上的标准差
        # 同样在最后一个维度上计算，保持维度不变
        # 加上 self.eps 是为了防止标准差为零，保证数值稳定性
        std = x.std(dim=-1, keepdim=True) + self.eps
        # 进行层归一化操作
        # 先将输入张量减去均值，再除以标准差，得到归一化后的张量
        # 然后乘以可学习的缩放因子 alpha，再加上可学习的偏移量 bias
        norm = self.alpha * ((x - mean) / std) + self.bias
        return norm

In [None]:
import torch
import torch.nn as nn

# 假设 PositionalEncoder、EncoderLayer 和 NormLayer 类已经定义
# 这里只是使用它们，具体定义可参考之前的代码

# 定义编码器类，继承自 PyTorch 的 nn.Module 类
class Encoder(nn.Module):

    def __init__(self, vocab_size, d_model, N, heads, dropout):
        """
        初始化编码器。

        参数:
        vocab_size (int): 词汇表的大小，即不同单词的数量。
        d_model (int): 模型的嵌入维度，也就是每个单词的嵌入向量的维度。
        N (int): 编码器层的数量，即堆叠的 EncoderLayer 的数量。
        heads (int): 多头注意力机制中注意力头的数量。
        dropout (float): 丢弃率，用于防止过拟合。
        """
        # 调用父类 nn.Module 的构造函数
        super().__init__()
        # 保存编码器层的数量
        self.N = N
        # 定义词嵌入层，将输入的单词索引转换为对应的嵌入向量
        # vocab_size 是词汇表的大小，d_model 是嵌入向量的维度
        self.embed = nn.Embedding(vocab_size, d_model)
        # 创建位置编码器实例，用于为输入的嵌入向量添加位置信息
        self.pe = PositionalEncoder(d_model)
        # 创建一个 ModuleList，其中包含 N 个 EncoderLayer 实例
        # 每个 EncoderLayer 包含多头注意力机制和前馈神经网络
        self.layers = nn.ModuleList([EncoderLayer(d_model, heads, dropout) for _ in range(N)])
        # 创建层归一化层实例，用于对编码器的输出进行归一化处理
        self.norm = NormLayer(d_model)

    def forward(self, src, mask):
        """
        前向传播函数，实现编码器的计算。

        参数:
        src (torch.Tensor): 输入的源序列，通常是单词的索引序列，形状为 (batch_size, seq_len)。
        mask (torch.Tensor): 掩码张量，用于屏蔽某些位置的注意力分数，形状根据具体需求而定。

        返回:
        torch.Tensor: 编码器的输出，形状为 (batch_size, seq_len, d_model)。
        """
        # 通过词嵌入层将输入的单词索引转换为嵌入向量
        # 输出的形状为 (batch_size, seq_len, d_model)
        x = self.embed(src)
        # 为嵌入向量添加位置编码信息
        # 位置编码可以让模型感知到单词在序列中的位置
        x = self.pe(x)
        # 依次通过 N 个 EncoderLayer 进行处理
        for layer in self.layers:
            # 每个 EncoderLayer 都会对输入进行多头注意力和前馈神经网络的计算
            x = layer(x, mask)
        # 最后对编码器的输出进行层归一化处理
        return self.norm(x)

In [None]:
import torch
import torch.nn as nn

# 假设 MultiHeadAttention、FeedForward 和 NormLayer 类已经定义
# 这里直接使用它们

# 定义编码器层类，继承自 PyTorch 的 nn.Module 类
class EncoderLayer(nn.Module):

    def __init__(self, d_model, heads, dropout=0.1):
        """
        初始化编码器层。

        参数:
        d_model (int): 模型的嵌入维度，即输入和输出的特征维度。
        heads (int): 多头注意力机制中注意力头的数量。
        dropout (float): 丢弃率，用于防止过拟合，默认值为 0.1。
        """
        # 调用父类 nn.Module 的构造函数
        super().__init__()
        # 第一个层归一化层，用于在多头注意力机制前对输入进行归一化
        self.norm_1 = NormLayer(d_model)
        # 第二个层归一化层，用于在前馈神经网络前对输入进行归一化
        self.norm_2 = NormLayer(d_model)
        # 多头注意力机制模块
        self.attn = MultiHeadAttention(heads, d_model, dropout=dropout)
        # 前馈神经网络模块
        self.ff = FeedForward(d_model, dropout=dropout)
        # 第一个丢弃层，用于在多头注意力机制的输出上应用丢弃操作
        self.dropout_1 = nn.Dropout(dropout)
        # 第二个丢弃层，用于在前馈神经网络的输出上应用丢弃操作
        self.dropout_2 = nn.Dropout(dropout)

    def forward(self, x, mask):
        """
        前向传播函数，实现编码器层的计算。

        参数:
        x (torch.Tensor): 输入张量，形状通常为 (batch_size, seq_len, d_model)。
        mask (torch.Tensor): 掩码张量，用于屏蔽某些位置的注意力分数，形状根据具体需求而定。

        返回:
        torch.Tensor: 编码器层的输出，形状与输入相同，为 (batch_size, seq_len, d_model)。
        """
        # 首先对输入进行层归一化
        x2 = self.norm_1(x)
        # 将归一化后的输入传入多头注意力机制进行计算
        # 这里查询（Query）、键（Key）和值（Value）都使用归一化后的输入 x2
        attn_output = self.attn(x2, x2, x2, mask)
        # 对多头注意力机制的输出应用丢弃操作，防止过拟合
        attn_output = self.dropout_1(attn_output)
        # 将多头注意力机制的输出与原始输入进行残差连接
        # 残差连接有助于缓解梯度消失问题，使模型更容易训练
        x = x + attn_output
        # 对经过多头注意力机制和残差连接后的输出再次进行层归一化
        x2 = self.norm_2(x)
        # 将归一化后的输出传入前馈神经网络进行计算
        ff_output = self.ff(x2)
        # 对前馈神经网络的输出应用丢弃操作，防止过拟合
        ff_output = self.dropout_2(ff_output)
        # 将前馈神经网络的输出与经过多头注意力机制处理后的输入进行残差连接
        x = x + ff_output
        # 返回最终的输出
        return x

In [None]:
import torch
import torch.nn as nn

# 假设 PositionalEncoder、DecoderLayer 和 NormLayer 类已经定义，这里直接使用

# 定义解码器类，继承自 PyTorch 的 nn.Module 类
class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, N, heads, dropout):
        """
        初始化解码器。

        参数:
        vocab_size (int): 目标词汇表的大小，即不同目标单词的数量。
        d_model (int): 模型的嵌入维度，也就是每个单词的嵌入向量的维度。
        N (int): 解码器层的数量，即堆叠的 DecoderLayer 的数量。
        heads (int): 多头注意力机制中注意力头的数量。
        dropout (float): 丢弃率，用于防止过拟合。
        """
        # 调用父类的构造函数
        super().__init__()
        # 保存解码器层的数量
        self.N = N
        # 定义词嵌入层，将目标序列的单词索引转换为对应的嵌入向量
        # vocab_size 是目标词汇表的大小，d_model 是嵌入向量的维度
        self.embed = nn.Embedding(vocab_size, d_model)
        # 创建位置编码器实例，用于为目标序列的嵌入向量添加位置信息
        self.pe = PositionalEncoder(d_model)
        # 创建一个 ModuleList，其中包含 N 个 DecoderLayer 实例
        # 每个 DecoderLayer 包含自注意力机制、编码器 - 解码器注意力机制和前馈神经网络
        self.layers = nn.ModuleList([DecoderLayer(d_model, heads, dropout) for _ in range(N)])
        # 创建层归一化层实例，用于对解码器的输出进行归一化处理
        self.norm = NormLayer(d_model)

    def forward(self, trg, e_outputs, src_mask, trg_mask):
        """
        前向传播函数，实现解码器的计算。

        参数:
        trg (torch.Tensor): 输入的目标序列，通常是目标单词的索引序列，形状为 (batch_size, trg_seq_len)。
        e_outputs (torch.Tensor): 编码器的输出，形状为 (batch_size, src_seq_len, d_model)。
        src_mask (torch.Tensor): 源序列的掩码张量，用于屏蔽源序列中的填充位置等，形状根据源序列而定。
        trg_mask (torch.Tensor): 目标序列的掩码张量，用于屏蔽目标序列中的未来位置和填充位置等，形状根据目标序列而定。

        返回:
        torch.Tensor: 解码器的输出，形状为 (batch_size, trg_seq_len, d_model)。
        """
        # 通过词嵌入层将输入的目标序列单词索引转换为嵌入向量
        # 输出的形状为 (batch_size, trg_seq_len, d_model)
        x = self.embed(trg)
        # 为目标序列的嵌入向量添加位置编码信息
        # 位置编码可以让模型感知到目标单词在序列中的位置
        x = self.pe(x)
        # 依次通过 N 个 DecoderLayer 进行处理
        for layer in self.layers:
            # 每个 DecoderLayer 都会对输入进行自注意力、编码器 - 解码器注意力和前馈神经网络的计算
            x = layer(x, e_outputs, src_mask, trg_mask)
        # 最后对解码器的输出进行层归一化处理
        return self.norm(x)

In [None]:
import torch
import torch.nn as nn

# 假设 MultiHeadAttention、FeedForward 和 NormLayer 类已经定义
# 这里直接使用这些类

class DecoderLayer(nn.Module):
    def __init__(self, d_model, heads, dropout=0.1):
        """
        初始化解码器层。

        参数:
        d_model (int): 模型的嵌入维度，即输入和输出张量的特征维度。
        heads (int): 多头注意力机制中注意力头的数量。
        dropout (float): 丢弃率，用于防止过拟合，默认值为 0.1。
        """
        super().__init__()
        # 定义三个层归一化模块，分别用于不同子层的输入归一化
        # 第一个层归一化用于自注意力机制的输入
        self.norm_1 = NormLayer(d_model)
        # 第二个层归一化用于编码器 - 解码器注意力机制的输入
        self.norm_2 = NormLayer(d_model)
        # 第三个层归一化用于前馈神经网络的输入
        self.norm_3 = NormLayer(d_model)

        # 定义三个丢弃层，分别应用于不同子层的输出，防止过拟合
        self.dropout_1 = nn.Dropout(dropout)
        self.dropout_2 = nn.Dropout(dropout)
        self.dropout_3 = nn.Dropout(dropout)

        # 定义两个多头注意力模块
        # 第一个多头注意力模块是自注意力机制，用于处理目标序列内部的依赖关系
        self.attn_1 = MultiHeadAttention(heads, d_model, dropout=dropout)
        # 第二个多头注意力模块是编码器 - 解码器注意力机制，用于结合编码器输出和目标序列信息
        self.attn_2 = MultiHeadAttention(heads, d_model, dropout=dropout)

        # 定义前馈神经网络模块，用于对特征进行进一步的非线性变换
        self.ff = FeedForward(d_model, dropout=dropout)

    def forward(self, x, e_outputs, src_mask, trg_mask):
        """
        前向传播函数，实现解码器层的计算。

        参数:
        x (torch.Tensor): 输入的目标序列张量，形状为 (batch_size, trg_seq_len, d_model)。
        e_outputs (torch.Tensor): 编码器的输出张量，形状为 (batch_size, src_seq_len, d_model)。
        src_mask (torch.Tensor): 源序列的掩码张量，用于屏蔽源序列中的填充位置等，形状根据源序列而定。
        trg_mask (torch.Tensor): 目标序列的掩码张量，用于屏蔽目标序列中的未来位置和填充位置等，形状根据目标序列而定。

        返回:
        torch.Tensor: 解码器层的输出张量，形状为 (batch_size, trg_seq_len, d_model)。
        """
        # 1. 自注意力子层
        # 对输入的目标序列进行层归一化
        x2 = self.norm_1(x)
        # 计算自注意力机制的输出，这里查询（Query）、键（Key）和值（Value）都使用归一化后的目标序列
        # trg_mask 用于屏蔽目标序列中的未来位置和填充位置，防止模型看到未来信息
        attn_1_output = self.attn_1(x2, x2, x2, trg_mask)
        # 对自注意力机制的输出应用丢弃操作，防止过拟合
        attn_1_output = self.dropout_1(attn_1_output)
        # 进行残差连接，将自注意力机制的输出与原始输入相加
        # 残差连接有助于缓解梯度消失问题，使模型更容易训练
        x = x + attn_1_output

        # 2. 编码器 - 解码器注意力子层
        # 对经过自注意力子层处理后的目标序列再次进行层归一化
        x2 = self.norm_2(x)
        # 计算编码器 - 解码器注意力机制的输出
        # 查询（Query）使用归一化后的目标序列，键（Key）和值（Value）使用编码器的输出
        # src_mask 用于屏蔽源序列中的填充位置
        attn_2_output = self.attn_2(x2, e_outputs, e_outputs, src_mask)
        # 对编码器 - 解码器注意力机制的输出应用丢弃操作，防止过拟合
        attn_2_output = self.dropout_2(attn_2_output)
        # 进行残差连接，将编码器 - 解码器注意力机制的输出与经过自注意力子层处理后的输入相加
        x = x + attn_2_output

        # 3. 前馈神经网络子层
        # 对经过编码器 - 解码器注意力子层处理后的目标序列进行层归一化
        x2 = self.norm_3(x)
        # 计算前馈神经网络的输出
        ff_output = self.ff(x2)
        # 对前馈神经网络的输出应用丢弃操作，防止过拟合
        ff_output = self.dropout_3(ff_output)
        # 进行残差连接，将前馈神经网络的输出与经过编码器 - 解码器注意力子层处理后的输入相加
        x = x + ff_output

        return x


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

# ========================================
# 位置编码模块
# ========================================
class PositionalEncoder(nn.Module):
    def __init__(self, d_model, max_seq_len=80):
        super().__init__()
        self.d_model = d_model
        pe = torch.zeros(max_seq_len, d_model)
        for pos in range(max_seq_len):
            for i in range(0, d_model, 2):
                pe[pos, i] = math.sin(pos / (10000 ** ((2 * i) / d_model)))
                if i + 1 < d_model:
                    pe[pos, i + 1] = math.cos(pos / (10000 ** ((2 * (i + 1)) / d_model)))
        pe = pe.unsqueeze(0)  # 增加batch维度
        self.register_buffer('pe', pe)  # 注册为非训练参数

    def forward(self, x):
        x = x * math.sqrt(self.d_model)  # 缩放嵌入向量
        seq_len = x.size(1)
        x = x + self.pe[:, :seq_len]  # 广播机制自动匹配维度
        return x

# ========================================
# 多头注意力模块
# ========================================
class MultiHeadAttention(nn.Module):
    def __init__(self, heads, d_model, dropout=0.1):
        super().__init__()
        self.d_model = d_model
        self.d_k = d_model // heads
        self.h = heads
        self.q_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)
        self.out = nn.Linear(d_model, d_model)

    def attention(self, q, k, v, d_k, mask=None, dropout=None):
        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k)
        if mask is not None:
            mask = mask.unsqueeze(1)
            scores = scores.masked_fill(mask == 0, -1e9)
        scores = F.softmax(scores, dim=-1)
        if dropout is not None:
            scores = dropout(scores)
        output = torch.matmul(scores, v)
        return output

    def forward(self, q, k, v, mask=None):
        bs = q.size(0)
        # 线性变换并分割多头
        k = self.k_linear(k).view(bs, -1, self.h, self.d_k).transpose(1, 2)
        q = self.q_linear(q).view(bs, -1, self.h, self.d_k).transpose(1, 2)
        v = self.v_linear(v).view(bs, -1, self.h, self.d_k).transpose(1, 2)
        # 计算注意力
        scores = self.attention(q, k, v, self.d_k, mask, self.dropout)
        # 合并多头
        concat = scores.transpose(1, 2).contiguous().view(bs, -1, self.d_model)
        output = self.out(concat)
        return output

# ========================================
# 前馈神经网络模块
# ========================================
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=2048, dropout=0.1):
        super().__init__()
        self.linear_1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        x = self.dropout(F.relu(self.linear_1(x)))
        x = self.linear_2(x)
        return x

# ========================================
# 层归一化模块
# ========================================
class NormLayer(nn.Module):
    def __init__(self, d_model, eps=1e-6):
        super().__init__()
        self.size = d_model
        self.alpha = nn.Parameter(torch.ones(self.size))
        self.bias = nn.Parameter(torch.zeros(self.size))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        std = x.std(dim=-1, keepdim=True) + self.eps
        return self.alpha * (x - mean) / std + self.bias

# ========================================
# 编码器层模块
# ========================================
class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, dropout=0.1):
        super().__init__()
        self.norm_1 = NormLayer(d_model)
        self.norm_2 = NormLayer(d_model)
        self.attn = MultiHeadAttention(heads, d_model, dropout=dropout)
        self.ff = FeedForward(d_model, dropout=dropout)
        self.dropout_1 = nn.Dropout(dropout)
        self.dropout_2 = nn.Dropout(dropout)

    def forward(self, x, mask):
        x = x + self.dropout_1(self.attn(self.norm_1(x), self.norm_1(x), self.norm_1(x), mask))
        x = x + self.dropout_2(self.ff(self.norm_2(x)))
        return x

# ========================================
# 编码器模块
# ========================================
class Encoder(nn.Module):
    def __init__(self, vocab_size, d_model, N, heads, dropout):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, d_model)
        self.pe = PositionalEncoder(d_model)
        self.layers = nn.ModuleList([EncoderLayer(d_model, heads, dropout) for _ in range(N)])
        self.norm = NormLayer(d_model)

    def forward(self, src, mask):
        x = self.pe(self.embed(src))
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

# ========================================
# 解码器层模块
# ========================================
class DecoderLayer(nn.Module):
    def __init__(self, d_model, heads, dropout=0.1):
        super().__init__()
        self.norm_1 = NormLayer(d_model)
        self.norm_2 = NormLayer(d_model)
        self.norm_3 = NormLayer(d_model)
        self.attn_1 = MultiHeadAttention(heads, d_model, dropout=dropout)
        self.attn_2 = MultiHeadAttention(heads, d_model, dropout=dropout)
        self.ff = FeedForward(d_model, dropout=dropout)
        self.dropout_1 = nn.Dropout(dropout)
        self.dropout_2 = nn.Dropout(dropout)
        self.dropout_3 = nn.Dropout(dropout)

    def forward(self, x, e_outputs, src_mask, trg_mask):
        x = x + self.dropout_1(self.attn_1(self.norm_1(x), self.norm_1(x), self.norm_1(x), trg_mask))
        x = x + self.dropout_2(self.attn_2(self.norm_2(x), e_outputs, e_outputs, src_mask))
        x = x + self.dropout_3(self.ff(self.norm_3(x)))
        return x

# ========================================
# 解码器模块
# ========================================
class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, N, heads, dropout):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, d_model)
        self.pe = PositionalEncoder(d_model)
        self.layers = nn.ModuleList([DecoderLayer(d_model, heads, dropout) for _ in range(N)])
        self.norm = NormLayer(d_model)

    def forward(self, trg, e_outputs, src_mask, trg_mask):
        x = self.pe(self.embed(trg))
        for layer in self.layers:
            x = layer(x, e_outputs, src_mask, trg_mask)
        return self.norm(x)

# ========================================
# 完整Transformer模型
# ========================================
class Transformer(nn.Module):
    def __init__(self, src_vocab, trg_vocab, d_model, N, heads, dropout):
        super().__init__()
        self.encoder = Encoder(src_vocab, d_model, N, heads, dropout)
        self.decoder = Decoder(trg_vocab, d_model, N, heads, dropout)
        self.out = nn.Linear(d_model, trg_vocab)

    def forward(self, src, trg, src_mask, trg_mask):
        e_outputs = self.encoder(src, src_mask)
        d_output = self.decoder(trg, e_outputs, src_mask, trg_mask)
        output = self.out(d_output)
        try:
            # 为了避免在生成时进行softmax，这里分开处理
            return output
        except:
            return F.log_softmax(output, dim=-1)