# A simple walkthrough for a minimal HuggingFace Tokenizer implementation

For the jupyter notebook fans out there, this is a scrappy walkthrough for all the different parts in the implementation for the `BPE` and the `MySlowTokenizer` classes implemented here. If you haven't gone through the chapter README yet, please do! The gist is that HuggingFace's [PreTokenizer class](https://huggingface.co/docs/transformers/main_classes/tokenizer) implements a "slow" tokenizer in python. `PreTokenizer` inherits from `PreTokenizerBase`, and a `SpecialTokensMixin` class. Thus, if you're trying to understand the slow tokenizer for GPT-2, you have a whole class genealogy to figure out what is implemented where:
```
PreTokenizerBase       SpecialTokensMixin
        \                   /
         \                 /
          PreTrainedTokenizer
                  |
                  |
            GPT2Tokenizer
```
This is not pretty to navigate. So, I've tried to make things simple with just 2 classes:

1. `BPE` : Meant to replicate what the BPE algorithm does in GPT-2's tokenizer. This is not a complete tokenizer in itself, it's got some weird kinks (with unicode symbols, etc) that we'll see soon.
2. `MySlowTokenizer` : This is meant to a minimal implementation that can match all the basic features for HuggingFace's GPT2 tokenizer (the slow version).

We're going to completely ignore the special tokens handling for now, as this is related to postprocessing that is in fact easier to understand later. 


# BPE
Let's see our implementation in action first.

In [1]:
from bpe import BPE 
bpe = BPE("vocab.json") # make sure you're in the current directory of the notebook

  from .autonotebook import tqdm as notebook_tqdm


## The `__call__` method
Let's see what the `__call__` method does

In [3]:
bpe(" wordaaa")



'Ġword aaa'

The output for " wordaaa" from the BPE tokenizer is the strange string "Ġword aaa". Well, here's what it does:
1. Converts input string into bytes and encodes each byte with a unicode symbol. Specifically, a space " " becomes "Ġ". This is how GPT-2 does it. 
2. Tokenizes the string into characters 
3. Applies the BPE merge algorithm to iteratively combine intermediate tokens until you can't reduce it further.
4. Join the list of tokens with a whitespace and return the resultant string

Before we dive into the implementation for the merge algorithm, let's see what the list of tokens are when you use GPT2's tokenizer from HuggingFace

In [4]:
from transformers import AutoTokenizer
gpt2_tokenizer = AutoTokenizer.from_pretrained("gpt2", use_fast=False)
text = " wordaaa"
hf_tokens = gpt2_tokenizer.batch_decode(gpt2_tokenizer.encode(text))
my_tokens = bpe(text).split(" ")
print("HF's tokens: ", hf_tokens)
print("My tokens: ", my_tokens)

HF's tokens:  [' word', 'aaa']
My tokens:  ['Ġword', 'aaa']




The two outputs should infact be exactly the same, except for special characters (whitespace, etc) which we'll not get into later. (these two classes aren't exactly comparable as I mentioned, but it's good to see that this work)

## The algorithm
Here's the complete merge algorithm with bpe, showing as a standalone function (with unicode symbols decoded)

In [21]:
from bpe import get_pairs
from transformers.models.gpt2.tokenization_gpt2 import bytes_to_unicode

byte_encoder = bytes_to_unicode()
byte_decoder = {v: k for k, v in byte_encoder.items()}
bpe_ranks = bpe.bpe_ranks

def split_into_tokens(word: str):
    word = "".join([byte_encoder[b] for b in word.encode("utf-8")])
    pairs = get_pairs(word) # "obobc" -> set([("o", "b"), ("b", "o"), ("b", "c")])
    iter = 0
    while True:
        # get pair of chars/tokens with lowest rank and merge
        bigram = min(pairs, key=lambda pair: bpe_ranks.get(pair, float("inf")))
        if bigram not in bpe_ranks:
            break # no more mergeable pairs
        first, second = bigram
        print(f"{iter=} : {first=} {second=}")
        new_word = []
        i = 0
        while i < len(word):
            try:
                j = word.index(first, i) # find index of occurence of `first` token in word[i:]
            except ValueError:
                print(f"\t -> {iter=} : {first=} not found in {word[i:]=}")
                new_word.extend(word[i:]) 
                break
            else:
                print(f"\t -> {iter=} : Found {first=} in {word[i:]=}. Skipping previous tokens {word[i:j]}")
                new_word.extend(word[i:j])
                i = j

            if word[i] == first and i < len(word) - 1 and word[i + 1] == second:
                print(f"\t -> {iter=} : Merging {first=} and {second=}")
                new_word.append(first + second)
                i += 2
            else:
                new_word.append(word[i])
                i += 1
            iter += 1
        new_word = tuple(new_word)
        word = new_word
        if len(word) == 1: # merged into a single token
            break
        else:
            pairs = get_pairs(word)
        print(f"{iter=} : Updated {word=}, {pairs=}")
    word = " ".join(word)
    return word

In [22]:
split_into_tokens(" wordaaa")

iter=0 : first='Ġ' second='w'
	 -> iter=0 : Found first='Ġ' in word[i:]='Ġwordaaa'. Skipping previous tokens 
	 -> iter=0 : Merging first='Ġ' and second='w'
	 -> iter=1 : first='Ġ' not found in word[i:]='ordaaa'
iter=1 : Updated word=('Ġw', 'o', 'r', 'd', 'a', 'a', 'a'), pairs={('d', 'a'), ('a', 'a'), ('r', 'd'), ('Ġw', 'o'), ('o', 'r')}
iter=1 : first='o' second='r'
	 -> iter=1 : Found first='o' in word[i:]=('Ġw', 'o', 'r', 'd', 'a', 'a', 'a'). Skipping previous tokens ('Ġw',)
	 -> iter=1 : Merging first='o' and second='r'
	 -> iter=2 : first='o' not found in word[i:]=('d', 'a', 'a', 'a')
iter=2 : Updated word=('Ġw', 'or', 'd', 'a', 'a', 'a'), pairs={('Ġw', 'or'), ('or', 'd'), ('d', 'a'), ('a', 'a')}
iter=2 : first='Ġw' second='or'
	 -> iter=2 : Found first='Ġw' in word[i:]=('Ġw', 'or', 'd', 'a', 'a', 'a'). Skipping previous tokens ()
	 -> iter=2 : Merging first='Ġw' and second='or'
	 -> iter=3 : first='Ġw' not found in word[i:]=('d', 'a', 'a', 'a')
iter=3 : Updated word=('Ġwor', 'd',

'Ġword aaa'

You can see how with each iteration, BPE merges the pair of tokens with the lowest rank / highest priority. It has to merge all occurences of the pair, and thus you have two while loops. The brief summary of what happened:
1. Obtain all unique bigrams (pairs of adjacent symbols) from the word. (before the while loop)
2. State: `word` is initially a string, but it is converted into a tuple in later iterations. `word` represents the current segmentation of the original word, represented as a tuple of tokens.
3. The algorithm then enters a loop where it looks for the lowest-ranked bigram (the first merge that BPE learnt while training). This represents the most frequent pair to be merged, or rather, the best compression step that BPE knows for this word.
4. If the bigram is not in `bpe_ranks`, it means that no more merges are possible, and the process terminates.
    - Why? Observe how the minimum has been calculated. If the bigram with minimum rank is not in `bpe_ranks`, then it has a rank of `inf`, which means none of the other bigrams are in `bpe_ranks` / merge list.
5. Otherwise, the function reconstructs the word by merging instances of the bigram.
    - This is the inner while loop. We iteratively find the first occurence of `first` in `word`, record the index, and break up the word into two at this index. Keep proceeding till the end of `word`.
6. This process continues until the word cannot be further merged, at which point the final tokenized word is returned by joining all the tokens with a whitespace.

What if we flipped the order of the tokens? How do the merges look then? Let's see:

In [18]:
split_into_tokens("aaa word")

iter=0 : first='Ġ' second='w'
	 -> iter=0 : Found first='Ġ' in word[i:]='aaaĠword'. Adding new token aaa
	 -> iter=1 : first='Ġ' not found in word[i:]='ord'
iter=1 : Updated word=('a', 'a', 'a', 'Ġw', 'o', 'r', 'd'), pairs={('a', 'a'), ('r', 'd'), ('Ġw', 'o'), ('o', 'r'), ('a', 'Ġw')}
iter=1 : first='o' second='r'
	 -> iter=1 : Found first='o' in word[i:]=('a', 'a', 'a', 'Ġw', 'o', 'r', 'd'). Adding new token ('a', 'a', 'a', 'Ġw')
	 -> iter=2 : first='o' not found in word[i:]=('d',)
iter=2 : Updated word=('a', 'a', 'a', 'Ġw', 'or', 'd'), pairs={('Ġw', 'or'), ('or', 'd'), ('a', 'a'), ('a', 'Ġw')}
iter=2 : first='Ġw' second='or'
	 -> iter=2 : Found first='Ġw' in word[i:]=('a', 'a', 'a', 'Ġw', 'or', 'd'). Adding new token ('a', 'a', 'a')
	 -> iter=3 : first='Ġw' not found in word[i:]=('d',)
iter=3 : Updated word=('a', 'a', 'a', 'Ġwor', 'd'), pairs={('a', 'a'), ('Ġwor', 'd'), ('a', 'Ġwor')}
iter=3 : first='Ġwor' second='d'
	 -> iter=3 : Found first='Ġwor' in word[i:]=('a', 'a', 'a', 'Ġwor'

'aaa Ġword'

It's almost exactly the same steps, except that the higest priority tokens seem to keep falling towards the end of the tuple `word`, and thus the tokens at the end keep getting merged (Notice how the first three tokens `('a','a','a')` keep getting skipped in the first few iterations)

# MySlowTokenizer

Let's now move on to the slow tokenizer's implementation. Let's see what you can _do_ with MySlowTokenizer first. Features implemented:
- `tokenizer(text)` : Tokenizes a piece of text and returns a list of token ids. Equivalent to `tokenizer.encode(text)`
- `tokenizer.decode(token_ids)`: Decodes a list of token ids and returns a stitched up string.
- `tokenizer.add_tokens(my_new_tokens)`: Add new tokens to the tokenizer's vocabulary. If a token is already present, this errors out. 
- `tokenizer.convert_token_to_id(token)` : Self-evident
- `tokenizer.convert_id_to_token(token_id)` : Self-evident
- `tokenizer.pre_tokenize(text)`: Pretokenizes a string by splitting at whitespaces, contractions (Ex: `don't`), etc. This isn't a method in HuggingFace, but I find this convenient. 
- `tokenizer.to_dict()` : Export tokenizer state into a dictionary. (Sadly, a `from_dict` hasn't been implemented, as I think this might be overkill)

In [8]:
from minimal_hf_tok import MySlowTokenizer
from transformers import AutoTokenizer
gpt2_tokenizer = AutoTokenizer.from_pretrained("gpt2", use_fast=False)
my_tokenizer = MySlowTokenizer("vocab.json")
my_tokenizer

MySlowTokenizer(vocab_size=50257, unk_token=<|endoftext|>, added_tokens={'<|endoftext|>'}, bpe=BPE(vocab_size=50257))

In [9]:
input_text = "This isn't<|myspecialtoken|> that   simple"
new_token = "<|myspecialtoken|>"
my_tokenizer.add_tokens(new_token)
gpt2_tokenizer.add_tokens(new_token)
my_enc = my_tokenizer.encode(input_text)
gpt2_enc = gpt2_tokenizer.encode(input_text)

print("Input text:", input_text)
print("My tokenizer encoding:", my_enc)
print("GPT2 tokenizer encoding:", gpt2_enc)

print("My tokenizer decoding:", my_tokenizer.decode(my_enc))
print("GPT2 tokenizer decoding:", gpt2_tokenizer.decode(gpt2_enc))

Added <|myspecialtoken|> to the vocabulary.
Input text: This isn't<|myspecialtoken|> that   simple
My tokenizer encoding: [1212, 2125, 470, 50257, 326, 220, 220, 2829]
GPT2 tokenizer encoding: [1212, 2125, 470, 50257, 326, 220, 220, 2829]
My tokenizer decoding: This isn't<|myspecialtoken|> that   simple
GPT2 tokenizer decoding: This isn't <|myspecialtoken|>  that   simple


Our tokenizer gives the same result as HuggingFace's tokenizer, and supports adding a new token!

# Pouring over the implementation
I'm copying over the code from `minimal_hf_tok.py` here, because, well, this wouldn't be much of a walkthrough otherwise

In [12]:
from typing import Dict, Tuple, Union, List, Any
import regex as re # regex is cooler than re
from bpe import BPE
from minimal_hf_tok import EOS_TOKEN, MyTrie

class MySlowTokenizer:
    """
    A minimal implementation of HF's slow tokenizer, based on GPT2's tokenizer
    References:
    https://github.com/huggingface/transformers/blob/8aca43bdb3cb9a5020f6d57589d85679dc873b1c/src/transformers/models/gpt2/tokenization_gpt2.py
    """
    def __init__(self, init_vocab_file: str = None):
        self.added_tokens_trie = MyTrie() # trie for added tokens only
        self.bpe = BPE(init_vocab_file)
        self.vocab = self.bpe.token_to_id # nice to have vocab accessible here
        self.byte_encoder = self.bpe.byte_encoder
        self.byte_decoder = {v: k for k, v in self.byte_encoder.items()}
        self.unk_token = EOS_TOKEN

        # Regex for pre-tokenization - breaking up a piece of text into words by splitting at whitespaces, contractions, etc. Borrowed from GPT-2
        self.pattern_for_splitting = re.compile(r"""'s|'t|'re|'ve|'m|'ll|'d| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+""")
        self._load_added_tokens()
    
    def _load_added_tokens(self):
        # loads added tokens from json and adds them to the trie
        # Hard coded for GPT2 demonstration
        self.added_tokens_trie.add(EOS_TOKEN)
    
    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        self.encode(*args, **kwargs)
    
    def encode(self, text: str, **kwargs: Any) -> Any:
        text, kwargs = self.prepare_for_tokenization(text, **kwargs)

        # 1. Split text into chunks at the boundaries of added_tokens. Can be thought of as a pre-tokenization step.
        # "This isn't<|endoftext|> what you think" -> ["This isn't", "<|endoftext|>", " what you think"] 
        chunks = self.added_tokens_trie.split(text)
        bpe_tokens = []
        for chunk in chunks:
            if chunk in self.added_tokens_trie._tokens:
                # if chunk is an added token, directly add it to bpe_tokens
                bpe_tokens.append(chunk)
            else:
                # 2. Tokenize each chunk
                tokens = self._tokenize(chunk)
                bpe_tokens.extend(tokens)
        # 3. Convert tokens to ids
        bpe_tokens = [self.convert_token_to_id(token) for token in bpe_tokens]
        return bpe_tokens

    def decode(self, ids: List[int], **kwargs: Any) -> str:
        # 1. Convert ids to tokens
        tokens = [self.convert_id_to_token(id_) for id_ in ids] 
        # 2. Join tokens
        text = "".join(tokens)
        # 3. Replace unicode symbols with normal characters
        text = bytearray([self.byte_decoder[c] for c in text]).decode("utf-8")
        return text
    
    def _tokenize(self, text: str) -> List[str]:
        all_tokens = []
        # Pre-tokenization: split text into words based on regex. "This isn't" -> ["This", " isn", "'t"]
        words = self.pre_tokenize(text)
        for word in words:
            # Unicode string encoding. " isn'" -> bytes object -> "Ġisn'"
            word = "".join([self.byte_encoder[b] for b in word.encode("utf-8")]) 
            tokens = self.bpe(word, dont_byte_encode=True).split(" ") # we already encoded the chunk to unicode strings
            all_tokens.extend(tokens)
        return all_tokens

    def pre_tokenize(self, text: str) -> List[str]:
        return self.pattern_for_splitting.findall(text)
    
    def prepare_for_tokenization(self, text: str, **kwargs: Any) -> Tuple[str, Dict[str, Any]]:
        """
        In HF, this method performs any pre-processing needed before tokenization. Dummy function for now.
        """
        # returns text and kwargs
        return (text, kwargs)

    def to_dict(self):
        dict1 = {}
        dict1["model"]["type"] = "BPE"
        dict1["model"]["vocab"] = self.vocab
        dict1["model"]["merges"] = self.bpe.merges
        dict1["added_tokens"] = list(self.added_tokens_trie._tokens)
        dict1["special_tokens_map"] = {"unk_token": self.unk_token}
        return dict1
    
    def add_tokens(self, new_tokens: Union[str, List[str]]):
        """
        Adds new tokens to the tokenizer.
        """
        if isinstance(new_tokens, str):
            new_tokens = [new_tokens]
        for token in new_tokens:
            self.bpe.add_token(token) # add to vocab first
            self.added_tokens_trie.add(token)
            print(f"Added {token} to the vocabulary.")
    
    def convert_token_to_id(self, token: str) -> int:
        """
        Converts a token to its id. Returns unk token id if token is not in vocab.
        Fancy word: Numericalization
        """
        return self.vocab.get(token, self.vocab[self.unk_token])

    def convert_id_to_token(self, index: int) -> str:
        """
        Converts an id to its token. Returns unk token if id is not in vocab.
        """
        return self.bpe.id_to_token.get(index, self.unk_token)

    def __repr__(self) -> str:
        string = "MySlowTokenizer("
        string += f"vocab_size={self.nvocab}, unk_token={self.unk_token}, added_tokens={str(self.added_tokens_trie._tokens)}, "
        string += f"bpe={str(self.bpe)})"
        return string
    
    # make vocab_size a property because it should change with added tokens
    @property
    def nvocab(self) -> int:
        return len(self.vocab)


## `__init__`

1. Initializes the tokenizer with a vocabulary file if provided.
2. Sets up the `MyTrie` structure for added tokens.
3. Prepares the byte pair encoding (BPE) mechanism.
4. Creates a byte encoder and decoder for character-level representation.
5. Sets a pattern for pre-tokenization, which splits text into chunks based on specified rules.

## `encode`

Let's break down how to implement the `encode` method. When you do `tok.encode(text)`, there are three operations:
1. Normalize and pre-tokenize input text. With GPT2, the pre-tokenization involves breaking up the text on whitespace, contractions, punctuations, etc.
2. Tokenize input string/strings to get a list of tokens for each word/chunk. This is handled by the `._tokenize()` method.
3. Convert tokens to token ids using `.convert_tokens_to_ids()` method.


Important detail on `added_tokens`: These are really tokens added to the BPE vocabulary after the model was trained -  can you really just let the BPE model tokenize your string directly? Think about this:
1. You added a new token `<|extraspecialtoken|>` to the vocabulary and gavae it a new ID.
2. You tokenize a string `Hello there <|extraspecialtoken|>`. 
What happens if you directly use the BPE model to tokenize this string? First, BPE will split this up into symbols, and then it will iteratively merge neighbouring pairs of tokens/characters until you can't merge anymore. So, we need to handle this in the pre-tokenization step - Along with splitting on whitespace, punctuations, etc, we will also split at the boundaries of `added_tokens`. 

## `decode`
When you run `tok.decode(token_ids)`, there are three operations:
1. Convert ids to tokens using the `id_to_token` mapping from `tok.bpe`. 
2. Join all the tokens
3. Replace unicode symbols with normal characters