# BPE

In [None]:
import re
from collections import Counter, defaultdict

class BPETokenizer:
    def __init__(self, vocab_size):
        self.vocab_size = vocab_size
        self.vocab = {}
        self.merges = {} # 合并规则，{key: (token1, token2), value: merged_token}

    def _get_stats(self, vocab):
        """
        统计所有相邻 token 对的出现频率
        :param vocab: 当前的语料库词汇表，格式为 {'l o w </w>': 5, ...}
        :return: 一个 Counter 对象，记录了每个 token 对的频率
        """
        pairs = Counter()
        for word, freq in vocab.items():
            tokens = word.split()
            # 遍历单词中的所有相邻 symbol 对
            for i in range(len(tokens) - 1):
                pairs[tokens[i], tokens[i+1]] += freq
        return pairs

    def _merge_vocab(self, pair, v_in):
        """
        在词汇表中执行一次合并操作
        :param pair: 需要合并的 token 对，例如 ('e', 's')
        :param v_in: 输入的词汇表
        :return: 合并后的新词汇表
        """
        v_out = {}
        bigram = re.escape(' '.join(pair)) # 将 ('e', 's') 拼接成 'e s'，用于在字符串中查找
        p = re.compile(r'(?<!\S)' + bigram + r'(?!\S)') # 替换模式：查找独立的 'e s' 对
        
        for word in v_in:
            # 将 'e s' 替换为 'es'
            w_out = p.sub(''.join(pair), word)
            v_out[w_out] = v_in[word]
        return v_out

    def fit(self, corpus):
        """
        训练 BPE 模型
        corpus: 文本语料
        """
        # 1. 初始化预分词词汇表
        #    将 'lowest' 变为 'l o w e s t </w>'，</w> 是bpe中的特殊词尾符号
        word_counts = Counter(corpus.split())
        vocab = {' '.join(word) + ' </w>': freq for word, freq in word_counts.items()}

        # 2. 获取初始词表（所有单个字符）
        alphabet = set()
        for word in vocab:
            alphabet.update(list(word.split()))
        
        # 初始词表就是这些基本字符
        self.vocab = {char: i for i, char in enumerate(alphabet)}
        
        num_merges = self.vocab_size - len(self.vocab)
        for i in range(num_merges):
            # 统计当前词汇表中所有相邻 token 对的频率
            pairs = self._get_stats(vocab)
            if not pairs:
                break
            
            # 找到频率最高的 token 对
            most_pair = pairs.most_common()[0][0]
            vocab = self._merge_vocab(most_pair, vocab)
            
            merged_token = ''.join(most_pair)
            self.merges[most_pair] = merged_token
            
            if merged_token not in self.vocab:
                self.vocab[merged_token] = len(self.vocab)
        

    def tokenize(self, text):
        """
        将输入的文本字符串进行分词
        :param text: 待分词的单词或句子，例如 "lowest"
        :return: token 列表
        """
        # 预处理：将单词拆分为字符，并添加词尾符号
        words=text.split()
        all_token_ids = []

        for word in words:
            tokens = list(word)
            tokens = ' '.join(tokens) + ' </w>'
            tokens = tokens.split()
            
            # 获取所有可能的 token 对
            def get_pairs(symbols):
                pairs = set()
                for i in range(len(symbols) - 1):
                    pairs.add((symbols[i], symbols[i+1]))
                return pairs

            while True:
                pairs = get_pairs(tokens)
                # 寻找在当前文本中可以合并的、优先级最高的（粒度最细，最早学会的）合并规则
                # 注意：这里需要按 self.merges 的学习顺序来查找，因为它是带优先级的
                best_pair_to_merge = None
                for pair in self.merges:
                    if pair in pairs:
                        best_pair_to_merge = pair
                        break # 找到第一个（优先级最高）就跳出
                
                if best_pair_to_merge is None:
                    break
                
                # 执行合并
                first, second = best_pair_to_merge
                new_tokens = []
                i = 0
                while i < len(tokens):
                    if i < len(tokens) - 1 and tokens[i] == first and tokens[i+1] == second:
                        new_tokens.append(first + second)
                        i += 2
                    else:
                        new_tokens.append(tokens[i])
                        i += 1
                tokens = new_tokens
            all_token_ids.extend([self.vocab[v] for v in tokens])
        return all_token_ids


In [14]:
from collections import Counter

counter=Counter()
counter.update(["Hello world"])
counter.update(["Hello world"])
counter.update(["Hello world"])
counter.update(["Hello"])
counter.update(["Hello wo"])

In [10]:
for item, freq in counter.items():
    print(item, freq)

Hello world 3


In [17]:
counter.most_common()[0]

('Hello world', 3)