In [13]:
import os
import shutil
from typing import Optional
import pickle
import numpy as np

class ConfigError(Exception):
    """for invalid user configuration of tokenizer class"""
    pass

class base_tokenizer:
    """
        This is the base class for tokenizer, it provde basic functionalities such as produce pair counts and merge new tokens
        Args:

    """
    def __init__(self):
        # initialize base vocabulary dictionary which is the character encoding based on UTF-8
        self._base_vocab = {i: bytes([i]) for i in range(256)}
        self._base_vocab_size = 256

    def _get_pair_counts(self,tokens):
        """
            treverse through the entire encoded text, produce a dictionary with paired occurrences of adjacent tokens
                key: token pairs, e.g., (106, 32)
                value: counts of occurrence of key, e.g., 300
                meaning: token pair (106, 32) occurred 300 times in the text
        """

        count_dict = {}
        for (c1, c2) in zip(tokens[:-1],tokens[1:]):
            count_dict[(c1,c2)] = count_dict.get((c1,c2),0) + 1
        return count_dict
    
    def _merge_pair(self,tokens,pair,new_token):
        """
            Replace all occurrences of pair in tokens by new_token
        """
        new_tokens = []
        i = 0
        while i < len(tokens):
            if i < len(tokens) - 1 and (tokens[i],tokens[i+1]) == pair:
                new_tokens.append(new_token)
                i += 2
            else:
                new_tokens.append(tokens[i])
                i += 1
        return new_tokens
    
    def _retrieve_training_history(self,title,vocab_size):
        """
            retrieve the dictionaries
        """
        folder = title + "_tok_folder"
        vocab_path = os.path.join(folder, title + "_vocab_dict_size"+str(vocab_size)+".pkl")
        merges_path = os.path.join(folder, title + "_merge_history_size"+str(vocab_size)+".pkl")
        tokens_path = os.path.join(folder, title + "_tokens_size"+str(vocab_size)+".npy")
        try:
            with open(vocab_path,"rb") as f:
                vocab = pickle.load(f)
            with open(merges_path,"rb") as f:
                merges = pickle.load(f)
            with open(tokens_path,"rb") as f:
                past_tokens = np.load(f)
        except:
            m = f"Dictionary files do not exit, tokenizer requires training with {title}. Or provided with inconsistent vocab_size, use os.listdir to inspect dictionary files."
            m_more = " Or past_tokens cannot be retreived, if this is the case, encode text first"
            raise FileNotFoundError(m+m_more)
        else:
            return vocab, merges, past_tokens

class TrainTokenizer(base_tokenizer):
    """
        This class implement compression algorithm described in 
            https://en.wikipedia.org/wiki/Byte_pair_encoding#:~:text=Byte%20pair%20encoding%20(also%20known,for%20use%20in%20downstream%20modeling
        It takes text and title, train a tokenizer and store the files in a directory
        Args:
            text: str, actual text
            title: str, name of the mateirals that the tokenizer is training on
            fresh_start: bool, whether to train from scratch or continue training/compressing, default=True
            final_vocab_size: int, final vocabulary size after compression - determines how many merges to perform
            last_vocab_size: int, if continue training, what is the last final_vocab_size in thousands: 10 -> 10,000
        
        Folder/Title Naming Convention:
            "book_title_tok_folder" - all lower case, connected with underscore
        
        Sub-files (in the _tok_folder) Naming Convetion:
            "book_title_vocab_dict_size10.pkl" stores encoding dictionary, where 10 means 10,000 vocabulary size
            "book_title_merge_history_size10.pkl" stores merging history, where 10 means 10,000 vocabulary size
            "book_title_tokens_size10.npy" stores tokens from last compression, with tokenization with size 10,000

        
    """

    def __init__(self, text: str, title: str, final_vocab_size: int =6000, fresh_start: bool =True, last_vocab_size: Optional[int] =None):
        super(TrainTokenizer,self).__init__()
        self.title = title
        self.final_vocab_size = final_vocab_size
        # initialize training vocabulary and merge history dictionaries
        if fresh_start:
            self.vocab = self._base_vocab
            self.merge_history = {}
        else:
            if last_vocab_size is None: raise ConfigError("for continue training (fresh_start == False), last final_vocab_size must be provided")
            if final_vocab_size <= last_vocab_size: raise ConfigError("unable to perform tokenizer training, because new vocabulary size must be larger than previous vocabulary size")
            self.vocab, self.merge_history, self.tokens = self._retrieve_training_history(self.title,last_vocab_size)
    
    
    def _perform_merge(self):
        """
            Training loop compression process:
                1. identify top pair
                2. swap the occurrences of top pair in the original tokens by new token
                3. update merge_history and vocab
        """
        vocab_size = len(self.merge_history) + self._base_vocab_size
        num_merges = self.final_vocab_size - vocab_size
        for _ in range(num_merges):
            pair_counts = self._get_pair_counts

    



In [14]:
a = TrainTokenizer("text","text")
a._base_vocab

{0: b'\x00',
 1: b'\x01',
 2: b'\x02',
 3: b'\x03',
 4: b'\x04',
 5: b'\x05',
 6: b'\x06',
 7: b'\x07',
 8: b'\x08',
 9: b'\t',
 10: b'\n',
 11: b'\x0b',
 12: b'\x0c',
 13: b'\r',
 14: b'\x0e',
 15: b'\x0f',
 16: b'\x10',
 17: b'\x11',
 18: b'\x12',
 19: b'\x13',
 20: b'\x14',
 21: b'\x15',
 22: b'\x16',
 23: b'\x17',
 24: b'\x18',
 25: b'\x19',
 26: b'\x1a',
 27: b'\x1b',
 28: b'\x1c',
 29: b'\x1d',
 30: b'\x1e',
 31: b'\x1f',
 32: b' ',
 33: b'!',
 34: b'"',
 35: b'#',
 36: b'$',
 37: b'%',
 38: b'&',
 39: b"'",
 40: b'(',
 41: b')',
 42: b'*',
 43: b'+',
 44: b',',
 45: b'-',
 46: b'.',
 47: b'/',
 48: b'0',
 49: b'1',
 50: b'2',
 51: b'3',
 52: b'4',
 53: b'5',
 54: b'6',
 55: b'7',
 56: b'8',
 57: b'9',
 58: b':',
 59: b';',
 60: b'<',
 61: b'=',
 62: b'>',
 63: b'?',
 64: b'@',
 65: b'A',
 66: b'B',
 67: b'C',
 68: b'D',
 69: b'E',
 70: b'F',
 71: b'G',
 72: b'H',
 73: b'I',
 74: b'J',
 75: b'K',
 76: b'L',
 77: b'M',
 78: b'N',
 79: b'O',
 80: b'P',
 81: b'Q',
 82: b'R',
 83: b'

In [8]:
with open("lord-of-the-rings-processed.txt", "r", encoding="utf-8") as f:
    text = f.read()
print(f"text length: {len(text)}")
tokens = list(text.encode("utf-8"))
tokens[:3]

text length: 3729059


[84, 104, 101]

In [10]:
tokenizer = base_tokenizer()
pair_counts = tokenizer._get_pair_counts(tokens)
sorted_pair_counts = sorted(pair_counts.items(),key=lambda x: x[1],reverse=True)
sorted_pair_counts[:5]

[((101, 32), 122267),
 ((32, 116), 107301),
 ((116, 104), 97454),
 ((104, 101), 95711),
 ((100, 32), 85094)]