# 多头注意力机制结合数据加载

完整的章节代码位于 [ch03.ipynb](./ch03.ipynb)。

这个 notebook 包含了主要的学习点，即多头注意力的实现（以及第二章中的数据加载流程）。

## 第二章中的数据加载器

In [1]:
# 导入必要的库
import tiktoken  # 假设这是一个自定义的分词库
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# 定义 GPT 数据集类
class GPTDatasetV1(Dataset):
    def __init__(self, txt, tokenizer, max_length, stride):
        # 初始化分词器
        self.tokenizer = tokenizer
        # 初始化输入和目标 ID 列表
        self.input_ids = []
        self.target_ids = []

        # 对整个文本进行分词
        token_ids = tokenizer.encode(txt, allowed_special={'<|endoftext|>'})

        # 使用滑动窗口将文本分割成重叠的 max_length 长度的序列
        for i in range(0, len(token_ids) - max_length, stride):
            input_chunk = token_ids[i:i + max_length]
            target_chunk = token_ids[i + 1: i + max_length + 1]
            # 将分词 ID 转换为 PyTorch 张量并添加到列表
            self.input_ids.append(torch.tensor(input_chunk))
            self.target_ids.append(torch.tensor(target_chunk))

    def __len__(self):
        # 返回数据集中的样本数量
        return len(self.input_ids)

    def __getitem__(self, idx):
        # 根据索引返回对应的输入和目标张量
        return self.input_ids[idx], self.target_ids[idx]

# 创建数据加载器的函数
def create_dataloader(txt, batch_size=4, max_length=256, stride=128, shuffle=True):
    # 初始化分词器
    tokenizer = tiktoken.get_encoding("gpt2")

    # 创建数据集
    dataset = GPTDatasetV1(txt, tokenizer, max_length, stride)

    # 创建数据加载器
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

    return dataloader

# 读取文本文件
with open("small-text-sample.txt", "r", encoding="utf-8") as f:
    raw_text = f.read()

# 初始化分词器
tokenizer = tiktoken.get_encoding("gpt2")
# 对原始文本进行编码
encoded_text = tokenizer.encode(raw_text)

# 定义词汇表大小、输出维度、最大长度和块大小
vocab_size = 50257
output_dim = 256
max_len = 1024
block_size = max_len

# 创建词嵌入层和位置嵌入层
token_embedding_layer = nn.Embedding(vocab_size, output_dim)
pos_embedding_layer = torch.nn.Embedding(block_size, output_dim)

# 设置最大长度为 4，这可能是一个错误，因为通常最大长度会大于 4
max_length = 4
# 创建数据加载器
dataloader = create_dataloader(raw_text, batch_size=8, max_length=max_length, stride=5)

In [2]:
# 遍历数据加载器中的每个批次
for batch in dataloader:
    # 从当前批次中解包输入和目标数据
    x, y = batch

    # 使用词嵌入层计算输入序列的词嵌入
    token_embeddings = token_embedding_layer(x)
    # 使用位置嵌入层计算位置嵌入，这里使用 torch.arange 创建一个与 max_length 相同长度的序列
    pos_embeddings = pos_embedding_layer(torch.arange(max_length))

    # 将词嵌入和位置嵌入相加，得到最终的输入嵌入
    input_embeddings = token_embeddings + pos_embeddings

    # 跳出循环，这里可能是为了演示目的，实际训练中通常会继续循环直到遍历完所有数据
    break

In [3]:
print(input_embeddings.shape)

torch.Size([8, 4, 256])


# 第三章中的多头注意力

## 方式A：简单实现

In [4]:
# 定义因果自注意力模块
class CausalSelfAttention(nn.Module):
    def __init__(self, d_in, d_out, block_size, dropout, qkv_bias=False):
        # 调用父类构造函数
        super().__init__()
        # 输出维度
        self.d_out = d_out
        # 查询、键和值的线性变换层
        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        # Dropout层，用于正则化
        self.dropout = nn.Dropout(dropout)
        # 注册一个缓冲区，用于存储上三角掩码，用于因果自注意力
        self.register_buffer('mask', torch.triu(torch.ones(block_size, block_size), diagonal=1))

    def forward(self, x):
        # 获取输入张量的批次大小、序列长度和输入维度
        b, n_tokens, d_in = x.shape
        # 分别计算查询、键和值
        keys = self.W_key(x)
        queries = self.W_query(x)
        values = self.W_value(x)

        # 计算注意力分数，这里使用了转置操作
        attn_scores = queries @ keys.transpose(1, 2)
        # 使用掩码将未来位置的注意力分数置为负无穷，实现因果自注意力
        attn_scores.masked_fill_(self.mask.bool()[:n_tokens, :n_tokens], -torch.inf)
        # 归一化注意力分数
        attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=1)
        # 应用dropout
        attn_weights = self.dropout(attn_weights)

        # 计算上下文向量
        context_vec = attn_weights @ values
        return context_vec

# 定义多头注意力包装器模块
class MultiHeadAttentionWrapper(nn.Module):
    def __init__(self, d_in, d_out, block_size, dropout, num_heads, qkv_bias=False):
        # 调用父类构造函数
        super().__init__()
        # 创建多个因果自注意力模块
        self.heads = nn.ModuleList([
            CausalSelfAttention(d_in, d_out, block_size, dropout, qkv_bias) 
            for _ in range(num_heads)
        ])
        # 输出投影层，用于将多头的输出合并
        self.out_proj = nn.Linear(d_out*num_heads, d_out*num_heads)

    def forward(self, x):
        # 将所有头的输出沿着最后一个维度拼接
        context_vec = torch.cat([head(x) for head in self.heads], dim=-1)
        # 通过输出投影层
        return self.out_proj(context_vec)

In [5]:
# 设置随机种子以确保结果的可重复性
torch.manual_seed(123)

# 定义块大小，这里与最大长度相同
block_size = max_length
# 输入维度
d_in = output_dim

# 定义多头注意力中每个头的输出维度
num_heads = 2
# 输出维度是输入维度除以头数
d_out = d_in // num_heads

# 初始化多头注意力包装器
mha = MultiHeadAttentionWrapper(d_in, d_out, block_size, 0.0, num_heads)

# 假设 input_embeddings 是之前准备好的输入数据
batch = input_embeddings
# 使用多头注意力模块处理输入数据
context_vecs = mha(batch)

# 打印上下文向量的维度
print("context_vecs.shape:", context_vecs.shape)

context_vecs.shape: torch.Size([8, 4, 256])


## 方式B：替代实现

In [6]:
# 定义多头自注意力模块
class MultiHeadAttention(nn.Module):
    def __init__(self, d_in, d_out, block_size, dropout, num_heads, qkv_bias=False):
        # 调用父类构造函数
        super().__init__()
        # 确保输出维度可以被头数整除
        assert d_out % num_heads == 0, "d_out must be divisible by n_heads"

        # 初始化模块的属性
        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads  # 计算每个头的维度
        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)  # 查询线性层
        self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)    # 键线性层
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)  # 值线性层
        self.out_proj = nn.Linear(d_out, d_out)  # 输出投影层
        self.dropout = nn.Dropout(dropout)  # Dropout层
        # 注册一个缓冲区，用于存储上三角掩码，用于因果自注意力
        self.register_buffer('mask', torch.triu(torch.ones(block_size, block_size), diagonal=1))

    def forward(self, x):
        # 获取输入张量的批次大小、序列长度和输入维度
        b, num_tokens, d_in = x.shape

        # 分别计算查询、键和值
        keys = self.W_key(x)
        queries = self.W_query(x)
        values = self.W_value(x)

        # 将矩阵按头数分割，并添加一个维度
        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim)
        values = values.view(b, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)

        # 转置以匹配多头注意力的维度
        keys = keys.transpose(1, 2)
        queries = queries.transpose(1, 2)
        values = values.transpose(1, 2)

        # 计算缩放点积注意力分数，并应用因果掩码
        attn_scores = queries @ keys.transpose(2, 3)  # 对每个头进行点积
        # 将掩码截断到与序列长度相匹配，并转换为布尔值
        mask_bool = self.mask.bool()[:num_tokens, :num_tokens]
        # 扩展掩码以匹配维度
        mask_unsqueezed = mask_bool.unsqueeze(0).unsqueeze(0)
        # 使用扩展的掩码填充注意力分数
        attn_scores.masked_fill_(mask_unsqueezed, -torch.inf)
        
        # 归一化注意力分数
        attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=-1)
        attn_weights = self.dropout(attn_weights)

        # 计算上下文向量
        context_vec = (attn_weights @ values).transpose(1, 2)
        
        # 合并头的输出
        context_vec = context_vec.contiguous().view(b, num_tokens, self.d_out)
        # 可选的输出投影
        context_vec = self.out_proj(context_vec)

        return context_vec

In [7]:
# 设置随机种子以确保结果的可重复性
torch.manual_seed(123)

# 定义块大小，这里与最大长度相同
block_size = max_length
# 输入和输出维度
d_in = output_dim
# 输出维度设置为与输入维度相同
d_out = d_in

# 初始化多头自注意力模块
mha = MultiHeadAttention(d_in, d_out, block_size, dropout=0.0, num_heads=2)

# 假设 input_embeddings 是之前准备好的输入数据
batch = input_embeddings
# 使用多头自注意力模块处理输入数据
context_vecs = mha(batch)

# 打印上下文向量的维度
print("context_vecs.shape:", context_vecs.shape)

context_vecs.shape: torch.Size([8, 4, 256])
