# This BPE from scratch implementation is a result of following Sebastian Raschka's tutorial
The link to his notebook is https://sebastianraschka.com/blog/2025/bpe-from-scratch.html
# Bits and bytes
Converting text into a byte array:

In [1]:
from enum import nonmember
from sys import prefix

text = "This is some text yes"
byte_ary = bytearray(text, "utf-8")
print(byte_ary)

bytearray(b'This is some text yes')


If we call `list()` on a `bytearray` each byte is treated as an individual object, and we get a list of integers corresponding to the byte values:

In [3]:
ids = list(byte_ary)
print(ids)
print("the number of tokens: ",len(ids))

[84, 104, 105, 115, 32, 105, 115, 32, 115, 111, 109, 101, 32, 116, 101, 120, 116, 32, 121, 101, 115]
the number of tokens:  21


This is a way to turn text into a token ID representation, however creating one id for each character results in too many id's.

Instead of each character BPE tokenizers have a vocabulary with a token ID for each word/subword.
For example the GPT-2 tokenizer tokenizes "This is some text yes" into 5 tokens and not 21.

In [5]:
import tiktoken
gpt2_tokenizer = tiktoken.get_encoding("gpt2")
gpt2ids = list(gpt2_tokenizer.encode("This is some text yes"))
print(gpt2ids)
print("the number of tokens: ",len(gpt2ids))

[1212, 318, 617, 2420, 3763]
the number of tokens:  5


Since there is only $2^8 = 256$ characters one byte can represent, `bytearray(range(0,257))` results in `VauleError: byte must be in range(0, 256)`
A BPE tokenizer usually uses these 256 values as its first 256 single character tokens, we can check this if we run the code:

In [2]:
import tiktoken
gpt2_tokenizer = tiktoken.get_encoding("gpt2")

for i in range(300):
    decoded = gpt2_tokenizer.decode([i])
    if 10 < i < 250 or 265 < i < 290:
        continue #we don't want to really print all the 300 numbers since it would be unreadable
    print(f"{i}: {decoded}")

0: !
1: "
2: #
3: $
4: %
5: &
6: '
7: (
8: )
9: *
10: +
250: �
251: �
252: �
253: �
254: �
255: �
256:  t
257:  a
258: he
259: in
260: re
261: on
262:  the
263: er
264:  s
265: at
290:  and
291: ic
292: as
293: le
294:  th
295: ion
296: om
297: ll
298: ent
299:  n


As we can see, some of the decoded tokens starting from 256 and so on which start with a whitespace are considered different (for example 't' is different from ' t') which has been improved in the GPT-4 tokenizer

# Building the vocabulary

The purpose of the BPE tokenization algorithm is to build a vocabulary of commonly occurring subwords like `298: ent` (from the words *entity, entertain, entrance, ...*) or words like
```
318: is
617: some
1212: This
2420: text
3763: yes
```

The general structure of the BPE algorithms goes like this:

## BPE algorithm outline

### 1. Identify frequent pairs
- Every iteration scan the text for the most commonly occurring pair of bytes(characters)
### 2. Replace and record
- Replaces that pair with a new placeholder ID (which is not already in use, so if we start with 0,...,255, the first placeholder should be 256)
- Records this mapping in a lookup table
- The size of the lookup table is a hyperparameter, also called "vocabulary size" (50,257 for gpt-2)
### 3. Repeat until no gains
- Keep repeating steps 1 and 2, merging most common pairs, until there is no pair that occurs more than once
### Decompression (decoding)
- to restore the original text, reverse the process by substituting each ID with the corresponding pair from the lookup table


# BPE algorithm example
## Concrete example of the 1st and 2nd step (encoding)
- Let's say we want to build a vocabulary out of the sentence `the cat in the hat` which will be out training dataset
#### Iteration 1
1. Identifying the frequent pairs
- In the text, `th` appears 2 times
2. Replace and record
- Replace the `th` with the first token not in use, e.g., `256`
- The new text is `<256>e cat in <256>e hat`
- the new vocabulary is:
```
  0: ...
  ...
  256: "th"
```
#### Iteration 2
1. Identifying the frequent pairs
- In the text, `<256>e` appears 2 times
2. Replace and record
- Replace the `<256>e` with the first token not in use, e.g., `257`
- The new text is `<257> cat in <257> hat`
- the new vocabulary is:
```
  0: ...
  ...
  256: "th"
  257: "<256>e"
```
#### Iteration 3
1. Identifying the frequent pairs
- In the text, `<257> ` appears 2 times
2. Replace and record
- Replace the `<257> ` with the first token not in use, e.g., `258`
- The new text is `<258>cat in <258>hat`
- the new vocabulary is:
```
  0: ...
  ...
  256: "th"
  257: "<256>e"
  258: "<257> "
```
- and so on...


## Concrete example of the last step (decoding)

To restore the original text, we reverse the process by substituting each token ID with its corresponding pair in the reverse order they were introduced
- Start with the final compressed text: <258>cat in <258>hat
- Substitute <258> → <257> : <257> cat in <257> hat
- Substitute <257> → <256>e: <256>e cat in <256>e hat
- Substitute <256> → “th”: the cat in the hat



# A simple BPE implementation

- Below is an implementation of the algorithm we described as a Python class that resembles the `tiktoken` Python interface
- For the encoding we will use the `train()` method which is similar to the `encode()` method although not as complicated

We will:
1. Split the text into individual bytes
2. Repeatedly find and replace adjacent tokens when they match any pair in the learned BPE merges (from highest to lowest "rank", the order they were learned)
3. merge until no more merges can be applied
4. The final list of token IDs os the encoded output

In [None]:
from collections import Counter, deque
from functools import lru_cache
import json

class BPETokenizerSimple:
    def __init__(self):
        # maps token_id to token_str
        self.vocab = {}
        # maps token_str to token_id
        self.inverse_vocab = {}
        # dictionary of BPE merges
        self.bpe_merges = {}

        # we are using a rank dict like the official OpenAI GPT-2
        # {(string_A, string_B): rank}, lower rank = higher priority
        self.bpe_ranks = {}

    def train(self, text, vocab_size, allowed_special={"<|endoftext|>"}):
        """
        Train the BPE tokenizer from scratch.

        Args:
            text (str): The training text.
            vocab_size (int): The desired vocabulary size.
            allowed_special (dict): A set of special tokens to include.
        """

        # Preprocess: Replace spaces with "Ġ"
        # The "Ġ" is a particularity of the GPT-2 BPE implementation
        # E.g., "Hello world" might be tokenized as ["Hello", "Ġworld"]
        # GPT-4 BPE would tokenize it as ["Hello", " world"]
        processed_text = []
        for i, char in enumerate(text):
            if char == " " and i !=0:
                processed_text.append("Ġ")
            if char != " ":
                processed_text.append(char)
        processed_text = "".join(processed_text)

        # Initialize vocab with unique characters, including "Ġ" if present
        # Start with the first 256 ASCII characters
        unique_chars = [chr(i) for i in range(256)]
        unique_chars.extend(
            char for char in sorted(set(processed_text)) if char not in allowed_special
            if char not in unique_chars
        )
        if "Ġ" not in unique_chars:
            unique_chars.append("Ġ")

        self.vocab = {i: char for i, char in enumerate(unique_chars)}
        self.reversed_vocab = {char: i for i, char in self.vocab.items()}

        # Add allowed special tokens
        if allowed_special:
            for token in allowed_special:
                if token not in self.inverse_vocab:
                    new_id = len(self.vocab)
                    self.vocab[new_id] = token
                    self.inverse_vocab[token] = new_id

        # Tokenize the processed_text into token IDs
        token_ids = [self.inverse_vocab[char] for char in processed_text]

        # BPE steps 1-3: Repeatedly find and replace frequent pairs
        for new_id in range(len(self.vocab), vocab_size):
            pair_id = self.find_freq_pair(token_ids, mode="most")
            if pair_id is None:
                break
            token_ids = self.replace_pair(token_ids, pair_id, new_id)
            self.bpe_merges[pair_id] = new_id

        # Build the vocabulary with merged tokens
        for (p0, p1), new_id in self.bpe_merges.items():
            merged_token = self.vocab[p0] + self.vocab[p1]
            self.vocab[new_id] = merged_token
            self.inverse_vocab[merged_token] = new_id

    def load_vocab_and_merges_from_openai(self, vocab_path, bpe_merges_path):
        """
        Load pre-trained vocabulary and BPE merges from OpenAI's GPT-2 files.

        Args:
            vocab_path (str): Path to the vocab file (GPT-2 calls it 'encoder.json')
            bpe_merges_path (str): Path to the bpe_merges file (GPT-2 calls it 'vocab.bpe')
        """

        # Load vocabulary
        with open(vocab_path, "r", encoding="utf-8") as file:
            loaded_vocab = json.load(file)
            # Convert loaded vocabulary to correct format
            self.vocab = {int(v): k for k, v in loaded_vocab.items()}
            self.inverse_vocab = {k: int(v) for k, v in loaded_vocab.items()}

        # Handle newline character without adding a new token
        if "\n" not in self.inverse_vocab:
            # Use an existing token ID as a placeholder for '\n'
            # Preferentially use "<|endoftext|>" if available
            fallback_token = next((token for token in ["<|endoftext|>", "Ġ", ""] if token in self.inverse_vocab), None)
            if fallback_token is not None:
                newline_token_id = self.inverse_vocab[fallback_token]
            else:
                # If no fallback token is available, raise an error
                raise KeyError("No suitable token found in vocabulary to map '\\n'.")

            self.inverse_vocab["\n"] = newline_token_id
            self.vocab[newline_token_id] = "\n"

        # Load GPT-2 merges and store them with an assigned "rank"
        self.bpe_ranks = {}
        with open(bpe_merges_path, "r", encoding="utf-8") as file:
            lines = file.readlines()
            if lines and lines[0].startswith("#"):
                lines = lines[1:]

            rank = 0
            for line in lines:
                pair = tuple(line.strip().split())
                if len(pair) == 2:
                    token1, token2 = pair
                    # If token1 or token2 not in vocab, skip
                    if token1 in self.inverse_vocab and token2 in self.inverse_vocab:
                        self.bpe_ranks[(token1, token2)] = rank
                        rank += 1
                    else:
                        print(f"Skipping pair: {pair} as one token is not in the vocabulary.")

    def encode(self, text, allowed_special=None):
        """
        Encode the input text into a list of token IDs, with tiktoken-style handling of special tokens.

        Args:
            text (str): The input text to encode.
            allowed_special (set or None): Special tokens to allow passthrough. If None, special handling is disabled.

        Returns:
            List of token IDs.
        """

        import re

        token_ids = []

        # If special token handling is enabled
        if allowed_special is not None and len(allowed_special) > 0:
            # Build regex to match allowed special tokens
            special_pattern = (
                "(" + "|".join(re.escape(tok) for tok in sorted(allowed_special, key=len, reverse=True)) + ")"
            )

            last_index = 0
            for match in re.finditer(special_pattern, text):
                prefix = text[last_index : match.start()]
                token_ids.extend(self.encode(prefix, allowed_special=None)) # Encode prefix without special handling

                special_token = match.group(0)
                if special_token in self.inverse_vocab:
                    token_ids.append(self.inverse_vocab[special_token])
                else:
                    raise ValueError(f"Special token: {special_token} not found in vocabulary.")
                last_index = match.end()

            text = text[last_index:]

            # Check if any disallowed special tokens are in the remainder
            disallowed = [
                tok for tok in self.inverse_vocab
                if tok.startswith("<|") and tok.endswith("|>") and tok in text and tok not in allowed_special
            ]
            if disallowed:
                raise ValueError(f"Disallowed special tokens encountered in text: {disallowed}")

        # If no special tokens, or remaining text after soecial token split:
        tokens = []
        lines = text.split("\n")
        for i, line in enumerate(lines):
            if i > 0:
                tokens.append("\n")
            words = line.split()
            for j, word in enumerate(words):
                if j == 0 and i > 0:
                    tokens.append("Ġ" + word)
                elif j == 0:
                    tokens.append(word)
                else:
                    tokens.append("Ġ" + word)

        for token in tokens:
            if token in self.inverse_vocab:
                token_ids.append(self.inverse_vocab[token])
            else:
                token_ids.extend(self.tokenize_with_bpe(token))

        return token_ids
    def tokenize_with_bpe(self, token):
        """
        Tokenize a single token using BPE merges.

        Args:
            token (str): The token to tokenize.

        Returns:
            List[int]: The list of token IDs after applying BPE.
        """
        # Tokenize the token into individual characters (as initial token IDs)
        token_ids = [self.inverse_vocab.get(char, None) for char in token]
        if None in token_ids:
            missing_chars = [char for char, tid in zip(token, token_ids) if tid is None]
            raise ValueError(f"Token {token} is missing token IDs: {missing_chars}")

        # If we haven't loaded OpenAI's GPT-2 merges, we use Sebastian's approach
        if not self.bpe_ranks:
            can_merge = True
            while can_merge and len(token_ids) > 1:
                can_merge = False
                new_tokens = []
                i = 0
                while i < len(token_ids) - 1:
                    pair = (token_ids[i], token_ids[i + 1])
                    if pair in self.bpe_merges:
                        merged_token_id = self.bpe_merges[pair]
                        new_tokens.append(merged_token_id)
                        print(f"Merged pair {pair} -> {merged_token_id} ('{self.vocab[merged_token_id]}')")
                        i += 2  # Skip the next token as it's merged
                        can_merge = True
                    else:
                        new_tokens.append(token_ids[i])
                        i += 1
                if i < len(token_ids):
                    new_tokens.append(token_ids[i])
                token_ids = new_tokens
            return token_ids

        # Otherwise, do GPT-2-style merging with the ranks:
        # Convert token_ids back to string "symbols" for each ID
        symbols = [self.vocab[id_num] for id_num in token_ids]

        # Repeatedly merge all occurrences of the lowest-rank pair
        while True:
            # Collect all adjacent pairs
            pairs = set(zip(symbols, symbols[1:]))
            if not pairs:
                break

            # Find the pair with the best (lowest) rank
            min_rank = float("inf")
            bigram = None
            for p in pairs:
                r = self.bpe_ranks.get(p, float("inf"))
                if r < min_rank:
                    min_rank = r
                    bigram = p

            # If no valid ranked pair is present, we are done
            if bigram is None or bigram not in self.bpe_ranks:
                break

            # Merge all occurrences of that pair
            first, second = bigram
            new_symbols = []
            i = 0
            while i < len(symbols):
                # If we see (first, second) at position i, merge them
                if i < len(symbols) - 1 and symbols[i] == first and symbols[i + 1] == second:
                    new_symbols.append(first + second) # merged symbol
                    i += 2
                else:
                    new_symbols.append(symbols[i])
                    i += 1
            symbols = new_symbols

            if len(symbols) == 1:
                break

        # Finally, convert merged symbols back to IDs
        merged_ids = [self.inverse_vocab[sym] for sym in symbols]
        return merged_ids

    def decode(self, token_ids):
        """
        Decode a list of token IDs back into a string.

        Args:
            token_ids (List[int]): The list of token IDs to decode.

        Returns:
            str: The decoded string.
        """
        decoded_string = ""
        for i, token_id in enumerate(token_ids):
            if token_id not in self.vocab:
                raise ValueError(f"Token ID {token_id} not found in vocab.")
            token = self.vocab[token_id]
            if token == "\n":
                if decoded_string and not decoded_string.endswith(" "):
                    decoded_string += " " # Add space if not present before a newline
                decoded_string += token
            elif token.startswith("Ġ"):
                decoded_string += " " + token[1:]
            else:
                decoded_string += token
        return decoded_string

    def load_vocab_and_merges(self, vocab_path, bpe_merges_path):
        """
        Load the vocabulary and BPE merges from JSON files.

        Args:
            vocab_path (str): Path to the vocabulary file.
            bpe_merges_path (str): Path to the BPE merges file.
        """
        # Load vocabulary
        with open(vocab_path, "r", encoding="utf-8") as file:
            loaded_vocab = json.load(file)
            self.vocab = {int(k): v for k, v in loaded_vocab.items()}
            self.inverse_vocab = {v: int(k) for k, v in loaded_vocab.items()}
        # Load bpe merges
        with open(bpe_merges_path, "r", encoding="utf-8") as file:
            merges_list = json.load(file)
            for merge in merges_list:
                pair = tuple(merge["pair"])
                new_id = merge["new_id"]
                self.bpe_merges[pair] = new_id

    @lru_cache(maxsize=None)
    def get_special_token_id(self, token):
        return self.inverse_vocab.get(token, None)

    @staticmethod
    def find_freq_pair(token_ids, mode="most"):
        pairs = Counter(zip(token_ids, token_ids[1:]))

        if not pairs:
            return None

        match(mode):
            case "most":
                return max(pairs.items(), key=lambda x: x[1])[0]
            case "least":
                return min(pairs.items(), key=lambda x: x[1])[0]
            case _:
                raise ValueError("Invalid mode. Choose 'most' or 'least'.")

    @staticmethod
    def replace_pair(token_ids, pair_id, new_id):
        dq = deque(token_ids)
        replaced = []

        while dq:
            current = dq.popleft()
            if dq and (current, dq[0]) == pair_id:
                replaced.append(new_id)
                # Remove the 2nd token of the pair, 1st was already removed
                dq.popleft()
            else:
                replaced.append(current)

        return replaced

- There is too much code in the class above to discuss it in this notebook, however the next section offers a overview of the usage to better understand the class methods