In [None]:
import math
from typing import Tuple

import torch
import torch.nn.functional as F
from torch import nn

In [None]:
def apply_scaling(freqs: torch.Tensor):
    # Values obtained from grid search
    scale_factor = 8
    low_freq_factor = 1
    high_freq_factor = 4
    old_context_len = 8192  # original llama3 length

    low_freq_wavelen = old_context_len / low_freq_factor
    high_freq_wavelen = old_context_len / high_freq_factor
    new_freqs = []
    for freq in freqs:
        wavelen = 2 * math.pi / freq
        if wavelen < high_freq_wavelen:
            new_freqs.append(freq)
        elif wavelen > low_freq_wavelen:
            new_freqs.append(freq / scale_factor)
        else:
            assert low_freq_wavelen != high_freq_wavelen
            smooth = (old_context_len / wavelen - low_freq_factor) / (
                high_freq_factor - low_freq_factor
            )
            new_freqs.append((1 - smooth) * freq / scale_factor + smooth * freq)
    return torch.tensor(new_freqs, dtype=freqs.dtype, device=freqs.device)

这个函数对频率进行缩放，目的是对频率进行调整，使得它们在不同的范围内平滑过渡。这对于处理长序列时的频率调整非常有用。

```
scale_factor：缩放因子。
low_freq_factor 和 high_freq_factor：用于计算频率的上下界。
old_context_len：原始上下文长度。
low_freq_wavelen 和 high_freq_wavelen：低频和高频的波长。
函数通过一个平滑函数在上下界之间进行过渡，确保频率在不同范围内的平滑性。
```

In [None]:
def precompute_freqs_cis(
    dim: int, end: int, theta: float = 500000.0, use_scaled: bool = False
):
    freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim))
    t = torch.arange(end, device=freqs.device, dtype=torch.float32)
    if use_scaled:
        freqs = apply_scaling(freqs)
    freqs = torch.outer(t, freqs)
    freqs_cis = torch.polar(torch.ones_like(freqs), freqs)  # complex64
    return freqs_cis

这个函数预计算频率的复数表示。

```
dim：频率维度。是原论文中的theta
end：时间步数长度，即序列的长度，最大token数。
theta：控制频率的基数。是原论文中的10000。
use_scaled：是否使用缩放。
频率的计算方式是基于指数函数的，生成的频率通过torch.polar函数转成复数表示。
```

在这里，freqs对应原论文中的theta，而theta则对应原论文中的10000。这是需要注意的地方。

cis是cosine和sine的缩写。具体来说，cis θ表示 cosθ + i*sinθ，其中 i 是虚数单位，θ是角度。这种表示方法在复数的极坐标形式中非常常见。

t对应了原论文中的m。

freqs = torch.outer(t, freqs)，将freqs通过外积计算，变成。new_freqs[i][j] = t[i]*freqs[j]。新的freqs矩阵在原论文中没有对应。

torch.polar将极坐标形式的数值转换为复数。它接受两个参数：幅度和相位角，并返回一个复数张量。所以freqs_cis是幅度为1，相位角为freqs对应位置的复数张量。

过一个具体的例子来展示如何计算 `freqs_cis`。

假设我们有以下参数：
- `dim = 4`
- `end = 3`
- `theta = 10000.0`
- `use_scaled = False`

### 1. 计算频率 `freqs`

首先，生成频率向量 `freqs`：

dim = 4
theta = 10000.0
freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim))


具体步骤如下：
- `torch.arange(0, dim, 2)[: (dim // 2)]` 生成序列 `[0, 2]`。
- `theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim)` 计算：
  - `theta ** (0 / 4) = 10000 ** 0 = 1.0`
  - `theta ** (2 / 4) = 10000 ** 0.5 = 100.0`
- 取倒数：
  - `1.0 / 1.0 = 1.0`
  - `1.0 / 100.0 = 0.01`
  
所以 `freqs` 为 `[1.0, 0.01]`。

### 2. 生成时间步 `t`

接下来生成时间步 `t`：

end = 3
t = torch.arange(end, device=freqs.device, dtype=torch.float32)


- `torch.arange(end)` 生成序列 `[0, 1, 2]`。
  
所以 `t` 为 `[0, 1, 2]`。

### 3. 计算外积 `freqs`

计算时间步 `t` 和频率 `freqs` 的外积：

freqs = torch.outer(t, freqs)


具体计算如下：
- `t` 为 `[0, 1, 2]`
- `freqs` 为 `[1.0, 0.01]`
- 外积结果为：
  
  [[0 * 1.0, 0 * 0.01],
   [1 * 1.0, 1 * 0.01],
   [2 * 1.0, 2 * 0.01]]
  
  即：
  
  [[0.0, 0.0],
   [1.0, 0.01],
   [2.0, 0.02]]
  

### 4. 生成复数表示 `freqs_cis`

最后生成复数表示 `freqs_cis`：

freqs_cis = torch.polar(torch.ones_like(freqs), freqs)  # complex64


- `torch.ones_like(freqs)` 生成一个与 `freqs` 形状相同的全1张量：
  
  [[1.0, 1.0],
   [1.0, 1.0],
   [1.0, 1.0]]
  
- `torch.polar(torch.ones_like(freqs), freqs)` 将幅度为1，相位为 `freqs` 的复数表示出来：
  - 对于 `freqs` 的每个元素，计算对应的复数：
    - `cis(0.0) = cos(0.0) + i * sin(0.0) = 1.0 + 0.0i`
    - `cis(0.0) = cos(0.0) + i * sin(0.0) = 1.0 + 0.0i`
    - `cis(1.0) = cos(1.0) + i * sin(1.0) ≈ 0.5403 + 0.8415i`
    - `cis(0.01) = cos(0.01) + i * sin(0.01) ≈ 0.99995 + 0.0099998i`
    - `cis(2.0) = cos(2.0) + i * sin(2.0) ≈ -0.4161 + 0.9093i`
    - `cis(0.02) = cos(0.02) + i * sin(0.02) ≈ 0.9998 + 0.0199987i`

  所以 `freqs_cis` 为：
  
  [[1.0 + 0.0i,       1.0 + 0.0i],
   [0.5403 + 0.8415i, 0.99995 + 0.0099998i],
   [-0.4161 + 0.9093i, 0.9998 + 0.0199987i]]

In [None]:
def reshape_for_broadcast(freqs_cis: torch.Tensor, x: torch.Tensor):
    ndim = x.ndim
    assert 0 <= 1 < ndim
    assert freqs_cis.shape == (x.shape[1], x.shape[-1])
    shape = [d if i == 1 or i == ndim - 1 else 1 for i, d in enumerate(x.shape)]
    return freqs_cis.view(*shape)

In [None]:
def apply_rotary_emb(
    xq: torch.Tensor,
    xk: torch.Tensor,
    freqs_cis: torch.Tensor,
) -> Tuple[torch.Tensor, torch.Tensor]:
    xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2))
    xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2))
    freqs_cis = reshape_for_broadcast(freqs_cis, xq_)
    xq_out = torch.view_as_real(xq_ * freqs_cis).flatten(3)
    xk_out = torch.view_as_real(xk_ * freqs_cis).flatten(3)
    return xq_out.type_as(xq), xk_out.type_as(xk)

这个函数将旋转位置编码应用于query和key张量。

```
xq和xk：q和k张量。
freqs_cis：预计算的复数频率张量。
```

torch.view_as_complex 将一个实数张量视为复数张量，其形状最后一维的大小必须是2，这意味着该张量的最后一维包含了复数的实部和虚部。函数返回一个新的复数张量，其形状与输入张量的形状相同，除了最后一维的大小从2变为1。

torch.view_as_real(xq_ * freqs_cis).flatten(3)则对应了截图中橙色叉下面的公式。

In [None]:
xq, xk = apply_rotary_emb(xq, xk, freqs_cis=freqs_cis)