
# Subword Tokenizer (BPE) + Mini Sentiment Demo
**Task:** 构建一个简单的BPE Tokenizer，并用它完成一个小型情感分类任务

**Step:**
1. 在一个小Corpus 上训练 BPE Tokenizer
2. 使用从Corpus里学习到的合并规则，对文本进行编码/解码
3. 将 BPE 分词结果作为特征，用于情感分类。
4. 简单评估并输出示例预测结果


## Setup

In [1]:

# Imports
from collections import Counter, defaultdict
import re
import random
from typing import List, Tuple, Dict

# ML bits
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import classification_report, accuracy_score



## 1) Tiny Training Corpus


In [2]:

train_corpus = [
    "i love this movie it is fantastic",
    "this film is terrible and boring",
    "what a wonderful day for cinema",
    "i dislike the ending of this movie",
    "great acting and a lovely soundtrack",
    "the plot is weak and predictable",
    "absolutely amazing visuals and pacing",
    "i will not recommend this film",
    "such a charming story and vibe",
    "the characters were flat and dull",
    "i enjoyed every minute of it",
    "waste of time and money",
    "brilliant performances and direction",
    "i hate how slow it becomes",
    "heartwarming scenes and solid writing",
    "the dialogue felt unnatural",
    "an inspiring and delightful experience",
    "poor editing ruined the tension",
    "i like the humor and style",
    "the soundtrack is annoying",
]
len(train_corpus)


20


## 2) Implement BPE
使用经典的 基于单词的 BPE，并添加单词结束标记 </w>：
1. 将每个单词表示为字符序列 + </w>
2. 统计整个语料中字符对的出现频率
3. 将出现频率最高的字符对合并为一个新符号
4. 重复上述步骤，直到完成 N 次合并
5. 分词时按照从高频到低频的顺序，贪心应用合并规则

In [3]:
import re
from collections import Counter
from typing import List, Tuple

# 按空白字符分词，返回单词列表
def whitespace_tokenize(text: str) -> List[str]:
    return re.findall(r"\S+", text.strip())  # \S+ 匹配非空白字符序列

# 准备 BPE 训练词表
def prepare_bpe_training(corpus: List[str]) -> Counter:
    # 将语料按行拆分为单词，并统一小写
    words = []
    for line in corpus:
        words.extend(whitespace_tokenize(line.lower()))

    # 将每个单词拆成字符 + 单词结束标记 '</w>'，并统计频率
    vocab = Counter(tuple(list(w) + ['</w>']) for w in words)
    return vocab

# 统计词表中所有相邻字符对的频率
def get_pair_stats(vocab: Counter) -> Counter:
    pairs = Counter()
    for symbols, freq in vocab.items():  # symbols 是字符元组
        for i in range(len(symbols)-1):
            pair = (symbols[i], symbols[i+1])  # 相邻字符对
            pairs[pair] += freq
    return pairs

# 合并词表中指定的高频字符对
def merge_vocab(vocab: Counter, pair: Tuple[str, str]) -> Counter:
    # 使用正则将指定字符对合并为一个新符号
    bigram = pair[0] + pair[1]  # 新符号
    new_vocab = Counter()
    for symbols, freq in vocab.items():
        s = ' '.join(symbols)  # 将元组转换为空格分隔的字符串方便替换
        # 将匹配的字符对替换为 bigram
        s = s.replace(pair[0] + ' ' + pair[1], bigram)
        # 再转回元组，并记录频率
        new_vocab[tuple(s.split(' '))] += freq
    return new_vocab

# 从语料中学习 BPE 合并规则
def learn_bpe(corpus: List[str], num_merges: int = 100):
    vocab = prepare_bpe_training(corpus)  # 准备词表
    merges = []  # 存储每一步的合并对
    for _ in range(num_merges):
        pairs = get_pair_stats(vocab)  # 统计所有字符对
        if not pairs:  # 如果没有字符对可合并，结束
            break
        best = max(pairs, key=pairs.get)  # 选择出现频率最高的字符对
        if pairs[best] < 2:  # 如果频率太低，也停止
            break
        vocab = merge_vocab(vocab, best)  # 合并选中的字符对
        merges.append(best)  # 记录合并
    return merges  # 返回所有学习到的合并规则


In [4]:
# 从训练语料中学习 80 次字符合并（可以根据语料大小或演示时间调整次数）
merges = learn_bpe(train_corpus, num_merges=80)

# 查看学习到的合并规则数量和前 10 条规则
len(merges), merges[:10]  # len(merges) 返回总共学习到的合并对数量
                         # merges[:10] 返回最前面 10 个高频合并对


(78,
 [('e', '</w>'),
  ('a', 'n'),
  ('d', '</w>'),
  ('i', 'n'),
  ('s', '</w>'),
  ('t', 'h'),
  ('an', 'd</w>'),
  ('in', 'g'),
  ('ing', '</w>'),
  ('t', '</w>')])

In [5]:
# 定义 BPE 分词器类
class BPETokenizer:
    def __init__(self, merges):
        # 将 merges 列表存储为字典 {字符对: 排名}，排名越小优先级越高
        self.rank = {pair: i for i, pair in enumerate(merges)}

    # 内部方法：对单个单词应用 BPE 贪心合并
    def _bpe(self, word: str):
        # 将单词拆成字符 + 单词结束标记 '</w>'
        symbols = list(word) + ['</w>']

        # 贪心合并：重复合并优先级最高的相邻字符对
        while True:
            if len(symbols) < 2:  # 如果少于 2 个符号，无法再合并
                break
            # 构建所有相邻字符对
            pairs = [(symbols[i], symbols[i+1]) for i in range(len(symbols)-1)]
            if not pairs:
                break  # 避免空列表传给 min() 报错

            # 为每个字符对查找排名，如果没有在 merges 中出现，则排名为 1e9（低优先级）
            ranked = [(self.rank.get(p, 1e9), i, p) for i, p in enumerate(pairs)]
            if all(r[0] == 1e9 for r in ranked):  # 如果没有可用的合并对，停止
                break

            # 选择排名最小（优先级最高）的字符对
            best_rank, idx, best_pair = min(ranked, key=lambda x: x[0])
            if best_rank == 1e9:
                break  # 没有可用的合并对

            # 在 idx 位置合并字符对
            symbols = symbols[:idx] + [symbols[idx] + symbols[idx+1]] + symbols[idx+2:]

        # 如果末尾还有 '</w>'，去掉它
        if symbols and symbols[-1] == '</w>':
            symbols = symbols[:-1]

        return symbols  # 返回 BPE 分词结果列表

    # 对文本进行 BPE 编码
    def encode(self, text: str):
        tokens = []
        for word in whitespace_tokenize(text.lower()):  # 按空格分词并转小写
            pieces = self._bpe(word)  # 对每个单词应用 BPE
            tokens.extend(pieces)
        return tokens

    # 对 BPE token 列表进行简单解码
    def decode(self, tokens):
        return ' '.join(tokens)  # 直接用空格连接 token

# 创建 BPE 分词器实例
tokenizer = BPETokenizer(merges)

# 测试示例
print("Example:", tokenizer.encode("Absolutely lovely movie!"))


Example: ['a', 'b', 'so', 'l', 'ut', 'ely</w>', 'lov', 'ely</w>', 'movi', 'e', '!']


其中，_bpe 是核心函数，对单词进行贪心合并，直到无法再合并为止

encode 将文本按空格拆成单词，并对每个单词调用 _bpe，而 decode 只是演示用，将 token 列表简单拼接成字符串

输出示例可以看到单词被分解成子词 token，例如 Absolutely 可能被分成 ['A', 'b', 'so', 'lutely']等等


### Quick sanity checks


In [6]:

for s in ["I loved the movie", "The plot was weak", "Charming visuals and story"]:
    print(s, "->", tokenizer.encode(s))


I loved the movie -> ['i</w>', 'lov', 'e', 'd</w>', 'the</w>', 'movie</w>']
The plot was weak -> ['the</w>', 'p', 'lo', 't</w>', 'wa', 's</w>', 'w', 'ea', 'k</w>']
Charming visuals and story -> ['cha', 'rming</w>', 'vi', 'su', 'a', 'l', 's</w>', 'and</w>', 'st', 'or', 'y</w>']



## 3) Use BPE Tokens as Features for Sentiment
接着在一个小数据集上训练一个简单的 多项式朴素贝叶斯 分类器，使用 BPE Token 作为情感特征



In [7]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from sklearn.naive_bayes import MultinomialNB
from sklearn.feature_extraction.text import CountVectorizer

#小型带标签数据集（正面=1，负面=0
dataset = [
    ("i love this movie", 1),
    ("this film is great", 1),
    ("absolutely amazing visuals", 1),
    ("what a wonderful story", 1),
    ("i like the acting and music", 1),
    ("heartwarming and inspiring", 1),
    ("i enjoyed every minute", 1),
    ("a delightful experience", 1),

    ("this film is terrible", 0),
    ("i hate the ending", 0),
    ("boring and predictable plot", 0),
    ("waste of time", 0),
    ("annoying soundtrack", 0),
    ("characters were flat", 0),
    ("poor editing ruined it", 0),
    ("dull and weak writing", 0),
]

# 将文本和标签分别提取出来
texts = [t for t,_ in dataset]   # 文本列表
labels = [y for _,y in dataset]  # 标签列表

#使用 CountVectorizer 结合自定义 BPE 分词器
# 参数说明：
# - lowercase=True: 文本转为小写
# - tokenizer=tokenizer.encode: 使用之前定义的 BPE 分词器
# - preprocessor=None: 不进行额外预处理
vectorizer = CountVectorizer(lowercase=True, tokenizer=tokenizer.encode, preprocessor=None)

# 构建词频矩阵
X = vectorizer.fit_transform(texts)
y = labels

# 划分训练集和测试集
# test_size=0.35: 35% 数据作为测试集
# random_state=42: 保证可重复性
# stratify=y: 按标签比例分层采样
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.35, random_state=42, stratify=y)

# 训练多项式朴素贝叶斯分类器
clf = MultinomialNB()
clf.fit(X_train, y_train)

# 在测试集上进行预测
y_pred = clf.predict(X_test)

# 输出结果
print("Accuracy:", accuracy_score(y_test, y_pred))  # 准确率
print(classification_report(y_test, y_pred, digits=3))  # 精确率、召回率、F1 等详细指标


Accuracy: 0.3333333333333333
              precision    recall  f1-score   support

           0      0.333     0.333     0.333         3
           1      0.333     0.333     0.333         3

    accuracy                          0.333         6
   macro avg      0.333     0.333     0.333         6
weighted avg      0.333     0.333     0.333         6






### Try own sentences


In [8]:
from typing import List

# 定义一个函数，用于对新文本进行情感预测
def predict_sentiment(sentences: List[str]):
    # 将新文本转换为 BPE token 特征矩阵
    X_new = vectorizer.transform(sentences)

    # 使用训练好的朴素贝叶斯分类器进行预测
    preds = clf.predict(X_new)         # 预测类别（0=负面，1=正面）
    probs = clf.predict_proba(X_new)[:,1]  # 获取正面情感概率

    # 输出每条文本的预测结果和正面概率
    for s, p, pr in zip(sentences, preds, probs):
        sentiment = 'positive' if p == 1 else 'negative'
        print(f"{s!r} -> {sentiment} (p_pos={pr:.3f})")

# 测试预测函数
predict_sentiment([
    "i absolutely love this film",
    "the movie is dull and boring",
    "great pacing but weak ending",
    "not my type, but okay",
])


'i absolutely love this film' -> negative (p_pos=0.098)
'the movie is dull and boring' -> negative (p_pos=0.024)
'great pacing but weak ending' -> negative (p_pos=0.026)
'not my type, but okay' -> positive (p_pos=0.862)
