# 1.字节对编码（BPE）的主要思想

## 1.1 Bits和bytes

In [5]:
text = "This is some text"
byte_ary = bytearray(text, "utf-8")
print(len(text))
print(byte_ary)

17
bytearray(b'This is some text')


In [4]:
ids = list(byte_ary)
print(len(ids))
print(ids)

17
[84, 104, 105, 115, 32, 105, 115, 32, 115, 111, 109, 101, 32, 116, 101, 120, 116]


In [6]:
print("Number of characters:", len(text))
print("Number of token IDs:", len(ids))

Number of characters: 17
Number of token IDs: 17


## 1.2 创建词汇表

## 1.3 BPE算法大览

# 2. BPE的实现

In [17]:
from collections import Counter, deque
from functools import lru_cache
import json

class BPETokenizerSimple:
    def __init__(self):
        # 映射 token_id 到 token_str（例如：{11246: "some"}）
        self.vocab = {}
        # 映射 token_str 到 token_id（例如：{"some": 11246}）
        self.inverse_vocab = {}
        # BPE 合并字典：{(token_id1, token_id2): merged_token_id}
        self.bpe_merges = {}

    def train(self, text, vocab_size, allowed_special={"<|endoftext|>"}):
        """
        从头开始训练 BPE 分词器。

        参数：
            text (str): 训练文本。
            vocab_size (int): 目标词汇表大小。
            allowed_special (set): 要包含的特殊令牌集。
        """

        # 预处理：将空格替换为 'Ġ'
        processed_text = []
        for i, char in enumerate(text):
            if char == " " and i != 0:
                processed_text.append("Ġ")
            if char != " ":
                processed_text.append(char)
        processed_text = "".join(processed_text)

        # 初始化词汇表
        unique_chars = [chr(i) for i in range(256)]

        # 扩展 unique_chars，包含处理后的文本中未包含的字符
        unique_chars.extend(char for char in sorted(set(processed_text)) if char not in unique_chars)
        
        # 可选：确保 'Ġ' 包含在内（如果它对文本处理相关）
        if 'Ġ' not in unique_chars:
            unique_chars.append('Ġ')

        # 创建词汇表和逆词汇表
        self.vocab = {i: char for i, char in enumerate(unique_chars)}
        self.inverse_vocab = {char: i for i, char in self.vocab.items()}

        # 添加允许的特殊token
        if allowed_special:
            for token in allowed_special:
                if token not in self.inverse_vocab:
                    new_id = len(self.vocab)
                    self.vocab[new_id] = token
                    self.inverse_vocab[token] = new_id

        # 将处理后的文本标记化为令牌 ID
        token_ids = [self.inverse_vocab[char] for char in processed_text]

        # BPE 步骤 1-3：反复查找并替换频繁的字节对
        for new_id in range(len(self.vocab), vocab_size):
            pair_id = self.find_freq_pair(token_ids, mode="most")
            if pair_id is None:
                break
            token_ids = self.replace_pair(token_ids, pair_id, new_id)
            self.bpe_merges[pair_id] = new_id
        
        # 使用合并后的token构建词汇表
        for (p0, p1), new_id in self.bpe_merges.items():
            merged_token = self.vocab[p0] + self.vocab[p1]
            self.vocab[new_id] = merged_token
            self.inverse_vocab[merged_token] = new_id
    
    def load_vocab_and_merges_from_openai(self, vocab_path, bpe_merges_path):
        """
        从 OpenAI 的 GPT-2 文件加载预训练的词汇表和 BPE 合并。

        参数：
            vocab_path (str): 词汇文件路径（GPT-2 称其为 'encoder.json'）。
            bpe_merges_path (str): BPE 合并文件路径（GPT-2 称其为 'vocab.bpe'）。
        """
        # 加载词汇表
        with open(vocab_path, "r", encoding="utf-8") as file:
            loaded_vocab = json.load(file)
            # loaded_vocab 将 token_str 映射到 token_id
            self.vocab = {int(v): k for k, v in loaded_vocab.items()}
            self.inverse_vocab = {k: int(v) for k, v in loaded_vocab.items()}
        # 加载BPE合并
        with open(bpe_merges_path, "r", encoding="utf-8") as file:
            lines = file.readlines()
            # 如果有头部注释行，跳过它
            if lines and lines[0].startswith("#"):
                lines = lines[1:]

            for rank, line in enumerate(lines):
                pair = tuple(line.strip().split())
                if len(pair) != 2:
                    print(f"第 {rank+1} 行有超过 2 个条目：{line.strip()}")
                    continue
                token1, token2 = pair
                if token1 in self.inverse_vocab and token2 in self.inverse_vocab:
                    token_id1 = self.inverse_vocab[token1]
                    token_id2 = self.inverse_vocab[token2]
                    merged_token = token1 + token2
                    if merged_token in self.inverse_vocab:
                        merged_token_id = self.inverse_vocab[merged_token]
                        self.bpe_merges[(token_id1, token_id2)] = merged_token_id
                    else:
                        print(f"合并的令牌 '{merged_token}' 未在词汇表中找到，跳过。")
                else:
                    print(f"跳过配对 {pair}，因为其中一个令牌不在词汇表中。")

    def encode(self, text):
        """
        将输入文本编码为令牌 ID 列表。

        参数：
            text (str): 要编码的文本。

        返回：
            List[int]: 令牌 ID 列表。
        """
        tokens = []
        # 将文本拆分为令牌，确保换行符 intact
        words = text.replace("\n", " \n ").split()  # 确保 '\n' 被视为独立的令牌

        for i, word in enumerate(words):
            if i > 0 and not word.startswith("\n"):
                tokens.append("Ġ" + word)  # 如果单词前有空格或换行符，添加 'Ġ'
            else:
                tokens.append(word)  # 处理第一个单词或独立的 '\n'

        token_ids = []
        for token in tokens:
            if token in self.inverse_vocab:
                # token 已经存在于词汇表中
                token_id = self.inverse_vocab[token]
                token_ids.append(token_id)
            else:
                # 尝试通过 BPE 处理子词分词
                sub_token_ids = self.tokenize_with_bpe(token)
                token_ids.extend(sub_token_ids)

        return token_ids

    def tokenize_with_bpe(self, token):
        """
        使用 BPE 合并对单个令牌进行分词。

        参数：
            token (str): 要分词的令牌。

        返回：
            List[int]: 应用 BPE 后的令牌 ID 列表。
        """
        # 将令牌拆分为单个字符（作为初始令牌 ID）
        token_ids = [self.inverse_vocab.get(char, None) for char in token]
        if None in token_ids:
            missing_chars = [char for char, tid in zip(token, token_ids) if tid is None]
            raise ValueError(f"未在词汇表中找到的字符：{missing_chars}")

        can_merge = True
        while can_merge and len(token_ids) > 1:
            can_merge = False
            new_tokens = []
            i = 0
            while i < len(token_ids) - 1:
                pair = (token_ids[i], token_ids[i + 1])
                if pair in self.bpe_merges:
                    merged_token_id = self.bpe_merges[pair]
                    new_tokens.append(merged_token_id)
                    i += 2
                    can_merge = True
                else:
                    new_tokens.append(token_ids[i])
                    i += 1
            if i < len(token_ids):
                new_tokens.append(token_ids[i])
            token_ids = new_tokens

        return token_ids

    def decode(self, token_ids):
        """
        将令牌 ID 列表解码回字符串。

        参数：
            token_ids (List[int]): 要解码的令牌 ID 列表。

        返回：
            str: 解码后的字符串。
        """
        decoded_string = ""
        for token_id in token_ids:
            if token_id not in self.vocab:
                raise ValueError(f"未在词汇表中找到令牌 ID {token_id}。")
            token = self.vocab[token_id]
            if token.startswith("Ġ"):
                # 用空格替换 'Ġ'
                decoded_string += " " + token[1:]
            else:
                decoded_string += token
        return decoded_string

    def save_vocab_and_merges(self, vocab_path, bpe_merges_path):
        """
        将词汇表和 BPE 合并保存到 JSON 文件。

        参数：
            vocab_path (str): 保存词汇表的路径。
            bpe_merges_path (str): 保存 BPE 合并的路径。
        """
        # 保存词汇表
        with open(vocab_path, "w", encoding="utf-8") as file:
            json.dump({k: v for k, v in self.vocab.items()}, file, ensure_ascii=False, indent=2)

        # 保存 BPE 合并作为字典列表
        with open(bpe_merges_path, "w", encoding="utf-8") as file:
            merges_list = [{"pair": list(pair), "new_id": new_id}
                           for pair, new_id in self.bpe_merges.items()]
            json.dump(merges_list, file, ensure_ascii=False, indent=2)

    def load_vocab_and_merges(self, vocab_path, bpe_merges_path):
        """
        从 JSON 文件加载词汇表和 BPE 合并。

        参数：
            vocab_path (str): 词汇表文件路径。
            bpe_merges_path (str): BPE 合并文件路径。
        """
        # 加载词汇表
        with open(vocab_path, "r", encoding="utf-8") as file:
            loaded_vocab = json.load(file)
            self.vocab = {int(k): v for k, v in loaded_vocab.items()}
            self.inverse_vocab = {v: int(k) for k, v in loaded_vocab.items()}

        # 加载 BPE 合并
        with open(bpe_merges_path, "r", encoding="utf-8") as file:
            merges_list = json.load(file)
            for merge in merges_list:
                pair = tuple(merge['pair'])
                new_id = merge['new_id']
                self.bpe_merges[pair] = new_id

    @lru_cache(maxsize=None)
    def get_special_token_id(self, token):
        return self.inverse_vocab.get(token, None)

    @staticmethod
    def find_freq_pair(token_ids, mode="most"):
        pairs = Counter(zip(token_ids, token_ids[1:]))

        if mode == "most":
            return max(pairs.items(), key=lambda x: x[1])[0]
        elif mode == "least":
            return min(pairs.items(), key=lambda x: x[1])[0]
        else:
            raise ValueError("无效模式。选择 'most' 或 'least'。")

    @staticmethod
    def replace_pair(token_ids, pair_id, new_id):
        dq = deque(token_ids)
        replaced = []

        while dq:
            current = dq.popleft()
            if dq and (current, dq[0]) == pair_id:
                replaced.append(new_id)
                dq.popleft()
            else:
                replaced.append(current)

        return replaced
        

### 3.1 训练、编码与解码

In [3]:
import os
import urllib.request

if not os.path.exists("the-verdict.txt"):
    url = ("https://raw.githubusercontent.com/rasbt/"
           "LLMs-from-scratch/main/ch02/01_main-chapter-code/"
           "the-verdict.txt")
    file_path = "the-verdict.txt"
    urllib.request.urlretrieve(url, file_path)

with open("the-verdict.txt", "r", encoding="utf-8") as f:
    text = f.read()

In [19]:
tokenizer = BPETokenizerSimple()
tokenizer.train(text, vocab_size=1000, allowed_special={"<|endoftext|>"})

In [20]:
print(len(tokenizer.vocab))

1000


In [21]:
print(len(tokenizer.bpe_merges))

742


In [22]:
input_text = "Jack embraced beauty through art and life."
token_ids = tokenizer.encode(input_text)
print(token_ids)

[424, 256, 654, 531, 302, 311, 256, 296, 97, 465, 121, 595, 841, 116, 287, 466, 256, 326, 972, 46]


In [23]:
print("Number of characters:", len(input_text))
print("Number of token IDs:", len(token_ids))

Number of characters: 42
Number of token IDs: 20


In [24]:
print(token_ids)

[424, 256, 654, 531, 302, 311, 256, 296, 97, 465, 121, 595, 841, 116, 287, 466, 256, 326, 972, 46]


In [25]:
print(tokenizer.decode(token_ids))

Jack embraced beauty through art and life.


In [26]:
for token_id in token_ids:
    print(f"{token_id} -> {tokenizer.decode([token_id])}")

424 -> Jack
256 ->  
654 -> em
531 -> br
302 -> ac
311 -> ed
256 ->  
296 -> be
97 -> a
465 -> ut
121 -> y
595 ->  through
841 ->  ar
116 -> t
287 ->  a
466 -> nd
256 ->  
326 -> li
972 -> fe
46 -> .


In [27]:
tokenizer.decode(tokenizer.encode("This is some text."))

'This is some text.'

In [28]:
# Save trained tokenizer
tokenizer.save_vocab_and_merges(vocab_path="vocab.json", bpe_merges_path="bpe_merges.txt")

In [29]:
# Load tokenizer
tokenizer2 = BPETokenizerSimple()
tokenizer2.load_vocab_and_merges(vocab_path="vocab.json", bpe_merges_path="bpe_merges.txt")

In [30]:
print(tokenizer2.decode(token_ids))

Jack embraced beauty through art and life.


&nbsp;
### 3.3 加载来自 OpenAI 的原始 GPT-2 BPE 分词器

In [31]:
import os
import urllib.request

def download_file_if_absent(url, filename):
    if not os.path.exists(filename):
        try:
            with urllib.request.urlopen(url) as response, open(filename, 'wb') as out_file:
                out_file.write(response.read())
            print(f"Downloaded {filename}")
        except Exception as e:
            print(f"Failed to download {filename}. Error: {e}")
    else:
        print(f"{filename} already exists")

files_to_download = {
    "https://openaipublic.blob.core.windows.net/gpt-2/models/124M/vocab.bpe": "vocab.bpe",
    "https://openaipublic.blob.core.windows.net/gpt-2/models/124M/encoder.json": "encoder.json"
}

for url, filename in files_to_download.items():
    download_file_if_absent(url, filename)

Downloaded vocab.bpe
Downloaded encoder.json


In [32]:
tokenizer_gpt2 = BPETokenizerSimple()
tokenizer_gpt2.load_vocab_and_merges_from_openai(
    vocab_path="encoder.json", bpe_merges_path="vocab.bpe"
)

In [33]:
len(tokenizer_gpt2.vocab)

50257

In [34]:
input_text = "This is some text"
token_ids = tokenizer_gpt2.encode(input_text)
print(token_ids)

[1212, 318, 617, 2420]


In [35]:
print(tokenizer_gpt2.decode(token_ids))

This is some text


In [36]:
import tiktoken

tik_tokenizer = tiktoken.get_encoding("gpt2")
tik_tokenizer.encode(input_text)



[1212, 318, 617, 2420]