In [1]:
[ord(x) for x in "Hello"]

[72, 101, 108, 108, 111]

In [4]:
list("Hello".encode("utf-8"))

[72, 101, 108, 108, 111]

In [5]:
class Tokenizer:
    def __init__(self):
        # token -> id
        self.token2id = {}
        # id -> token
        self.id2token = {}
        # merge operations in order [(x, y), ...]
        self.merges = []

    def get_stats(self, corpus):
        """
        统计当前语料中所有相邻符号对的频次
        corpus: list of words, each word is list of symbols, e.g. ['t', 'h', 'e', '</w>']
        返回：dict mapping (sym_i, sym_{i+1}) -> count
        """
        pairs = {}
        for word in corpus:
            for i in range(len(word) - 1):
                pair = (word[i], word[i + 1])
                pairs[pair] = pairs.get(pair, 0) + 1
        return pairs

    def merge_pair(self, pair, corpus):
        """
        在所有词上将指定的符号对子合并为单一符号
        """
        merged_token = ''.join(pair)
        new_corpus = []
        for word in corpus:
            new_word = []
            i = 0
            while i < len(word):
                # 如果当前位置及下一位置是我们要合并的 pair，就替换为 merged_token
                if i < len(word) - 1 and word[i] == pair[0] and word[i + 1] == pair[1]:
                    new_word.append(merged_token)
                    i += 2
                else:
                    new_word.append(word[i])
                    i += 1
            new_corpus.append(new_word)
        return new_corpus

    def train(self, text, vocab_size):
        """
        训练 BPE 分词器
        text: 原始字符串
        vocab_size: 希望得到的词表大小（包含所有字符与子词）
        """
        # 1. 构建初始语料：按空格切词，每词末尾加 </w>，并拆成字符列表
        words = text.strip().split()
        corpus = [list(word) + ['</w>'] for word in words]

        # 2. 初始化 vocab 为所有单字符 + </w>
        vocab = set()
        for w in corpus:
            vocab.update(w)

        # 3. 迭代合并
        while len(vocab) < vocab_size:
            pairs = self.get_stats(corpus)
            if not pairs:
                break
            # 选出现频次最高的一对
            best_pair = max(pairs, key=pairs.get)
            self.merges.append(best_pair)
            # 合并语料中的该对子
            corpus = self.merge_pair(best_pair, corpus)
            # 添加新符号到 vocab
            merged_token = ''.join(best_pair)
            vocab.add(merged_token)

        # 4. 构建 token2id 与 id2token 映射
        for idx, token in enumerate(sorted(vocab)):
            self.token2id[token] = idx
            self.id2token[idx] = token

    def encode(self, text):
        """
        将输入字符串编码为 ID 列表
        """
        words = text.strip().split()
        output_ids = []
        for word in words:
            symbols = list(word) + ['</w>']
            # 按训练时 merges 的顺序依次尝试合并
            for pair in self.merges:
                i = 0
                while i < len(symbols) - 1:
                    if symbols[i] == pair[0] and symbols[i+1] == pair[1]:
                        symbols[i:i+2] = [''.join(pair)]
                    else:
                        i += 1
            # 将每个子词映射到 ID
            for sym in symbols:
                output_ids.append(self.token2id[sym])
        return output_ids

    def decode(self, ids):
        """
        将 ID 列表解码回字符串
        """
        tokens = [self.id2token[i] for i in ids]
        words = []
        current = []
        for t in tokens:
            if t == '</w>':
                # 遇到词结束符则将 current 拼成一个词
                words.append(''.join(current))
                current = []
            else:
                current.append(t)
        # 如果最后没有以 </w> 结束，补上剩余
        if current:
            words.append(''.join(current))
        return ' '.join(words)


In [None]:
if __name__ == "__main__":
    text = "low lower lowest low"
    tokenizer = Tokenizer()

    tokenizer.train(text, vocab_size=20)

    inds = tokenizer.encode("low lowest")
    print("Encoded IDs:", inds)

    s = tokenizer.decode(inds)
    print("Decoded text:", s)


Encoded IDs: [5, 11]
Decoded text: low</w>lowest</w>


In [7]:
class Tokenizer:
    def __init__(self):
        # BPE 合并规则：{ (p0, p1): new_id, ... }
        self.merges = {}
        # vocab 映射：token -> id
        self.token2id = {}
        self.id2token = {}

    def get_stats(self, corpus):
        """
        统计当前语料（list of list of symbols）中所有相邻符号对的出现次数
        返回 dict: { (sym_i, sym_{i+1}): count, ... }
        """
        stats = {}
        for word in corpus:
            for i in range(len(word) - 1):
                pair = (word[i], word[i+1])
                stats[pair] = stats.get(pair, 0) + 1
        return stats

    def merge_pair(self, pair, corpus):
        """
        在 corpus 中将所有相邻的 pair=('A','B') 合并为 ['AB']
        返回新的 corpus
        """
        merged = pair[0] + pair[1]
        new_corpus = []
        for word in corpus:
            new_word = []
            i = 0
            while i < len(word):
                # 如果当前位置和下一个是我们要合并的 pair，则替换为 merged
                if i < len(word)-1 and word[i] == pair[0] and word[i+1] == pair[1]:
                    new_word.append(merged)
                    i += 2
                else:
                    new_word.append(word[i])
                    i += 1
            new_corpus.append(new_word)
        return new_corpus

    def train(self, text, vocab_size):
        """
        使用 BPE 训练合并规则，生成从 token->id 的映射
        """
        # 1. 构建初始语料：按空格切词，每词末尾加 '</w>' 表示词边界
        words = text.strip().split()
        corpus = [list(w) + ['</w>'] for w in words]

        # 2. 初始 vocab：所有单字符 + '</w>'
        vocab = set()
        for w in corpus:
            vocab.update(w)

        # 3. 迭代合并，直到 vocab 大小达到 vocab_size
        next_id = 0
        while len(vocab) < vocab_size:
            stats = self.get_stats(corpus)
            if not stats:
                break
            # 选出现次数最多的 pair
            best_pair = max(stats, key=stats.get)
            # 给它分配一个新 id
            self.merges[best_pair] = vocab_size + len(self.merges)
            # 执行合并
            corpus = self.merge_pair(best_pair, corpus)
            # 更新 vocab
            merged_token = best_pair[0] + best_pair[1]
            vocab.add(merged_token)

        # 4. 构建最终的 token2id/id2token（先把单字符 sorted，再加上所有 merges 生成的 tokens）
        all_tokens = sorted([t for t in vocab if len(t)==1 or t == '</w>'])
        # merges 生成的新 token
        for (p0, p1), idx in sorted(self.merges.items(), key=lambda x: x[1]):
            all_tokens.append(p0+p1)

        # 映射编号
        for i, tok in enumerate(all_tokens):
            self.token2id[tok] = i
            self.id2token[i] = tok

    def encode(self, text):
        """
        将字符串编码为 token id 列表
        """
        words = text.strip().split()
        output_ids = []
        for w in words:
            symbols = list(w) + ['</w>']
            # 按训练时 merges 的顺序依次尝试合并
            # merges 按值排序即合并顺序
            for (p0, p1), _ in sorted(self.merges.items(), key=lambda x: x[1]):
                i = 0
                while i < len(symbols)-1:
                    if symbols[i] == p0 and symbols[i+1] == p1:
                        symbols[i:i+2] = [p0+p1]
                    else:
                        i += 1
            # 转成 id
            for s in symbols:
                output_ids.append(self.token2id[s])
        return output_ids

    def decode(self, ids):
        """
        将 token id 列表解码回字符串
        """
        tokens = [self.id2token[i] for i in ids]
        words = []
        cur = []
        for t in tokens:
            if t == '</w>':
                words.append(''.join(cur))
                cur = []
            else:
                cur.append(t)
        # 如果最后没有以 </w> 结尾，补充
        if cur:
            words.append(''.join(cur))
        return ' '.join(words)


# ==== 使用示例 ====
if __name__ == "__main__":
    sample = "low lower lowest low"
    tok = Tokenizer()
    tok.train(sample, vocab_size=20)

    seq = tok.encode("low lowest")
    print("Encoded IDs:", seq)

    txt = tok.decode(seq)
    print("Decoded text:", txt)


Encoded IDs: [10, 16]
Decoded text: low</w>lowest</w>


In [21]:
class Tokenizer:
    def __init__(self):
        # BPE 合并：{ (b1, b2): new_id, ... }
        self.merges = {}
        # token(id) -> byte sequence
        self.id2bytes = {}
        # byte sequence -> id
        self.bytes2id = {}

    def get_stats(self, seq):
        """统计一个字节序列中所有相邻对的次数"""
        stats = {}
        for i in range(len(seq) - 1):
            pair = (seq[i], seq[i+1])
            stats[pair] = stats.get(pair, 0) + 1
        return stats

    def merge_pair(self, pair, seq):
        """在整个序列里把 pair=('x','y') 全部替换成 merged"""
        merged = pair[0] + pair[1]
        out = []
        i = 0
        while i < len(seq):
            if i < len(seq)-1 and seq[i]==pair[0] and seq[i+1]==pair[1]:
                out.append(merged)
                i += 2
            else:
                out.append(seq[i])
                i += 1
        return out

    def train(self, text, vocab_size):
        # 1) 把全文当作一个“词”，初始序列是 UTF-8 bytes 的 list
        data = list(text.encode("utf-8"))
        # 2) 初始 vocab 是所有单字节
        vocab = set(data)
        # 3) 迭代合并最常见的 pair
        next_id = 256
        seq = [bytes([b]) for b in data]  # 把每个 int->bytes([int])
        # id2bytes / bytes2id 先填单字节
        for b in range(256):
            bt = bytes([b])
            self.id2bytes[b] = bt
            self.bytes2id[bt] = b

        while len(self.id2bytes) < vocab_size:
            stats = self.get_stats(seq)
            if not stats:
                break
            # 选出现最多的 pair
            best = max(stats, key=stats.get)
            merged = best[0] + best[1]
            # 加到 merges，并给它分配新的 ID
            self.merges[best] = next_id
            self.id2bytes[next_id] = merged
            self.bytes2id[merged] = next_id
            next_id += 1
            # 合并序列
            seq = self.merge_pair(best, seq)

    def encode(self, text):
        # 把 text->bytes list，再按 merges 顺序合并
        seq = [bytes([b]) for b in text.encode("utf-8")]
        for pair, idx in sorted(self.merges.items(), key=lambda x: x[1]):
            i = 0
            while i < len(seq)-1:
                if seq[i]==pair[0] and seq[i+1]==pair[1]:
                    seq[i:i+2] = [self.id2bytes[idx]]
                else:
                    i += 1
        # 最后把 bytes token 转成 ID
        return [ self.bytes2id[b] for b in seq ]

    def decode(self, ids):
        # 把 ID 列表还原到 bytes，再一次性 decode
        bseq = b"".join(self.id2bytes[i] for i in ids)
        return bseq.decode("utf-8", errors="strict")


# ==== 使用示例 ====
if __name__ == "__main__":
    sample = "low lower lowest low"
    tok = Tokenizer()
    tok.train(sample, vocab_size=20)

    seq = tok.encode("low lowest")
    print("Encoded IDs:", seq)

    txt = tok.decode(seq)
    print("Decoded text:", txt)


Encoded IDs: [108, 111, 119, 32, 108, 111, 119, 101, 115, 116]
Decoded text: low lowest


In [None]:
# -*- coding: utf-8 -*-
class Tokenizer:
    def __init__(self):
        self.merges = {}
        self.id2bytes = {}
        self.bytes2id = {}

    def _get_stats(self, seq):
        stats = {}
        for i in range(len(seq) - 1):
            pair = (seq[i], seq[i+1])
            stats[pair] = stats.get(pair, 0) + 1
        return stats

    def _merge_pair(self, pair, seq):
        merged = pair[0] + pair[1]
        out = []
        i = 0
        while i < len(seq):
            if i < len(seq) - 1 and seq[i] == pair[0] and seq[i+1] == pair[1]:
                out.append(merged)
                i += 2
            else:
                out.append(seq[i])
                i += 1
        return out

    def train(self, text, vocab_size):
        data = list(text.encode("utf-8"))
        seq = [bytes([b]) for b in data]

        for b in range(256):
            bt = bytes([b])
            self.id2bytes[b] = bt
            self.bytes2id[bt] = b
        next_id = 256

        while len(self.id2bytes) < vocab_size:
            stats = self._get_stats(seq)
            if not stats:
                break

            best = max(stats, key=stats.get)
            merged = best[0] + best[1]

            self.merges[best] = next_id
            self.id2bytes[next_id] = merged
            self.bytes2id[merged] = next_id
            next_id += 1

            seq = self._merge_pair(best, seq)

    def encode(self, text):
        seq = [bytes([b]) for b in text.encode("utf-8")]

        for pair, idx in sorted(self.merges.items(), key=lambda x: x[1]):
            i = 0
            while i < len(seq) - 1:
                if seq[i] == pair[0] and seq[i+1] == pair[1]:
                    seq[i:i+2] = [self.id2bytes[idx]]
                else:
                    i += 1

        return [self.bytes2id[b] for b in seq]

    def decode(self, ids):
        bseq = b"".join(self.id2bytes[i] for i in ids)
        return bseq.decode("utf-8", errors="strict")

def main():
    # 1. 读取原始文本
    input_path = "manual.txt"
    with open(input_path, "r", encoding="utf-8") as f:
        original = f.read()

    # 2. 训练 BPE tokenizer
    tok = Tokenizer()
    tok.train(original, vocab_size=1024)

    # 3. 用训练好的模型进行 encode & decode
    ids = tok.encode(original)
    decoded = tok.decode(ids)

    # 4. 检查一致性
    if decoded == original:
        print("✅ 解码后文本与原文完全一致！")
    else:
        print("❌ 存在差异，请检查合并逻辑或文本编码。")
        # 如需查看差异摘要，可以借助 difflib：
        import difflib
        diff = difflib.unified_diff(
            original.splitlines(True),
            decoded.splitlines(True),
            fromfile="original",
            tofile="decoded",
        )
        print("".join(diff))

if __name__ == "__main__":
    main()


✅ 解码后文本与原文完全一致！


In [24]:
# -*- coding: utf-8 -*-
class Tokenizer:
    def __init__(self):
        self.merges = {}
        self.id2bytes = {}
        self.bytes2id = {}

    def get_stats(self, seq):
        stats = {}
        for i in range(len(seq) - 1):
            pair = (seq[i], seq[i+1])
            stats[pair] = stats.get(pair, 0) + 1
        return stats

    def merge_pair(self, pair, seq):
        merged = pair[0] + pair[1]
        out, i = [], 0
        while i < len(seq):
            if i < len(seq)-1 and seq[i]==pair[0] and seq[i+1]==pair[1]:
                out.append(merged)
                i += 2
            else:
                out.append(seq[i])
                i += 1
        return out

    def train(self, text, vocab_size):
        data = list(text.encode("utf-8"))
        seq = [bytes([b]) for b in data]
        # init vocab 0-255
        for b in range(256):
            bt = bytes([b])
            self.id2bytes[b] = bt
            self.bytes2id[bt] = b
        next_id = 256

        while len(self.id2bytes) < vocab_size:
            stats = self.get_stats(seq)
            if not stats:
                break
            best = max(stats, key=stats.get)
            self.merges[best] = next_id
            merged = best[0] + best[1]
            self.id2bytes[next_id] = merged
            self.bytes2id[merged] = next_id
            next_id += 1
            seq = self.merge_pair(best, seq)

    def encode(self, text):
        seq = [bytes([b]) for b in text.encode("utf-8")]
        for pair, idx in sorted(self.merges.items(), key=lambda x: x[1]):
            i = 0
            while i < len(seq)-1:
                if seq[i]==pair[0] and seq[i+1]==pair[1]:
                    seq[i:i+2] = [self.id2bytes[idx]]
                else:
                    i += 1
        return [ self.bytes2id[b] for b in seq ]

    def decode(self, ids):
        bseq = b"".join(self.id2bytes[i] for i in ids)
        return bseq.decode("utf-8", errors="strict")

    def decode_bytes(self, ids):
        return b"".join(self.id2bytes[i] for i in ids)


if __name__ == "__main__":
    # 1. 原始 raw bytes
    with open("manual.txt", "rb") as f:
        raw = f.read()
    text = raw.decode("utf-8")  # 不做任何 newline 转换

    # 2. 训练、encode/decode
    tok = Tokenizer()
    tok.train(text, vocab_size=1024)
    ids = tok.encode(text)
    recon_raw = tok.decode_bytes(ids)
    recon_text = tok.decode(ids)

    # 3. 检查
    print("原始 bytes == 重构 bytes?", raw == recon_raw)
    print("原始文本 == 解码文本?", raw.decode("utf-8") == recon_text)


原始 bytes == 重构 bytes? True
原始文本 == 解码文本? True


In [29]:
# -*- coding: utf-8 -*-
class Tokenizer:
    def __init__(self):
        # 保存 BPE 合并规则：{ (b1, b2): new_id, ... }
        self.merges = {}
        # id -> bytes token
        self.id2bytes = {}
        # bytes token -> id
        self.bytes2id = {}

    def _get_stats(self, seq):
        """
        统计字节序列中所有相邻对的出现次数。
        seq: List[bytes]
        返回: Dict[(bytes, bytes), int]
        """
        stats = {}
        for i in range(len(seq) - 1):
            pair = (seq[i], seq[i+1])
            stats[pair] = stats.get(pair, 0) + 1
        return stats

    def _merge_pair(self, pair, seq):
        """
        在 seq 中将所有相邻的 pair=('x','y') 合并成 merged= x+y。
        seq: List[bytes]
        返回新的 List[bytes]
        """
        merged = pair[0] + pair[1]
        out = []
        i = 0
        while i < len(seq):
            if i < len(seq) - 1 and seq[i] == pair[0] and seq[i+1] == pair[1]:
                out.append(merged)
                i += 2
            else:
                out.append(seq[i])
                i += 1
        return out

    def train(self, text, vocab_size):
        """
        训练 BPE：将整个 text 作为一个 byte-stream，初始 vocab 为 0–255 字节，
        依次合并最频繁的 byte-pair，直到 vocab_size 为止。
        """
        # 将 text 转为 UTF-8 byte 列表
        data = list(text.encode("utf-8"))
        # 初始 seq：List[bytes([b])] 
        seq = [bytes([b]) for b in data]

        # 初始化单字节 vocab
        for b in range(256):
            bt = bytes([b])
            self.id2bytes[b] = bt
            self.bytes2id[bt] = b
        next_id = 256

        # 迭代合并最频繁的对
        while len(self.id2bytes) < vocab_size:
            stats = self._get_stats(seq)
            if not stats:
                break
            # 选出现次数最多的 pair
            best = max(stats, key=stats.get)
            merged = best[0] + best[1]
            # 记录合并规则并分配新 id
            self.merges[best] = next_id
            self.id2bytes[next_id] = merged
            self.bytes2id[merged] = next_id
            next_id += 1
            # 更新 seq
            seq = self._merge_pair(best, seq)

    def encode(self, text):
        """
        Encode 输入字符串为 token ID 列表。
        步骤：text -> UTF-8 bytes -> 按训练好的 merges 顺序合并 -> 映射为 ID
        """
        seq = [bytes([b]) for b in text.encode("utf-8")]
        # 按 merges 的 id 升序（即训练时的顺序）依次做合并
        for pair, idx in sorted(self.merges.items(), key=lambda x: x[1]):
            i = 0
            while i < len(seq) - 1:
                if seq[i] == pair[0] and seq[i+1] == pair[1]:
                    seq[i:i+2] = [self.id2bytes[idx]]
                else:
                    i += 1
        # 最后把每个 bytes token 映射为 ID
        return [self.bytes2id[b] for b in seq]

    def decode(self, ids):
        """
        Decode ID 列表回原始字符串。
        步骤：ID 列表 -> bytes token 列表拼接 -> UTF-8 decode
        """
        bseq = b"".join(self.id2bytes[i] for i in ids)
        return bseq.decode("utf-8", errors="strict")


In [30]:
if __name__ == "__main__":
    # 1. 读取原文（注意用二进制或禁用 newline 转换）
    with open("manual.txt", "rb") as f:
        raw = f.read()
    text = raw.decode("utf-8")

    # 2. 训练
    tok = Tokenizer()
    tok.train(text, vocab_size=1024)

    # 3. 编码 & 解码
    ids = tok.encode(text)
    out = tok.decode(ids)

    # 4. 检查
    print("一致吗？", out == text)  # 应该输出 True


一致吗？ True
