# minbpe exercise

  At this point you have everything you need to build your own GPT-4 tokenizer. This is the exercise progression you may wish to follow. You'll note that it is part of the minbpe repo, which is the solution to that exercise, and is a cleaned up version of the code above.

In [1]:
def getStats(ids):
    counts = {}
    for pair in zip(ids, ids[1:]): #get consecutive ordr
        counts[pair] = counts.get(pair, 0) + 1
    return counts




In [15]:
def merge(ids, pair, ix):
    newId = [ ]
    i = 0
    while i < len(ids):
        if i < len(ids) - 1 and ids[i] == pair[0] and ids[i+1] == pair[1]:
            newId.append(ix)
            i += 2
        else:
            newId.append(ids[i])
            i += 1
    return newId

print(merge([5, 6, 6, 7, 9, 1], (6, 6), 99))

    

[5, 99, 7, 9, 1]


# Step 1 :
   + Write the BasicTokenizer class, with the following three core functions:

      * def train(self, text, vocab_size, verbose=False)


      * def encode(self, text)


      * def decode(self, ids)

### First Try

In [None]:

class BasicTokenizer:
    def __init__(self, token):
        self.merges = {}
        self.vocabSiz = 256
        self.numMerges = self.vocabSiz - 256
        self.token = token

    def train(self, txt, vocabSiz, verbose=False):
        vocabSiz = 256
        ids = list(self.token)
        for i in range(self.numMerges):
            stats = getStats(ids)
            pair = max(stats, key=stats.get)
            ix = 256 + i
            print(f'mergin para: {pair}, to --> {ix}')
            ids = merge(ids, pair, ix)
        return ids 

    def encode(self, txt):
        token = list(txt.encode('utf-8'))
        while len(token) >= 2:
            stats = getStats(token)
            ## if not pair that didnt emegi, assing it to inf
            pair = min(stats, key=lambda p: self.merges.get(p, float('inf')))
            if pair not in self.merges:
                break
            ix = self.merges[pair]
            token = merge(token, pair, ix)
        return token
    
    def decode(self, ids):
        # token id to byte Object For that token
        vocab = {ix:bytes([ix]) for ix in range(256)} 
        for (p0, p1), ix in self.merges.items():
            vocab[ix] = vocab[p0] + vocab[p1] 

        
        tokens = b"".join(vocab[ix] for ix in ids)
        txt = tokens.decode('utf-8', errors='replace')
        return txt 
    





### Second Try, Got Help from minbpe Repo

In [3]:
def getStats(ids, counts=None):
    counts = { } if counts is None else counts
    # pythonic way of iterating consecutive element
    for pair in zip(ids, ids[1:]):
        counts[pair] = counts.get(pair, 0) + 1

    return counts  

In [4]:
def merge(ids, pair, ix):
    newId = [ ]
    i = 0
    while i < len(ids):
        if i < len(ids) - 1 and ids[i] == pair[0] and ids[i+1] == pair[1]:
            newId.append(ix)
            i += 2
        else:
            newId.append(ids[i])
            i += 1
    return newId

In [5]:
import unicodedata 

# first two helper functions...// 

# #Code from minbpe Repo../
def replaceCtrlChar(s: str) -> str:
    chars = [ ]
    for ch in s:
        if unicodedata.category(ch)[0] != "C":
            chars.append(ch) # this character is ok
        
        else:
            chars.append(f"\\u{ord(ch):04x}") # escape
    
    return "".join(chars)

def renderToken(t: bytes) -> str:
    s = t.decode('utf-8', errors='replace')
    s = replaceCtrlChar(s)

    return s 



In [6]:
class Tokenizer:
    def __init__(self):
        self.merges = {}
        self.pattern = ""
        self.specialToken = {}
        self.vocab = self._buildVocab()
    
    def train(self, txt, vocabSiz, verbose=False):
        raise NotImplementedError
    
    def enc(self, txt):
        raise NotImplementedError

    def decod(self, ids):
        raise NotImplementedError
    
    def _buildVocab(self):
        vocab = {ix: bytes([ix]) for ix in range(256)}
        for (p0, p1), ix in self.merges.items():
            vocab[ix] = vocab[p0] + vocab[p1]
        
        for special, ix in self.specialToken.items():
            vocab[ix] = special.encode('utf-8')
        
        return vocab

    
    def save(self, filPrefx):
        modelFile = filPrefx + ".model"
        with open(modelFile, 'w') as file:
            file.write("minbpe v1\n")
            file.write(f"{self.pattern}\n")

            file.write(f"{len(self.specialToken)}\n")
            for special, ix in self.specialToken.items():
                file.write(f"{special}  {ix}\n")
            
            for ix1, ix2 in self.merges:
                file.write(f"{ix1} {ix2}\n")
        
        vocabSiz = filPrefx + '.vocab'
        invertedMerges = {ix:pair for pair, ix in self.merges.items()}

        with open(vocabSiz, 'w', encoding='utf-8') as file:
            for ix, token in self.vocab.items():

                s = renderToken(token)
                # find the children of this token, if any
                if ix in invertedMerges:
                    ix0, ix1 = invertedMerges[ix]
                    s0 = renderToken(self.vocab[ix0])
                    s1 = renderToken(self.vocab[ix1])
                    file.write(f"[{s0}] [{s1}] -> [{s}] [{ix}]\n")
                
                else:
                    file.write(f"[{s}] {ix}\n")
    

    def load(self, modelFile):
        assert modelFile.endswith(".model")

        merges = {}
        specialToken = {}
        ix = 256

        with open(modelFile, 'r', encoding='utf-8') as file:
            vesrion = file.readline().strip()
            assert vesrion == "minbpe v1"

            self.patrn = file.readline().strip()

            numSpecial = int(file.readline().strip().split())
            
            for i in range(numSpecial):
                special, specailIx = file.readline().strip().split()
                specialToken[special] = int(specailIx)
            
            for line in file:
                ix1, ix2 = map(int, line.split())
                merges[(ix1, ix2)] = ix 
                ix += 1
        
        self.merges = merges
        self.specialToken = specialToken
        self.vocab = self._buildVocab()




In [7]:
class BasicTokenizer(Tokenizer):
    def __init__(self):
        super().__init__()
    
    def train(self, txt, vocabSiz, verbose=False):
        assert vocabSiz >= 256
        numMerges = vocabSiz - 256

        txtByts = txt.encode('utf-8')
        ids = list(txtByts)

        mergs = {}
        vocab = {ix:bytes([ix]) for ix in range(256)}

        for i in range(numMerges):
            stats = getStats(ids)

            pair = max(stats, key=stats.get)

            ix = 256 + i

            ids = merge(ids, pair, ix)

            mergs[pair] = ix 
            vocab[ix] = vocab[pair[0]] + vocab[pair[1]]

            if verbose:
                print(f"merge {i+1}/{numMerges}: {pair} -> {ix} ({vocab[ix]}) has {stats[pair]} occurence")
        
        self.merges = mergs
        self.vocab = vocab
    
    def decod(self, ids):
        txtByts = b"".join(self.vocab[ix] for ix in ids)
        txt = txtByts.decode('utf-8', errors='replace')

        return txt 
    

    def enc(self, txt):
        txtByts = txt.encode('utf-8')
        ids = list(txtByts)
        while len(ids) >= 2:
            stats = getStats(ids)
            pair = min(stats, key=lambda p: self.merges.get(p, float('inf')))

            if pair not in self.merges:
                break

            ix = self.merges[pair]
            ids = merge(ids, pair, ix)
        
        return ids 

In [None]:
import os 
import time

txtByts = open('taylorswift.txt', "r", encoding='utf-8').read()
os.makedirs("models", exist_ok=True)

t0 = time.time()

tokenizr = BasicTokenizer()
tokenizr.train(txtByts, 512, verbose=True)

prefx = os.path.join('models', 'basic')
tokenizr.save(prefx)

t1 = time.time()
print(f"Training took {t1 - t0:.2f} seconds")


merge 1/256: (101, 32) -> 256 (b'e ') has 2981 occurence
merge 2/256: (44, 32) -> 257 (b', ') has 2961 occurence
merge 3/256: (100, 32) -> 258 (b'd ') has 2617 occurence
merge 4/256: (46, 32) -> 259 (b'. ') has 2560 occurence
merge 5/256: (114, 32) -> 260 (b'r ') has 2428 occurence
merge 6/256: (50, 48) -> 261 (b'20') has 2365 occurence
merge 7/256: (115, 32) -> 262 (b's ') has 2053 occurence
merge 8/256: (105, 110) -> 263 (b'in') has 2006 occurence
merge 9/256: (111, 110) -> 264 (b'on') has 1815 occurence
merge 10/256: (114, 105) -> 265 (b'ri') has 1805 occurence
merge 11/256: (116, 32) -> 266 (b't ') has 1802 occurence
merge 12/256: (116, 104) -> 267 (b'th') has 1737 occurence
merge 13/256: (101, 258) -> 268 (b'ed ') has 1736 occurence
merge 14/256: (257, 261) -> 269 (b', 20') has 1705 occurence
merge 15/256: (97, 110) -> 270 (b'an') has 1487 occurence
merge 16/256: (97, 114) -> 271 (b'ar') has 1360 occurence
merge 17/256: (101, 260) -> 272 (b'er ') has 1356 occurence
merge 18/256: (

In [8]:
import regex as re 


In [9]:
GPT2_SPLIT_PATTERN = r"""'(?:[sdmt]|ll|ve|re)| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+"""
GPT4_SPLIT_PATTERN =  r"""'(?i:[sdmt]|ll|ve|re)|[^\r\n\p{L}\p{N}]?+\p{L}+|\p{N}{1,3}| ?[^\s\p{L}\p{N}]++[\r\n]*|\s*[\r\n]|\s+(?!\S)|\s+"""


In [10]:
class RegExTokenizer(Tokenizer):
    def __init__(self, pattern=None):
        super.__init__()
        self.pattern = GPT4_SPLIT_PATTERN if pattern is None else pattern
        self.compilPattrn = re.compile(self.pattern)
        self.specialTokn = {}
        self.inverseSpecialTokn = { }

    def train(self, txt, vocabSiz, verbose=False):
        assert vocabSiz >= 256
        numMerges = vocabSiz - 256

        txtChunks = re.findall(self.compilPattrn, txt)
        ids = [list(ch.encode('utf-8')) for ch in txtChunks]


        mergs = {}
        vocab = {ix:bytes([ix]) for ix in range(256)}

        for i in range(numMerges):
            stats = { }
            
            for chunkIds in ids:
                getStats(chunkIds, stats)

            pair = max(stats, key=stats.get)

            ix = 256 + i

            ids = [merge(chnkIds, pair, ix) for chnkIds in ids]
            

            mergs[pair] = ix 
            vocab[ix] = vocab[pair[0]] + vocab[pair[1]]

            if verbose:
                print(f"merge {i+1}/{numMerges}: {pair} -> {ix} ({vocab[ix]}) has {stats[pair]} occurence")
        
        self.merges = mergs
        self.vocab = vocab

    
    def registrSpecialTokn(self, specialTokn):
        self.specialTokn = specialTokn
        self.inverseSpecialTokn = {v:k for k, v in specialTokn.items()}
    
    def decod(self, ids):
        partByts = []
        for ix in ids:
            if ix in self.vocab:
                partByts.append(self.vocab[ix])
            
            elif ix in self.inverseSpecialTokn:
                partByts.append(self.inverseSpecialTokn[ix].encode('utf-8'))
            
            else:
                raise ValueError(f'invalid token id {ix}')
            
        txtByts = b"".join(partByts)
        txt = txtByts.decode('utf-8', errors='replace')

        return txt 
    

    def _encodeChunk(self, txtByts):

        ids = list(txtByts)
        while len(ids) >= 2:
            stats = getStats(ids)
            pair = min(stats, key=lambda p: self.merges.get(p, float('inf')))

            if pair not in self.merges:
                break

            ix = self.merges[pair]
            ids = merge(ids, pair, ix)
        
        return ids 

    def encodeOrdinary(self, txt):
        """Encoding that ignores any special tokens."""
        
        txtChunks = re.findall(self.compilPattrn, txt)
       
        ids = []
        for chunk in txtChunks:
            chunkByts = chunk.encode("utf-8") 
            chunkIds = self._encodeChunk(chunkByts) 
            ids.extend(chunkIds)
        
        return ids
    

    def enc(self, txt, allowdSpecial="none_raise"):
        special = None

        if allowdSpecial == 'all':
           special = self.specialToken
        
        elif allowdSpecial == 'none':
            special = {}
        
        elif allowdSpecial == 'none_raise':
            special = {}
            assert all(tken not in txt for tken in self.specialToken)

        elif isinstance(allowdSpecial, set):
            special = {k:v for v, k in self.specialToken.items() if k in allowdSpecial} 
        
        else:
            raise ValueError(f'allowd special {allowdSpecial} not understood')

        if not special:
            return self.encodeOrdinary(txt)
        
        specialPattrn = "(" + "|".join(re.escape(k) for k in special) + ")"
        specialChunk = re.split(specialPattrn, txt)

        ids = []
        for part in specialChunk:
            if part in special:
                ids.append(special[part])
            
            else:
                ids.extend(self.encodeOrdinary(part))
        
        return ids


# Step 3
  + You're now ready to load the merges from the GPT-4 tokenizer and show that your tokenizer produces the identical results for both encode and decode, matching tiktoken.

In [11]:
# match this
import tiktoken
enc = tiktoken.get_encoding("cl100k_base") # this is the GPT-4 tokenizer
ids = enc.encode("hello world!!!? (안녕하세요!) lol123 😉")
text = enc.decode(ids) # get the same text back

# Unfortunately, you will run into two issues:

 1. It is not trivial to recover the raw merges from the GPT-4 tokenizer. You can easily recover what we call vocab here, and what they call and store under enc._mergeable_ranks. Feel free to copy paste the recover_merges function in minbpe/gpt4.py, which takes these ranks and returns the raw merges. If you wish to know how this function works, read this and this. Basically, under some conditions it is enough to only store the parent nodes (and their rank) and get rid of the precise details of which children merged up to any parent.

 
  2. Second, the GPT-4 tokenizer for some reason permutes its raw bytes. It stores this permutation in the first 256 elements of the mergeable ranks, so you can recover this byte shuffle relatively simply as byte_shuffle = {i: enc._mergeable_ranks[bytes([i])] for i in range(256)}. In both your encode and decode, you'll have to shuffle bytes around accordingly. If you're stuck, reference the minbpe/gpt4.py` file for hints.

In [12]:
import tiktoken
enc = tiktoken.get_encoding("cl100k_base") # this is the GPT-4 tokenizer
ids = enc.encode("<|endoftext|>hello world", allowed_special="all")