In [1]:
import torch
from torch import Tensor, nn

## MHA's input

Multi-Head Attention 的输入$h$是shape等于 $[B, S, D]$ 的 Tensor:
- B: batch_size
- S: content length
- D: embedding dimension

In [2]:
B, S, D = 10, 20, 256

x = torch.rand(B, S, D)
x.shape

torch.Size([10, 20, 256])

## MHA's output

Multi-Head Attention 的输出$mha(h)$是shape等于 $[B, S, D']$ 的 Tensor. 很多情况下都会保持$D=D'$, 也就是说这种情况下，数据经过MHA前后的shape是不变的(这样也方便后续加shortcut/skip-connection).

## MHA

In [3]:
class MHA(nn.Module):
    def __init__(
        self,
        d_in: int = D,
        d_out: int = D,
        n_head: int = 8,
        ctx_len: int = S,
        qkv_bias: bool = False,
        dropout_rate: float = 0.2,
    ):
        super().__init__()
        # 确保D'可以被n_head整除，否则不能进行多头分割
        if d_out % n_head != 0:
            raise ValueError(f"d_out % n_head = {d_out % n_head} != 0")
        self.n_head = n_head
        self.head_dim = d_out // n_head

        self.Wq = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.Wk = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.Wv = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.Wo = nn.Linear(d_out, d_out)

        self.register_buffer("mask", torch.full((ctx_len, ctx_len), -torch.inf).triu_(1))

        self.qkv_bias = qkv_bias
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x: Tensor) -> Tensor:
        B, S, D = x.shape

        # [B, S, D] x [D, D'] -> [B, S, D']
        q = self.Wq(x)
        k = self.Wk(x)
        v = self.Wv(x)

        # [B, S, D'] -> [B, S, H, Dh]
        q = q.view(B, S, self.n_head, self.head_dim)
        k = k.view(B, S, self.n_head, self.head_dim)
        v = v.view(B, S, self.n_head, self.head_dim)

        # [B, S, H, Dh] -> [B, H, S, Dh]
        q = q.transpose(1, 2)
        k = k.transpose(1, 2)
        v = v.transpose(1, 2)

        # attention matrix: [B, H, S, S]
        attn_scores = q @ k.transpose(2, 3) / k.shape[-1] ** 0.5

        # attention mask
        attn_scores += self.mask[:S, :S]
        attn_weight = torch.softmax(attn_scores, -1)
        attn_weight = self.dropout(attn_weight)

        # [B, H, S, S] x [B, H, S, Dh] -> [B, H, S, Dh]
        h = attn_weight @ v
        #  [B, H, S, Dh] -> [B, S, H Dh]
        h = h.transpose(1, 2)
        h = h.contiguous().view(B, S, D)

        # output projection
        o = self.Wo(h)
        return o

In [4]:
mha = MHA()
o = mha(x)
(x.shape, o.shape)

(torch.Size([10, 20, 256]), torch.Size([10, 20, 256]))

## 温馨提示


### contiguous().view V.S. reshape

在 PyTorch 中，张量的**内存连续性**直接影响能否直接使用 `.view()` 或 `.reshape()`：

- **`.view()` 的限制**  
  `.view()` 方法要求张量在内存中是**连续存储**的（逻辑顺序与物理存储顺序一致）。如果张量经过转置（`transpose`）、切片（`slice`）等操作后变为非连续，直接调用 `.view()` 会抛出错误。此时必须显式调用 `.contiguous()` 将张量转换为连续布局，再使用 `.view()`。

- **`.reshape()` 的隐式处理**  
  `.reshape()` 方法会自动处理非连续张量：
  - 若张量**已连续**，`.reshape()` 等价于 `.view()`（无额外开销）。
  - 若张量**非连续**，`.reshape()` 会隐式调用 `.contiguous()` 生成连续副本，再调整形状。这会引入**潜在的性能损耗**（内存复制）。


看了下LLaMA2/3的模型实现，都是采用的`contiguous().view`, 所以这里我们也保留使用这种形式。

References:
- [What's the difference between torch.reshape vs. torch.view - PyTorch Forums](https://discuss.pytorch.org/t/whats-the-difference-between-torch-reshape-vs-torch-view/159172)
- [torch.reshape — PyTorch 2.6 documentation](https://pytorch.org/docs/stable/generated/torch.reshape.html#torch.reshape)
- [torch.Tensor.view — PyTorch 2.6 documentation](https://pytorch.org/docs/stable/generated/torch.Tensor.view.html#torch.Tensor.view)

### 另外一种mask的实现


这里causal mask的作用是通过相加的的方式实现的，在toyllm中gpt2的MHA是通过`masked_fill_`来实现的，两者都是可以的。
不过这里却不能直接将gpt2的实现换成这里相加的方式，因为原始实现中`mask`的实现为：

```python
self.register_buffer("mask", torch.triu(torch.ones(ctx_len, ctx_len), diagonal=1), persistent=True)
# ...
attn_scores = queries @ keys.transpose(2, 3)  # Dot product for each head
# Original mask truncated to the number of tokens and converted to boolean
mask_bool = self.mask.bool()[:num_tokens, :num_tokens]
# Use the mask to fill attention scores
attn_scores.masked_fill_(mask_bool, -torch.inf)
```

这里`persistent=True`会使得在存储`model.pt`时候会将`mask`的值一并存入，之后载入模型会随之一起载入，所以这里无法直接替换`mask`为新的形式。

不过我们可以通过忽略`self.mask`, 在推理的时候通过函数内定义`mask`来指定使用新的`mask`：

```python
attn_scores = queries @ keys.transpose(2, 3)  # Dot product for each head

mask = torch.triu(torch.full((num_tokens, num_tokens), -torch.inf), diagonal=1).to(attn_scores.device)
attn_scores = attn_scores + mask
```