## transformer模型结构
![image.png](../add_pic/transformer结构.png)

In [122]:
import math
import torch
import torch.nn as nn
from labml_helpers.module import Module
from labml_nn.utils import clone_module_list
from typing import Optional, List
from torch.utils.data import DataLoader, TensorDataset
from torch import optim
import torch.nn.functional as F

In [123]:
class FeedForward(Module):
    def __init__(self, d_model: int, d_ff: int,
                 dropout: float = 0.1,
                 activation=nn.ReLU(),
                 is_gated: bool = False,
                 bias1: bool = True,
                 bias2: bool = True,
                 bias_gate: bool = True):
        super().__init__()
        # 初始化第一层线性变换，输入维度为 d_model，输出维度为 d_ff
        self.layer1 = nn.Linear(d_model, d_ff, bias=bias1)
        # 初始化第二层线性变换，输入维度为 d_ff，输出维度为 d_model
        self.layer2 = nn.Linear(d_ff, d_model, bias=bias2)
        # 初始化 dropout 层，用于在训练过程中随机关闭部分神经元
        self.dropout = nn.Dropout(dropout)
        # 激活函数，用于引入非线性
        self.activation = activation
        # 是否启用门控机制
        self.is_gated = is_gated
        if is_gated:
            # 如果启用了门控，初始化另一层线性变换，用于门控
            self.linear_v = nn.Linear(d_model, d_ff, bias=bias_gate)

    def forward(self, x: torch.Tensor):
        # 应用第一层线性变换和激活函数
        g = self.activation(self.layer1(x))

        # 如果启用了门控机制
        if self.is_gated:
            # 应用门控操作
            x = g * self.linear_v(x)
        else:
            x = g

        # 应用 dropout
        x = self.dropout(x)
        # 应用第二层线性变换
        return self.layer2(x)


![image.png](../add_pic/transformer多头注意力机制1.png)
![image.png](../add_pic/transformer多头注意力机制2.png)

In [124]:
class PrepareForMultiHeadAttention(nn.Module):
    def __init__(self, d_model: int, heads: int, d_k: int, bias: bool):
        super().__init__()
        # 初始化线性变换层，用于将输入转换为多头注意力所需的形状
        self.linear = nn.Linear(d_model, heads * d_k, bias=bias)
        # 注意力头的数量
        self.heads = heads
        # 每个头中向量的维度
        self.d_k = d_k

    def forward(self, x: torch.Tensor):
        # 获取输入的初始形状，用于之后的变形操作
        head_shape = x.shape[:-1]

        # 应用线性变换
        x = self.linear(x)

        # 将最后一个维度分割成多个头，并为每个头分配 d_k 维度
        x = x.view(*head_shape, self.heads, self.d_k)
        # 返回的形状是 [seq_len, batch_size, heads, d_k] 或 [batch_size, heads, d_k]
        return x


In [125]:
class MultiHeadAttention(nn.Module):
    def __init__(self, heads: int, d_model: int, dropout_prob: float = 0.1, bias: bool = True):
        super().__init__()
        # 计算每个注意力头的维度
        self.d_k = d_model // heads
        # 注意力头的数量
        self.heads = heads

        # 分别为 query, key 和 value 初始化线性层
        self.query = PrepareForMultiHeadAttention(d_model, heads, self.d_k, bias=bias)
        self.key = PrepareForMultiHeadAttention(d_model, heads, self.d_k, bias=bias)
        self.value = PrepareForMultiHeadAttention(d_model, heads, self.d_k, bias=True)

        # 初始化 softmax 层，用于注意力权重的计算
        self.softmax = nn.Softmax(dim=1)
        # 输出层，将多头注意力的输出合并并作为一个张量
        self.output = nn.Linear(d_model, d_model)
        # Dropout 层
        self.dropout = nn.Dropout(dropout_prob)
        # 注意力计算的缩放因子
        self.scale = 1 / math.sqrt(self.d_k)
        # 存储注意力权重，用于调试或可视化
        self.attn = None

    def get_scores(self, query: torch.Tensor, key: torch.Tensor):
        # 计算 query 和 key 的点积，用于注意力权重
        return torch.einsum('ibhd,jbhd->ijbh', query, key)

    def prepare_mask(self, mask: torch.Tensor, query_shape: List[int], key_shape: List[int]):
        """
        调整 mask 为 (seq_len_q, seq_len_k, batch, 1)，
        其中 query_shape 为 (seq_len_q, batch, d_model)。
        """
        # mask 已经是 3 维
        if mask.dim() == 3:
            # 如果 mask 的第一维等于 batch，则说明输入 mask 原本形状是 (batch, seq_len, seq_len)
            if mask.size(0) == query_shape[1]:
                # 转换为 (seq_len, seq_len, batch)
                mask = mask.permute(1, 2, 0)
            # 如果 mask 的第二维为 1（例如 src_mask 原始形状为 (seq_len, 1, batch)），则扩展该维度
            elif mask.size(1) == 1:
                mask = mask.expand(-1, query_shape[0], -1)
            # 否则，认为 mask 已经是 (seq_len, seq_len, batch)（例如 tgt_mask 经过转置后）
            mask = mask.unsqueeze(-1)  # 变为 (seq_len, seq_len, batch, 1)
        elif mask.dim() == 2:
            mask = mask.unsqueeze(0).unsqueeze(-1)  # (1, seq_len, seq_len, 1)
            mask = mask.expand(query_shape[0], -1, -1, -1)
        else:
            raise ValueError("Unsupported mask dimension: expected 2 or 3, got {}".format(mask.dim()))
        return mask


    def forward(self, *,
                query: torch.Tensor,
                key: torch.Tensor,
                value: torch.Tensor,
                mask: Optional[torch.Tensor] = None):
        # 获取序列长度和批次大小
        seq_len, batch_size, _ = query.shape

        # 如果提供掩码，则进行调整
        if mask is not None:
            mask = self.prepare_mask(mask, query.shape, key.shape)

        # 准备 query, key 和 value
        query = self.query(query)
        key = self.key(key)
        value = self.value(value)

        # 计算注意力得分
        scores = self.get_scores(query, key)
        # 应用缩放因子
        scores *= self.scale

        # 如果有掩码，应用掩码
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))

        # 应用 softmax 得到注意力权重
        attn = self.softmax(scores)
        # 应用 dropout
        attn = self.dropout(attn)
        # 根据注意力权重组合 value
        x = torch.einsum("ijbh,jbhd->ibhd", attn, value)

        # 存储注意力权重
        self.attn = attn.detach()

        # 合并多头注意力的输出
        x = x.reshape(seq_len, batch_size, -1)
        # 应用输出层
        return self.output(x)


![image.png](../add_pic/transformer位置编码和嵌入.png)

In [126]:
def get_positional_encoding(d_model: int, max_len: int = 5000):
    # 初始化一个全零的位置信息矩阵，形状为 [max_len, d_model]
    encodings = torch.zeros(max_len, d_model)
    # 创建一个位置索引，形状为 [max_len, 1]
    position = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
    # 创建一个序列，用于在正弦和余弦函数中的分母，形状为 [d_model/2]
    two_i = torch.arange(0, d_model, 2, dtype=torch.float32)
    # 计算分母项
    div_term = torch.exp(two_i * -(math.log(10000.0) / d_model))

    # 填充位置信息的偶数部分 (2i) 使用正弦函数
    encodings[:, 0::2] = torch.sin(position * div_term)
    # 填充位置信息的奇数部分 (2i+1) 使用余弦函数
    encodings[:, 1::2] = torch.cos(position * div_term)

    # 增加一个批次维度，并禁止对位置编码本身计算梯度
    encodings = encodings.unsqueeze(0).requires_grad_(False)
    return encodings


In [127]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, dropout_prob: float, max_len: int = 5000):
        super().__init__()
        # 初始化 dropout 层
        self.dropout = nn.Dropout(dropout_prob)
        # 创建位置信息并注册为 buffer（不作为可训练参数）
        self.register_buffer('positional_encodings',
                             get_positional_encoding(d_model, max_len),
                             persistent=False)

    def forward(self, x: torch.Tensor):
        # 从 buffer 中取出与输入长度相同的位置信息
        pe = self.positional_encodings[:x.shape[0]].detach().requires_grad_(False)
        # 将位置编码添加到输入中
        x = x + pe
        # 应用 dropout
        x = self.dropout(x)
        return x


In [128]:
class EmbeddingsWithPositionalEncoding(nn.Module):
    def __init__(self, d_model: int, n_vocab: int, max_len: int = 5000):
        super().__init__()
        self.linear = nn.Embedding(n_vocab, d_model)
        self.d_model = d_model
        # 注册位置编码，形状为 (1, max_len, d_model)
        self.register_buffer('positional_encodings',
                             get_positional_encoding(d_model, max_len),
                             persistent=False)

    def forward(self, x: torch.Tensor):
        # 假设 x 的形状是 (batch_size, seq_len)
        # 取出对应序列长度的部分，形状变为 (1, seq_len, d_model)
        pe = self.positional_encodings[:, :x.size(1)]
        # 进行嵌入和缩放，再加上位置编码（利用广播机制，pe 会自动扩展到 (batch_size, seq_len, d_model)）
        return self.linear(x) * math.sqrt(self.d_model) + pe


In [129]:
class EmbeddingsWithLearnedPositionalEncoding(nn.Module):
    def __init__(self, d_model: int, n_vocab: int, max_len: int = 5000):
        super().__init__()
        # 创建一个嵌入层
        self.linear = nn.Embedding(n_vocab, d_model)
        # 存储模型维度
        self.d_model = d_model
        # 初始化一个可学习的位置信息
        self.positional_encodings = nn.Parameter(
            torch.zeros(max_len, 1, d_model),
            requires_grad=True
        )

    def forward(self, x: torch.Tensor):
        # 获取可学习的位置信息
        pe = self.positional_encodings[:x.shape[0]]
        # 将嵌入向量以 sqrt(d_model) 进行缩放，并加上位置编码
        return self.linear(x) * math.sqrt(self.d_model) + pe


![image.png](../add_pic/Transformer层.png)

In [130]:
class TransformerLayer(nn.Module):
    def __init__(self, *,
                 d_model: int,
                 self_attn: MultiHeadAttention,
                 src_attn: MultiHeadAttention = None,
                 feed_forward: FeedForward,
                 dropout_prob: float):
        super().__init__()
        # 设置模型的维度
        self.size = d_model
        # 自注意力机制
        self.self_attn = self_attn
        # 源注意力机制（解码器用，用于关注编码器输出），可为空
        self.src_attn = src_attn
        # 前馈网络
        self.feed_forward = feed_forward
        # Dropout 层
        self.dropout = nn.Dropout(dropout_prob)

        # 对自注意力的输出进行层归一化
        self.norm_self_attn = nn.LayerNorm([d_model])
        # 如果存在源注意力，则对其输出进行层归一化
        if self.src_attn is not None:
            self.norm_src_attn = nn.LayerNorm([d_model])
        # 对前馈网络的输出进行层归一化
        self.norm_ff = nn.LayerNorm([d_model])

        # 用于保存前馈网络的输入（若需调试或其他用途）
        self.is_save_ff_input = False

    def forward(self, *,
                x: torch.Tensor,
                mask: torch.Tensor,
                src: torch.Tensor = None,
                src_mask: torch.Tensor = None):
        # 对输入进行层归一化后再进行自注意力计算
        z = self.norm_self_attn(x)
        self_attn = self.self_attn(query=z, key=z, value=z, mask=mask)
        # 将自注意力的输出加回到原始输入上
        x = x + self.dropout(self_attn)

        # 如果提供了源序列 (src)，则进行源注意力计算
        if src is not None:
            z = self.norm_src_attn(x)
            attn_src = self.src_attn(query=z, key=src, value=src, mask=src_mask)
            x = x + self.dropout(attn_src)

        # 对注意力的输出进行层归一化后通过前馈网络
        z = self.norm_ff(x)
        ff = self.feed_forward(z)
        x = x + self.dropout(ff)
        return x


![image.png](../add_pic/transformer%20Encoder.png)

In [131]:
class Encoder(nn.Module):
    def __init__(self, layer: TransformerLayer, n_layers: int):
        super().__init__()
        # 复制多个 Transformer 层
        self.layers = clone_module_list(layer, n_layers)
        # 对最终的输出进行层归一化
        self.norm = nn.LayerNorm([layer.size])

    def forward(self, x: torch.Tensor, mask: torch.Tensor):
        # 依次通过每个 Transformer 层
        for layer in self.layers:
            x = layer(x=x, mask=mask)
        # 对最终的输出进行层归一化
        return self.norm(x)


![image.png](../add_pic/transformer%20Decoder.png)

In [132]:
class Decoder(nn.Module):
    def __init__(self, layer: TransformerLayer, n_layers: int):
        super().__init__()
        # 复制多个 Transformer 层
        self.layers = clone_module_list(layer, n_layers)
        # 对最终的输出进行层归一化
        self.norm = nn.LayerNorm([layer.size])

    def forward(self,
                x: torch.Tensor,
                memory: torch.Tensor,
                src_mask: torch.Tensor,
                tgt_mask: torch.Tensor):
        # 依次通过每个 Transformer 层
        for layer in self.layers:
            x = layer(x=x, mask=tgt_mask, src=memory, src_mask=src_mask)
        # 对最终的输出进行层归一化
        return self.norm(x)


In [133]:
class Generator(nn.Module):
    def __init__(self, n_vocab: int, d_model: int):
        super().__init__()
        # 初始化线性层，用于将解码器的输出投影到词汇表大小的空间
        self.projection = nn.Linear(d_model, n_vocab)

    def forward(self, x: torch.Tensor):
        # 应用线性层，将解码器的输出转换为词汇表上的分布
        return self.projection(x)


In [134]:
class EncoderDecoder(nn.Module):
    def __init__(self,
                 encoder: Encoder,
                 decoder: Decoder,
                 src_embed: nn.Module,
                 tgt_embed: nn.Module,
                 generator: nn.Module):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_embed = src_embed
        self.tgt_embed = tgt_embed
        self.generator = generator

        # 使用 Xavier 初始化参数
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)

    def forward(self,
                src: torch.Tensor,
                tgt: torch.Tensor,
                src_mask: torch.Tensor,
                tgt_mask: torch.Tensor):
        # 假设 src 和 tgt 的形状为 (batch_size, seq_len)
        # 转换为 (seq_len, batch_size)
        src = src.transpose(0, 1)
        tgt = tgt.transpose(0, 1)

        # 对 mask 进行相应转置
        src_mask = src_mask.transpose(0, 2)  # 从 (batch_size, 1, seq_len) 变为 (seq_len, 1, batch_size)
        tgt_mask = tgt_mask.transpose(0, 1).transpose(1, 2)  # 得到 (seq_len, seq_len, batch_size)

        enc = self.encode(src, src_mask)
        dec_output = self.decode(enc, src_mask, tgt, tgt_mask)
        # 调用 Generator 层将 dec_output 投影到词汇表大小的维度
        return self.generator(dec_output)


    def encode(self, src: torch.Tensor, src_mask: torch.Tensor):
        # 将源序列嵌入并传递给编码器
        return self.encoder(self.src_embed(src), src_mask)

    def decode(self,
               memory: torch.Tensor,
               src_mask: torch.Tensor,
               tgt: torch.Tensor,
               tgt_mask: torch.Tensor):
        # 将目标序列嵌入并传递给解码器，同时提供编码器的输出作为上下文
        return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)


In [135]:
# 设置参数
n_vocab = 100    # 假设词汇表大小为 100
d_model = 512    # 编码/解码维度
n_layers = 3     # Transformer层数
heads = 8        # 多头注意力头数
d_ff = 2048      # 前馈网络隐藏层维度
dropout = 0.1    # Dropout 概率

# 检查设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


Using device: cpu


In [136]:
def generate_random_data(seq_len, batch_size, n_vocab, dataset_size, device):
    # 生成随机整数序列并放到 GPU（如果可用）
    src = torch.randint(0, n_vocab, (dataset_size, seq_len), device=device)
    tgt = torch.randint(0, n_vocab, (dataset_size, seq_len), device=device)

    # 源序列掩码（形状 [batch_size, 1, seq_len]）
    src_mask = torch.ones(dataset_size, 1, seq_len, device=device)
    # 目标序列掩码（形状 [batch_size, seq_len, seq_len]），用于解码器防止看到未来位置
    tgt_mask = (torch.tril(torch.ones(seq_len, seq_len, device=device)) == 1)\
                .unsqueeze(0).repeat(dataset_size, 1, 1)

    return src, tgt, src_mask, tgt_mask

# 数据相关参数
seq_len = 10
batch_size = 32
dataset_size = 1000

# 生成训练数据和测试数据
src, tgt, src_mask, tgt_mask = generate_random_data(seq_len, batch_size, n_vocab, dataset_size, device)
test_src, test_tgt, test_src_mask, test_tgt_mask = generate_random_data(seq_len, batch_size, n_vocab, dataset_size, device)


In [137]:
# 创建 DataLoader
train_dataset = TensorDataset(src, tgt, src_mask, tgt_mask)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = TensorDataset(test_src, test_tgt, test_src_mask, test_tgt_mask)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

# Encoder 层只需要自注意力和前馈网络
encoder_self_attn = MultiHeadAttention(heads, d_model)
encoder_feed_forward = FeedForward(d_model, d_ff)
encoder_layer = TransformerLayer(d_model=d_model,
                                 self_attn=encoder_self_attn,
                                 feed_forward=encoder_feed_forward,
                                 dropout_prob=dropout)

# Decoder 层需要自注意力、cross attention 和前馈网络
decoder_self_attn = MultiHeadAttention(heads, d_model)
decoder_src_attn = MultiHeadAttention(heads, d_model)
decoder_feed_forward = FeedForward(d_model, d_ff)
decoder_layer = TransformerLayer(d_model=d_model,
                                 self_attn=decoder_self_attn,
                                 src_attn=decoder_src_attn,
                                 feed_forward=decoder_feed_forward,
                                 dropout_prob=dropout)


# 构建编码器和解码器
encoder = Encoder(encoder_layer, n_layers)
decoder = Decoder(decoder_layer, n_layers)

# 构建嵌入层和生成器
src_embed = EmbeddingsWithPositionalEncoding(d_model, n_vocab)
tgt_embed = EmbeddingsWithPositionalEncoding(d_model, n_vocab)
generator = Generator(n_vocab, d_model)

# 构建整个编码器-解码器模型
model = EncoderDecoder(encoder, decoder, src_embed, tgt_embed, generator)

# 指定设备并移动模型到该设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 构建优化器
optimizer = optim.Adam(model.parameters(), lr=0.0001)


In [None]:
epochs = 10
model.train()
for epoch in range(epochs):
    total_loss = 0
    for src, tgt, src_mask, tgt_mask in train_dataloader:
        # 数据已经在 GPU 上（若 device=cuda），无需再次 .to(device)
        optimizer.zero_grad()
        output = model(src, tgt, src_mask, tgt_mask)
        # 使用交叉熵损失，忽略索引 0（可作为 <pad>）
        loss = F.cross_entropy(output.view(-1, n_vocab), tgt.view(-1), ignore_index=0)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print(f"Epoch {epoch+1}, Loss: {total_loss/len(train_dataloader)}")

Epoch 1, Loss: 4.859693542122841
Epoch 2, Loss: 4.684530854225159
Epoch 3, Loss: 4.6604111939668655
Epoch 4, Loss: 4.657957315444946
Epoch 5, Loss: 4.65576434135437
Epoch 6, Loss: 4.643081501126289
Epoch 7, Loss: 4.643091395497322
Epoch 8, Loss: 4.640585109591484
Epoch 9, Loss: 4.6413600742816925
Epoch 10, Loss: 4.63554921746254


In [139]:
model.eval()
total_accuracy = 0
with torch.no_grad():
    for src, tgt, src_mask, tgt_mask in test_dataloader:
        output = model(src, tgt, src_mask, tgt_mask)
        # 将 output 的维度从 (seq_len, batch_size) 转换为 (batch_size, seq_len)
        output_max = output.argmax(dim=-1).transpose(0, 1)
        correct = (output_max == tgt).sum().item()
        total_accuracy += correct / (tgt.size(0) * tgt.size(1))

print(f"Accuracy: {total_accuracy/len(test_dataloader)}")

Accuracy: 0.004882812500000001
