In [137]:
import regex as re
from collections import defaultdict

PAT = r"""'(?:[sdmt]|ll|ve|re)| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+"""

def pretokenContainsBytePair(pretokenTuple, bytePairTuple):
    for idx, byte in enumerate(pretokenTuple[:-1]):
        if byte == bytePairTuple[0] and pretokenTuple[idx + 1] == bytePairTuple[1]:
            return True, idx
    return False, 0

def makeMerge(pretokenizedCounts):
    bytePairCounts = defaultdict(int)
    for pretoken, count in pretokenizedCounts.items():
        for idx, byte in enumerate(pretoken[:-1]):
            bytePairCounts[(byte, pretoken[idx + 1])] += count
    
    bytePairCountsSorted = sorted(bytePairCounts.items(), key = lambda x : (x[1], x[0]), reverse=True)
    
    merge = bytePairCountsSorted[0][0]
    pairCount = bytePairCountsSorted[0][1]
    bytePairCounts[merge] = 0
    return bytePairCounts, merge, pairCount


def mergePretokenizedCounts(pretokenizedCounts, mergedPair, bytePairCounts):
    newPretokenizedCounts = defaultdict(int)
    pretokensContainingMerge = []
    for pretoken, count in pretokenizedCounts.items():
        modifiedPretoken = list(pretoken)
        pretokenContainsMerge = False

        while True:
            mergeLocationTuple = pretokenContainsBytePair(modifiedPretoken, mergedPair)
            if not mergeLocationTuple[0]:
                break
            pretokenContainsMerge = True
            mergeLocation = mergeLocationTuple[1]
            if mergeLocation != 0:
                leftOverlap = (modifiedPretoken[mergeLocation - 1], modifiedPretoken[mergeLocation])
                if leftOverlap[0] != mergedPair[0] + mergedPair[1]:
                    bytePairCounts[leftOverlap] -= count
                    
            if mergeLocation + 1 != len(modifiedPretoken) - 1:
                rightOverlap = (modifiedPretoken[mergeLocation + 1], modifiedPretoken[mergeLocation + 2])
                bytePairCounts[rightOverlap] -= count
            
            modifiedPretoken[mergeLocation + 1] = mergedPair[0] + mergedPair[1]
            modifiedPretoken = modifiedPretoken[:mergeLocation] + modifiedPretoken[mergeLocation + 1:]

        
        
        modifiedPretoken = tuple(modifiedPretoken)
        newPretokenizedCounts[modifiedPretoken] = count
        if pretokenContainsMerge:
            for idx, byte in enumerate(modifiedPretoken[:-1]):
                if byte == (mergedPair[0] + mergedPair[1]) or modifiedPretoken[idx + 1] == (mergedPair[0] + mergedPair[1]):
                    bytePairCounts[(byte, modifiedPretoken[idx + 1])] += count
    return pretokensContainingMerge, newPretokenizedCounts

def getMerges(path, vocabSize, specialTokens):
    with open(path, 'r') as f:
        data = f.read()
    
    specialTokenPAT = '|'.join(re.escape(s) for s in specialTokens)
    docs = re.split(specialTokenPAT, data)

    pretokenizedCounts = defaultdict(int)
    for doc in docs:
        splitText = re.finditer(PAT, doc)
        for x in splitText:
            bytesRep = tuple(k.to_bytes() for k in x[0].encode('utf-8'))
            pretokenizedCounts[bytesRep] += 1
    
        
    vocabulary = [x.to_bytes() for x in range(256)] + [x.encode('utf-8') for x in specialTokens]
    
    merges = []
    bytePairCounts, mergedPair, pairCount = makeMerge(pretokenizedCounts)
    while len(vocabulary) < vocabSize:
        
        merges.append(mergedPair)
        vocabulary.append(mergedPair[0] + mergedPair[1])
        pretokensContainingMerge, pretokenizedCounts = mergePretokenizedCounts(pretokenizedCounts, mergedPair, bytePairCounts)
        bytePairCountsSorted = sorted(bytePairCounts.items(), key = lambda x : (x[1], x[0]), reverse=True)
    
        mergedPair = bytePairCountsSorted[0][0]
        pairCount = bytePairCountsSorted[0][1]
        bytePairCounts[mergedPair] = 0
    vocabDict = {}
    for idx, word in enumerate(vocabulary):
        vocabDict[idx] = word
    return vocabDict, merges

vocab, merges = getMerges("../tests/fixtures/corpus.en", 500, ["<|endoftext|>"])
print(merges[:10])

[(b' ', b't'), (b' ', b'a'), (b'h', b'e'), (b'i', b'n'), (b' t', b'he'), (b'r', b'e'), (b' ', b'o'), (b' ', b','), (b'e', b'r'), (b' ', b's')]


In [171]:
from typing import Iterator, Iterable
from functools import cache

def pretokenContainsBytePair(pretokenTuple, bytePairTuple):
    for idx, byte in enumerate(pretokenTuple[:-1]):
        if byte == bytePairTuple[0] and pretokenTuple[idx + 1] == bytePairTuple[1]:
            return True, idx
    return False, 0

def tryMerge(pretoken, mergedPair):
    modifiedPretoken = list(pretoken)
    pretokenContainsMerge = False
    while True:
        mergeLocationTuple = pretokenContainsBytePair(modifiedPretoken, mergedPair)
        if not mergeLocationTuple[0]:
            break
        pretokenContainsMerge = True
        mergeLocation = mergeLocationTuple[1]
        
        modifiedPretoken[mergeLocation + 1] = mergedPair[0] + mergedPair[1]
        modifiedPretoken = modifiedPretoken[:mergeLocation] + modifiedPretoken[mergeLocation + 1:]
    modifiedPretoken = tuple(modifiedPretoken)
    return modifiedPretoken

def tokenizeWord(word, merges):
    modifiedPretoken = word
    for merge in merges:
        modifiedPretoken = tryMerge(modifiedPretoken, merge)
    return modifiedPretoken
    
class Tokenizer():
    def __init__(self, vocab, merges, special_tokens=None):
        self.vocab = vocab
        self.merges = tuple(merges)
        self.special_tokens = special_tokens
        self.vocabToIdx = {v : k for k, v in self.vocab.items()}
    def from_files(cls, vocab_filepath, merges_filepath, special_tokens=None):
        pass
    def encode(self, text: str) -> list[int]:
        if self.special_tokens is not None:
            tokens_sorted = sorted(self.special_tokens, key=len, reverse=True)

            specialTokenPAT = "(" + "|".join(re.escape(s) for s in tokens_sorted) + ")"
            docs = re.split(specialTokenPAT, text)
        else:
            docs = [text]
        pretokenizedWords = []
        
        for doc in docs:
            if self.special_tokens is not None and doc in self.special_tokens:
                pretokenizedWords.append((doc.encode('utf-8'),))
            else:
                splitText = re.finditer(PAT, doc)
                for idx, x in enumerate(splitText):
                    bytesRep = tuple(k.to_bytes() for k in x[0].encode('utf-8'))
                    pretokenizedWords.append(bytesRep)
        idSequence = []
        for pretoken in pretokenizedWords:
            if self.special_tokens is not None and pretoken in self.special_tokens:
                idSequence.append(self.vocabToIdx[pretoken])
                continue
            modifiedPretoken = tokenizeWord(pretoken, self.merges)
            for tok in modifiedPretoken:
                idSequence.append(self.vocabToIdx[tok])
            
        
        return idSequence
        
    def encode_iterable(self, iterable: Iterable[str]) -> Iterator[int]:
        for string in iterable:
            splitText = re.finditer(PAT, string)
            for txt in splitText:
                for tok in self.encode(txt):
                    yield tok
                    
    def decode(self, ids: list[int]) -> str:
        txt = b''
        for tokId in ids:
            byte = self.vocab[tokId]
            txt += byte
        return txt.decode('utf-8', errors='replace')
tokenizer = Tokenizer(vocab, merges, ["<|endoftext|>"])
tokenizer.encode("luated <|endoftext|> aww man")

[482, 480, 32, 256, 258, 119, 119, 484]

In [172]:
tokenizer.decode([482, 480])

'luated'

In [134]:
specialTokens = ["<s>"]
data = "gather gathering <s> gatherer"

specialTokenPAT = "(" + '|'.join(re.escape(s) for s in specialTokens) + ")"
docs = re.split(specialTokenPAT, data)
print(docs)
pretokenizedWords = []
for doc in docs:
    if doc in specialTokens:
        pretokenizedWords.append(doc.encode('utf-8'))
    else:
        splitText = re.finditer(PAT, doc)
        for x in splitText:
            
            bytesRep = tuple(k.to_bytes() for k in x[0].encode('utf-8'))
            pretokenizedWords.append(bytesRep)
pretokenizedWords

['gather gathering ', '<s>', ' gatherer']


[(b'g', b'a', b't', b'h', b'e', b'r'),
 (b' ', b'g', b'a', b't', b'h', b'e', b'r', b'i', b'n', b'g'),
 (b' ',),
 b'<s>',
 (b' ', b'g', b'a', b't', b'h', b'e', b'r', b'e', b'r')]