# Byte-Pair Encoding Tokenization (BPE)

## 1 语料库

In [1]:
corpus = [
    "This is the Hugging Face Course.",
    "This chapter is about tokenization.",
    "This section shows several tokenizer algorithms.",
    "Hopefully, you will be able to understand how they are trained and generate tokens.",
]

## 2 预分词

BPE分词算法是被OpenAI用于预训练GPT系列模型而闻名的，由于我们将会手撸一个BPE分词器，所以我们需要先引入`gpt2`预训练模型对应的分词器来预分词。

In [2]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("gpt2")

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
words = []
for sent in corpus:
    for word, _ in tokenizer.backend_tokenizer.pre_tokenizer.pre_tokenize_str(sent):
        words.append(word)
words[:10]

['This',
 'Ġis',
 'Ġthe',
 'ĠHugging',
 'ĠFace',
 'ĠCourse',
 '.',
 'This',
 'Ġchapter',
 'Ġis']

注意，GPT-2的分词器采用字节级BPE(Byte-Level BPE)算法，将空格视为单词的一部分而非独立字符。为了区分“单词开头带空格”和“单词不带空格”的情况，分词器引入特殊符号 Ġ 作为空格的可视化表示，例如：
- "hello" → 编码为 "hello"（无空格前缀）；
- " hello" → 编码为 "Ġhello"（Ġ 表示前面的空格）。

## 3 统计词频

In [4]:
from collections import defaultdict

word_freqs = defaultdict(int)
for word in words:
    word_freqs[word] += 1
word_freqs

defaultdict(int,
            {'This': 3,
             'Ġis': 2,
             'Ġthe': 1,
             'ĠHugging': 1,
             'ĠFace': 1,
             'ĠCourse': 1,
             '.': 4,
             'Ġchapter': 1,
             'Ġabout': 1,
             'Ġtokenization': 1,
             'Ġsection': 1,
             'Ġshows': 1,
             'Ġseveral': 1,
             'Ġtokenizer': 1,
             'Ġalgorithms': 1,
             'Hopefully': 1,
             ',': 1,
             'Ġyou': 1,
             'Ġwill': 1,
             'Ġbe': 1,
             'Ġable': 1,
             'Ġto': 1,
             'Ġunderstand': 1,
             'Ġhow': 1,
             'Ġthey': 1,
             'Ġare': 1,
             'Ġtrained': 1,
             'Ġand': 1,
             'Ġgenerate': 1,
             'Ġtokens': 1})

## 4 初始化词汇表

按照BPE算法的约定，初始的词汇表由语料库中使用的所有字符以及预训练使用到的特殊字符组成：

In [5]:
alphabet = []
chars = set()

for word in words:
    for c in word:
        if c not in chars:
            chars.add(c)
            alphabet.append(c)

alphabet.sort()
# alphabet

In [6]:
expected_alphabet = [ ',', '.', 'C', 'F', 'H', 'T', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'k', 'l', 'm', 'n', 'o', 'p', 'r', 's',
  't', 'u', 'v', 'w', 'y', 'z', 'Ġ']
assert alphabet == expected_alphabet

In [7]:
special_tokens = ["<|endoftext|>"]

In [8]:
vocab = special_tokens + alphabet.copy()
vocab[:10]

['<|endoftext|>', ',', '.', 'C', 'F', 'H', 'T', 'a', 'b', 'c']

## 5 分割单词

接下来我们需要将语料库中的所有单词分割为首字母以及为非首字母都加上“##”前缀：

In [9]:
splits = {
    word: [c for c in word]
    for word in words
}
splits

{'This': ['T', 'h', 'i', 's'],
 'Ġis': ['Ġ', 'i', 's'],
 'Ġthe': ['Ġ', 't', 'h', 'e'],
 'ĠHugging': ['Ġ', 'H', 'u', 'g', 'g', 'i', 'n', 'g'],
 'ĠFace': ['Ġ', 'F', 'a', 'c', 'e'],
 'ĠCourse': ['Ġ', 'C', 'o', 'u', 'r', 's', 'e'],
 '.': ['.'],
 'Ġchapter': ['Ġ', 'c', 'h', 'a', 'p', 't', 'e', 'r'],
 'Ġabout': ['Ġ', 'a', 'b', 'o', 'u', 't'],
 'Ġtokenization': ['Ġ',
  't',
  'o',
  'k',
  'e',
  'n',
  'i',
  'z',
  'a',
  't',
  'i',
  'o',
  'n'],
 'Ġsection': ['Ġ', 's', 'e', 'c', 't', 'i', 'o', 'n'],
 'Ġshows': ['Ġ', 's', 'h', 'o', 'w', 's'],
 'Ġseveral': ['Ġ', 's', 'e', 'v', 'e', 'r', 'a', 'l'],
 'Ġtokenizer': ['Ġ', 't', 'o', 'k', 'e', 'n', 'i', 'z', 'e', 'r'],
 'Ġalgorithms': ['Ġ', 'a', 'l', 'g', 'o', 'r', 'i', 't', 'h', 'm', 's'],
 'Hopefully': ['H', 'o', 'p', 'e', 'f', 'u', 'l', 'l', 'y'],
 ',': [','],
 'Ġyou': ['Ġ', 'y', 'o', 'u'],
 'Ġwill': ['Ġ', 'w', 'i', 'l', 'l'],
 'Ġbe': ['Ġ', 'b', 'e'],
 'Ġable': ['Ġ', 'a', 'b', 'l', 'e'],
 'Ġto': ['Ġ', 't', 'o'],
 'Ġunderstand': ['Ġ', 'u', 'n'

## 6 计算分数

BPE通过合并出现频率最高的相邻字符对/子词对/词元对（这些词元对可以称为bigram pair）来不断扩充词汇表，直到词汇表的大小等于预先设置的期望大小：

In [10]:
def bigram():
    for word, tokens in splits.items():
        print(f"{word}:", end=" ")
        for i in range(len(tokens) - 1):
            print(f"{(tokens[i], tokens[i+1])}", end=", ")
        break

bigram()

This: ('T', 'h'), ('h', 'i'), ('i', 's'), 

In [11]:
def compute_bigram_freqs(word_freqs, splits):
    bigram_freqs = defaultdict(int)
    for word, freq in word_freqs.items():
        split = splits[word]
        if len(split) == 1:
            # bigram_freqs[split[0]] += freq  # 坑死了
            continue
        for i in range(len(split) - 1):
            bigram = split[i], split[i+1]
            bigram_freqs[bigram] += freq
    return bigram_freqs

In [12]:
freqs = compute_bigram_freqs(word_freqs, splits)
for i, key in enumerate(freqs.keys()):
    print(f"{key}: {freqs[key]}")
    if i == 5:
        break

('T', 'h'): 3
('h', 'i'): 3
('i', 's'): 5
('Ġ', 'i'): 2
('Ġ', 't'): 7
('t', 'h'): 3


## 7 合并子词

In [13]:
# def merge_pair(first, second, splits):
#     for word, split in splits.items():
#         if len(split) == 1:
#             continue
#         i = 0
#         while i < len(split) - 1:
#             if split[i] == first and split[i+1] == second:
#                 pair = first + second[2:] if second.startswith("##") else first + second
#                 split = split[:i] + [pair] + split[i+2:]
#             else:
#                 i += 1
#         splits[word] = split

def merge_pair(first, second, splits):
    for word, split in splits.items():
        if len(split) == 1:
            continue
        i = 0
        new_split = split.copy()
        while i < len(split) - 1:
            if split[i] == first and split[i+1] == second:
                pair = first + second[2:] if second.startswith("##") else first + second
                new_split = split[:i] + [pair] + split[i+2:]
                i += 2
            else:
                i += 1
        splits[word] = new_split

## 8 训练

找到最大分数的Pair：

In [14]:
sorted_freqs = sorted(freqs.items(), key=lambda x: x[-1], reverse=True)
sorted_freqs[:5]


[(('Ġ', 't'), 7),
 (('i', 's'), 5),
 (('e', 'r'), 5),
 (('Ġ', 'a'), 5),
 (('t', 'o'), 4)]

先合并('Ġ', 't')：

In [15]:
merge_rules = {("Ġ", "t"): "Ġt"}


In [16]:
vocab.append("Ġt")
merge_pair("Ġ", "t", splits)

expected_split = ['Ġt', 'r', 'a', 'i', 'n', 'e', 'd']
assert splits["Ġtrained"] == expected_split

print(vocab)

['<|endoftext|>', ',', '.', 'C', 'F', 'H', 'T', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'k', 'l', 'm', 'n', 'o', 'p', 'r', 's', 't', 'u', 'v', 'w', 'y', 'z', 'Ġ', 'Ġt']


In [17]:
vocab_size = 50
while len(vocab) < vocab_size:
    freqs = compute_bigram_freqs(word_freqs, splits)
    sorted_freqs = sorted(freqs.items(), key=lambda x: x[-1], reverse=True)
    best = sorted_freqs[0]
    bigram, _ = best
    print(bigram, len(bigram))
    merge_pair(*bigram, splits)
    token = bigram[0] + bigram[1]
    merge_rules[bigram] = token
    vocab.append(token)

('i', 's') 2
('e', 'r') 2
('Ġ', 'a') 2
('Ġt', 'o') 2
('e', 'n') 2
('T', 'h') 2
('Th', 'is') 2
('o', 'u') 2
('s', 'e') 2
('Ġto', 'k') 2
('Ġtok', 'en') 2
('n', 'd') 2
('Ġ', 'is') 2
('Ġt', 'h') 2
('Ġth', 'e') 2
('i', 'n') 2
('Ġa', 'b') 2
('Ġtoken', 'i') 2


In [18]:
expected_merge_rules = {('Ġ', 't'): 'Ġt', ('i', 's'): 'is', ('e', 'r'): 'er', ('Ġ', 'a'): 'Ġa', ('Ġt', 'o'): 'Ġto', ('e', 'n'): 'en',
 ('T', 'h'): 'Th', ('Th', 'is'): 'This', ('o', 'u'): 'ou', ('s', 'e'): 'se', ('Ġto', 'k'): 'Ġtok',
 ('Ġtok', 'en'): 'Ġtoken', ('n', 'd'): 'nd', ('Ġ', 'is'): 'Ġis', ('Ġt', 'h'): 'Ġth', ('Ġth', 'e'): 'Ġthe',
 ('i', 'n'): 'in', ('Ġa', 'b'): 'Ġab', ('Ġtoken', 'i'): 'Ġtokeni'}

In [19]:
assert merge_rules == expected_merge_rules, print(f"{merge_rules}\n{vocab}")

## 9 分词

In [113]:
# def tokenize(text, merge_rules):
#     words = [word for word, _ in tokenizer.backend_tokenizer.pre_tokenizer.pre_tokenize_str(text)]
#     splits = {word: [c for c in word] for word in words}
#     print(splits)
#     for bigram in merge_rules.keys():
#         first, second = bigram
#         merge_pair(first, second, splits)
#         print(f"{bigram}: {splits}")
#     tokens = [splits[word] for word in words]
#     return sum(tokens, [])

def tokenize(text, merge_rules):
    words = [word for word, _ in tokenizer.backend_tokenizer.pre_tokenizer.pre_tokenize_str(text)]
    splits = {word: [w for w in word] for word in words}
    print(splits)
    for word, split in splits.items():
        while True:
            more = False  # 是否发现可以合并的bigram
            i = 0
            while i < len(split) - 1:
                bigram = (split[i], split[i+1])
                if bigram in merge_rules:
                    token = merge_rules[bigram]
                    split = split[:i] + [token] + split[i+2:]
                    splits[word] = split
                    more = True  # 如果发现有合并的bigram，说明可能还可以合并
                else:
                    i += 1
                print(f"{bigram}: {splits}")
            if not more:  # 如果已经没有可以合并的bigram，就可以结束当前word的合并过程了
                break
    tokens = [splits[word] for word in words]
    return sum(tokens, [])

In [114]:
actual = tokenize("This is not a token.", merge_rules)
actual

{'This': ['T', 'h', 'i', 's'], 'Ġis': ['Ġ', 'i', 's'], 'Ġnot': ['Ġ', 'n', 'o', 't'], 'Ġa': ['Ġ', 'a'], 'Ġtoken': ['Ġ', 't', 'o', 'k', 'e', 'n'], '.': ['.']}
('T', 'h'): {'This': ['Th', 'i', 's'], 'Ġis': ['Ġ', 'i', 's'], 'Ġnot': ['Ġ', 'n', 'o', 't'], 'Ġa': ['Ġ', 'a'], 'Ġtoken': ['Ġ', 't', 'o', 'k', 'e', 'n'], '.': ['.']}
('Th', 'i'): {'This': ['Th', 'i', 's'], 'Ġis': ['Ġ', 'i', 's'], 'Ġnot': ['Ġ', 'n', 'o', 't'], 'Ġa': ['Ġ', 'a'], 'Ġtoken': ['Ġ', 't', 'o', 'k', 'e', 'n'], '.': ['.']}
('i', 's'): {'This': ['Th', 'is'], 'Ġis': ['Ġ', 'i', 's'], 'Ġnot': ['Ġ', 'n', 'o', 't'], 'Ġa': ['Ġ', 'a'], 'Ġtoken': ['Ġ', 't', 'o', 'k', 'e', 'n'], '.': ['.']}
('Th', 'is'): {'This': ['This'], 'Ġis': ['Ġ', 'i', 's'], 'Ġnot': ['Ġ', 'n', 'o', 't'], 'Ġa': ['Ġ', 'a'], 'Ġtoken': ['Ġ', 't', 'o', 'k', 'e', 'n'], '.': ['.']}
('Ġ', 'i'): {'This': ['This'], 'Ġis': ['Ġ', 'i', 's'], 'Ġnot': ['Ġ', 'n', 'o', 't'], 'Ġa': ['Ġ', 'a'], 'Ġtoken': ['Ġ', 't', 'o', 'k', 'e', 'n'], '.': ['.']}
('i', 's'): {'This': ['This'], 'Ġis

['This', 'Ġis', 'Ġ', 'n', 'o', 't', 'Ġa', 'Ġtoken', '.']

In [110]:
# merge_rules

In [115]:
expected = ['This', 'Ġis', 'Ġ', 'n', 'o', 't', 'Ġa', 'Ġtoken', '.']

In [116]:
assert actual == expected