# [Tokenizers库](https://huggingface.co/learn/nlp-course/zh-CN/chapter6)
当我们需要微调模型时，我们需要使用与模型预训练相同的`tokenizer`

## 基于已有的 tokenizer 训练新的 tokenizer
大多数`Transformer 模型`使用`子词分词算法`。为了找到语料库中的常见子词，`tokenizer`需要深入统计语料库中的所有文本——这个过程我们称之为`训练（training）`。具体的训练规则取决于使用的`tokenizer`类型。
> 训练 tokenizer 与训练模型不同！模型训练使用随机梯度下降使每个 batch 的 loss 小一点。它本质上是随机的（这意味着在即使两次训练的参数和算法完全相同，你也必须设置一些随机数种子才能获得相同的结果）。训练 tokenizer 是一个统计过程，它试图确定哪些子词最适合为给定的语料库选择，确定的过程取决于分词算法。它是确定性的，这意味着在相同的语料库上使用相同的算法进行训练时，得到的结果总是相同的。

In [None]:
# step1 准备语料库
from datasets import load_dataset

# # 下载不下来数据的时候可以增加魔法
# import os
# os.environ["http_proxy"] = "http://127.0.0.1:port"
# os.environ["https_proxy"] = "http://127.0.0.1:port"

# # 加载数据集
raw_datasets = load_dataset("code_search_net", "python", trust_remote_code=True)
print(raw_datasets["train"])

In [None]:
# 我们仅使用 whole_func_string 列来训练我们的 tokenizer 
# 我们可以通过索引来查看其中一个函数的示例
print(raw_datasets["train"][123456]["whole_func_string"])


In [None]:
# 创建迭代器
# 将数据集转换为一个文本列表的iterator（迭代器），这样就可以以batch的方式进行训练了
# 使用迭代器可以不把所有内容都加载到内存中
# 通过 python 的 generator 来实现
## version 1 基于列表的 generator
def get_training_corpus():
    return (
        raw_datasets["train"][i : i + 1000]["whole_func_string"]
        for i in range(0, len(raw_datasets["train"]), 1000)
    )


training_corpus = get_training_corpus()
print(training_corpus)

## version 2 基于 yield 的 generator
def get_training_corpus_v2():
    dataset = raw_datasets["train"]
    for start_idx in range(0, len(dataset), 1000):
        samples = dataset[start_idx : start_idx + 1000]
        yield samples["whole_func_string"]
    
training_corpus_v2 = get_training_corpus_v2()
print(training_corpus_v2)

In [None]:
# step2 训练一个新的 tokenizer
from transformers import AutoTokenizer
# 使用旧的 tokenizer，训练一个新的 tokenizer
# 新的 tokenizer 将与 GPT-2 完全相同，唯一的区别是词汇表，这将由我们的语料库通过训练来重新确定。
old_tokenizer = AutoTokenizer.from_pretrained("gpt2")
example = '''def add_numbers(a, b):
    """Add the two numbers `a` and `b`."""
    return a + b'''

tokens = old_tokenizer.tokenize(example)
# 展示在旧的 tokenizer 下的 token
print(tokens)
# 使用code_search_net中的Python数据集，训练一个新的 tokenizer ，词汇表大小为52000
tokenizer = old_tokenizer.train_new_from_iterator(training_corpus, 52000)
tokens = tokenizer.tokenize(example)
print(tokens)
print(len(tokens))
print(len(old_tokenizer.tokenize(example)))

## Token-classification
`Token-classification`是自然语言处理（NLP）中的核心任务，旨在为文本中的每个 token（如单词、子词或字符）分配一个特定的标签。它的核心目标是通过对文本的细粒度分析，提取结构化信息或理解语言单元的语义角色，广泛应用于命名实体识别（NER）、分词、词性标注等场景。    


In [None]:
# 使用pipeline，默认基于 dbmdz/bert-large-cased-finetuned-conll03-english 模型对句子进行 NER
from transformers import pipeline

token_classifier = pipeline("token-classification")
print(token_classifier("My name is Sylvain and I work at Hugging Face in Brooklyn."))
# 可以使用将同一实体的 token 组合在一起
## aggregation_strategy 通过设置，可以设置组合后实体计算的策略
token_classifier = pipeline("token-classification", aggregation_strategy="simple")
print(token_classifier("My name is Sylvain and I work at Hugging Face in Brooklyn."))

In [None]:
# 自己构建pipeline
from transformers import AutoTokenizer, AutoModelForTokenClassification

model_checkpoint = "dbmdz/bert-large-cased-finetuned-conll03-english"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
model = AutoModelForTokenClassification.from_pretrained(model_checkpoint)

example = "My name is Sylvain and I work at Hugging Face in Brooklyn."
inputs = tokenizer(example, return_tensors="pt")
outputs = model(**inputs)
# 输入的token的维度
print(inputs["input_ids"].shape)
# 输出的维度
print(outputs.logits.shape)
# 打印索引到标签的映射
print(model.config.id2label)

import torch
# 使用 softmax 函数将这些 logits 转化为概率，并取 argmax 来得到预测
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)[0].tolist()
predictions = outputs.logits.argmax(dim=-1)[0].tolist()
print(predictions)

# 格式化输出 token 的得分和标签
results = []
# 获取偏移映射
inputs_with_offsets = tokenizer(example, return_offsets_mapping=True)
tokens = inputs_with_offsets.tokens()
offsets = inputs_with_offsets["offset_mapping"]

for idx, pred in enumerate(predictions):
    label = model.config.id2label[pred]
    if label != "O":
        start, end = offsets[idx]
        results.append(
            {
                "entity": label,
                "score": probabilities[idx][pred],
                "word": tokens[idx],
                "start": start,
                "end": end,
            }
        )

print(results)

# 格式化输出组合后 token 的得分和标签
import numpy as np

results = []
idx = 0
while idx < len(predictions):
    pred = predictions[idx]
    label = model.config.id2label[pred]
    if label != "O":
        # 删除 B- 或者 I-
        label = label[2:]
        start, _ = offsets[idx]

        # 获取所有标有 I 标签的token
        all_scores = []
        while (
            idx < len(predictions)
            and model.config.id2label[predictions[idx]] == f"I-{label}"
        ):
            all_scores.append(probabilities[idx][pred])
            _, end = offsets[idx]
            idx += 1

        # 分数是该分组实体中所有token分数的平均值
        score = np.mean(all_scores).item()
        word = example[start:end]
        results.append(
            {
                "entity_group": label,
                "score": score,
                "word": word,
                "start": start,
                "end": end,
            }
        )
    idx += 1

print(results)

## question-answering
question-answering pipeline 用于执行问答任务。具体来说，它用于从给定的文本中提取答案，基于一个预训练的模型。


In [None]:
from transformers import pipeline

question_answerer = pipeline("question-answering")
context = """
🤗 Transformers is backed by the three most popular deep learning libraries — Jax, PyTorch, and TensorFlow — with a seamless integration
between them. It's straightforward to train your models with one before loading them for inference with the other.
"""
long_context = """
🤗 Transformers: State of the Art NLP

🤗 Transformers provides thousands of pretrained models to perform tasks on texts such as classification, information extraction,
question answering, summarization, translation, text generation and more in over 100 languages.
Its aim is to make cutting-edge NLP easier to use for everyone.

🤗 Transformers provides APIs to quickly download and use those pretrained models on a given text, fine-tune them on your own datasets and
then share them with the community on our model hub. At the same time, each python module defining an architecture is fully standalone and
can be modified to enable quick research experiments.

Why should I use transformers?

1. Easy-to-use state-of-the-art models:
  - High performance on NLU and NLG tasks.
  - Low barrier to entry for educators and practitioners.
  - Few user-facing abstractions with just three classes to learn.
  - A unified API for using all our pretrained models.
  - Lower compute costs, smaller carbon footprint:

2. Researchers can share trained models instead of always retraining.
  - Practitioners can reduce compute time and production costs.
  - Dozens of architectures with over 10,000 pretrained models, some in more than 100 languages.

3. Choose the right framework for every part of a model's lifetime:
  - Train state-of-the-art models in 3 lines of code.
  - Move a single model between TF2.0/PyTorch frameworks at will.
  - Seamlessly pick the right framework for training, evaluation and production.

4. Easily customize a model or an example to your needs:
  - We provide examples for each architecture to reproduce the results published by its original authors.
  - Model internals are exposed as consistently as possible.
  - Model files can be used independently of the library for quick experiments.

🤗 Transformers is backed by the three most popular deep learning libraries — Jax, PyTorch and TensorFlow — with a seamless integration
between them. It's straightforward to train your models with one before loading them for inference with the other.
"""

question = "Which deep learning libraries back 🤗 Transformers?"
print(question_answerer(question=question, context=context))
# 长文本也能处理
print(question_answerer(question=question, context=long_context))

In [None]:
# 自己构建pipeline
from transformers import AutoTokenizer, AutoModelForQuestionAnswering

model_checkpoint = "distilbert-base-cased-distilled-squad"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
model = AutoModelForQuestionAnswering.from_pretrained(model_checkpoint)

# 输入格式是：[CLS] question [SEP] context [SEP] 
inputs = tokenizer(question, context, return_tensors="pt")
outputs = model(**inputs)
# 问答模型训练的目标是来预测答案开始的 token 的索引和答案结束的 token 的索引。
# 所以返回值是两个logits：一个对应于答案的开始 token 的 logits，另一个对应于答案的结束 token 的 logits。
start_logits = outputs.start_logits
end_logits = outputs.end_logits
print(start_logits.shape, end_logits.shape)

# 使用 softmax 函数将这些 logits 转化为概率，并取 argmax 来得到预测
"""
注意：
由于输入格式是 [CLS] question [SEP] context [SEP] ，所以我们需要屏蔽 question 的 tokens 以及 [SEP] token 。
不过，我们将保留 [CLS] ，因为某些模型使用它来表示答案不在上下文中。
"""
import torch

sequence_ids = inputs.sequence_ids()
# 屏蔽除 context 之外的所有内容
mask = [i != 1 for i in sequence_ids]
# 不屏蔽 [CLS] token
mask[0] = False
mask = torch.tensor(mask)[None]
print(mask)
# 将想要屏蔽的 logits 替换为一个大的负数
start_logits[mask] = -10000
end_logits[mask] = -10000
start_probabilities = torch.nn.functional.softmax(start_logits, dim=-1)[0]
end_probabilities = torch.nn.functional.softmax(end_logits, dim=-1)[0]
print(start_probabilities.shape, end_probabilities.shape)

# 在满足 start_index <= end_index 的前提下，计算每个可能的 start_index 和 end_index 的概率，
# 然后取概率最高的 (start_index, end_index) 元组。
# start_probabilities[start_index] × end_probabilities[end_index] 就是这个 (start_index, end_index) 元组的概率。
scores = start_probabilities[:, None] * end_probabilities[None, :]
print(scores.shape)
import numpy as np
# 获取上三角部分
scores = torch.triu(scores)

# argmax 将返回展平（flattened）后张量中的索引
max_index = scores.argmax().item()
start_index = max_index // scores.shape[1]
end_index = max_index % scores.shape[1]
print(start_index, end_index)
print(scores[start_index, end_index])

# 有了start_index 和 end_index，可以从 context 中提取答案
inputs_with_offsets = tokenizer(question, context, return_offsets_mapping=True)
offsets = inputs_with_offsets["offset_mapping"]

start_char, _ = offsets[start_index]
_, end_char = offsets[end_index]
answer = context[start_char:end_char]
# 格式化输出
result = {
    "answer": answer,
    "start": start_char,
    "end": end_char,
    "score": scores[start_index, end_index],
}
print(result)

In [None]:
# 处理长文本
"""
question-answering pipeline 
通过设置max_length，限制tokens数量的最大值为384
当超过这个限制时，我们需要将长文本分割成多个小段，然后在小段中寻找答案
为了防止答案被分割，我们需要在各块之间重叠一些内容

通过使用以下参数实现：
1. return_overflowing_tokens=True，当输入文本长度超过模型支持的最大长度时，
   该参数会将文本分割成多个子片段（chunks），并返回所有溢出片段。
2. stride 定义分割后的子片段之间的重叠token数量。
"""
inputs = tokenizer(
    question,
    long_context,
    stride=128,
    max_length=384,
    padding="longest",
    truncation="only_second",
    return_overflowing_tokens=True,
    return_offsets_mapping=True,
)
_ = inputs.pop("overflow_to_sample_mapping")
offsets = inputs.pop("offset_mapping")

inputs = inputs.convert_to_tensors("pt")
print(inputs["input_ids"].shape)
# sequence_ids 用于标识输入中不同句子的边界（常见于多序列输入场景）。
# 例如在问答任务中，输入由问题（标记为0）和上下文（标记为1）组成
# 参数: batch_index (int, optional, defaults to 0) — The index to access in the batch.
# 不指定 batch_index 时，默认返回第一个 batch 的 sequence_ids
sequence_ids = inputs.sequence_ids()
print(sequence_ids)
print(len(sequence_ids))

outputs = model(**inputs)
start_logits = outputs.start_logits
end_logits = outputs.end_logits
print(start_logits.shape, end_logits.shape)

# 屏蔽 context tokens 之外的所有内容
mask = [ i != 1 for i in sequence_ids]
# 取消对 [CLS] token 的屏蔽
mask[0] = False
# 屏蔽所有的 [PAD] tokens
mask = torch.logical_or(torch.tensor(mask)[None], (inputs["attention_mask"] == 0))

start_logits[mask] = -10000
end_logits[mask] = -10000

start_probabilities = torch.nn.functional.softmax(start_logits, dim=-1)
end_probabilities = torch.nn.functional.softmax(end_logits, dim=-1)
print(start_probabilities.shape, end_probabilities.shape)

candidates = []
for start_probs, end_probs in zip(start_probabilities, end_probabilities):
    scores = start_probs[:, None] * end_probs[None, :]
    idx = torch.triu(scores).argmax().item()

    start_idx = idx // scores.shape[1]
    end_idx = idx % scores.shape[1]
    score = scores[start_idx, end_idx].item()
    candidates.append((start_idx, end_idx, score))

print(candidates)

for candidate, offset in zip(candidates, offsets):
    start_token, end_token, score = candidate
    start_char, _ = offset[start_token]
    _, end_char = offset[end_token]
    answer = long_context[start_char:end_char]
    result = {"answer": answer, "start": start_char, "end": end_char, "score": score}
    print(result)

## 标准化和预分词
tokenization 的步骤:
1. 标准化（任何认为必要的文本清理，例如删除空格或重音符号、Unicode 规范化等）
2. 预分词（将输入拆分为单词）
3. 通过模型处理输入（使用预先拆分的词来生成一系列 tokens ）
4. 后处理（添加 tokenizer 的特殊 tokens 生成注意力掩码和 token 类型 ID）


In [None]:
from transformers import AutoTokenizer

# BERT tokenizer 基于WordPiece
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
print(type(tokenizer.backend_tokenizer))
# 通过 normalize_str 方法来规范化字符串
print(tokenizer.backend_tokenizer.normalizer.normalize_str("Héllò hôw are ü?"))

# pre_tokenize_str() 方法将字符串拆分为单词
tokenizer.backend_tokenizer.pre_tokenizer.pre_tokenize_str("Hello, how are  you?")

In [None]:
# 不同的分词器，分词规则存在差异，比如上面 BERT tokenizer 会将两个空格看做一个空格，
# 而 GPT-2 tokenizer 不会
# GPT-2 tokenizer 基于 Byte-Pair Encoding (BPE)
tokenizer = AutoTokenizer.from_pretrained("gpt2")
tokenizer.backend_tokenizer.pre_tokenizer.pre_tokenize_str("Hello, how are  you?")

In [None]:
# 基于 SentencePiece 算法的 T5 tokenizer
tokenizer = AutoTokenizer.from_pretrained("t5-small")
tokenizer.backend_tokenizer.pre_tokenizer.pre_tokenize_str("Hello, how are  you?")

## BPE tokenization 算法
BPE（Byte Pair Encoding，字节对编码） 是一种基于统计的子词分词算法。其核心思想是通过逐步合并高频字符对，将文本拆分为更小的子词单元（subword units），从而平衡词汇表大小与模型对罕见词的处理能力。
1. **从字符到子词**：BPE 从字符级别开始，逐步合并出现频率最高的相邻字符对，形成更长的子词单元，直到达到预设的词汇表大小。
2. **高频优先**：高频的字符组合（如英文中的 "ing"、"ed"，中文中的常见字对）会被优先合并，形成有语义意义的子词。

In [None]:
# 手动实现 BPE 算法
# 1. 提供一个简单的语料库
corpus = [
    "This is the Hugging Face Course.",
    "This chapter is about tokenization.",
    "This section shows several tokenizer algorithms.",
    "Hopefully, you will be able to understand how they are trained and generate tokens.",
]

# 2. 加载 gpt2 分词器
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("gpt2")

# 3. 使用gpt2分词器进行预分词，并计算语料库中每个单词的频率
from collections import defaultdict
word_freqs = defaultdict(int)
for text in corpus:
    words_with_offsets = tokenizer.backend_tokenizer.pre_tokenizer.pre_tokenize_str(text)
    new_words = [word for word, offset in words_with_offsets]
    for word in new_words:
        word_freqs[word] += 1
print(word_freqs)

# 4. 产出基础词汇表（字符串->字符）
alphabet = []
for word in word_freqs.keys():
    for letter in word:
        if letter not in alphabet:
            alphabet.append(letter)
alphabet.sort()
print(alphabet)

# 5. 词汇表开头增加一个特殊标识
vocab = ["<|endoftext|>"] + alphabet.copy()
print(vocab)

# 6. 将每个单词拆分成单独的字符
splits = {word: [c for c in word] for word in word_freqs.keys()}
print(splits)

# 7. 涉及一个 统计每对字符的频率 的函数
def compute_pair_freqs(splits):
    pair_freqs = defaultdict(int)
    for word, freq in word_freqs.items():
        split = splits[word]
        if len(split) == 1:
            # 单个字符不需要合并
            continue
        for i in range(len(split) - 1):
            pair = (split[i], split[i + 1])
            pair_freqs[pair] += freq
    return pair_freqs

# 8. 合并字符对，并加入词汇表
def merge_pair(a, b, splits):
    for word in word_freqs:
        split = splits[word]
        if len(split) == 1:
            continue
        i = 0
        while i < len(split) - 1:
            if split[i] == a and split[i + 1] == b:
                split = split[:i] + [a + b] + split[i + 2 :]
            else:
                i += 1
        splits[word] = split
    return splits

"""
将 7、8 放到循环内，便可以获取到所有想要的合并了
"""
# 设定词汇表的大小
vocab_size = 50
merges = dict()
while len(vocab) < vocab_size:
    # 生成 字符对 和 频率
    pair_freqs = compute_pair_freqs(splits)
    # 找出频率最高的字符对
    best_pair = ""
    max_freq = None
    for pair, freq in pair_freqs.items():
        if max_freq is None or max_freq < freq:
            best_pair = pair
            max_freq = freq
    splits = merge_pair(*best_pair, splits)
    merges[best_pair] = best_pair[0] + best_pair[1]
    vocab.append(best_pair[0] + best_pair[1])
print(merges)
print(vocab)


In [None]:
# 测试上面生成的分词器
def tokenize(text):
    pre_tokenize_result = tokenizer._tokenizer.pre_tokenizer.pre_tokenize_str(text)
    pre_tokenized_text = [word for word, offset in pre_tokenize_result]
    splits = [[l for l in word] for word in pre_tokenized_text]
    for pair, merge in merges.items():
        for idx, split in enumerate(splits):
            i = 0
            while i < len(split) - 1:
                if split[i] == pair[0] and split[i + 1] == pair[1]:
                    split = split[:i] + [merge] + split[i + 2 :]
                else:
                    i += 1
            splits[idx] = split
    return sum(splits, [])

tokenize("This is not a token.")

## WordPiece tokenization 算法
WordPiece 是另一种基于子词的分词算法，由 Google 提出并广泛应用于 BERT 等预训练模型。与 BPE（Byte Pair Encoding）类似，WordPiece 通过将文本分解为更小的子词单元来解决未登录词（OOV）问题，但它在合并策略和训练目标上与 BPE 有显著差异。
- 核心思想：通过最大化语言模型的似然概率，选择合并后能最大程度提升句子概率的字符对。（与 BPE 的“高频优先”合并不同，WordPiece 的合并策略更关注子词组合对整体语义的贡献。）

计算合并候选对的得分公式：    
$$
score=(freq\_of\_pair)/(freq\_of\_first\_element \times freq\_of\_second\_element)
$$
通过将两部分合在一起的频率除以其中各部分的频率的乘积，该算法优先合并那些在词汇表中单独出现频率较低的对。得分越高，说明合并后对模型越有利。  
      
注：WordPiece 和 BPE 的分词方式有所不同，WordPiece 只保存最终词汇表，而不保存学习到的合并规则。



In [None]:
# 手动实现 WordPiece 算法
corpus = [
    "This is the Hugging Face Course.",
    "This chapter is about tokenization.",
    "This section shows several tokenizer algorithms.",
    "Hopefully, you will be able to understand how they are trained and generate tokens.",
]

# 加载用于预分词的分词器
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

# 统计语料库中每个单词出现的频率
from collections import defaultdict
word_freqs = defaultdict(int)
for text in corpus:
    words_with_offsets = tokenizer.backend_tokenizer.pre_tokenizer.pre_tokenize_str(text)
    new_words = [word for word, offset in words_with_offsets]
    for word in new_words:
        word_freqs[word] += 1
print(word_freqs)

# 生成字母表，注意除了单词的第一个字母外，其他字符前面都得加上前缀“##”
alphabet = []
for word in word_freqs.keys():
    if word[0] not in alphabet:
        alphabet.append(word[0])
    for letter in word[1:]:
        if f"##{letter}" not in alphabet:
            alphabet.append(f"##{letter}")
alphabet.sort()
print(alphabet)

# 在词汇表的开头添加了特殊 tokens
vocab = ["[PAD]", "[UNK]", "[CLS]", "[SEP]", "[MASK]"] + alphabet.copy()
print(vocab)

# 将每个单词进行分割，除了第一个字母外，其他字母都需要以 "##" 为前缀
splits = {
    word: [c if i == 0 else f"##{c}" for i, c in enumerate(word)]
    for word in word_freqs.keys()
}
print(splits)

# 计算每对的分数
def compute_pair_scores(splits):
    letter_freqs = defaultdict(int)
    pair_freqs = defaultdict(int)
    for word, freq in word_freqs.items():
        split = splits[word]
        if len(split) == 1:
            letter_freqs[split[0]] += freq
            continue
        for i in range(len(split) - 1):
            pair = (split[i], split[i + 1])
            letter_freqs[split[i]] += freq
            pair_freqs[pair] += freq
        letter_freqs[split[-1]] += freq

    scores = {
        pair: freq / (letter_freqs[pair[0]] * letter_freqs[pair[1]])
        for pair, freq in pair_freqs.items()
    }
    return scores

# 对 splits 字典进行合并
def merge_pair(a, b, splits):
    for word in word_freqs:
        split = splits[word]
        if len(split) == 1:
            continue
        i = 0
        while i < len(split) - 1:
            if split[i] == a and split[i + 1] == b:
                merge = a + b[2:] if b.startswith("##") else a + b
                split = split[:i] + [merge] + split[i + 2 :]
            else:
                i += 1
        splits[word] = split
    return splits

# 定词汇表的大小为 70
vocab_size = 70
while len(vocab) < vocab_size:
    scores = compute_pair_scores(splits)
    best_pair, max_score = "", None
    for pair, score in scores.items():
        if max_score is None or max_score < score:
            best_pair = pair
            max_score = score
    splits = merge_pair(*best_pair, splits)
    new_token = (
        best_pair[0] + best_pair[1][2:]
        if best_pair[1].startswith("##")
        else best_pair[0] + best_pair[1]
    )
    vocab.append(new_token)
print(vocab)

In [None]:
# 要对新文本进行分词，我们先预分词，再进行分割，然后在每个词上使用分词算法。也就是说，
# 我们寻找从第一个词开始的最大子词并将其分割，然后我们对第二部分重复此过程，以此类推，
# 对该词以及文本中的后续词进行分割
def encode_word(word):
    """
    对输入的单词进行分词
    """
    tokens = []
    while len(word) > 0:
        i = len(word)
        while i > 0 and word[:i] not in vocab:
            i -= 1
        if i == 0:
            return ["[UNK]"]
        tokens.append(word[:i])
        word = word[i:]
        if len(word) > 0:
            word = f"##{word}"
    return tokens

print(encode_word("Hugging"))
print(encode_word("HOgging"))

# 对文本进行分词
def tokenize(text):
    # 预分词
    pre_tokenize_result = tokenizer._tokenizer.pre_tokenizer.pre_tokenize_str(text)
    pre_tokenized_text = [word for word, offset in pre_tokenize_result]
    encoded_words = [encode_word(word) for word in pre_tokenized_text]
    return sum(encoded_words, [])
print(tokenize("This is the Hugging Face course!"))


## Unigram tokenization 算法
Unigram 分词算法 是一种基于概率模型的子词分词方法，与 BPE 和 WordPiece 不同，它通过**最大化句子概率**来确定最优分词方式。Unigram 假设每个子词的出现是独立的，并通过迭代优化词汇表和子词概率，适用于灵活处理多语言和复杂文本场景。   
- 概率驱动：每个子词有一个概率值，句子的分词概率是其所有子词概率的乘积。Unigram 的目标是找到使句子概率最大的分词方式。
- 动态词汇表：从一个大词汇表开始，逐步删除对整体似然贡献最小的子词，最终得到目标大小的词汇表。

In [None]:
# 手动实现 Unigram tokenization 算法
corpus = [
    "This is the Hugging Face Course.",
    "This chapter is about tokenization.",
    "This section shows several tokenizer algorithms.",
    "Hopefully, you will be able to understand how they are trained and generate tokens.",
]

from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("xlnet-base-cased")

# 统计单词的频率
from collections import defaultdict
word_freqs = defaultdict(int)
for text in corpus:
    words_with_offsets = tokenizer.backend_tokenizer.pre_tokenizer.pre_tokenize_str(text)
    new_words = [word for word, offset in words_with_offsets]
    for word in new_words:
        word_freqs[word] += 1
print(word_freqs)

# 统计字符的频率
char_freqs = defaultdict(int)
# 生成每个单词的全部子串，并统计子串的频率
subwords_freqs = defaultdict(int)
for word, freq in word_freqs.items():
    for i in range(len(word)):
        char_freqs[word[i]] += freq
        # 循环遍历长度至少为2的子字
        for j in range(i + 2, len(word) + 1):
            subwords_freqs[word[i:j]] += freq
# 按频率对子词排序
sorted_subwords = sorted(subwords_freqs.items(), key=lambda x: x[1], reverse=True)
print(char_freqs)
print(sorted_subwords[:20])

# 将频率靠前的子词 和 单字符 融合在一起，得到一个大小为300的初始词汇表
token_freqs = list(char_freqs.items()) + sorted_subwords[: 300 - len(char_freqs)]
token_freqs = {token: freq for token, freq in token_freqs}
print(list(token_freqs.items())[:10])

# 计算所有频率的总和，将频率转化为概率
# 相较于小数相乘，对数相加在数值上更稳定，而且这将简化模型损失的计算
from math import log
total_sum = sum([freq for token, freq in token_freqs.items()])
print(total_sum)
model = {token: -log(freq / total_sum) for token, freq in token_freqs.items()}
print(list(model.items())[:10])

def encode_word(word, model):
    # 为词的每一个位置（从 0 开始，一直到词的总长度）都保存一个字典，
    # 记录每个位置的最佳分词起点和累计得分。
    best_segmentations = [{"start": 0, "score": 1}] + [
        {"start": None, "score": None} for _ in range(len(word))
    ]
    # 填充动态规划表
    for start_idx in range(len(word)):
        # 遍历所有可能的起始位置 start_idx 
        # best_score_at_start应该由循环的前面的步骤计算和填充
        best_score_at_start = best_segmentations[start_idx]["score"]
        for end_idx in range(start_idx + 1, len(word) + 1):
            # 遍历所有可能的结束位置 end_idx，生成子词 token。
            token = word[start_idx:end_idx]
            if token in model and best_score_at_start is not None:
                # 如果子词 token 存在于 model 中，则计算当前分段的累计得
                # 分（model[token] + 当前起点的得分）。
                score = model[token] + best_score_at_start
                # 如果我们发现以 end_idx 结尾的更好分段,我们会更新
                # score 是 token 的概率的负对数（-log(probability)），因此分数越小表示概率越大。
                if (
                    best_segmentations[end_idx]["score"] is None
                    or best_segmentations[end_idx]["score"] > score
                ):
                    # 保留得分更高的分词路径
                    best_segmentations[end_idx] = {"start": start_idx, "score": score}
    # 回溯最优路径
    segmentation = best_segmentations[-1]
    if segmentation["score"] is None:
        # 我们没有找到单词的 tokens  -> unknown
        return ["<unk>"], None
    score = segmentation["score"]
    start = segmentation["start"]
    # 从单词末尾（len(word)）开始，通过 start 字段逐步回溯到开头，提取所有子词
    end = len(word)
    tokens = []
    while start != 0:
        # 将子词按顺序插入单词表头部，确保最终分词顺序正确
        tokens.insert(0, word[start:end])
        next_start = best_segmentations[start]["start"]
        end = start
        start = next_start
    tokens.insert(0, word[start:end])
    return tokens, score

print(encode_word("Hopefully", model))
print(encode_word("This", model))

def compute_loss(model):
    loss = 0
    for word, freq in word_freqs.items():
        _, word_loss = encode_word(word, model)
        loss += freq * word_loss
    return loss

import copy
def compute_scores(model):
    scores = {}
    model_loss = compute_loss(model)
    for token, score in model.items():
        # 我们将保留长度为 1 的 tokens
        if len(token) == 1:
            continue
        model_without_token = copy.deepcopy(model)
        _ = model_without_token.pop(token)
        scores[token] = compute_loss(model_without_token) - model_loss
    return scores

# 设置词汇表最终大小为100
percent_to_remove = 0.1
while len(model) > 100:
    scores = compute_scores(model)
    sorted_scores = sorted(scores.items(), key=lambda x: x[1])
    # 删除分数最低的percent_to_remov tokens
    for i in range(int(len(model) * percent_to_remove)):
        _ = token_freqs.pop(sorted_scores[i][0])
    total_sum = sum([freq for token, freq in token_freqs.items()])
    model = {token: -log(freq / total_sum) for token, freq in token_freqs.items()}

# 对新文本进行 tokenization
def tokenize(text, model):
    words_with_offsets = tokenizer.backend_tokenizer.pre_tokenizer.pre_tokenize_str(text)
    pre_tokenized_text = [word for word, offset in words_with_offsets]
    encoded_words = [encode_word(word, model)[0] for word in pre_tokenized_text]
    return sum(encoded_words, [])


print(tokenize("This is the Hugging Face course.", model))

## 模块化构建 tokenizer
`Tokenizers`库旨在为 tokenization 每个步骤提供多个选项，你可以任意搭配这些选项。接下来我们将会从零开始构建`tokenizer`（不是基于旧的`tokenizer`训练），

In [None]:
# 获取语料库
from datasets import load_dataset
dataset = load_dataset("wikitext", name="wikitext-2-raw-v1", split="train")
def get_training_corpus():
    for i in range(0, len(dataset), 1000):
        yield dataset[i : i + 1000]["text"]

# 离线保存在本地
with open("wikitext-2.txt", "w", encoding="utf-8") as f:
    for i in range(len(dataset)):
        f.write(dataset[i]["text"] + "\n")

from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)

# 创建一个使用 WordPiece 模型的 Tokenizer 
tokenizer = Tokenizer(models.WordPiece(unk_token="[UNK]"))

# step1 标准化
tokenizer.normalizer = normalizers.Sequence(
    [normalizers.NFD(), normalizers.Lowercase(), normalizers.StripAccents()]
)
print(tokenizer.normalizer.normalize_str("Héllò hôw are ü?"))

# step2 预分词
# Whitespace 会使用空格和所有不是字母、数字或下划线的字符进行分割
tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()
print(tokenizer.pre_tokenizer.pre_tokenize_str("Let's test my pre-tokenizer."))
# WhitespaceSplit 只使用空格进行分割
pre_tokenizer = pre_tokenizers.WhitespaceSplit()
print(pre_tokenizer.pre_tokenize_str("Let's test my pre-tokenizer."))
# 和 normalizer 一样，也可以使用 Sequence 来组合几个预分词的步骤
pre_tokenizer = pre_tokenizers.Sequence(
    [pre_tokenizers.WhitespaceSplit(), pre_tokenizers.Punctuation()]
)
print(pre_tokenizer.pre_tokenize_str("Let's test my pre-tokenizer."))

# step3 通过模型处理输入
special_tokens = ["[UNK]", "[PAD]", "[CLS]", "[SEP]", "[MASK]"]
trainer = trainers.WordPieceTrainer(vocab_size=25000, special_tokens=special_tokens)
tokenizer.train_from_iterator(get_training_corpus(), trainer=trainer)
# # 如果基于本地文件的话可以按下面的方式训练模型
# tokenizer.model = models.WordPiece(unk_token="[UNK]")
# tokenizer.train(["wikitext-2.txt"], trainer=trainer)
encoding = tokenizer.encode("Let's test this tokenizer.")
print(encoding.tokens)

# step4 后处理
cls_token_id = tokenizer.token_to_id("[CLS]")
sep_token_id = tokenizer.token_to_id("[SEP]")
print(cls_token_id, sep_token_id)
tokenizer.post_processor = processors.TemplateProcessing(
    single=f"[CLS]:0 $A:0 [SEP]:0",
    pair=f"[CLS]:0 $A:0 [SEP]:0 $B:1 [SEP]:1",
    special_tokens=[("[CLS]", cls_token_id), ("[SEP]", sep_token_id)],
)
encoding = tokenizer.encode("Let's test this tokenizer.")
print(encoding.tokens)
encoding = tokenizer.encode("Let's test this tokenizer...", "on a pair of sentences.")
print(encoding.tokens)
print(encoding.type_ids)

# step5 指定一个解码器
tokenizer.decoder = decoders.WordPiece(prefix="##")
print(tokenizer.decode(encoding.ids))
