# 1.1 从零实现BPE？

## 一、BPE 分词器实现

### 1.1 什么是BPE？

BPE（Byte Pair Encoding，字节对编码）是一种常用的子词分词算法，广泛应用于自然语言处理任务中，尤其是在大规模预训练模型（如GPT、BERT等）中。
BPE的核心思想是通过统计训练语料中出现频率最高的相邻字符对，并将其合并为新的符号，逐步构建出一个高效的子词词表。这样可以有效减少词表大小，同时兼顾词汇覆盖率和表示能力。

BPE的主要流程如下：
1. 将文本按字符切分，初始时每个字符为一个token。
2. 统计所有相邻token对的出现频率，找到出现频率最高的token对。
3. 将该token对合并为一个新的token，更新语料。
4. 重复步骤2-3，直到达到预设的词表大小或没有高频token对可合并。

BPE的优点在于能够处理未登录词（OOV），并且在词表规模和分词粒度之间取得平衡。它既能保留常见词的整体性，又能将罕见词拆分为更小的子词单元，从而提升模型的泛化能力。

### 1.2 BPE算法示例：从字符到子词的合并过程  


In [13]:
# 实现BPE算法，输入case为low_ lower_ lowest_，用_标记结束

from typing import Dict, Tuple, List, Set

def get_stats(vocab: Dict[str, int]) -> Dict[Tuple[str, str], int]:
    """
    统计词表中所有相邻符号对的频率

    输入:
        vocab: Dict[str, int]
            词表，key为以空格分隔的token序列字符串，value为该词出现的频数

            示例：
            {
                'l o w _': 1,
                'l o w e r _': 1,
                'l o w e s t _': 1
            }
    输出:
        pairs: Dict[Tuple[str, str], int]
            所有相邻token对及其在词表中的总出现频率

            示例输出：
            {
                ('l', 'o'): 3,
                ('o', 'w'): 3,
                ('w', '_'): 1,
                ('w', 'e'): 2,
                ('e', 'r'): 1,
                ('e', 's'): 1,
                ('s', 't'): 1,
                ('t', '_'): 1,
                ('r', '_'): 1
            }
    """
    pairs: Dict[Tuple[str, str], int] = {}
    for word, freq in vocab.items():
        symbols: List[str] = word.split()
        for i in range(len(symbols)-1):
            pair = (symbols[i], symbols[i+1])
            pairs[pair] = pairs.get(pair, 0) + freq
    return pairs

def merge_vocab(pair: Tuple[str, str], vocab: Dict[str, int]) -> Dict[str, int]:
    """
    将词表中所有出现pair的地方合并为新符号

    输入:
        pair: Tuple[str, str]
            需要合并的token对
            示例：('l', 'o')
        vocab: Dict[str, int]
            当前词表，key为以空格分隔的token序列字符串，value为该词出现的频数

            示例：
            {
                'l o w _': 1,
                'l o w e r _': 1,
                'l o w e s t _': 1
            }

    输出:
        new_vocab: Dict[str, int]
            合并pair后的新词表

            示例输出（合并('l', 'o')后）：
            {
                'lo w _': 1,
                'lo w e r _': 1,
                'lo w e s t _': 1
            }
    """
    new_vocab: Dict[str, int] = {}
    bigram: str = ' '.join(pair)
    replacement: str = ''.join(pair)
    for word in vocab:
        # 用空格分隔，保证只合并相邻的pair
        new_word: str = word.replace(bigram, replacement)
        new_vocab[new_word] = vocab[word]
    return new_vocab

# 输入case
corpus: List[str] = ['low_', 'lower_', 'lowest_']

# 初始化词表（以字符为单位，空格分隔）
vocab: Dict[str, int] = {}
for word in corpus:
    chars: str = ' '.join(list(word))
    vocab[chars] = vocab.get(chars, 0) + 1

print("初始词表：")
for k, v in vocab.items():
    print(f"{k}: {v}")

# vocab此时的示例内容：
# l o w _: 1
# l o w e r _: 1
# l o w e s t _: 1

num_merges: int = 3  # 最大合并次数，可根据需要调整
for i in range(num_merges):
    pairs: Dict[Tuple[str, str], int] = get_stats(vocab)
    if not pairs:
        break
    # 选择出现频率最高的pair
    best: Tuple[str, str] = max(pairs, key=lambda x: pairs[x])
    print(f"\n第{i+1}轮合并: 合并{best}, 频率={pairs[best]}")
    vocab = merge_vocab(best, vocab)
    print("当前词表：")
    for k, v in vocab.items():
        print(f"{k}: {v}")

print("\n最终BPE词表：")
tokens: Set[str] = set()
for word in vocab:
    tokens.update(word.split())
print(tokens)

初始词表：
l o w _: 3
l o w e r _: 1
l o w e s t _: 1

第1轮合并: 合并('l', 'o'), 频率=5
当前词表：
lo w _: 3
lo w e r _: 1
lo w e s t _: 1

第2轮合并: 合并('lo', 'w'), 频率=5
当前词表：
low _: 3
low e r _: 1
low e s t _: 1

第3轮合并: 合并('low', '_'), 频率=3
当前词表：
low_: 3
low e r _: 1
low e s t _: 1

最终BPE词表：
{'low_', 'e', 't', 'r', 'low', '_', 's'}


### 1.3 BPE与字符压缩的关系
- BPE（Byte Pair Encoding）本质上是一种数据压缩算法，其核心目标是通过合并高频子串（字符或子词）​来减少文本的存储空间或编码长度，从而提升后续处理（如NLP模型训练）的效率。​字符压缩率可以直观反映BPE的效果——即原始文本经过BPE编码后，平均每个字符或 token 所占用的空间是否减少。

- 理论上，BPE（Byte Pair Encoding）分词的目标就是用词表（tokens）对单词进行最少分割，也就是让token数最少。这其实就是一个最优分割问题，经典做法是用动态规划（DP）/Bellman Principle（贝尔曼原理）来求解最小分割次数。下面代码是用“贪心法”——每次都尽量匹配最长的token。这种方法简单高效，但不一定总能得到最优（最少token数）分割。有些情况下，贪心法会错过全局最优解。

In [14]:
# 计算每个单词BPE编码后的token数与原始字符数的压缩占比
print("\n每个单词的字符压缩占比：")
for word in corpus:
    # 原始字符数
    orig_len = len(word)
    # BPE分词：从最终tokens集合中，尽可能长地匹配
    chars = list(word)
    bpe_tokens = []
    i = 0
    while i < len(chars):
        matched = None
        # 尝试最长匹配
        for j in range(len(chars), i, -1):
            sub = ' '.join(chars[i:j])
            if sub.replace(' ', '') in tokens or sub in tokens:
                matched = sub
                bpe_tokens.append(sub)
                i = j - 1
                break
        if matched is None:
            # 单字符
            bpe_tokens.append(chars[i])
        i += 1
    bpe_token_count = len(bpe_tokens)
    compress_ratio = bpe_token_count / orig_len
    print(f"{word}: 原始字符数={orig_len}, BPE token数={bpe_token_count}, 压缩占比={compress_ratio:.2f}")


每个单词的字符压缩占比：
low_: 原始字符数=4, BPE token数=1, 压缩占比=0.25
low_: 原始字符数=4, BPE token数=1, 压缩占比=0.25
low_: 原始字符数=4, BPE token数=1, 压缩占比=0.25
lower_: 原始字符数=6, BPE token数=4, 压缩占比=0.67
lowest_: 原始字符数=7, BPE token数=5, 压缩占比=0.71


## 二、作业说明

### 2.1 代码结构

1. 下载`https://github.com/stanford-cs336/assignment1-basics`代码
2. 阅读`readme.md`安装数据集、环境 

``` text 
assignment1-basics/
├── README.md
├── pyproject.toml / requirements.txt   # 依赖管理
├── data/                               # 存放数据集
│   ├── TinyStoriesV2-GPT4-train.txt
│   ├── TinyStoriesV2-GPT4-valid.txt
│   ├── owt_train.txt
│   └── owt_valid.txt
├── tests/
│   ├── adapters.py                     # 适配器接口（你要实现的函数）
│   ├── conftest.py                     # pytest 配置文件
│   └── test_*.py                       # 具体测试文件（如 test_bpe.py、test_tokenizer.py 等）
├── your_code.py / my_bpe.py / ...      # 你自己的实现文件（可选）
└── ...                                 # 其他辅助文件
```

### 2.2 测试函数
运行`uv run pytest`的输出，目前由于所有的核心函数都没有实现，抛出的错误都用`NotImplementedError`，作业1的BPE的部分，需要通过tests/test_tokenizer.py的所有测试函数。

```text 
================================================ short test summary info =================================================
FAILED tests/test_data.py::test_get_batch - NotImplementedError
FAILED tests/test_model.py::test_linear - NotImplementedError
FAILED tests/test_model.py::test_embedding - NotImplementedError
FAILED tests/test_model.py::test_swiglu - NotImplementedError
FAILED tests/test_model.py::test_scaled_dot_product_attention - NotImplementedError
FAILED tests/test_model.py::test_4d_scaled_dot_product_attention - NotImplementedError
FAILED tests/test_model.py::test_multihead_self_attention - NotImplementedError
FAILED tests/test_model.py::test_multihead_self_attention_with_rope - NotImplementedError
FAILED tests/test_model.py::test_transformer_lm - NotImplementedError
FAILED tests/test_model.py::test_transformer_lm_truncated_input - NotImplementedError
FAILED tests/test_model.py::test_transformer_block - NotImplementedError
FAILED tests/test_model.py::test_rmsnorm - NotImplementedError
FAILED tests/test_model.py::test_rope - NotImplementedError
FAILED tests/test_model.py::test_silu_matches_pytorch - NotImplementedError
FAILED tests/test_nn_utils.py::test_softmax_matches_pytorch - NotImplementedError
FAILED tests/test_nn_utils.py::test_cross_entropy - NotImplementedError
FAILED tests/test_nn_utils.py::test_gradient_clipping - NotImplementedError
FAILED tests/test_optimizer.py::test_adamw - NotImplementedError
FAILED tests/test_optimizer.py::test_get_lr_cosine_schedule - NotImplementedError
FAILED tests/test_serialization.py::test_checkpointing - NotImplementedError
FAILED tests/test_tokenizer.py::test_roundtrip_empty - NotImplementedError
FAILED tests/test_tokenizer.py::test_empty_matches_tiktoken - NotImplementedError
FAILED tests/test_tokenizer.py::test_roundtrip_single_character - NotImplementedError
FAILED tests/test_tokenizer.py::test_single_character_matches_tiktoken - NotImplementedError
FAILED tests/test_tokenizer.py::test_roundtrip_single_unicode_character - NotImplementedError
FAILED tests/test_tokenizer.py::test_single_unicode_character_matches_tiktoken - NotImplementedError
FAILED tests/test_tokenizer.py::test_roundtrip_ascii_string - NotImplementedError
FAILED tests/test_tokenizer.py::test_ascii_string_matches_tiktoken - NotImplementedError
FAILED tests/test_tokenizer.py::test_roundtrip_unicode_string - NotImplementedError
FAILED tests/test_tokenizer.py::test_unicode_string_matches_tiktoken - NotImplementedError
FAILED tests/test_tokenizer.py::test_roundtrip_unicode_string_with_special_tokens - NotImplementedError
FAILED tests/test_tokenizer.py::test_unicode_string_with_special_tokens_matches_tiktoken - NotImplementedError
FAILED tests/test_tokenizer.py::test_overlapping_special_tokens - NotImplementedError
FAILED tests/test_tokenizer.py::test_address_roundtrip - NotImplementedError
FAILED tests/test_tokenizer.py::test_address_matches_tiktoken - NotImplementedError
FAILED tests/test_tokenizer.py::test_german_roundtrip - NotImplementedError
FAILED tests/test_tokenizer.py::test_german_matches_tiktoken - NotImplementedError
FAILED tests/test_tokenizer.py::test_tinystories_sample_roundtrip - NotImplementedError
FAILED tests/test_tokenizer.py::test_tinystories_matches_tiktoken - NotImplementedError
FAILED tests/test_tokenizer.py::test_encode_special_token_trailing_newlines - NotImplementedError
FAILED tests/test_tokenizer.py::test_encode_special_token_double_newline_non_whitespace - NotImplementedError
FAILED tests/test_tokenizer.py::test_encode_iterable_tinystories_sample_roundtrip - NotImplementedError
FAILED tests/test_tokenizer.py::test_encode_iterable_tinystories_matches_tiktoken - NotImplementedError
FAILED tests/test_train_bpe.py::test_train_bpe_speed - NotImplementedError
FAILED tests/test_train_bpe.py::test_train_bpe - NotImplementedError
FAILED tests/test_train_bpe.py::test_train_bpe_special_tokens - NotImplementedError
============================================= 46 failed, 2 skipped in 2.51s ==============================================
```

### 环境配置
`vscode`相关的编辑器需要在`.vscode`下新建`setting.json`配置pytest的相关配置，可以对每个测试函数单独进行调试，如下图所示
```json
{
    "python.testing.pytestArgs": [
        "tests"
    ],
    "python.testing.unittestEnabled": false,
    "python.testing.pytestEnabled": true
}
```

![VSCODE测试图](./image/1.1_1.png)

### 2.3 作业实现

请实现如下图所示的一个Tokenizer类，注意合并阶段需要按照merge中给定的优先级合并，完成后即

![Encode和Decode设计](./image/1.1_2.png)

``` python 
class Tokenizer:
    def __init__(self, vocab: Dict[int, bytes], merges: List[Tuple[bytes, bytes]], special_tokens: Optional[List[str]] = None):
        """
        初始化分词器。

        参数:
            vocab: 字典，键为token id（int），值为token对应的字节串（bytes）。
            merges: BPE合并规则列表，每个元素为一个二元组（bytes, bytes）。
            special_tokens: 可选，特殊token的字符串列表。

        示例:
            vocab = {0: b'a', 1: b'b', 2: b'ab'} 
            merges = [(b'a', b'b')]
            special_tokens = ['<PAD>', '<EOS>']
            tokenizer = Tokenizer(vocab, merges, special_tokens)
        """
        pass  # 这里只是示例，实际实现略

    def encode(self, text: str) -> List[int]:
        """
        将输入文本编码为token id的列表。

        参数:
            text: 需要编码的字符串。

        返回:
            token id的列表（List[int]）。

        示例:
            假设tokenizer已经初始化好:
            tokenizer.encode("ab")
            可能返回: [2]
        """
        pass  # 这里只是示例，实际实现略

    def decode(self, ids: List[int]) -> str:
        """
        将token id的列表解码为原始字符串。

        参数:
            ids: token id的列表。

        返回:
            解码后的字符串。

        示例:
            假设tokenizer已经初始化好:
            tokenizer.decode([2])
            可能返回: "ab"
        """
        pass  # 这里只是示例，实际实现略
```
