In [4]:
chr(0)

'\x00'

In [5]:
print(chr(0))

 


In [6]:
chr(0).__repr__()

"'\\x00'"

In [7]:
'this is a test' + chr(0) + 'string'

'this is a test\x00string'

In [8]:
print('this is a test' + chr(0) + 'string')

this is a test string


In [4]:
test_string = "hello! こんにちは!"
utf8_encoded = test_string.encode("utf-8")
print(utf8_encoded)
print(utf8_encoded.decode("utf-8"))
print(type(utf8_encoded))

b'hello! \xe3\x81\x93\xe3\x82\x93\xe3\x81\xab\xe3\x81\xa1\xe3\x81\xaf!'
hello! こんにちは!
<class 'bytes'>


In [7]:
for i in range(len(test_string)):
    print(test_string[i], utf8_encoded[i])
    
print(list(utf8_encoded))

h 104
e 101
l 108
l 108
o 111
! 33
  32
こ 227
ん 129
に 147
ち 227
は 130
! 147
[104, 101, 108, 108, 111, 33, 32, 227, 129, 147, 227, 130, 147, 227, 129, 171, 227, 129, 161, 227, 129, 175, 33]


In [24]:
print(len(test_string))
print(len(utf8_encoded))

13
23


In [103]:
max([("A", "B"), ("A", "C"), ("B", "ZZ"), ("BA", "A")])

('BA', 'A')

In [None]:
import regex as re
PAT = r"""'(?:[sdmt]|ll|ve|re)| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+"""
re.findall(PAT, "some text that i'll pre-tokenize")

['some', ' text', ' that', ' i', "'ll", ' pre', '-', 'tokenize']

In [None]:
# 下载数据并保存
from datasets import load_dataset
ds = load_dataset("roneneldan/TinyStories")
ds.save_to_disk('../data/TinyStories')

In [None]:
# 加载数据
from datasets import load_from_disk
dataset = load_from_disk('../data/TinyStories')

In [16]:
dataset

DatasetDict({
    train: Dataset({
        features: ['text'],
        num_rows: 2119719
    })
    validation: Dataset({
        features: ['text'],
        num_rows: 21990
    })
})

In [9]:
import os 
from abc import ABC
import regex as re
from collections import defaultdict

class Tokenizer(ABC):
    """分词器的抽象类，规范了分词器必须要有的方法: `encode()`、`decode()`"""
    def encode(self, string: str) -> list[int]:
        raise NotImplementedError

    def decode(self, indices: list[int]) -> str:
        raise NotImplementedError

class BPETokenizerParams:
    """定义一个 BPETokenizer 所需的全部内容。
    意思是有了这些参数：`vocab`、`merges`，就能构建一个 BPE 分词器。"""
    vocab: dict[int, bytes]     # index -> bytes
    merges: dict[tuple[int, int], int]  # index1,index2 -> new_index

def merge(indices: list[int], pair: tuple[int, int], new_index: int) -> list[int]:
    """遍历所给的 `indices` 列表, 主要作用就是更新索引序列（或者说 Token 序列），
    把其中出现的指定 token 对 pair 生成一个新的 token"""
    new_indices = []
    i = 0
    while i < len(indices):
        # i + 1 < len(indices) 是用来保证 i 指向的是列表中的第二个 index
        # indices[i] == pair[0] and indices[i + 1] == pair[1] ：指定的 token 对 pair
        if i + 1 < len(indices) and indices[i] == pair[0] and indices[i + 1] == pair[1]:
            new_indices.append(new_index)
            i += 2
        else:
            # 没有被指定 pair 对的时候，将原来indices中的indice直接添加到new_indices
            new_indices.append(indices[i])  
            i += 1
    return new_indices


In [22]:
# merge() 举一个例子
indices = [1, 2, 3, 2, 3, 4, 2, 3]
pair = (2, 3)
new_index = 99

result = merge(indices, pair, new_index)
print(result)  # [1, 99, 99, 4]

[1, 99, 99, 4, 99]


In [None]:
class BPETokenizer(Tokenizer):
    """定义一个 BPETokenizer 类，继承了 Tokenizer 类，定义了两个方法：`encode()`, `decode()`"""
    def __init__(self, params: BPETokenizerParams):
        self.params = params
        
    def encode(self, string: str) -> list[int]:
        indices = list(map(int, string.encode("utf-8")))
        # Note: this is a very slow implementation
        for pair, new_index in self.params.merges.items():
            indices = merge(indices, pair, new_index)
        return indices
    
    def decode(self, indices: list[int]) -> str:
        bytes_list = list(map(self.params.vocab.get, indices))
        string = b"".join(bytes_list).decode("utf-8")
        return string

In [None]:
def train_bpe(string: str, num_merges: int) -> BPETokenizerParams:
    '''训练 bpe 分词器，
    string: 输入一段字符串
    num_merges: 指定进行几次合并
    返回类型是 BPETokenizerParams '''
    
    # 把字符串编码成 UTF-8 字节序列，再转成整型列表。
    # map() 的作用是：将 string.encode("utf-8")  的结果转换成整数类型
    indices = list(map(int, string.encode("utf-8")))
    
    # merges 定义数据类型是 dict[tuple[int, int], int]，记录的是每次 merge 对应的两个
    merges: dict[tuple[int, int], int] = {}
    
    # 词表 vocab 定义数据类型是 dict[int, bytes]，这里首先初始化了词表
    vocab: dict[int, bytes] = {x: bytes([x]) for x in range(256)}
    
    for i in range(num_merges):
        
        # 统计每一对 token 出现的次数
        # defaultdict(int) 的作用是加入字典的键不存在时，自动创建并计数为0
        counts = defaultdict(int)
        
        # 遍历生成相邻两个 token 的组合（index1, index2）
        for index1, index2 in zip(indices, indices[1:]):
            counts[(index1, index2)] += 1
        
        # 找到 counts 字典中，值最大的键。这里：如果有多个最大值，返回字典顺序下的第一个。
        pair = max(counts, key=counts.get)
        index1, index2 = pair
        
        # merges 更新
        new_index = 256 + i  # i 是从 0 开始的
        merges[pair] = new_index
        
        # 词表更新。两个字节类型的元素相加：不是数值相加，是两个字节拼接到一起
        vocab[new_index] = vocab[index1] + vocab[index2]
        
        # 更新索引序列
        indices = merge(indices, pair, new_index)
        
    return BPETokenizerParams(vocab=vocab, merges=merges)

In [None]:
def run_train_bpe(
    input_path: str | os.PathLike,
    vocab_size: int,
    special_tokens: list[str],
    **kwargs,
) -> tuple[dict[int, bytes], list[tuple[bytes, bytes]]]:
    """Given the path to an input corpus, run train a BPE tokenizer and
    output its vocabulary and merges.

    Args:
        input_path (str | os.PathLike): Path to BPE tokenizer training data.
        vocab_size (int): Total number of items in the tokenizer's vocabulary (including special tokens).
        special_tokens (list[str]): A list of string special tokens to be added to the tokenizer vocabulary.
            These strings will never be split into multiple tokens, and will always be
            kept as a single token. If these special tokens occur in the `input_path`,
            they are treated as any other string.

    Returns:
        tuple[dict[int, bytes], list[tuple[bytes, bytes]]]:
            vocab:
                The trained tokenizer vocabulary, a mapping from int (token ID in the vocabulary)
                to bytes (token bytes)
            merges:
                BPE merges. Each list item is a tuple of bytes (<token1>, <token2>),
                representing that <token1> was merged with <token2>.
                Merges are ordered by order of creation.
    """
    
    # 1. 词表初始化: 256个基础词、特殊 tokens
    # 词表 vocab 定义数据类型是 dict[int, bytes]，这里首先初始化了词表
    vocab: dict[int, bytes] = {x: bytes([x]) for x in range(256)}  # 256个基础词，0-255
    next_token_id = 256
    for special_token in special_tokens:  # 特殊 tokens 
        # 注意这里所给的特殊 tokens 为 str 格式，vocab 中的是字节形式的
        vocab[next_token_id] = special_token.encode("utf-8")
        next_token_id += 1
    
    # 2. 预分词
    # 读取数据
    with open(input_path, "r", encoding="utf-8") as f:
        content = f.read()
    
    # 预分词规则：gpt2 的分词规则
    PAT = r"""'(?:[sdmt]|ll|ve|re)| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+"""
    pre_token_matches = re.findall(PAT, content)
    
    # 统计出现的次数
    pre_counts = defaultdict(int)
    for match in pre_token_matches:
        
    
    
    
    # 3. BPE 合并
    
    
    
    raise NotImplementedError

In [None]:
import re

text = "Hello 123, world 456 " *100

# 匹配单词或数字
PAT = r"""'(?:[sdmt]|ll|ve|re)| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+"""

pre_token_matches = re.findall(PAT, "Hello 123, world 456 ")

error: bad escape \p at position 23

In [None]:
# 使用 re.finditer
matches = re.finditer(PAT, text)

for match in matches:
    print(match.group(), 
          match.start(), 
          match.end())

In [21]:

import regex as re
PAT = r"""'(?:[sdmt]|ll|ve|re)| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+"""
re.findall(PAT, "Hello 123, world 456 " * 100)



['Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world',
 ' 456',
 ' Hello',
 ' 123',
 ',',
 ' world'

In [7]:
import re
import sys

# 假设这是我们的"语料"
text = "hello world! " * 1000

pattern = r"\w+"  # 匹配单词

# ❌ 方法1：findall（一次性把所有结果放到列表里）
tokens_findall = re.findall(pattern, text)
print("findall 结果数量:", len(tokens_findall))
print("findall 占用内存大概:", sys.getsizeof(tokens_findall), "字节")

# ✅ 方法2：finditer（生成器，边匹配边用）
tokens_finditer = re.finditer(pattern, text)
count = 0
for match in tokens_finditer:
    count += 1
print("finditer 结果数量:", count)
print("finditer 占用内存大概:", sys.getsizeof(tokens_finditer), "字节")


findall 结果数量: 2000
findall 占用内存大概: 16184 字节
finditer 结果数量: 2000
finditer 占用内存大概: 48 字节


In [5]:
tokens_finditer

<callable_iterator at 0x1ddf01f5330>

In [68]:
special_tokens = ['<|endoftext|>']

vocab: dict[int, bytes] = {x: bytes([x]) for x in range(256)}  # 256个基础词，0-255
next_token_id = 256
for special_token in special_tokens:  # 特殊 tokens 
    # 注意这里所给的特殊 tokens 为 str 格式，vocab 中的是字节形式的
    vocab[next_token_id] = special_token.encode("utf-8")
    next_token_id += 1

vocab

{0: b'\x00',
 1: b'\x01',
 2: b'\x02',
 3: b'\x03',
 4: b'\x04',
 5: b'\x05',
 6: b'\x06',
 7: b'\x07',
 8: b'\x08',
 9: b'\t',
 10: b'\n',
 11: b'\x0b',
 12: b'\x0c',
 13: b'\r',
 14: b'\x0e',
 15: b'\x0f',
 16: b'\x10',
 17: b'\x11',
 18: b'\x12',
 19: b'\x13',
 20: b'\x14',
 21: b'\x15',
 22: b'\x16',
 23: b'\x17',
 24: b'\x18',
 25: b'\x19',
 26: b'\x1a',
 27: b'\x1b',
 28: b'\x1c',
 29: b'\x1d',
 30: b'\x1e',
 31: b'\x1f',
 32: b' ',
 33: b'!',
 34: b'"',
 35: b'#',
 36: b'$',
 37: b'%',
 38: b'&',
 39: b"'",
 40: b'(',
 41: b')',
 42: b'*',
 43: b'+',
 44: b',',
 45: b'-',
 46: b'.',
 47: b'/',
 48: b'0',
 49: b'1',
 50: b'2',
 51: b'3',
 52: b'4',
 53: b'5',
 54: b'6',
 55: b'7',
 56: b'8',
 57: b'9',
 58: b':',
 59: b';',
 60: b'<',
 61: b'=',
 62: b'>',
 63: b'?',
 64: b'@',
 65: b'A',
 66: b'B',
 67: b'C',
 68: b'D',
 69: b'E',
 70: b'F',
 71: b'G',
 72: b'H',
 73: b'I',
 74: b'J',
 75: b'K',
 76: b'L',
 77: b'M',
 78: b'N',
 79: b'O',
 80: b'P',
 81: b'Q',
 82: b'R',
 83: b'

In [53]:
# 加载数据
from datasets import load_from_disk
dataset = load_from_disk('../data/TinyStories')

for i in range(10):
    print(dataset['validation'][i])

{'text': 'Spot. Spot saw the shiny car and said, "Wow, Kitty, your car is so bright and clean!" Kitty smiled and replied, "Thank you, Spot. I polish it every day."\n\nAfter playing with the car, Kitty and Spot felt thirsty. They found a small pond with clear water. They drank the water and felt very happy. They played together all day and became best friends.'}
{'text': 'Once upon a time, in a big forest, there lived a rhinoceros named Roxy. Roxy loved to climb. She climbed trees, rocks, and hills. One day, Roxy found an icy hill. She had never seen anything like it before. It was shiny and cold, and she wanted to climb it.\n\nRoxy tried to climb the icy hill, but it was very slippery. She tried again and again, but she kept falling down. Roxy was sad. She wanted to climb the icy hill so much. Then, she saw a little bird named Billy. Billy saw that Roxy was sad and asked, "Why are you sad, Roxy?"\n\nRoxy told Billy about the icy hill and how she couldn\'t climb it. Billy said, "I have 

In [56]:
dataset['validation']

Dataset({
    features: ['text'],
    num_rows: 21990
})

In [63]:
os.getcwd()

'c:\\Users\\OneRaise\\Desktop\\CS336-Notes\\assignment1'

In [None]:
# text_train = "\n<|endoftext|>\n".join(p['text'].replace("\n\n", "\n") for p in dataset['train'])
# text_valid = "\n<|endoftext|>\n".join(p['text'].replace("\n\n", "\n") for p in dataset['validation'])

# # 写入到一个 txt 文件
# with open("../data/TinyStoriesV2-GPT4-train.txt", "w", encoding="utf-8") as f:
#     f.write(text_train)
    
# with open("../data/TinyStoriesV2-GPT4-valid.txt", "w", encoding="utf-8") as f:
#     f.write(text_valid)