In [2]:
# 计算注意力分数
# 导入库
import torch
import torch.nn.functional as F

# 示例输入序列
input_sequence = torch.tensor([[0.1, 0.2, 0.3], [0.4, 0.5, 0.6], [0.7, 0.8, 0.9]])

# 生成 Key、Query 和 Value 矩阵的随机权重
random_weights_key = torch.randn(input_sequence.size(-1), input_sequence.size(-1))
random_weights_query = torch.randn(input_sequence.size(-1), input_sequence.size(-1))
random_weights_value = torch.randn(input_sequence.size(-1), input_sequence.size(-1))

# 计算 Key、Query 和 Value 矩阵
key = torch.matmul(input_sequence, random_weights_key)
query = torch.matmul(input_sequence, random_weights_query)
value = torch.matmul(input_sequence, random_weights_value)

# 计算注意力分数
attention_scores = torch.matmul(query, key.T) / torch.sqrt(torch.tensor(query.size(-1), dtype=torch.float32))

# 使用 softmax 函数获得注意力权重
attention_weights = F.softmax(attention_scores, dim=-1)

# 计算 Value 向量的加权和
output = torch.matmul(attention_weights, value)

print("自注意力机制后的输出:")
print(output)


自注意力机制后的输出:
tensor([[ 1.3634, -0.1174, -0.2121],
        [ 1.4749, -0.1411, -0.2295],
        [ 1.5752, -0.1625, -0.2451]])


In [3]:
# 位置编码的实现
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F


class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        # 继承nn.Module的初始化方法
        super(PositionalEncoding, self).__init__()
        
        # 计算位置编码
        # 初始化一个(max_len, d_model)的零张量用于存储位置编码
        pe = torch.zeros(max_len, d_model)
        # 创建一个从0到max_len-1的张量，用于表示每个位置的索引，并在第1维增加一个尺寸
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        # 计算缩放因子，用于调整正弦波和余弦波的频率
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model))
        # 根据位置编码公式填充偶数列（正弦部分）
        pe[:, 0::2] = torch.sin(position * div_term)
        # 填充奇数列（余弦部分）
        pe[:, 1::2] = torch.cos(position * div_term)
        # 为位置编码增加一个批次维度，以便于后续与输入数据相加
        pe = pe.unsqueeze(0)
        # 使用register_buffer将位置编码缓存为模型的一部分，不会被梯度更新
        self.register_buffer('pe', pe)

    def forward(self, x):
        # 在前向传播中，将位置编码直接加到输入x上，注意重复或裁剪以匹配x的序列长度
        x = x + self.pe[:, :x.size(1)]
        return x

# 示例用法
d_model = 512  # 嵌入维度
max_len = 100  # 序列最大长度

# 位置编码实例化
pos_encoder = PositionalEncoding(d_model, max_len)

# 创建一个示例输入序列，形状为(批量大小, 序列长度, 嵌入维度)
input_sequence = torch.randn(5, max_len, d_model)

# 应用位置编码到输入序列上
input_sequence = pos_encoder(input_sequence)

print("输入序列的位置编码:")
print(input_sequence.shape)



输入序列的位置编码:
torch.Size([5, 100, 512])


In [3]:
# 多头注意力的代码实现

# 定义一个多头注意力（Multi-Head Attention）类，继承自nn.Module
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        # 调用父类初始化方法
        super(MultiHeadAttention, self).__init__()
        # 设置头的数量
        self.num_heads = num_heads
        # 设置模型的嵌入维度
        self.d_model = d_model
        # 断言检查d_model是否能被num_heads整除，确保可以平均分配给每个头
        assert d_model % num_heads == 0
        # 计算每个头的维度
        self.depth = d_model // num_heads
        
        # 定义线性变换层，用于查询、键、值的转换
        self.query_linear = nn.Linear(d_model, d_model)
        self.key_linear = nn.Linear(d_model, d_model)
        self.value_linear = nn.Linear(d_model, d_model)
        
        # 定义一个输出的线性变换层，用于最终的注意力输出
        self.output_linear = nn.Linear(d_model, d_model)
    
    def split_heads(self, x):
        # 接收一个张量并将其按头分割
        batch_size, seq_length, d_model = x.size()
        # 调整张量形状以便于分头，然后转置以适应多头注意力的计算
        return x.view(batch_size, seq_length, self.num_heads, self.depth).transpose(1, 2)
    
    def forward(self, query, key, value, mask=None):
        # 对查询、键、值应用线性变换
        query = self.query_linear(query)
        key = self.key_linear(key)
        value = self.value_linear(value)
        
        # 将变换后的查询、键、值张量按头部进行分割
        query = self.split_heads(query)
        key = self.split_heads(key)
        value = self.split_heads(value)
        
        # 计算缩放点积注意力分数，除以深度的平方根是为了缩放，防止softmax函数因太大或太小的值而饱和
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.depth)
        
        # 如果提供了掩码，用负无穷大填充那些需要被屏蔽的位置，以便在softmax中忽略这些位置
        if mask is not None:
            scores += scores.masked_fill(mask == 0, -1e9)
        
        # 应用softmax函数得到注意力权重
        attention_weights = torch.softmax(scores, dim=-1)
        
        # 使用注意力权重加权求和值得到注意力输出
        attention_output = torch.matmul(attention_weights, value)
        
        # 将多头注意力的结果合并回原始形状
        batch_size, _, seq_length, d_k = attention_output.size()
        attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
        
        # 对最终的注意力输出做线性变换
        attention_output = self.output_linear(attention_output)
        
        # 返回注意力输出结果
        return attention_output

# 示例代码：使用定义的多头注意力类
# 设置模型的嵌入维度、序列最大长度、头的数量以及潜在的前馈网络维度（尽管这里未直接使用）
d_model = 512
max_len = 100
num_heads = 8
d_ff = 2048

# 创建一个多头注意力实例
multihead_attn = MultiHeadAttention(d_model, num_heads)

# 随机生成一个示例输入序列张量
input_sequence = torch.randn(5, max_len, d_model)

# 应用多头注意力于输入序列自身（查询=键=值）
attention_output= multihead_attn(input_sequence, input_sequence, input_sequence)

# 打印输出注意力结果的形状
print("attention_output shape:", attention_output.shape)



attention_output shape: torch.Size([5, 100, 512])


In [4]:
# 前馈网络的代码实现
# 定义一个前馈网络（FeedForward Network）模块，作为Transformer模型的一部分，用于在多头注意力之后增加网络的非线性表达能力。
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        # 继承nn.Module的初始化方法，并定义前馈网络的结构
        super(FeedForward, self).__init__()
        # 第一层线性变换，将输入维度d_model映射到一个更大的维度d_ff，有助于模型学习更复杂的表示
        self.linear1 = nn.Linear(d_model, d_ff)
        # 第二层线性变换，将中间维度d_ff映射回原始的输入维度d_model，保持输出与输入维度一致
        self.linear2 = nn.Linear(d_ff, d_model)
        # 使用ReLU激活函数，为网络引入非线性
        self.relu = nn.ReLU()

    def forward(self, x):
        # 线性变换1后接ReLU激活，增加网络的非线性表达能力
        x = self.relu(self.linear1(x))
        
        # 第二次线性变换，将激活后的特征映射回d_model维度
        x = self.linear2(x)
        
        # 返回前馈网络的输出
        return x

# 定义模型使用的超参数
d_model = 512         # 模型的嵌入维度
max_len = 100          # 序列的最大长度
num_heads = 8          # 多头注意力中头的数量
d_ff = 2048           # 前馈网络中间层的维度

# 实例化多头注意力模块
multihead_attn = MultiHeadAttention(d_model, num_heads)

# 实例化前馈网络模块
ff_network = FeedForward(d_model, d_ff)

# 生成一个随机的输入序列，用于演示
input_sequence = torch.randn(5, max_len, d_model) # 形状为(批次大小, 序列长度, 嵌入维度)

# 应用多头注意力机制到输入序列上
attention_output = multihead_attn(input_sequence, input_sequence, input_sequence)

# 将多头注意力的输出传入前馈网络
output_ff = ff_network(attention_output)

# 打印输入序列和前馈网络输出的形状，验证数据流正确无误
print('input_sequence shape:', input_sequence.shape)
print("output_ff shape:", output_ff.shape)



input_sequence shape: torch.Size([5, 100, 512])
output_ff shape: torch.Size([5, 100, 512])


In [5]:
# 编码器的代码实现
# 导入必要的库
import torch
import torch.nn as nn

# 定义多头注意力模块（Multi-Head Attention）和前馈网络模块（FeedForward），
# 这里假设它们已被正确定义在代码的其他部分，因为它们的具体实现未给出。
# 注意：实际使用中，你需要实现这些类。

# 位置编码类，为输入序列中的每个位置生成一个固定的向量表示
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=100):
        super(PositionalEncoding, self).__init__()
        # 初始化位置编码矩阵
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        # 将位置编码加到输入序列上
        x = x + self.pe[:x.size(0), :]
        return x

# 编码器层，结合自注意力机制和前馈网络，同时包含Layer Normalization和Dropout
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        # 初始化自注意力层、前馈网络层、归一化层以及Dropout层
        self.self_attention = MultiHeadAttention(d_model, num_heads)  # 多头注意力层
        self.feed_forward = FeedForward(d_model, d_ff)              # 前馈网络层
        self.norm1 = nn.LayerNorm(d_model)                          # 第一层归一化
        self.norm2 = nn.LayerNorm(d_model)                          # 第二层归一化
        self.dropout = nn.Dropout(dropout)                          # Dropout层
        self.positional_encoding = PositionalEncoding(d_model)       # 位置编码
        
    def forward(self, x, mask):
        # 应用位置编码
        x = self.positional_encoding(x)
        # 自注意力层处理并添加残差连接
        attention_output = self.self_attention(x, x, x, mask)
        attention_output = self.dropout(attention_output)
        x = x + attention_output
        x = self.norm1(x)
        # 前馈网络处理并添加残差连接
        feed_forward_output = self.feed_forward(x)
        feed_forward_output = self.dropout(feed_forward_output)
        x = x + feed_forward_output
        x = self.norm2(x)
        return x 

# 设置模型超参数
d_model = 512         # 模型维度
max_len = 100         # 序列最大长度
num_heads = 8         # 多头注意力中的头数
d_ff = 2048           # 前馈网络的隐藏层尺寸

# 实例化编码器层和位置编码模块
encoder_layer = EncoderLayer(d_model, num_heads, d_ff, 0.1)
pos_encoder = PositionalEncoding(d_model)

# 创建一个随机的输入序列张量，模拟数据输入
input_sequence = torch.randn(5, max_len, d_model)  # 形状为(批量大小, 序列长度, 模型维度)

# 使用位置编码增强输入序列
input_with_pe = pos_encoder(input_sequence)

# 通过编码器层处理带位置编码的输入序列，此处未使用遮罩（mask）
encoder_output = encoder_layer(input_with_pe, None)

# 打印输出的形状
print("encoder output shape：", encoder_output.shape)  # 预期输出为 torch.Size([5, 100, 512])



encoder output shape： torch.Size([5, 100, 512])


In [6]:
# 解码器的代码实现
import torch
import torch.nn as nn


class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        """
        初始化解码器层所需的所有子层和归一化层。
        
        :param d_model: 模型的维度
        :param num_heads: 多头注意力中的头数
        :param d_ff: 前馈网络中间层的维度
        :param dropout: Dropout比率
        """
        super(DecoderLayer, self).__init__()
        self.masked_self_attention = MultiHeadAttention(d_model, num_heads)  # 掩码自注意力层
        self.enc_dec_attention = MultiHeadAttention(d_model, num_heads)      # 编码器-解码器注意力层
        self.feed_forward = FeedForward(d_model, d_ff)                      # 前馈网络层
        self.norm1 = nn.LayerNorm(d_model)                                  # 第一层归一化
        self.norm2 = nn.LayerNorm(d_model)                                  # 第二层归一化
        self.norm3 = nn.LayerNorm(d_model)                                  # 第三层归一化
        self.dropout = nn.Dropout(dropout)                                  # Dropout层

    
    def forward(self, x, encoder_output, src_mask, tgt_mask):
        """
        解码器层的前向传播方法，执行自注意力、编码器-解码器注意力和前馈网络操作。
        
        :param x: 解码器的输入
        :param encoder_output: 编码器的输出
        :param src_mask: 源序列的遮罩
        :param tgt_mask: 目标序列的遮罩
        :return: 经过解码器层处理后的输出
        """       
        
        # 掩码自注意力层
        self_attention_output = self.masked_self_attention(x, x, x, tgt_mask)
        self_attention_output = self.dropout(self_attention_output)
        x = x + self_attention_output 
        x = self.norm1(x)
        
        # 编码器-解码器注意力层
        enc_dec_attention_output = self.enc_dec_attention(x, encoder_output,
                                                          encoder_output, src_mask)
        enc_dec_attention_output = self.dropout(enc_dec_attention_output)
        x = x + enc_dec_attention_output
        x = self.norm2(x)
        
        # 前馈层
        feed_forward_output = self.feed_forward(x)
        feed_forward_output = self.dropout(feed_forward_output)
        x = x + feed_forward_output
        x = self.norm3(x)
        
        return x
    
    
# 定义DecoderLayer的参数
max_len = 100
d_model = 512
num_heads = 8
d_ff = 2048
dropout = 0.1
batch_size = 1
    

encoder_layer = EncoderLayer(d_model, num_heads, d_ff, 0.1)
pos_encoder = PositionalEncoding(d_model)
# 创建一个随机的输入序列张量，模拟数据输入
input_sequence = torch.randn(5, max_len, d_model)  # 形状为(批量大小, 序列长度, 模型维度)
# 使用位置编码增强输入序列
input_with_pe = pos_encoder(input_sequence)
# 通过编码器层处理带位置编码的输入序列，此处未使用遮罩（mask）
encoder_output = encoder_layer(input_with_pe, None)


# 定义DecoderLayer实例
decoder_layer = DecoderLayer(d_model, num_heads, d_ff, dropout)      

''' 
1. torch.rand(batch_size, max_len, max_len) 生成一个形状为(batch_size, max_len, max_len)的张量，
其中每个元素都是从0到1之间的随机数。
2. > 0.5 是一个布尔操作，将张量中的每个元素与0.5比较，如果元素大于0.5，
则结果为True（通常在PyTorch中表示为1），否则为False（通常表示为0）。
因此，这行代码实际上生成了一个二值掩码，其中大部分情况下会是随机分布的True和False
，但实际上在Transformer的上下文中，src_mask常用于表示源序列中的padding部分或者
在某些情境下对序列进行特殊处理，此处的生成方式可能不直接对应标准实践，
标准情况下src_mask应依据实际的padding情况精确构建。
'''
src_mask = torch.rand(batch_size, max_len, max_len) > 0.5
''' 
1. torch.ones(max_len, max_len) 创建一个形状为(max_len, max_len)的全1张量，
这里的max_len和max_len应当理解为同一概念，表示序列长度。
2. torch.tril(...) 是一个三角函数，它返回下三角矩阵的下三角部分，其中下三角部分的所有元素为1，
其余为0。在这个上下文中，它生成了一个下三角掩码，
用于确保自注意力（在解码器的自注意力层）中只关注当前位置及之前的位置，符合自回归原则。
3. unsqueeze(0) 在张量的第一个维度（批维度）添加一个维度，使得掩码可以广播到任何批次大小，
最终形状变为(1, max_len, max_len)。
4. == 0 这个操作将下三角矩阵转换为掩码形式，其中下三角为False（0，表示可访问），
上三角为True（1，经过后续处理后变为0，表示不可访问），以确保自注意力不会看到未来的信息。
'''
tgt_mask = torch.tril(torch.ones(max_len, max_len)).unsqueeze(0) == 0

#将输入张量传递到DecoderLayer
output = decoder_layer(input_sequence, encoder_output, src_mask, tgt_mask)  

print("Output shape:", output.shape)  # torch.Size([5, 100, 512])    
 


Output shape: torch.Size([5, 100, 512])


In [7]:
# Transformer的实现

import torch.nn as nn

class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers,
                 d_ff, max_len, dropout):
        super(Transformer, self).__init__()
        
        # 定义编码器和解码器的词嵌入层
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        
        # 定义位置编码层
        self.positional_encoding = PositionalEncoding(d_model, max_len)
        
        # 定义编码器和解码器的多层堆叠
        self.encoder_layers = nn.ModuleList(
            [EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]
        )
        self.decoder_layers = nn.ModuleList(
            [DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]
        )
        
        # 定义线性层
        self.linear = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)
        
    # 生成掩码
    def generate_mask(self, src, tgt):
        
        ''' 
        通过比较src（源序列）与0，创建一个布尔掩码，其中1表示有效 token（非填充值，假设0为填充标记），
        0表示填充部分。然后，使用.unsqueeze(1)和.unsqueeze(2)分别在第1维和第2维增加一维，
        以便该掩码能够正确广播并与后续操作中的注意力权重矩阵相乘。
        这主要用于排除填充部分在自注意力计算中的影响。
        '''
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(2)
        
        seq_length = tgt.size(1)
        
        ''' 
        这里计算了一个“nopeek”掩码，用于阻止解码器中的一个时间步访问之后的时间步，
        这是Transformer解码器中的典型做法，以维持自回归特性。
        它通过创建一个上三角矩阵（torch.triu）并取其补（减去自身并取1减去结果）来实现，
        然后转换为布尔类型。seq_length是从目标序列tgt中获取的实际序列长度。
        '''
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
        ''' 
        将之前初始化的目标序列掩码（排除了padding）与刚计算的“nopeek”掩码进行按位与操作（&），
        这样得到的tgt_mask既排除了padding，
        又阻止了每个位置看到未来的信息，符合Transformer解码器的需要。
        '''
        tgt_mask = tgt_mask & nopeak_mask
        
        
        ''' 
        ，函数返回两个掩码：一个是源序列掩码，用于在编码器中屏蔽padding；
        另一个是经过调整的目标序列掩码，用于在解码器中同时屏蔽padding和未来信息。
        '''
        return src_mask, tgt_mask
    
    # 前向传播
    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)
        
        # 编码器输入的词嵌入和位置编码
        encoder_embedding = self.encoder_embedding(src)
        en_positional_encoding = self.positional_encoding(encoder_embedding)
        src_embedded = self.dropout(en_positional_encoding)
        
        # 解码器输入的词嵌入和位置编码
        decoder_embedding = self.decoder_embedding(tgt)
        de_positional_encoding = self.positional_encoding(decoder_embedding)
        tgt_embedded = self.dropout(de_positional_encoding)
        
        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)
        
        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)
            
        output = self.linear(dec_output)
        return output
        

# 示例用法
src_vocab_size = 5000
tgt_vocab_size = 5000
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048 
max_len = 100
dropout = 0.1 

transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, 
                          d_ff, max_len, dropout)

# 生成随机示例数据
''' 
这行代码的作用是生成一个形状为(5, max_len)的张量，其中的每个元素都是在1到src_vocab_size之间（不包括src_vocab_size）
的随机整数。这个张量可以用来模拟一个包含5个样本、每个样本长度为max_len的源序列数据，
适用于训练如机器翻译、文本生成等序列到序列（sequence-to-sequence）的学习任务中。
每个元素可以被视作词汇表中一个单词的索引，用于后续的词嵌入等处理。
'''
src_data = torch.randint(1, src_vocab_size, (5, max_len))  # (batch_size, seq_length)
tgt_data = torch.randint(1, tgt_vocab_size, (5, max_len))  # (batch_size, seq_length)

transformer(src_data, tgt_data[:, :-1]).shape  # torch.Size([5, 99, 5000])



torch.Size([5, 99, 5000])

In [8]:
# Transformer 模型的训练和评估流程实现

# 导入必要的库
import torch
import torch.nn as nn
import torch.optim as optim

# 初始化损失函数 CrossEntropyLoss，设置忽略索引为0，通常用于padding部分的mask处理
criterion = nn.CrossEntropyLoss(ignore_index=0)

# 初始化优化器 Adam，用于更新Transformer模型参数
# 学习率为0.0001，betas=(0.9, 0.98)为动量项系数，eps为防止除零异常的最小分母值
optimizer = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

# 开始训练循环
transformer.train()  # 设置模型为训练模式

# 进行多个epoch的训练
for epoch in range(10):  # 假设训练10轮
    # 在每个批次开始前清零梯度，避免梯度累积
    optimizer.zero_grad()
    
    # 通过Transformer模型得到输出，src_data为源序列，tgt_data[:, :-1]为目标序列去掉最后一个词作为输入
    output = transformer(src_data, tgt_data[:, :-1])
    
    # 计算损失，输出和目标序列都调整为一维向量形式以匹配CrossEntropyLoss要求
    ''' 
    output.contiguous().view(-1, tgt_vocab_size):
        .contiguous()确保张量的内存是连续的。在某些操作后（如transpose），张量可能在内存中变得非连续，
        这会影响之后的.view()操作。虽然在现代PyTorch版本中这一步有时不是必需的，但在涉及改变张量形状时
        保持习惯性地使用可以避免潜在问题。
        .view(-1, tgt_vocab_size) 改变output的形状。-1是一个占位符，表示该维度的大小由其他维度自动推断得出，
        以保证整体元素数量不变。tgt_vocab_size是你目标词汇表的大小。
        这意味着将输出张量重塑为形状(batch_size * sequence_length, tgt_vocab_size)，
        其中每一行对应一个单词的预测概率分布。
    '''
    loss = criterion(output.contiguous().view(-1, tgt_vocab_size), tgt_data[:, :-1].contiguous().view(-1))
    
    # 反向传播计算梯度
    loss.backward()
    
    # 更新模型参数
    optimizer.step()
    
    # 打印当前epoch的损失信息
    print(f"{epoch + 1} 轮：损失= {loss.item():.4f}")

# 准备虚拟数据用于演示，模拟实际的输入输出数据
src_data= torch.randint(1, src_vocab_size, (5, max_len))  # 源序列数据
tgt_data= torch.randint(1, tgt_vocab_size, (5, max_len))  # 目标序列数据

# 开始评估循环，设置模型为评估模式以关闭dropout等训练时特有的操作
transformer.eval()

# 使用torch.no_grad()上下文管理器，避免在此阶段计算和存储梯度，节省内存并加速评估过程
with torch.no_grad():
    # 对虚拟数据进行前向传播得到预测输出
    output = transformer(src_data, tgt_data[:, :-1])
    
    # 计算评估阶段的损失，注意这里tgt_data切片变化，与训练阶段对应未来词预测
    ''' 
    切片操作[:, 1:]意味着取所有行（样本），但是从每个序列的第二个元素到最后一个元素，这是因为序列到序列任务中，
    模型在训练时通常需要预测下一个词，所以标签序列相比输入序列少一个元素（第一个词不参与预测）。
    '''
    loss = criterion(output.contiguous().view(-1, tgt_vocab_size), tgt_data[:, 1:].contiguous().view(-1))
    
    # 输出评估损失
    print(f"\n虚拟数据的评估损失= {loss.item():.4f}")



1 轮：损失= 8.7046
2 轮：损失= 8.2527
3 轮：损失= 8.0162
4 轮：损失= 7.8507
5 轮：损失= 7.6828
6 轮：损失= 7.5305
7 轮：损失= 7.3640
8 轮：损失= 7.2285
9 轮：损失= 7.0601
10 轮：损失= 6.8446

虚拟数据的评估损失= 8.7007


In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import math
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

# 定义位置编码类
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=100):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return x

# 定义多头注意力类
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        assert d_model % num_heads == 0
        self.depth = d_model // num_heads
        self.query_linear = nn.Linear(d_model, d_model)
        self.key_linear = nn.Linear(d_model, d_model)
        self.value_linear = nn.Linear(d_model, d_model)
        self.output_linear = nn.Linear(d_model, d_model)
    
    def split_heads(self, x):
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.depth).transpose(1, 2)
    
    def forward(self, query, key, value, mask=None):
        query = self.query_linear(query)
        key = self.key_linear(key)
        value = self.value_linear(value)
        query = self.split_heads(query)
        key = self.split_heads(key)
        value = self.split_heads(value)
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.depth)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        attention_weights = torch.softmax(scores, dim=-1)
        attention_output = torch.matmul(attention_weights, value)
        batch_size, _, seq_length, d_k = attention_output.size()
        attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
        attention_output = self.output_linear(attention_output)
        return attention_output

# 定义前馈网络类
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(FeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.linear1(x))
        x = self.linear2(x)
        return x

# 定义编码器层类
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        self.positional_encoding = PositionalEncoding(d_model)
        
    def forward(self, x, mask):
        x = self.positional_encoding(x)
        attention_output = self.self_attention(x, x, x, mask)
        attention_output = self.dropout(attention_output)
        x = x + attention_output
        x = self.norm1(x)
        feed_forward_output = self.feed_forward(x)
        feed_forward_output = self.dropout(feed_forward_output)
        x = x + feed_forward_output
        x = self.norm2(x)
        return x 

# 定义解码器层类
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.masked_self_attention = MultiHeadAttention(d_model, num_heads)
        self.enc_dec_attention = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        self_attention_output = self.masked_self_attention(x, x, x, tgt_mask)
        self_attention_output = self.dropout(self_attention_output)
        x = x + self_attention_output 
        x = self.norm1(x)
        enc_dec_attention_output = self.enc_dec_attention(x, encoder_output,
                                                          encoder_output, src_mask)
        enc_dec_attention_output = self.dropout(enc_dec_attention_output)
        x = x + enc_dec_attention_output
        x = self.norm2(x)
        feed_forward_output = self.feed_forward(x)
        feed_forward_output = self.dropout(feed_forward_output)
        x = x + feed_forward_output
        x = self.norm3(x)
        return x

# 定义Transformer类
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers,
                 d_ff, max_len, dropout):
        super(Transformer, self).__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_len)
        self.encoder_layers = nn.ModuleList(
            [EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]
        )
        self.decoder_layers = nn.ModuleList(
            [DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]
        )
        self.linear = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)
        
    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(2)
        seq_length = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask
    
    def encode(self, src, src_mask):
        src = self.encoder_embedding(src)
        src = self.positional_encoding(src)
        for layer in self.encoder_layers:
            src = layer(src, src_mask)
        return src
    
    def decode(self, tgt, memory, src_mask, tgt_mask):
        tgt = self.decoder_embedding(tgt)
        tgt = self.positional_encoding(tgt)
        for layer in self.decoder_layers:
            tgt = layer(tgt, memory, src_mask, tgt_mask)
        return tgt
    
    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)
        memory = self.encode(src, src_mask)
        output = self.decode(tgt, memory, src_mask, tgt_mask)
        output = self.linear(output)
        return output

# 定义模型参数
src_vocab_size = 10000
tgt_vocab_size = 10000
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
max_len = 100
dropout = 0.1

# 创建Transformer模型实例
model = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers,
                    d_ff, max_len, dropout)

# 生成随机的输入数据
src = torch.randint(0, src_vocab_size, (32, 10))  # 模拟源序列
tgt = torch.randint(0, tgt_vocab_size, (32, 10))  # 模拟目标序列

# 前向传播
output = model(src, tgt)

# 示例评估翻译质量的函数
def evaluate_bleu(reference, candidate):
    """
    评估BLEU分数的辅助函数
    :param reference: 参考翻译
    :param candidate: 模型生成的翻译
    :return: BLEU分数
    """
    smooth = SmoothingFunction().method4
    return sentence_bleu([reference], candidate, smoothing_function=smooth)
reference = ["this", "is", "a", "test"]
candidate = ["this", "is", "a","a","test"]



# 计算BLEU分数
bleu_score = evaluate_bleu(reference, candidate)
print(f"BLEU Score: {bleu_score:.4f}")




BLEU Score: 0.3562
