# Install

In [31]:
# 安装 PyTorch
!pip install torch==2.2.1+cu121

[31mERROR: Could not find a version that satisfies the requirement torch==2.2.1+cu121 (from versions: 2.2.0, 2.2.1, 2.2.2, 2.3.0, 2.3.1, 2.4.0, 2.4.1, 2.5.0, 2.5.1, 2.6.0, 2.7.0, 2.7.1, 2.8.0)[0m[31m
[0m[31mERROR: No matching distribution found for torch==2.2.1+cu121[0m[31m
[0m

# Self-Attention

In [32]:
import torch
import torch.nn.functional as F

In [33]:
# input_seq = torch.tensor([[0.1, 0.2, 0.3], [0.4, 0.5, 0.6], [0.7, 0.8, 0.9]])
input_seq = torch.randn(3, 5)
print(input_seq.shape)

torch.Size([3, 5])


In [34]:
# 生成 Key、Query 和 Value 矩阵的随机权重
l, d_in = input_seq.shape
d_model = 8
weight_q = torch.randn(d_in, d_model)
weight_k = torch.randn(d_in, d_model)
weight_v = torch.randn(d_in, d_model)

query = torch.matmul(input_seq, weight_q)
key = torch.matmul(input_seq, weight_k)
value = torch.matmul(input_seq, weight_v)

# #### Scaled dot-product attention
# (batch_size, num_heads, query_len, key_len)
att_scores = torch.matmul(query, key.T) / d_model**0.5
att_scores = F.softmax(att_scores, dim=-1)

output = torch.matmul(att_scores, value)
print(output.shape)

torch.Size([3, 8])


# Define model

In [35]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

## Positional Embeddings

- Why `max_len`?
    - x.size(1) 是当前 batch 的实际序列长度。
    - 从 self.pe 中截取前 x.size(1) 部分即可。
    - 这样不用每次重新算，只需要切片即可。
- `torch.arange(0, d_model, 2)` 就是取出所有偶数维度索引

In [36]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=512):
        super(PositionalEncoding, self).__init__()

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # (max_len, 1)

        div_term = torch.exp(-torch.arange(0, d_model, 2).float() * torch.log(torch.tensor(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0) # (1, max_len, d_model)

        self.register_buffer('pe', pe)  # not a parameter, but part of the module

    def forward(self, x):
        # 为什么使用x.size(1)？因为PE可以无限长
        return x + self.pe[:, :x.size(1)]

In [37]:
# 示例用法
d_model = 512
max_len = 100
num_heads = 8

# 位置编码
pos_encoder = PositionalEncoding(d_model, max_len)

# 示例输入序列
input_sequence = torch.randn(5, max_len, d_model)

# 应用位置编码
input_sequence = pos_encoder(input_sequence)
print("输入序列的位置编码:")
print(input_sequence.shape)

输入序列的位置编码:
torch.Size([5, 100, 512])


## Self-attention: scaled dot product

1. 注意力 (Attention) 的计算方式
在 Transformer 里，**自注意力 (self-attention)** 的计算核心是：
$$
\text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}} + \text{mask}\right)V
$$

- `Q` (queries)：每个位置要去“提问”  
- `K` (keys)：每个位置的“特征”  
- `QK^T`：得到一个 **[seq_len × seq_len] 的矩阵**，表示每个位置对所有位置的“相关性分数”。

2. mask为什么设置为-inf?
- 将被屏蔽位置的分数设为 -1e9 或 -inf 是为了确保在 softmax 操作后，这些位置的注意力权重为零，从而实现真正的屏蔽效果。直接将分数设为 0 并不能完全屏蔽这些位置，可能会导致模型在训练或推理时关注到不应关注的部分。

In [38]:
def att(query, key, value, mask=None, dropout=None):
    d_k = query.size(-1) # feature dimension
    scores = torch.matmul(query, key.transpose(-1, -2)) / math.sqrt(d_k)

    # if masked or not
    # mask：布尔类型的掩码张量，形状需要与原始张量相同或可广播。value：用于填充的值，当 mask 中对应位置为 True 时，原始张量中的该位置将被替换为 value
    if mask is not None:
        scores = scores.masked_fill(mask==0, -1e9)

    att_scores = F.softmax(scores, dim=-1)
    if dropout is not None:
        att_scores = dropout(att_scores)

    output = torch.matmul(att_scores, value)
    print('output_att:', output.shape)
    return output, att_scores

## Multi-head attention

将输入张量 x 的形状从 [batch_size, seq_len, d_model] 重塑为 [batch_size, num_heads, seq_len, depth]

In [39]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout=0.1):
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.depth = d_model // num_heads

        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)

        self.dropout = nn.Dropout(dropout)

        self.w_o = nn.Linear(d_model, d_model)

    def split_head(self, x):
        batch_size, seq_len, d_model = x.size()
        x = x.view(batch_size, seq_len, num_heads, self.depth).transpose(1, 2)
        # 将输入张量 x 的形状从 [batch_size, seq_len, d_model] 重塑为 [batch_size, num_heads, seq_len, depth]
        # 将嵌入维度 d_model 分解为多个注意力头。
        return x

    def forward(self, query, key, value, mask=None):
        query = self.w_q(query)
        key = self.w_k(key)
        value = self.w_v(value)

        # split heads
        query = self.split_head(query)
        key = self.split_head(key)
        value = self.split_head(value)

        # scaled dot product attention
        x, att_scores = att(query, key, value, mask, self.dropout)

        batch_size, _, seq_len, depth = x.size()
        # 换回原来的顺序（从 [batch, heads, seq, depth] → [batch, seq, heads, depth]）
        # 把最后两维 flatten 成 d_model = heads × depth，得到最终形状 [batch_size, seq_length, d_model]
        x = x.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)

        x = self.w_o(x)
        return x

In [40]:
# 示例用法
d_model = 512
max_len = 100
num_heads = 8

# 多头注意力
multihead_attn = MultiHeadAttention(d_model, num_heads)

# 示例输入序列
input_sequence = torch.randn(5, max_len, d_model)

att_output = multihead_attn(input_sequence, input_sequence, input_sequence)
print("output_MH_att:", att_output.shape)

output_att: torch.Size([5, 8, 100, 64])
output_MH_att: torch.Size([5, 100, 512])


## Feed forward layer

In [41]:
# 前馈网络的代码实现
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(FeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        # 线性变换1
        x = self.relu(self.linear1(x))

        # 线性变换2
        x = self.linear2(x)

        return x

## Example

In [42]:
# 示例用法
d_model = 512
max_len = 100
num_heads = 8
d_ff = 2048

# 多头注意力
multihead_attn = MultiHeadAttention(d_model, num_heads)

# 前馈网络
ff_network = FeedForward(d_model, d_ff)

# 示例输入序列
input_sequence = torch.randn(5, max_len, d_model)

# 多头注意力
attention_output= multihead_attn(input_sequence, input_sequence, input_sequence)

# 前馈网络
output_ff = ff_network(attention_output)
print('input_sequence',input_sequence.shape)
print("output_ff", output_ff.shape)

output_att: torch.Size([5, 8, 100, 64])
input_sequence torch.Size([5, 100, 512])
output_ff torch.Size([5, 100, 512])


## Encoder

In [43]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=None):
        super(EncoderLayer, self).__init__()

        self.multihead_attn = MultiHeadAttention(d_model, num_heads, dropout)
        self.feed_forward = FeedForward(d_model, d_ff)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x, mask=None):
        residual = x

        # ### Multi-head attention
        att_outputs = self.multihead_attn(x, x, x, mask)

        # Add & Norm
        x = x + att_outputs
        x = self.norm1(x)

        # ### Feed Forward
        ff_outputs = self.feed_forward(x)

        # Add & Norm
        x = x + ff_outputs
        x = self.norm2(x)

        return x

In [44]:
d_model = 512
max_len = 100
num_heads = 8
d_ff = 2048

encoder_layer = EncoderLayer(d_model, num_heads, d_ff, 0.1)

input_sequence = torch.randn(5, max_len, d_model)

encoder_output = encoder_layer(input_sequence)
print("encoder output shape:", encoder_output.shape)

output_att: torch.Size([5, 8, 100, 64])
encoder output shape: torch.Size([5, 100, 512])


## Decoder

In [45]:
# 解码器的代码实现
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()

        self.masked_self_attention = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)

        self.enc_dec_attention = MultiHeadAttention(d_model, num_heads)
        self.norm2 = nn.LayerNorm(d_model)

        self.feed_forward = FeedForward(d_model, d_ff)
        self.norm3 = nn.LayerNorm(d_model)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x, encoder_output, src_mask, tgt_mask):

        # ### masked multi-head self attention
        # 掩码的自注意力层
        self_attention_output= self.masked_self_attention(x, x, x, tgt_mask)
        self_attention_output = self.dropout(self_attention_output)
        # add & norm
        x = x + self_attention_output
        x = self.norm1(x)

        # ### multi-head cross attention
        # 编码器-解码器注意力层
        enc_dec_attention_output= self.enc_dec_attention(x, encoder_output,
        encoder_output, src_mask) # q, k, v, mask
        # add & norm
        enc_dec_attention_output = self.dropout(enc_dec_attention_output)
        x = x + enc_dec_attention_output
        x = self.norm2(x)

        # ## feed forward
        # 前馈层
        feed_forward_output = self.feed_forward(x)
        feed_forward_output = self.dropout(feed_forward_output)
        # add & norm
        x = x + feed_forward_output
        x = self.norm3(x)

        return x

In [46]:
# 定义DecoderLayer的参数
d_model = 512  # 模型的维度
num_heads = 8  # 注意力头的数量
d_ff = 2048    # 前馈网络的维度
dropout = 0.1  # 丢弃概率

batch_size = 1 # 批量大小
max_len = 100  # 序列的最大长度

# 定义DecoderLayer实例
decoder_layer = DecoderLayer(d_model, num_heads, d_ff, dropout)

src_mask = torch.rand(batch_size, max_len, max_len) > 0.5
tgt_mask = torch.tril(torch.ones(max_len, max_len)).unsqueeze(0) == 0

# 将输入张量传递到DecoderLayer
output = decoder_layer(input_sequence, encoder_output, src_mask, tgt_mask)

# 输出形状
print("Output shape:", output.shape)

output_att: torch.Size([5, 8, 100, 64])
output_att: torch.Size([5, 8, 100, 64])
Output shape: torch.Size([5, 100, 512])


## Transformer

In [51]:
# TRANSFORMER的实现
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff,
    max_len, dropout):
        super(Transformer, self).__init__()
        # ### embedding
        # 定义编码器和解码器的词嵌入层
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)

        # ### positional embedding
        # 定义位置编码层
        self.positional_encoding = PositionalEncoding(d_model, max_len)

        # 定义编码器和解码器的多层堆叠
        # ### encoders
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout)
        for _ in range(num_layers)])
        # ### decoders
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout)
        for _ in range(num_layers)])

        # 定义线性层
        self.linear = nn.Linear(d_model, tgt_vocab_size)

        self.dropout = nn.Dropout(dropout)

    # 生成掩码
    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2) # 最后形状会变成 [batch_size, 1, 1, src_len]
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3) # 形状变成 [batch_size, 1, tgt_len, 1]

        seq_length = tgt.size(1)

        # 取上三角（不含对角线），得到对角线上方全是 1，其余是 0。
        # `1-...` 翻转，使得上三角变成 0，对角线及下三角变成 1。
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask

    # 前向传播
    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)

        # 编码器输入的词嵌入和位置编码
        encoder_embedding = self.encoder_embedding(src)
        en_positional_encoding = self.positional_encoding(encoder_embedding)
        src_embedded = self.dropout(en_positional_encoding)

        # 解码器输入的词嵌入和位置编码
        decoder_embedding = self.decoder_embedding(tgt)
        de_positional_encoding = self.positional_encoding(decoder_embedding)
        tgt_embedded = self.dropout(de_positional_encoding)

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            print('[ENC]')
            enc_output = enc_layer(enc_output, src_mask)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            print('[DEC]')
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        output = self.linear(dec_output)
        return output

- 为什么是 0 ~ vocab_size-1
    - 我们会给词表（vocabulary）里的每一个 token 分配一个唯一的 整数 ID。

In [53]:
# 示例用法
src_vocab_size = 5000
tgt_vocab_size = 5000

d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048

max_len = 100
dropout = 0.1

transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_len, dropout)

# 生成随机示例数据
src_data = torch.randint(1, src_vocab_size, (5, max_len))  # size 是 (batch_size, seq_length), “一次要看多少句话”
tgt_data = torch.randint(1, tgt_vocab_size, (5, max_len))  # size 是 (batch_size, seq_length)

# 如果原始 tgt_data 的长度是 100（含 <bos> 和 <eos>），
# 模型的 decoder 输入 是 tgt_data[:, :-1]（从 <bos> 开始，到最后一个 token 的前一个）。
# 模型的 预测目标 是 tgt_data[:, 1:]（从第一个实际词开始，到 <eos>）。
transformer(src_data, tgt_data[:, :-1]).shape   # (batch_size, seq_length, tgt_vocab_size)

[ENC]
output_att: torch.Size([5, 8, 100, 64])
[ENC]
output_att: torch.Size([5, 8, 100, 64])
[ENC]
output_att: torch.Size([5, 8, 100, 64])
[ENC]
output_att: torch.Size([5, 8, 100, 64])
[ENC]
output_att: torch.Size([5, 8, 100, 64])
[ENC]
output_att: torch.Size([5, 8, 100, 64])
[DEC]
output_att: torch.Size([5, 8, 99, 64])
output_att: torch.Size([5, 8, 99, 64])
[DEC]
output_att: torch.Size([5, 8, 99, 64])
output_att: torch.Size([5, 8, 99, 64])
[DEC]
output_att: torch.Size([5, 8, 99, 64])
output_att: torch.Size([5, 8, 99, 64])
[DEC]
output_att: torch.Size([5, 8, 99, 64])
output_att: torch.Size([5, 8, 99, 64])
[DEC]
output_att: torch.Size([5, 8, 99, 64])
output_att: torch.Size([5, 8, 99, 64])
[DEC]
output_att: torch.Size([5, 8, 99, 64])
output_att: torch.Size([5, 8, 99, 64])


torch.Size([5, 99, 5000])