This is an implementation of the GPT tokenizer following [this video](https://www.youtube.com/watch?v=zduSFxRajkE) by Andrej Karpathy

In [145]:
def get_bigram(ids, bigram: dict):
    for pair in zip(ids, ids[1:]):
        # pair = list(pair) # This might be preferable because of all the 'list(pair)' in the decoder
        bigram[pair] = bigram.get(pair, 0) + 1
    return bigram

def merge(ids, new_pair, new_code):
    new_ids = []
    i = 0
    while i < len(ids) - 1:
        if (ids[i], ids[i + 1]) == new_pair:
            new_ids.append(new_code)
            i += 2
        else:
            new_ids.append(ids[i])
            i += 1
    if i == len(ids) - 1:
        new_ids.append(ids[-1])
    return new_ids
class BasicTokenizer:
    """
    This is a simple implementation of a BPE tokenizer.
    After initializing it you have to train it using .train
    """
    def __init__(self):
        super().__init__()
    def train(self, text, vocab_size, verbose=False):
        """Train the BPE algorithm on text, by performing merges until the vocab size == vocab_size"""
        assert vocab_size >= 256, "The vocab_size has to be greater than 256"
        self.merges = {} # A history of all the merges that happend
        ids = [x for x in text.encode('utf-8')]
        len_ids_before = len(ids)
        curr_vocab_size = 256
        self.decode_map = {i: bytes([i]) for i in range(256)}
        while curr_vocab_size < vocab_size and len(ids) > 1:
            bigram = {}
            get_bigram(ids, bigram)
            new_pair = max(bigram, key=bigram.get)
            if verbose:
                print(f'Merging {new_pair} into id {curr_vocab_size}')
            self.decode_map[curr_vocab_size] = self.decode_map[new_pair[0]] + self.decode_map[new_pair[1]]
            ids = merge(ids, new_pair, curr_vocab_size)
            self.merges[new_pair] = curr_vocab_size
            curr_vocab_size += 1
        self.vocab_size = curr_vocab_size
        len_ids_after = len(ids)
        if verbose:
            print(f'The compression rate is {len_ids_before/len_ids_after:.2f}')

    def encode(self, text):
        # I've implemented this by myself. There are probably other algorithms that do the same things much faster
        tokens = list(text.encode('utf-8'))
        prev_tokens_len = 0
        curr_tokens = tokens
        curr_tokens_len = len(tokens)
        while prev_tokens_len != curr_tokens_len:
            new_tokens = []
            i = 0
            while i < curr_tokens_len - 1:
                if (curr_tokens[i], curr_tokens[i + 1]) in self.merges:
                    new_tokens.append(self.merges[(curr_tokens[i], curr_tokens[i + 1])])# This will help us decode
                    i += 2
                else:
                    new_tokens.append(curr_tokens[i])
                    i += 1
            if i == curr_tokens_len - 1:
                new_tokens.append(curr_tokens[-1])

            prev_tokens_len = curr_tokens_len
            curr_tokens = new_tokens
            curr_tokens_len = len(new_tokens)
        return curr_tokens
        
    def decode(self, ids):
        decoded_ids = b''.join([self.decode_map[idx] for idx in ids])
        text = decoded_ids.decode('utf-8', errors='replace')
        return text
                

In [146]:
BasicTokenizer.train?

[0;31mSignature:[0m [0mBasicTokenizer[0m[0;34m.[0m[0mtrain[0m[0;34m([0m[0mself[0m[0;34m,[0m [0mtext[0m[0;34m,[0m [0mvocab_size[0m[0;34m,[0m [0mverbose[0m[0;34m=[0m[0;32mFalse[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m Train the BPE algorithm on text, by performing merges until the vocab size == vocab_size
[0;31mFile:[0m      /tmp/ipykernel_193194/1854094153.py
[0;31mType:[0m      function

In [89]:
tok = BasicTokenizer()
taylor = open('data/taylorswift.txt' ,'r').read()
small_test = 'Hello world this is a test!\n'
tok.train(taylor, 500, verbose=False)
# Basic sanity check tests
assert tok.decode(tok.encode(small_test)) == small_test
tok = BasicTokenizer()
tok.train(small_test, 500)
assert tok.decode(tok.encode(taylor)) == taylor

In [84]:
import regex as re
GPT4_SPLIT_PATTERN = r"""'(?i:[sdmt]|ll|ve|re)|[^\r\n\p{L}\p{N}]?+\p{L}+|\p{N}{1,3}| ?[^\s\p{L}\p{N}]++[\r\n]*|\s*[\r\n]|\s+(?!\S)|\s+"""

In [142]:
class RegexTokenizer:
    """A BPE tokenizer that splits the training/encoding text, before preforming training/tokenizing using BPE""" 
    def __init__(self, pattern=GPT4_SPLIT_PATTERN):
        super().__init__()
        self.pattern = re.compile(pattern)
        
    def train(self, text, vocab_size, verbose=False):
        assert vocab_size >= 256, "The vocab_size has to be greater than 256"
        self.vocab_size = vocab_size
        self.merges = {} # A history of all the merges that happend
        list_ids = [list(t.encode('utf-8')) for t in re.findall(self.pattern, text)]
        len_ids_before = len(text.encode('utf-8')) # To calculate the compression ration
        curr_vocab_size = 256
        self.decode_map = {i: bytes([i]) for i in range(256)}
        while curr_vocab_size < vocab_size:
            bigram = {}
            for ids in list_ids:
                get_bigram(ids, bigram)
            if bigram == {}: # 1 code per ids
                break
            new_pair = max(bigram, key=bigram.get)
            if verbose:
                print(f'Merging {new_pair} into id {curr_vocab_size}')
            self.decode_map[curr_vocab_size] = self.decode_map[new_pair[0]] + self.decode_map[new_pair[1]] # This will help us decode
            list_ids = [merge(ids, new_pair, curr_vocab_size) for ids in list_ids]
            self.merges[new_pair] = curr_vocab_size
            curr_vocab_size += 1
        self.vocab_size = curr_vocab_size
        len_ids_after = sum(len(ids) for ids in list_ids)
        if verbose:
            print(f'The compression rate is {len_ids_before/len_ids_after:.2f}')
    def encode(self, text):
        # I've implemented this by myself. There are probably other algorithms that do the same things much faster
        list_tokens = [list(tokens.encode('utf-8')) for tokens in re.findall(self.pattern, text)]
        prev_tokens_len = 0
        curr_list_tokens = list_tokens
        curr_tokens_len = len(text.encode('utf-8'))
        while prev_tokens_len != curr_tokens_len:
            prev_tokens_len = curr_tokens_len
            curr_token_len = 0
            new_list_tokens = []
            for curr_tokens in curr_list_tokens:
                i = 0
                new_tokens = []
                tokens_len = len(curr_tokens)
                while i < tokens_len - 1:
                    if (curr_tokens[i], curr_tokens[i + 1]) in self.merges:
                        new_tokens.append(self.merges[(curr_tokens[i], curr_tokens[i + 1])])# This will help us decode
                        i += 2
                    else:
                        new_tokens.append(curr_tokens[i])
                        i += 1
                    curr_token_len += 1
                if i == tokens_len - 1:
                    new_tokens.append(curr_tokens[-1])
                    curr_token_len += 1
                new_list_tokens.append(new_tokens)
            curr_list_tokens = new_list_tokens
        encoded_tokens = [token for tokens in curr_list_tokens for token in tokens]
        # print(f'The number of partitions is {len(curr_list_tokens)}, and they all have {sum(len(tokens) for tokens in curr_list_tokens)}')
        return encoded_tokens
    def decode(self, ids):
        decoded_ids = b''.join([self.decode_map[idx] for idx in ids])
        text = decoded_ids.decode('utf-8', errors='replace')
        return text

In [144]:
tok = RegexTokenizer()
taylor = open('data/taylorswift.txt' ,'r').read()
small_test = 'Hello world this is a test!\n'
tok.train(small_test, 500, verbose=False)
assert taylor == tok.decode(tok.encode(taylor))
tok = RegexTokenizer()
tok.train(taylor, 500, verbose=False)
assert small_test == tok.decode(tok.encode(small_test))

After finishing this I have to move it to it's own .py file\(s\).

This was made mostly for educational purposes because running a transformer on tokens is impossible for me due to compute limitations.

I might expand this in the future to include more impelentations

Andrej Karpathy's [Implementation](https://github.com/karpathy/minbpe/blob/master/minbpe/gpt4.py)