# Byte Pair Encoding (BPE) Tokenizer From Scratch

## Bits and bytes

In [1]:
# Bits and bytes
text = "So far, I had"
byte_ary = bytearray(text, "utf-8")
print(byte_ary)

bytearray(b'So far, I had')


In [2]:
# Get byte values
ids = list(byte_ary)
print(ids)

[83, 111, 32, 102, 97, 114, 44, 32, 73, 32, 104, 97, 100]


In [3]:
# This is creating an ID for each character, not each word
print("Number of characters:", len(text))
print("Number of token IDs:", len(ids))

Number of characters: 13
Number of token IDs: 13


In [4]:
# Check how it is encoded with tiktoken
import tiktoken
gpt2_tokenizer = tiktoken.get_encoding("gpt2")
gpt2_tokenizer.encode("So far, I had")

[2396, 1290, 11, 314, 550]

In [5]:
# check the first 256 single-character tokens
for i in range(260):
    decoded = gpt2_tokenizer.decode([i])
    print(f"{i}: {decoded}")

0: !
1: "
2: #
3: $
4: %
5: &
6: '
7: (
8: )
9: *
10: +
11: ,
12: -
13: .
14: /
15: 0
16: 1
17: 2
18: 3
19: 4
20: 5
21: 6
22: 7
23: 8
24: 9
25: :
26: ;
27: <
28: =
29: >
30: ?
31: @
32: A
33: B
34: C
35: D
36: E
37: F
38: G
39: H
40: I
41: J
42: K
43: L
44: M
45: N
46: O
47: P
48: Q
49: R
50: S
51: T
52: U
53: V
54: W
55: X
56: Y
57: Z
58: [
59: \
60: ]
61: ^
62: _
63: `
64: a
65: b
66: c
67: d
68: e
69: f
70: g
71: h
72: i
73: j
74: k
75: l
76: m
77: n
78: o
79: p
80: q
81: r
82: s
83: t
84: u
85: v
86: w
87: x
88: y
89: z
90: {
91: |
92: }
93: ~
94: �
95: �
96: �
97: �
98: �
99: �
100: �
101: �
102: �
103: �
104: �
105: �
106: �
107: �
108: �
109: �
110: �
111: �
112: �
113: �
114: �
115: �
116: �
117: �
118: �
119: �
120: �
121: �
122: �
123: �
124: �
125: �
126: �
127: �
128: �
129: �
130: �
131: �
132: �
133: �
134: �
135: �
136: �
137: �
138: �
139: �
140: �
141: �
142: �
143: �
144: �
145: �
146: �
147: �
148: �
149: �
150: �
151: �
152: �
153: �
154: �
155: �
156: �
157: �
158:

## A simple BPE implementation

In [6]:
from collections import Counter, deque
from functools import lru_cache

class BPETokenizerSimple:
    def __init__(self):
        # Maps token_id to token_str (e.g., {11246: "some"})
        self.vocab = {}
        # Maps token_str to token_id (e.g., {"some": 11246})
        self.inverse_vocab = {}
        # Dictionary of BPE merges: {(token_id1, token_id2): merged_token_id}
        self.bpe_merges = {}

    def train(self, text, vocab_size, allowed_special={"<|endoftext|>"}):
        """
        Train the BPE tokenizer from scratch.

        Args:
            text (str): The training text.
            vocab_size (int): The desired vocabulary size.
            allowed_special (set): A set of special tokens to include.
        """

        # Preprocess: Replace spaces with 'Ġ'
        # Note that Ġ is a particularity of the GPT-2 BPE implementation
        # E.g., "Hello world" might be tokenized as ["Hello", "Ġworld"]
        # (GPT-4 BPE would tokenize it as ["Hello", " world"])
        processed_text = []
        for i, char in enumerate(text):
            if char == " " and i != 0:
                processed_text.append("Ġ")
            if char != " ":
                processed_text.append(char)
        processed_text = "".join(processed_text)

        # Initialize vocab with unique characters, including 'Ġ' if present
        # Start with the first 256 ASCII characters
        unique_chars = [chr(i) for i in range(256)]

        # Extend unique_chars with characters from processed_text that are not already included
        unique_chars.extend(char for char in sorted(set(processed_text)) if char not in unique_chars)

        # Optionally, ensure 'Ġ' is included if it is relevant to your text processing
        if 'Ġ' not in unique_chars:
            unique_chars.append('Ġ')

        # Now create the vocab and inverse vocab dictionaries
        self.vocab = {i: char for i, char in enumerate(unique_chars)}
        self.inverse_vocab = {char: i for i, char in self.vocab.items()}

        # Add allowed special tokens
        if allowed_special:
            for token in allowed_special:
                if token not in self.inverse_vocab:
                    new_id = len(self.vocab)
                    self.vocab[new_id] = token
                    self.inverse_vocab[token] = new_id

        # Tokenize the processed_text into token IDs
        token_ids = [self.inverse_vocab[char] for char in processed_text]

        # BPE steps 1-3: Repeatedly find and replace frequent pairs
        for new_id in range(len(self.vocab), vocab_size):
            pair_id = self.find_freq_pair(token_ids, mode="most")
            if pair_id is None:  # No more pairs to merge. Stopping training.
                break
            token_ids = self.replace_pair(token_ids, pair_id, new_id)
            self.bpe_merges[pair_id] = new_id

        # Build the vocabulary with merged tokens
        for (p0, p1), new_id in self.bpe_merges.items():
            merged_token = self.vocab[p0] + self.vocab[p1]
            self.vocab[new_id] = merged_token
            self.inverse_vocab[merged_token] = new_id

    def encode(self, text):
        """
        Encode the input text into a list of token IDs.

        Args:
            text (str): The text to encode.

        Returns:
            List[int]: The list of token IDs.
        """
        tokens = []
        # Split text into tokens, keeping newlines intact
        words = text.replace("\n", " \n ").split()  # Ensure '\n' is treated as a separate token

        for i, word in enumerate(words):
            if i > 0 and not word.startswith("\n"):
                tokens.append("Ġ" + word)  # Add 'Ġ' to words that follow a space or newline
            else:
                tokens.append(word)  # Handle first word or standalone '\n'

        token_ids = []
        for token in tokens:
            if token in self.inverse_vocab:
                # token is contained in the vocabulary as is
                token_id = self.inverse_vocab[token]
                token_ids.append(token_id)
            else:
                # Attempt to handle subword tokenization via BPE
                sub_token_ids = self.tokenize_with_bpe(token)
                token_ids.extend(sub_token_ids)

        return token_ids

    def tokenize_with_bpe(self, token):
        """
        Tokenize a single token using BPE merges.

        Args:
            token (str): The token to tokenize.

        Returns:
            List[int]: The list of token IDs after applying BPE.
        """
        # Tokenize the token into individual characters (as initial token IDs)
        token_ids = [self.inverse_vocab.get(char, None) for char in token]
        if None in token_ids:
            missing_chars = [char for char, tid in zip(token, token_ids) if tid is None]
            raise ValueError(f"Characters not found in vocab: {missing_chars}")

        can_merge = True
        while can_merge and len(token_ids) > 1:
            can_merge = False
            new_tokens = []
            i = 0
            while i < len(token_ids) - 1:
                pair = (token_ids[i], token_ids[i + 1])
                if pair in self.bpe_merges:
                    merged_token_id = self.bpe_merges[pair]
                    new_tokens.append(merged_token_id)
                    # Uncomment for educational purposes:
                    # print(f"Merged pair {pair} -> {merged_token_id} ('{self.vocab[merged_token_id]}')")
                    i += 2  # Skip the next token as it's merged
                    can_merge = True
                else:
                    new_tokens.append(token_ids[i])
                    i += 1
            if i < len(token_ids):
                new_tokens.append(token_ids[i])
            token_ids = new_tokens

        return token_ids

    def decode(self, token_ids):
        """
        Decode a list of token IDs back into a string.

        Args:
            token_ids (List[int]): The list of token IDs to decode.

        Returns:
            str: The decoded string.
        """
        decoded_string = ""
        for token_id in token_ids:
            if token_id not in self.vocab:
                raise ValueError(f"Token ID {token_id} not found in vocab.")
            token = self.vocab[token_id]
            if token.startswith("Ġ"):
                # Replace 'Ġ' with a space
                decoded_string += " " + token[1:]
            else:
                decoded_string += token
        return decoded_string

    @lru_cache(maxsize=None)
    def get_special_token_id(self, token):
        return self.inverse_vocab.get(token, None)

    @staticmethod
    def find_freq_pair(token_ids, mode="most"):
        pairs = Counter(zip(token_ids, token_ids[1:]))

        if mode == "most":
            return max(pairs.items(), key=lambda x: x[1])[0]
        elif mode == "least":
            return min(pairs.items(), key=lambda x: x[1])[0]
        else:
            raise ValueError("Invalid mode. Choose 'most' or 'least'.")

    @staticmethod
    def replace_pair(token_ids, pair_id, new_id):
        dq = deque(token_ids)
        replaced = []

        while dq:
            current = dq.popleft()
            if dq and (current, dq[0]) == pair_id:
                replaced.append(new_id)
                # Remove the 2nd token of the pair, 1st was already removed
                dq.popleft()
            else:
                replaced.append(current)

        return replaced

## Simple BPE implementation walkthrough

In [7]:
# Import training text
with open("pit-and-pendulum.txt", "r", encoding="utf-8") as f:
    text = f.read()

In [8]:
# Initialize and train the BPE tokenizer with a vocab size of 1000
tokenizer = BPETokenizerSimple()
tokenizer.train(text, vocab_size=1000, allowed_special="<|endoftext|>")
print(tokenizer.vocab)
print(len(tokenizer.vocab))
print(len(tokenizer.bpe_merges))

{0: '\x00', 1: '\x01', 2: '\x02', 3: '\x03', 4: '\x04', 5: '\x05', 6: '\x06', 7: '\x07', 8: '\x08', 9: '\t', 10: '\n', 11: '\x0b', 12: '\x0c', 13: '\r', 14: '\x0e', 15: '\x0f', 16: '\x10', 17: '\x11', 18: '\x12', 19: '\x13', 20: '\x14', 21: '\x15', 22: '\x16', 23: '\x17', 24: '\x18', 25: '\x19', 26: '\x1a', 27: '\x1b', 28: '\x1c', 29: '\x1d', 30: '\x1e', 31: '\x1f', 32: ' ', 33: '!', 34: '"', 35: '#', 36: '$', 37: '%', 38: '&', 39: "'", 40: '(', 41: ')', 42: '*', 43: '+', 44: ',', 45: '-', 46: '.', 47: '/', 48: '0', 49: '1', 50: '2', 51: '3', 52: '4', 53: '5', 54: '6', 55: '7', 56: '8', 57: '9', 58: ':', 59: ';', 60: '<', 61: '=', 62: '>', 63: '?', 64: '@', 65: 'A', 66: 'B', 67: 'C', 68: 'D', 69: 'E', 70: 'F', 71: 'G', 72: 'H', 73: 'I', 74: 'J', 75: 'K', 76: 'L', 77: 'M', 78: 'N', 79: 'O', 80: 'P', 81: 'Q', 82: 'R', 83: 'S', 84: 'T', 85: 'U', 86: 'V', 87: 'W', 88: 'X', 89: 'Y', 90: 'Z', 91: '[', 92: '\\', 93: ']', 94: '^', 95: '_', 96: '`', 97: 'a', 98: 'b', 99: 'c', 100: 'd', 101: 'e'

In [9]:
# Test of encoding
input_text = "So far, I had"
token_ids = tokenizer.encode(input_text)
print(token_ids)

[83, 111, 408, 287, 44, 256, 73, 256, 280, 100]


In [10]:
# Print length information
print("Number of characters:", len(input_text))
print("Number of token IDs:", len(token_ids))

Number of characters: 13
Number of token IDs: 10


In [11]:
# Test of decoding
print(token_ids)
print(tokenizer.decode(token_ids))

[83, 111, 408, 287, 44, 256, 73, 256, 280, 100]
So far, I had


In [12]:
# Better understanding of decoding
for token_id in token_ids:
    print(f"{token_id} -> {tokenizer.decode([token_id])}")

83 -> S
111 -> o
408 ->  f
287 -> ar
44 -> ,
256 ->  
73 -> I
256 ->  
280 -> ha
100 -> d


In [13]:
# Same text reproducible
tokenizer.decode(tokenizer.encode("Hello everyone!"))

'Hello everyone!'

## An improved BPE implementation

In [14]:
from collections import Counter, deque
from functools import lru_cache
import re
import json


class BPETokenizerImproved:
    def __init__(self):
        # Maps token_id to token_str (e.g., {11246: "some"})
        self.vocab = {}
        # Maps token_str to token_id (e.g., {"some": 11246})
        self.inverse_vocab = {}
        # Dictionary of BPE merges: {(token_id1, token_id2): merged_token_id}
        self.bpe_merges = {}

        # For the official OpenAI GPT-2 merges, use a rank dict:
        #  of form {(string_A, string_B): rank}, where lower rank = higher priority
        self.bpe_ranks = {}

    def train(self, text, vocab_size, allowed_special={"<|endoftext|>"}):
        """
        Train the BPE tokenizer from scratch.

        Args:
            text (str): The training text.
            vocab_size (int): The desired vocabulary size.
            allowed_special (set): A set of special tokens to include.
        """

        # Preprocess: Replace spaces with "Ġ"
        # Note that Ġ is a particularity of the GPT-2 BPE implementation
        # E.g., "Hello world" might be tokenized as ["Hello", "Ġworld"]
        # (GPT-4 BPE would tokenize it as ["Hello", " world"])
        processed_text = []
        for i, char in enumerate(text):
            if char == " " and i != 0:
                processed_text.append("Ġ")
            if char != " ":
                processed_text.append(char)
        processed_text = "".join(processed_text)

        # Initialize vocab with unique characters, including "Ġ" if present
        # Start with the first 256 ASCII characters
        unique_chars = [chr(i) for i in range(256)]
        unique_chars.extend(
            char for char in sorted(set(processed_text))
            if char not in unique_chars
        )
        if "Ġ" not in unique_chars:
            unique_chars.append("Ġ")

        self.vocab = {i: char for i, char in enumerate(unique_chars)}
        self.inverse_vocab = {char: i for i, char in self.vocab.items()}

        # Add allowed special tokens
        if allowed_special:
            for token in allowed_special:
                if token not in self.inverse_vocab:
                    new_id = len(self.vocab)
                    self.vocab[new_id] = token
                    self.inverse_vocab[token] = new_id

        # Tokenize the processed_text into token IDs
        token_ids = [self.inverse_vocab[char] for char in processed_text]

        # BPE steps 1-3: Repeatedly find and replace frequent pairs
        for new_id in range(len(self.vocab), vocab_size):
            pair_id = self.find_freq_pair(token_ids, mode="most")
            if pair_id is None:
                break
            token_ids = self.replace_pair(token_ids, pair_id, new_id)
            self.bpe_merges[pair_id] = new_id

        # Build the vocabulary with merged tokens
        for (p0, p1), new_id in self.bpe_merges.items():
            merged_token = self.vocab[p0] + self.vocab[p1]
            self.vocab[new_id] = merged_token
            self.inverse_vocab[merged_token] = new_id

    def load_vocab_and_merges_from_openai(self, vocab_path, bpe_merges_path):
        """
        Load pre-trained vocabulary and BPE merges from OpenAI's GPT-2 files.

        Args:
            vocab_path (str): Path to the vocab file (GPT-2 calls it 'encoder.json').
            bpe_merges_path (str): Path to the bpe_merges file  (GPT-2 calls it 'vocab.bpe').
        """
        # Load vocabulary
        with open(vocab_path, "r", encoding="utf-8") as file:
            loaded_vocab = json.load(file)
            # encoder.json is {token_str: id}; we want id->str and str->id
            self.vocab = {int(v): k for k, v in loaded_vocab.items()}
            self.inverse_vocab = {k: int(v) for k, v in loaded_vocab.items()}
    
        # Must have GPT-2's printable newline character 'Ċ' (U+010A) at id 198
        if "Ċ" not in self.inverse_vocab or self.inverse_vocab["Ċ"] != 198:
            raise KeyError("Vocabulary missing GPT-2 newline glyph 'Ċ' at id 198.")
    
        # Must have <|endoftext|> at 50256
        if "<|endoftext|>" not in self.inverse_vocab or self.inverse_vocab["<|endoftext|>"] != 50256:
            raise KeyError("Vocabulary missing <|endoftext|> at id 50256.")
    
        # Provide a convenience alias for '\n' -> 198
        # Keep printable character 'Ċ' in vocab so BPE merges keep working
        if "\n" not in self.inverse_vocab:
            self.inverse_vocab["\n"] = self.inverse_vocab["Ċ"]

        if "\r" not in self.inverse_vocab:
            if 201 in self.vocab:
                self.inverse_vocab["\r"] = 201
            else:
                raise KeyError("Vocabulary missing carriage return token at id 201.")

        # Load GPT-2 merges and store ranks
        self.bpe_ranks = {}
        with open(bpe_merges_path, "r", encoding="utf-8") as file:
            lines = file.readlines()
            if lines and lines[0].startswith("#"):
                lines = lines[1:]
    
            rank = 0
            for line in lines:
                token1, *rest = line.strip().split()
                if len(rest) != 1:
                    continue
                token2 = rest[0]
                if token1 in self.inverse_vocab and token2 in self.inverse_vocab:
                    self.bpe_ranks[(token1, token2)] = rank
                    rank += 1
                else:
                    # Safe to skip pairs whose symbols are not in vocab
                    pass


    def encode(self, text, allowed_special=None):
        """
        Encode the input text into a list of token IDs, with tiktoken-style handling of special tokens.
    
        Args:
            text (str): The input text to encode.
            allowed_special (set or None): Special tokens to allow passthrough. If None, special handling is disabled.
    
        Returns:
            List of token IDs.
        """
    
        # ---- This section is to mimic tiktoken in terms of allowed special tokens ----
        specials_in_vocab = [
            tok for tok in self.inverse_vocab
            if tok.startswith("<|") and tok.endswith("|>")
        ]
        if allowed_special is None:
            # Nothing is allowed
            disallowed = [tok for tok in specials_in_vocab if tok in text]
            if disallowed:
                raise ValueError(f"Disallowed special tokens encountered in text: {disallowed}")
        else:
            # Some spefic tokens are allowed (e.g., we use this for <|endoftext|>)
            disallowed = [tok for tok in specials_in_vocab if tok in text and tok not in allowed_special]
            if disallowed:
                raise ValueError(f"Disallowed special tokens encountered in text: {disallowed}")
        # -----------------------------------------------------------------------------

        token_ids = []
        # If some specials are allowed, split around them and passthrough those ids
        if allowed_special is not None and len(allowed_special) > 0:
            special_pattern = "(" + "|".join(
                re.escape(tok) for tok in sorted(allowed_special, key=len, reverse=True)
            ) + ")"
    
            last_index = 0
            for match in re.finditer(special_pattern, text):
                prefix = text[last_index:match.start()]
                token_ids.extend(self.encode(prefix, allowed_special=None))  # encode prefix normally
    
                special_token = match.group(0)
                if special_token in self.inverse_vocab:
                    token_ids.append(self.inverse_vocab[special_token])
                else:
                    raise ValueError(f"Special token {special_token} not found in vocabulary.")
                last_index = match.end()
    
            text = text[last_index:]  # remainder to process normally
    
            # Extra guard for any other special literals left over
            disallowed = [
                tok for tok in self.inverse_vocab
                if tok.startswith("<|") and tok.endswith("|>") and tok in text and tok not in allowed_special
            ]
            if disallowed:
                raise ValueError(f"Disallowed special tokens encountered in text: {disallowed}")

    
        # ---- Newline and carriage return handling ----
        tokens = []
        parts = re.split(r'(\r\n|\r|\n)', text)
        for part in parts:
            if part == "":
                continue
            if part == "\r\n":
                tokens.append("\r")
                tokens.append("\n")
                continue
            if part == "\r":
                tokens.append("\r")
                continue
            if part == "\n":
                tokens.append("\n")
                continue
    
            # Normal chunk without line breaks:
            # - If spaces precede a word, prefix the first word with 'Ġ' and
            #   add standalone 'Ġ' for additional spaces
            # - If spaces trail the chunk (e.g., before a newline) add
            #   standalone 'Ġ' tokens (tiktoken produces id 220 for 'Ġ')
            pending_spaces = 0
            for m in re.finditer(r'( +)|(\S+)', part):
                if m.group(1) is not None:
                    pending_spaces += len(m.group(1))
                else:
                    word = m.group(2)
                    if pending_spaces > 0:
                        tokens.append("Ġ" + word) # one leading space
                        for _ in range(pending_spaces - 1):
                            tokens.append("Ġ")  # remaining spaces as standalone
                        pending_spaces = 0
                    else:
                        tokens.append(word)
            # Trailing spaces (no following word): add standalone 'Ġ' tokens
            for _ in range(pending_spaces):
                tokens.append("Ġ")
        # ---------------------------------------------------------------
    
        # Map tokens -> ids (BPE if needed)
        for tok in tokens:
            if tok in self.inverse_vocab:
                token_ids.append(self.inverse_vocab[tok])
            else:
                token_ids.extend(self.tokenize_with_bpe(tok))
    
        return token_ids

    def tokenize_with_bpe(self, token):
        """
        Tokenize a single token using BPE merges.

        Args:
            token (str): The token to tokenize.

        Returns:
            List[int]: The list of token IDs after applying BPE.
        """
        # Tokenize the token into individual characters (as initial token IDs)
        token_ids = [self.inverse_vocab.get(char, None) for char in token]
        if None in token_ids:
            missing_chars = [char for char, tid in zip(token, token_ids) if tid is None]
            raise ValueError(f"Characters not found in vocab: {missing_chars}")

        # If we haven't loaded OpenAI's GPT-2 merges, use my approach
        if not self.bpe_ranks:
            can_merge = True
            while can_merge and len(token_ids) > 1:
                can_merge = False
                new_tokens = []
                i = 0
                while i < len(token_ids) - 1:
                    pair = (token_ids[i], token_ids[i + 1])
                    if pair in self.bpe_merges:
                        merged_token_id = self.bpe_merges[pair]
                        new_tokens.append(merged_token_id)
                        # Uncomment for educational purposes:
                        # print(f"Merged pair {pair} -> {merged_token_id} ('{self.vocab[merged_token_id]}')")
                        i += 2  # Skip the next token as it's merged
                        can_merge = True
                    else:
                        new_tokens.append(token_ids[i])
                        i += 1
                if i < len(token_ids):
                    new_tokens.append(token_ids[i])
                token_ids = new_tokens
            return token_ids

        # Otherwise, do GPT-2-style merging with the ranks:
        # 1) Convert token_ids back to string "symbols" for each ID
        symbols = [self.vocab[id_num] for id_num in token_ids]

        # Repeatedly merge all occurrences of the lowest-rank pair
        while True:
            # Collect all adjacent pairs
            pairs = set(zip(symbols, symbols[1:]))
            if not pairs:
                break

            # Find the pair with the best (lowest) rank
            min_rank = float("inf")
            bigram = None
            for p in pairs:
                r = self.bpe_ranks.get(p, float("inf"))
                if r < min_rank:
                    min_rank = r
                    bigram = p

            # If no valid ranked pair is present, we're done
            if bigram is None or bigram not in self.bpe_ranks:
                break

            # Merge all occurrences of that pair
            first, second = bigram
            new_symbols = []
            i = 0
            while i < len(symbols):
                # If we see (first, second) at position i, merge them
                if i < len(symbols) - 1 and symbols[i] == first and symbols[i+1] == second:
                    new_symbols.append(first + second)  # merged symbol
                    i += 2
                else:
                    new_symbols.append(symbols[i])
                    i += 1
            symbols = new_symbols

            if len(symbols) == 1:
                break

        # Finally, convert merged symbols back to IDs
        merged_ids = [self.inverse_vocab[sym] for sym in symbols]
        return merged_ids

    def decode(self, token_ids):
        """
        Decode a list of token IDs back into a string.

        Args:
            token_ids (List[int]): The list of token IDs to decode.

        Returns:
            str: The decoded string.
        """
        out = []
        for tid in token_ids:
            if tid not in self.vocab:
                raise ValueError(f"Token ID {tid} not found in vocab.")
            tok = self.vocab[tid]

            # Map GPT-2 special chars back to real chars
            if tid == 198 or tok == "\n":
                out.append("\n")
            elif tid == 201 or tok == "\r":
                out.append("\r")
            elif tok.startswith("Ġ"):
                out.append(" " + tok[1:])
            else:
                out.append(tok)
        return "".join(out)

    def save_vocab_and_merges(self, vocab_path, bpe_merges_path):
        """
        Save the vocabulary and BPE merges to JSON files.

        Args:
            vocab_path (str): Path to save the vocabulary.
            bpe_merges_path (str): Path to save the BPE merges.
        """
        # Save vocabulary
        with open(vocab_path, "w", encoding="utf-8") as file:
            json.dump(self.vocab, file, ensure_ascii=False, indent=2)

        # Save BPE merges as a list of dictionaries
        with open(bpe_merges_path, "w", encoding="utf-8") as file:
            merges_list = [{"pair": list(pair), "new_id": new_id}
                           for pair, new_id in self.bpe_merges.items()]
            json.dump(merges_list, file, ensure_ascii=False, indent=2)

    def load_vocab_and_merges(self, vocab_path, bpe_merges_path):
        """
        Load the vocabulary and BPE merges from JSON files.

        Args:
            vocab_path (str): Path to the vocabulary file.
            bpe_merges_path (str): Path to the BPE merges file.
        """
        # Load vocabulary
        with open(vocab_path, "r", encoding="utf-8") as file:
            loaded_vocab = json.load(file)
            self.vocab = {int(k): v for k, v in loaded_vocab.items()}
            self.inverse_vocab = {v: int(k) for k, v in loaded_vocab.items()}

        # Load BPE merges
        with open(bpe_merges_path, "r", encoding="utf-8") as file:
            merges_list = json.load(file)
            for merge in merges_list:
                pair = tuple(merge["pair"])
                new_id = merge["new_id"]
                self.bpe_merges[pair] = new_id

    @lru_cache(maxsize=None)
    def get_special_token_id(self, token):
        return self.inverse_vocab.get(token, None)

    @staticmethod
    def find_freq_pair(token_ids, mode="most"):
        pairs = Counter(zip(token_ids, token_ids[1:]))

        if not pairs:
            return None

        if mode == "most":
            return max(pairs.items(), key=lambda x: x[1])[0]
        elif mode == "least":
            return min(pairs.items(), key=lambda x: x[1])[0]
        else:
            raise ValueError("Invalid mode. Choose 'most' or 'least'.")

    @staticmethod
    def replace_pair(token_ids, pair_id, new_id):
        dq = deque(token_ids)
        replaced = []

        while dq:
            current = dq.popleft()
            if dq and (current, dq[0]) == pair_id:
                replaced.append(new_id)
                # Remove the 2nd token of the pair, 1st was already removed
                dq.popleft()
            else:
                replaced.append(current)

        return replaced

## BPE implementation walkthrough

In [15]:
# Import training text
with open("pit-and-pendulum.txt", "r", encoding="utf-8") as f:
    text = f.read()

In [16]:
# Initialize and train the BPE tokenizer with a vocab size of 1000
tokenizer = BPETokenizerImproved()
tokenizer.train(text, vocab_size=1000, allowed_special={"<|endoftext|>"})
print(tokenizer.vocab)
print(len(tokenizer.vocab))
print(len(tokenizer.bpe_merges))

{0: '\x00', 1: '\x01', 2: '\x02', 3: '\x03', 4: '\x04', 5: '\x05', 6: '\x06', 7: '\x07', 8: '\x08', 9: '\t', 10: '\n', 11: '\x0b', 12: '\x0c', 13: '\r', 14: '\x0e', 15: '\x0f', 16: '\x10', 17: '\x11', 18: '\x12', 19: '\x13', 20: '\x14', 21: '\x15', 22: '\x16', 23: '\x17', 24: '\x18', 25: '\x19', 26: '\x1a', 27: '\x1b', 28: '\x1c', 29: '\x1d', 30: '\x1e', 31: '\x1f', 32: ' ', 33: '!', 34: '"', 35: '#', 36: '$', 37: '%', 38: '&', 39: "'", 40: '(', 41: ')', 42: '*', 43: '+', 44: ',', 45: '-', 46: '.', 47: '/', 48: '0', 49: '1', 50: '2', 51: '3', 52: '4', 53: '5', 54: '6', 55: '7', 56: '8', 57: '9', 58: ':', 59: ';', 60: '<', 61: '=', 62: '>', 63: '?', 64: '@', 65: 'A', 66: 'B', 67: 'C', 68: 'D', 69: 'E', 70: 'F', 71: 'G', 72: 'H', 73: 'I', 74: 'J', 75: 'K', 76: 'L', 77: 'M', 78: 'N', 79: 'O', 80: 'P', 81: 'Q', 82: 'R', 83: 'S', 84: 'T', 85: 'U', 86: 'V', 87: 'W', 88: 'X', 89: 'Y', 90: 'Z', 91: '[', 92: '\\', 93: ']', 94: '^', 95: '_', 96: '`', 97: 'a', 98: 'b', 99: 'c', 100: 'd', 101: 'e'

In [17]:
# Test of encoding
input_text = "So far, I had"
token_ids = tokenizer.encode(input_text)
print(token_ids)

[83, 111, 409, 288, 44, 256, 73, 256, 281, 100]


In [18]:
# Test of encoding with allowed_special
input_text = "So far, I had<|endoftext|>"
token_ids = tokenizer.encode(input_text, allowed_special={"<|endoftext|>"})
print(token_ids)

[83, 111, 409, 288, 44, 256, 73, 256, 281, 100, 258]


In [19]:
# Print length information
print("Number of characters:", len(input_text))
print("Number of token IDs:", len(token_ids))

Number of characters: 26
Number of token IDs: 11


In [20]:
# Test of decoding
print(token_ids)
print(tokenizer.decode(token_ids))

[83, 111, 409, 288, 44, 256, 73, 256, 281, 100, 258]
So far, I had<|endoftext|>


In [21]:
# Better understanding of decoding
for token_id in token_ids:
    print(f"{token_id} -> {tokenizer.decode([token_id])}")

83 -> S
111 -> o
409 ->  f
288 -> ar
44 -> ,
256 ->  
73 -> I
256 ->  
281 -> ha
100 -> d
258 -> <|endoftext|>


In [22]:
# Same text reproducible
tokenizer.decode(tokenizer.encode("Hello everyone with \n newline characters!"))

'Hello everyone with \n newline characters!'

## Saving and loading the tokenizer

In [23]:
# Save trained tokenizer
tokenizer.save_vocab_and_merges(vocab_path="vocab.json", bpe_merges_path="bpe_merges.txt")

In [25]:
# Load the tokenizer
tokenizer2 = BPETokenizerImproved()
tokenizer2.load_vocab_and_merges(vocab_path="vocab.json", bpe_merges_path="bpe_merges.txt")

In [27]:
# Same results as before
print(tokenizer2.decode(token_ids))

So far, I had<|endoftext|>


In [26]:
# Same text reproducible
tokenizer.decode(tokenizer.encode("Hello everyone with \n newline characters!"))

'Hello everyone with \n newline characters!'

## Load the original GPT-2 BPE tokenizer from OpenAI

In [29]:
# Function for downloading file
import os
import requests

def download_file_if_absent(url, filename, search_dirs):
    for directory in search_dirs:
        file_path = os.path.join(directory, filename)
        if os.path.exists(file_path):
            print(f"{filename} already exists in {file_path}")
            return file_path

    target_path = os.path.join(search_dirs[0], filename)
    try:
        response = requests.get(url, stream=True, timeout=60)
        response.raise_for_status()
        with open(target_path, "wb") as out_file:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    out_file.write(chunk)
        print(f"Downloaded {filename} to {target_path}")
    except Exception as e:
        print(f"Failed to download {filename}. Error: {e}")

    return target_path

In [30]:
# Download files if not already present in this directory
files_to_download = {
    "https://openaipublic.blob.core.windows.net/gpt-2/models/124M/vocab.bpe": "vocab.bpe",
    "https://openaipublic.blob.core.windows.net/gpt-2/models/124M/encoder.json": "encoder.json"
}
paths = {}
for url, filename in files_to_download.items():
    paths[filename] = download_file_if_absent(url, filename, ".")

Downloaded vocab.bpe to ./vocab.bpe
Downloaded encoder.json to ./encoder.json


In [None]:
# Load the GPT-2 files
tokenizer_gpt2 = BPETokenizerImproved()
tokenizer_gpt2.load_vocab_and_merges_from_openai(vocab_path=paths["encoder.json"], bpe_merges_path=paths["vocab.bpe"])

In [32]:
# Check the length
print(len(tokenizer_gpt2.vocab))

50257


In [33]:
# Use the GPT-2 tokenizer
input_text = "So far, I had"
token_ids = tokenizer_gpt2.encode(input_text)
print(token_ids)

[2396, 1290, 11, 314, 550]


In [34]:
# Test decode with GPT-2 tokenizer
print(tokenizer_gpt2.decode(token_ids))

So far, I had


In [35]:
# Check for correctness
import tiktoken
gpt2_tokenizer = tiktoken.get_encoding("gpt2")
gpt2_tokenizer.encode(input_text)

[2396, 1290, 11, 314, 550]