In [31]:
import math
import struct
import inspect
from dataclasses import dataclass
from typing import Any, Optional, Tuple

import numpy as np
import torch
import torch.nn.functional as F
from torch import nn

In [32]:
@dataclass
class ModelArgs:
    # default hyperparameters for the Llama 7B model
    dim: int = 4096  # 模型维度
    n_layers: int = 32  # Transformer层数
    n_heads: int = 32  # 注意力机制的头数
    n_kv_heads: Optional[int] = None  # 键/值头数，如果未指定，则默认为n_heads
    vocab_size: int = 32000  # 词汇表大小
    hidden_dim: Optional[int] = None  # 隐藏层维度，如果未指定，则使用其他规则确定
    multiple_of: int = 256  # MLP隐藏层大小是这个数的倍数
    norm_eps: float = 1e-5  # 归一化层的epsilon值
    max_seq_len: int = 2048  # 最大序列长度
    dropout: float = 0.0  # 丢弃率

In [33]:
args = ModelArgs()

Llama2的RMSNorm层的公式如下：

$$\text{RMSNorm}(x) = \frac{x}{\sqrt{\frac{1}{n}\sum_{i=1}^{n}w_i^2 + \epsilon}}$$

其中：

- ( $x$ ) 是层的输入。
- ( $w_i$ ) 代表层的权重。
- ( $n$ ) 是权重的数量。
- ( $\epsilon$ ) 是一个小常数，用于数值稳定性（以避免除以零的情况）。

In [34]:
class RMSNorm(torch.nn.Module):
    def __init__(self, dim: int, eps: float):
        super().__init__()
        self.eps = eps
        self.weight = nn.Parameter(torch.ones(dim))

    def _norm(self, x):
        return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)

    def forward(self, x):
        output = self._norm(x.float()).type_as(x)
        return output * self.weight

In [35]:
norm = RMSNorm(dim=args.dim, eps=args.norm_eps)

In [63]:
# 写一个关于norm的测试

x = torch.randn(1, 50, 4096) # bs, seq_len, dim
print(x.shape)
print(norm(x).shape)

torch.Size([1, 50, 4096])
torch.Size([1, 50, 4096])


假设 $\text{dim}$ 是输入维度，$\text{end}$ 是序列的长度，$\theta$ 是比例因子（默认为 10000.0）。

1. **频率计算**:
   $$\text{freqs} = \frac{1}{\theta^{\frac{2i}{\text{dim}}}}$$
   其中 $i = 0, 1, 2, ..., \frac{\text{dim}}{2} - 1$。

2. **时间序列与频率的外积**:
   创建一个从 0 到 $\text{end} - 1$ 的时间序列$t$，并计算 $t$ 和 $\text{freqs}$ 的外积得到频率矩阵。

3. **余弦和正弦值计算**:
   - 余弦值：$\text{freqs\_cos} = \cos(\text{freqs\_matrix})$
   - 正弦值：$\text{freqs\_sin} = \sin(\text{freqs\_matrix})$

其中，$\text{freqs\_matrix}$ 是时间序列 $t$ 和频率$ \text{freqs}$ 的外积的结果。


这个例子首先定义了函数 `precompute_freqs_cis`。然后，它设置了维度 `dim` 为 10，序列长度 `end` 为 5，并保持默认的比例因子$\theta = 10000.0$。通过调用这个函数并传入这些参数，它计算了序列中每个位置的余弦和正弦值。最后，这个例子打印了这些计算得到的余弦和正弦值矩阵。

这种预计算的余弦和正弦值可以用于例如 Transformer 模型中的位置编码，以提供位置信息，帮助模型理解输入数据中元素的顺序关系。

In [37]:
def precompute_freqs_cis(dim: int, end: int, theta: float = 10000.0):
    freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim))
    t = torch.arange(end, device=freqs.device)  # type: ignore
    freqs = torch.outer(t, freqs).float()  # type: ignore
    freqs_cos = torch.cos(freqs)  # real part
    freqs_sin = torch.sin(freqs)  # imaginary part
    return freqs_cos, freqs_sin

In [38]:
theta=10000.0
dim=4096
end=50

freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim))
t = torch.arange(end, device=freqs.device)
res = torch.outer(t, freqs).float() 
freqs.shape, t.shape, res.shape

(torch.Size([2048]), torch.Size([50]), torch.Size([50, 2048]))

In [39]:
cos, sin = precompute_freqs_cis(4096, 50)
cos.shape, sin.shape    

(torch.Size([50, 2048]), torch.Size([50, 2048]))

## reshape_for_broadcast 函数

主要作用： 该函数的目的是为了将频率的余弦（cos）和正弦（sin）张量重新塑形（reshape），使其能够在后续的旋转操作中通过广播机制与查询（query）或键（key）张量进行元素级的乘法操作。广播是一种在不同形状的张量之间进行数学运算的方式，能够自动扩展张量的形状以匹配操作的需求。

In [40]:
def reshape_for_broadcast(freqs_cis: torch.Tensor, x: torch.Tensor):
    ndim = x.ndim  # 获取x的维度数量
    assert 0 <= 1 < ndim  # 确保x至少有两个维度
    assert freqs_cis.shape == (x.shape[1], x.shape[-1])  # 确保频率张量的形状与x的第二个维度和最后一个维度匹配
    shape = [d if i == 1 or i == ndim - 1 else 1 for i, d in enumerate(x.shape)]  # 生成一个新的形状，除了第二个和最后一个维度，其他维度设置为1
    return freqs_cis.view(shape)  # 返回重新塑形的频率张量

## apply_rotary_emb 函数

主要作用： 该函数实现了旋转位置编码的应用过程。它首先将查询（query）和键（key）张量转换为复数形式（这里使用实部和虚部的形式分别表示），然后利用传入的余弦和正弦频率张量对它们进行旋转，最后将旋转后的结果转换回原来的形状。这个过程可以增强模型对每个位置信息的感知能力，从而提高处理序列数据的性能。

In [41]:
def apply_rotary_emb(
    xq: torch.Tensor,
    xk: torch.Tensor,
    freqs_cos: torch.Tensor,
    freqs_sin: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor]:

    # 将查询和键张量转换为浮点数，并重塑形状以分离实部和虚部
    xq_r, xq_i = xq.float().reshape(xq.shape[:-1] + (-1, 2)).unbind(-1)
    xk_r, xk_i = xk.float().reshape(xk.shape[:-1] + (-1, 2)).unbind(-1)

    # 重新塑形频率张量以进行广播
    freqs_cos = reshape_for_broadcast(freqs_cos, xq_r)
    freqs_sin = reshape_for_broadcast(freqs_sin, xq_r)

    # 应用旋转，分别计算旋转后的实部和虚部
    xq_out_r = xq_r * freqs_cos - xq_i * freqs_sin
    xq_out_i = xq_r * freqs_sin + xq_i * freqs_cos
    xk_out_r = xk_r * freqs_cos - xk_i * freqs_sin
    xk_out_i = xk_r * freqs_sin + xk_i * freqs_cos

    # 将最后两个维度合并，并还原为原始张量的形状
    xq_out = torch.stack([xq_out_r, xq_out_i], dim=-1).flatten(3)
    xk_out = torch.stack([xk_out_r, xk_out_i], dim=-1).flatten(3)

    return xq_out.type_as(xq), xk_out.type_as(xk)

In [62]:
# 根据上述代码 为apply_rotary_emb函数写一个测试案例

xq = torch.randn(1, 50, 288) # bs, seq_len, dim
xk = torch.randn(1, 50, 288) # bs, seq_len, dim

# 使用 precompute_freqs_cis 函数获取 sin和cos

cos, sin = precompute_freqs_cis(288, 50)
print(cos.shape, sin.shape)
xq_out, xk_out = apply_rotary_emb(xq, xk, cos, sin)

xq_out.shape, xk_out.shape

torch.Size([50, 144]) torch.Size([50, 144])


(torch.Size([1, 50, 144, 2]), torch.Size([1, 50, 144, 2]))

## repeat_kv

根据 n_rep 参数的值重复每个键（key）和值（value）元素。如果 n_rep 为1，表示不需要重复，直接返回原始张量。如果 n_rep 大于1，函数会将输入张量 x 在特定的维度上重复 n_rep 次，然后重新组织张量的形状以适应重复后的结构。

输入张量 x 在键/值维度 (n_kv_heads) 上被重复了 n_rep 次，且这种重复是在不改变其他维度（如批量大小、序列长度、头的维度）的情况下实现的。这使得在 Transformer 模型中可以灵活地调整键和值的数量，以适应不同的模型架构或实验设置。

In [43]:
def repeat_kv(x: torch.Tensor, n_rep: int) -> torch.Tensor:
    # 获取输入张量的形状：批量大小、序列长度、键/值对头的数量、每个头的维度大小
    bs, slen, n_kv_heads, head_dim = x.shape
    
    # 如果重复次数为1，则不需要重复，直接返回原始张量
    if n_rep == 1:
        return x
    
    # 对张量进行扩展和重塑操作以重复键值对
    return (
        x[:, :, :, None, :]  # 在第四个维度（头的维度前）添加一个新的维度
        .expand(bs, slen, n_kv_heads, n_rep, head_dim)  # 将新添加的维度扩展到n_rep大小，实现重复的效果
        .reshape(bs, slen, n_kv_heads * n_rep, head_dim)  # 重新塑形，合并键/值对头的数量和重复次数的维度
    )


## Attention

In [44]:
class Attention(nn.Module):
    def __init__(self, args: ModelArgs):
        super().__init__()
        # 根据是否指定n_kv_heads，确定用于键（key）和值（value）的头的数量。
        self.n_kv_heads = args.n_heads if args.n_kv_heads is None else args.n_kv_heads
        # 确保总头数可以被键值头数整除。
        assert args.n_heads % self.n_kv_heads == 0

        # 模型并行处理大小，默认为1。
        model_parallel_size = 1
        # 本地计算头数，等于总头数除以模型并行处理大小。
        self.n_local_heads = args.n_heads // model_parallel_size
        # 本地键值头数，等于键值头数除以模型并行处理大小。
        self.n_local_kv_heads = self.n_kv_heads // model_parallel_size
        # 重复次数，用于扩展键和值的尺寸。
        self.n_rep = self.n_local_heads // self.n_local_kv_heads
        # 每个头的维度，等于模型维度除以头的总数。
        self.head_dim = args.dim // args.n_heads

        # 定义权重矩阵。
        self.wq = nn.Linear(args.dim, args.n_heads * self.head_dim, bias=False)
        self.wk = nn.Linear(args.dim, self.n_kv_heads * self.head_dim, bias=False)
        self.wv = nn.Linear(args.dim, self.n_kv_heads * self.head_dim, bias=False)
        # 输出权重矩阵。
        self.wo = nn.Linear(args.n_heads * self.head_dim, args.dim, bias=False)

        # 定义dropout。
        self.attn_dropout = nn.Dropout(args.dropout)
        self.resid_dropout = nn.Dropout(args.dropout)
        # 保存dropout概率。
        self.dropout = args.dropout

        # 检查是否使用Flash Attention（需要PyTorch >= 2.0）。
        self.flash = hasattr(torch.nn.functional, 'scaled_dot_product_attention')
        if not self.flash:
            # 若不支持Flash Attention，则使用手动实现的注意力机制，并设置mask。
            print("WARNING: using slow attention. Flash Attention requires PyTorch >= 2.0")
            # 创建一个上三角矩阵，用于遮蔽未来信息。
            mask = torch.full((1, 1, args.max_seq_len, args.max_seq_len), float("-inf"))
            mask = torch.triu(mask, diagonal=1)
            # 注册为模型的缓冲区
            self.register_buffer("mask", mask)

    def forward(self, x: torch.Tensor, freqs_cos: torch.Tensor, freqs_sin: torch.Tensor):
        # 获取批次大小和序列长度。
        bsz, seqlen, _ = x.shape

        # 计算查询（Q）、键（K）、值（V）。
        xq, xk, xv = self.wq(x), self.wk(x), self.wv(x)
        # 调整形状以适应头的维度。
        xq = xq.view(bsz, seqlen, self.n_local_heads, self.head_dim)
        xk = xk.view(bsz, seqlen, self.n_local_kv_heads, self.head_dim)
        xv = xv.view(bsz, seqlen, self.n_local_kv_heads, self.head_dim)

        # 应用旋转位置嵌入（RoPE）。
        xq, xk = apply_rotary_emb(xq, xk, freqs_cos, freqs_sin)

        # 对键和值进行扩展以适应重复次数。
        xk = repeat_kv(xk, self.n_rep)
        xv = repeat_kv(xv, self.n_rep)

        # 将头作为批次维度处理。
        xq = xq.transpose(1, 2)
        xk = xk.transpose(1, 2)
        xv = xv.transpose(1, 2)

        # 根据是否支持Flash Attention，选择实现方式。
        if self.flash:
            # 使用Flash Attention。
            output = torch.nn.functional.scaled_dot_product_attention(xq, xk, xv, attn_mask=None, dropout_p=self.dropout if self.training else 0.0, is_causal=True)
        else:
            # 使用手动实现的注意力机制。
            scores = torch.matmul(xq, xk.transpose(2, 3)) / math.sqrt(self.head_dim)
            assert hasattr(self, 'mask')
            scores = scores + self.mask[:, :, :seqlen, :seqlen]
            scores = F.softmax(scores.float(), dim=-1).type_as(xq)
            scores = self.attn_dropout(scores)
            output = torch.matmul(scores, xv)

        # 恢复时间维度并合并头。
        output = output.transpose(1, 2).contiguous().view(bsz, seqlen, -1)

        # 最终投影回残差流。
        output = self.wo(output)
        output = self.resid_dropout(output)
        return output

In [45]:
mask = torch.full((1, 1, 6, 6), float("-inf"))
print(mask)
mask = torch.triu(mask, diagonal=1)
print(mask)

tensor([[[[-inf, -inf, -inf, -inf, -inf, -inf],
          [-inf, -inf, -inf, -inf, -inf, -inf],
          [-inf, -inf, -inf, -inf, -inf, -inf],
          [-inf, -inf, -inf, -inf, -inf, -inf],
          [-inf, -inf, -inf, -inf, -inf, -inf],
          [-inf, -inf, -inf, -inf, -inf, -inf]]]])
tensor([[[[0., -inf, -inf, -inf, -inf, -inf],
          [0., 0., -inf, -inf, -inf, -inf],
          [0., 0., 0., -inf, -inf, -inf],
          [0., 0., 0., 0., -inf, -inf],
          [0., 0., 0., 0., 0., -inf],
          [0., 0., 0., 0., 0., 0.]]]])


In [56]:
class ModelArgs:
    def __init__(self, dim, n_heads, n_kv_heads, max_seq_len, dropout):
        self.dim = dim
        self.n_heads = n_heads
        self.n_kv_heads = n_kv_heads
        self.max_seq_len = max_seq_len
        self.dropout = dropout

args = ModelArgs(
    dim=288, 
    n_heads=6, 
    n_kv_heads=6, 
    max_seq_len=100,  # 假设序列的最大长度为100
    dropout=0.0
)

# 创建Attention实例
attention_model = Attention(args)

# 模拟输入数据
batch_size = 2
seq_len = 50  # 假设实际使用的序列长度为50
dim = args.dim
x = torch.rand(batch_size, seq_len, dim)  # 随机生成输入张量
# freqs_cos = torch.rand(seq_len, dim // 2)  # 模拟cos频率，用于RoPE
# freqs_sin = torch.rand(seq_len, dim // 2)  # 模拟sin频率，用于RoPE

freqs_cos, freqs_sin = precompute_freqs_cis(dim, seq_len)

print(freqs_cos.shape, freqs_sin.shape)

# 运行Attention模型
output = attention_model(x, freqs_cos, freqs_sin)

print("Output shape:", output.shape)

torch.Size([50, 144]) torch.Size([50, 144])


Traceback (most recent call last):
  File "f:\miniconda\install\envs\nlp\lib\site-packages\debugpy\_vendored\pydevd\_pydevd_bundle\pydevd_vars.py", line 624, in change_attr_expression
    value = eval(expression, frame.f_globals, frame.f_locals)
  File "<string>", line 1
    tensor([[[-0.6086, -0.0233, -0.4982,  ..., -0.0884,  0.3996, -0.0289],         [-0.4072,  0.1747, -0.3251,  ...,  0.1759,  0.5822, -0.3738],         [-0.5547,  0.1017, -0.6729,  ..., -0.0168, -0.1436, -0.0081],         ...,         [-0.6012, -0.1039, -0.6057,  ...,  0.0770,  0.2393,  0.1426],         [-0.6042,  0.3118, -0.3081,  ..., -0.0017,  0.1426, -0.0581],         [-0.2958,  0.4100, -0.6251,  ...,  0.0133,  0.4780,  0.2772]],        [[-0.7364,  0.1893, -0.6148,  ...,  0.1761,  0.1788,  0.1780],         [-0.5823,  0.3000, -0.2029,  ..., -0.0164,  0.1293, -0.0359],         [-0.5836, -0.0185, -0.4734,  ...,  0.0971,  0.2520,  0.2106],         ...,         [-0.8891, -0.1100, -0.3439,  ..., -0.0887, -0.0505, -0.302

AssertionError: 

In [47]:
x = torch.rand(batch_size, seq_len, dim)  # 假设输入张量
try:
    freqs_cos, freqs_sin = precompute_freqs_cis(dim=dim, end=seq_len)
    freqs_cos_b = reshape_for_broadcast(freqs_cos, x)
    freqs_sin_b = reshape_for_broadcast(freqs_sin, x)
    print("Broadcast shapes:", freqs_cos_b.shape, freqs_sin_b.shape)
except AssertionError as e:
    print("Assertion error:", e)


Assertion error: 
