Cannot wait to be building my own GPT tokenizer!

In [1]:
# Tokenizer for part 1

class BasicTokenizer:
    def _get_pair_rankings(self, text_bytes):
        """count the occurances for each pair in the input"""
         # generate a list of the most common byte-pairs
        encountered_pairs = {}
        for pair in zip(text_bytes, text_bytes[1:]): # neat way of iterating through the pairs
            encountered_pairs[pair] = encountered_pairs.get(pair, 0) + 1

        # sort the pairs
        encountered_pairs = list(sorted(encountered_pairs.items(), key=lambda x: x[1], reverse=True))
        return encountered_pairs
    

    def _merge_bp(self, text_bytes, encountered_pairs, next_token, verbose):
        """perform a merge on the most frequently occuring byte pair"""
        merges = {}

         # choose the pair that has the highest count and do compression
        top_pair_and_count = encountered_pairs[0]
        skip_next = False
        new_sequence = []

        for pair in zip(text_bytes, text_bytes[1:]):
            if skip_next:
                skip_next = False
                continue

            if pair == top_pair_and_count[0]:
                new_sequence.append(next_token)
                # since 2 tokens were compressed into 1, skip the next token
                skip_next = True

            else:
                new_sequence.append(pair[0])

        if verbose:
            print(f"Merged {top_pair_and_count[0]} into {next_token}.")

        merges[next_token] = top_pair_and_count[0]
        
        # account for the last token that was not zipped 
        new_sequence = new_sequence + [text_bytes[-1]] if not skip_next else []

        return new_sequence, merges

    def train(self, text, vocab_size, verbose=False):
        """Train the tokenizer on your own text sequence"""
        # start the tokenizer by changing the code to utf-8 encodings
        text_bytes = list(text.encode("utf-8"))
        merges = {}
        
        next_token = 256

        while next_token < vocab_size:
            encountered_pairs = self._get_pair_rankings(text_bytes)
            # important: after each round of encoding, update the series of text bytes and record the bpe that occured
            text_bytes, new_merge = self._merge_bp(text_bytes, encountered_pairs, next_token, verbose)
            merges = {**merges, **new_merge}
            next_token += 1
    
        return text_bytes

In [2]:
tokenizer = BasicTokenizer()
training_set = open("../taylorswift.txt", "r", encoding="utf-8").read()
tokenizer._get_pair_rankings(list((training_set).encode("utf-8")))
# r = tokenizer.train(training_set, 500, True)

[((101, 32), 2981),
 ((44, 32), 2961),
 ((32, 50), 2633),
 ((100, 32), 2617),
 ((46, 32), 2560),
 ((114, 32), 2428),
 ((50, 48), 2365),
 ((101, 114), 2359),
 ((104, 101), 2118),
 ((111, 114), 2076),
 ((115, 32), 2053),
 ((105, 110), 2006),
 ((101, 100), 1876),
 ((114, 105), 1862),
 ((32, 116), 1824),
 ((111, 110), 1815),
 ((116, 32), 1802),
 ((110, 32), 1768),
 ((118, 101), 1763),
 ((116, 104), 1737),
 ((32, 111), 1663),
 ((32, 83), 1633),
 ((97, 114), 1519),
 ((32, 97), 1495),
 ((97, 110), 1487),
 ((32, 65), 1335),
 ((121, 32), 1248),
 ((97, 108), 1164),
 ((115, 116), 1089),
 ((119, 105), 1086),
 ((32, 82), 1045),
 ((108, 32), 1026),
 ((48, 49), 989),
 ((98, 101), 988),
 ((116, 114), 978),
 ((114, 101), 972),
 ((32, 102), 967),
 ((110, 103), 966),
 ((48, 50), 960),
 ((105, 102), 955),
 ((101, 115), 938),
 ((105, 101), 938),
 ((32, 84), 934),
 ((102, 116), 934),
 ((97, 121), 900),
 ((82, 101), 887),
 ((32, 34), 882),
 ((83, 119), 873),
 ((108, 111), 872),
 ((116, 101), 866),
 ((101, 11

In [3]:
# tokenizer for part 2

import regex as re
GPT4_SPLIT_PATTERN = r"""'(?i:[sdmt]|ll|ve|re)|[^\r\n\p{L}\p{N}]?+\p{L}+|\p{N}{1,3}| ?[^\s\p{L}\p{N}]++[\r\n]*|\s*[\r\n]|\s+(?!\S)|\s+"""

"""
1. matches all '(smth) (?i...) means ignorecase
"""

training_set = open("../taylorswift.txt", "r", encoding="utf-8").read()
split_training_set = re.findall(GPT4_SPLIT_PATTERN, training_set)
training_bytes = [list(s.encode("utf-8")) for s in split_training_set]
# training_bytes

In [4]:
class BasicTokenizerWithRegex:
    def __init__(self):
         self.new_token_to_target = {}

    def _get_pair_rankings(self, text_bytes):
        """count the occurances for each pair in the input"""
         # generate a list of the most common byte-pairs
        encountered_pairs = {}
        for chunk in text_bytes:
            for pair in zip(chunk, chunk[1:]): # neat way of iterating through the pairs
                encountered_pairs[pair] = encountered_pairs.get(pair, 0) + 1

        # sort the pairs
        encountered_pairs = list(sorted(encountered_pairs.items(), key=lambda x: x[1], reverse=True))
        return encountered_pairs
    
    def _merge(self, text_bytes, target_pair, new_token):
        """Do a pure merge"""
        
        new_chunk = []
        skip_next = False

        for pair in zip(text_bytes, text_bytes[1:]):
            if skip_next:
                skip_next = False
                continue

            if pair == target_pair:
                new_chunk.append(new_token)
                # since 2 tokens were compressed into 1, skip the next token
                skip_next = True

            else:
                new_chunk.append(pair[0])

        new_chunk += [text_bytes[-1]] if not skip_next else []
        return new_chunk
    

    def _train_merge_bp(self, text_bytes, encountered_pairs, next_token, verbose):
        """perform a training merge on the most frequently occuring byte pair"""
        merges = {}

        # choose the pair that has the highest count and do compression
        top_pair_and_count = encountered_pairs[0]
        # break early if the remaining tokens do not appear more than once
        if top_pair_and_count[1] < 2:
            print("no more pairs that should be merged")
            return text_bytes, merges
        
        new_sequence = []

        for chunk in text_bytes:
            new_chunk = self._merge(chunk, top_pair_and_count[0], next_token)
            new_sequence.append(new_chunk)

        if verbose:
            print(f"Merged {top_pair_and_count[0]} into {next_token}.")

        merges[next_token] = top_pair_and_count[0]

        return new_sequence, merges

    def train(self, text, vocab_size, verbose=False):
        """Train the tokenizer on your own text sequence, now with regex"""
        # start the tokenizer by changing the code to utf-8 encodings
        if vocab_size < 256:
            raise Exception("Vocab size should not be lower than 255")

        text_chunks = re.findall(GPT4_SPLIT_PATTERN, text)
        text_bytes = [list(s.encode("utf-8")) for s in text_chunks]

        next_token = 256

        while next_token <= vocab_size:
            encountered_pairs = self._get_pair_rankings(text_bytes)
            # important: after each round of encoding, update the series of text bytes and record the bpe that occured
            text_bytes, new_merge = self._train_merge_bp(text_bytes, encountered_pairs, next_token, verbose)
            # print(text_bytes)
            self.new_token_to_target = {** self.new_token_to_target, **new_merge}
            next_token += 1

        self.target_to_new_token = {target:new_token for new_token, target in  self.new_token_to_target.items()}
        self.reversed_new_token_to_target = list(reversed(self.new_token_to_target.items()))
        return text_bytes
    
    def encode(self, text):
        "given text, encode it based on the merge dictionary"
        if not self.new_token_to_target:
            raise Exception("This tokenizer has not yet been trained!")

        # split the text up into chunks before doing the encoding (since that was how it was trained)
        txt_bytes = [list(chunk.encode("utf-8")) for chunk in re.findall(GPT4_SPLIT_PATTERN, text)]

        # follow the defined merges in order
        new_bytes = []
        for chunk in txt_bytes:
            for target, new_token in self.target_to_new_token.items():
                # print(f'target is {target}, new token is {new_token}')
                new_chunk = self._merge(chunk, target, new_token)
            
            new_bytes += new_chunk

        return new_bytes
    
    def decode(self, tokens):
        "given a set of encoded tokens, decode them into text"
        if not self.new_token_to_target:
            raise Exception("This tokenizer has not yet been trained!")

        for new_token, target in self.reversed_new_token_to_target:
            new_sequence = []
            for token in tokens:
                if token == new_token:
                    new_sequence.extend(target)

                else:
                    new_sequence.append(token)

            tokens = new_sequence
        # print(tokens)
        text = "".join([chr(t) for t in tokens])
        return text



In [5]:
gpt_4_copy = BasicTokenizerWithRegex()
r = gpt_4_copy.train(training_set, 260, True)

Merged (101, 114) into 256.
Merged (50, 48) into 257.
Merged (111, 114) into 258.
Merged (105, 110) into 259.
Merged (101, 100) into 260.


In [6]:
print(list("biggere rand cooler and bled".encode("utf-8")))
o = gpt_4_copy.encode("biggere rand cooler and bled")
gpt_4_copy.decode(o)

[98, 105, 103, 103, 101, 114, 101, 32, 114, 97, 110, 100, 32, 99, 111, 111, 108, 101, 114, 32, 97, 110, 100, 32, 98, 108, 101, 100]


'biggere rand cooler and bled'