In [37]:
from collections import defaultdict

In [38]:
def initialize_vocabulary(corpus):
    vocabulary = defaultdict(int)
    charset = set()
    for word in corpus:
        word_with_marker = '_' + word
        characters = list(word_with_marker)
        charset.update(characters)
        tokenized_word = " ".join(characters)  # Use space to separate tokens
        vocabulary[tokenized_word] += 1
    return vocabulary, charset
corpus = ["I", "like", "to", "watch", "good", "movies"]  # ["I", "like", "to", "watch", "bad", "movies"], ["I", "like", "to", "watch", "good", "movies", "with", "my", "friends"]
vocabulary, charset = initialize_vocabulary(corpus)
print(vocabulary)
print(charset)

defaultdict(<class 'int'>, {'_ I': 1, '_ l i k e': 1, '_ t o': 1, '_ w a t c h': 1, '_ g o o d': 1, '_ m o v i e s': 1})
{'m', 'k', 'h', 'I', 's', 'l', 'e', 'v', 'a', 'i', 'd', 'c', 'o', 'w', 'g', '_', 't'}


In [39]:
def get_pair_counts(vocabulary):
    pair_counts = defaultdict(int)
    for tokenized_word, count in vocabulary.items():
        tokens = tokenized_word.split()
        for i in range(len(tokens) - 1):
            pair = (tokens[i], tokens[i+1])
            pair_counts[pair] += count
    return pair_counts
print(f"vocabulary: {vocabulary}")
print(f"pair_counts: {get_pair_counts(vocabulary)}")


vocabulary: defaultdict(<class 'int'>, {'_ I': 1, '_ l i k e': 1, '_ t o': 1, '_ w a t c h': 1, '_ g o o d': 1, '_ m o v i e s': 1})
pair_counts: defaultdict(<class 'int'>, {('_', 'I'): 1, ('_', 'l'): 1, ('l', 'i'): 1, ('i', 'k'): 1, ('k', 'e'): 1, ('_', 't'): 1, ('t', 'o'): 1, ('_', 'w'): 1, ('w', 'a'): 1, ('a', 't'): 1, ('t', 'c'): 1, ('c', 'h'): 1, ('_', 'g'): 1, ('g', 'o'): 1, ('o', 'o'): 1, ('o', 'd'): 1, ('_', 'm'): 1, ('m', 'o'): 1, ('o', 'v'): 1, ('v', 'i'): 1, ('i', 'e'): 1, ('e', 's'): 1})


In [40]:
import re
def merge_pair(vocabulary, pair):
    new_vocabulary = defaultdict(int)
    bigram = re.escape(' '.join(pair)) # escape function automatically adds backslashes to special characters in string (like .,*?) so they are interpreted as literal characters
    pattern = re.compile(r"(?<!\S)" + bigram + r"(?!\S)") # matches only whole token pairs; bigram is not part of a word (e.g. good morning in thisisgood morning doesn't match)
    for tokenized_word, count in vocabulary.items():
        new_tokenized_word = pattern.sub("".join(pair), tokenized_word) # replace all occurrences of matched pattern with the joined pair
        new_vocabulary[new_tokenized_word] += count
    return new_vocabulary

print(f"vocabulary: {vocabulary}")
print(f"pair_counts: {get_pair_counts(vocabulary)}")
print(f"new_vocabulary: {merge_pair(vocabulary, ('k', 'e'))}")
# Note
# _ l i k e: 1 -> _ l i ke: 1

vocabulary: defaultdict(<class 'int'>, {'_ I': 1, '_ l i k e': 1, '_ t o': 1, '_ w a t c h': 1, '_ g o o d': 1, '_ m o v i e s': 1})
pair_counts: defaultdict(<class 'int'>, {('_', 'I'): 1, ('_', 'l'): 1, ('l', 'i'): 1, ('i', 'k'): 1, ('k', 'e'): 1, ('_', 't'): 1, ('t', 'o'): 1, ('_', 'w'): 1, ('w', 'a'): 1, ('a', 't'): 1, ('t', 'c'): 1, ('c', 'h'): 1, ('_', 'g'): 1, ('g', 'o'): 1, ('o', 'o'): 1, ('o', 'd'): 1, ('_', 'm'): 1, ('m', 'o'): 1, ('o', 'v'): 1, ('v', 'i'): 1, ('i', 'e'): 1, ('e', 's'): 1})
new_vocabulary: defaultdict(<class 'int'>, {'_ I': 1, '_ l i ke': 1, '_ t o': 1, '_ w a t c h': 1, '_ g o o d': 1, '_ m o v i e s': 1})


In [43]:
# BPE algorithm = iteratively merges most frequent pair of tokens until vocab_size is reached
def byte_pair_encoding(corpus, vocab_size):
    vocabulary, charset = initialize_vocabulary(corpus)
    merges = []
    tokens = set(charset)
    print(len(tokens))
    while len(tokens) < vocab_size:
        pair_counts = get_pair_counts(vocabulary)
        if not pair_counts:
            break
        most_frequent_pair = max(pair_counts, key=pair_counts.get)
        merges.append(most_frequent_pair)
        vocabulary = merge_pair(vocabulary, most_frequent_pair)
        new_token = ''.join(most_frequent_pair) 
        tokens.add(new_token)
    return vocabulary, merges, charset, tokens

# vocab_size must be larger than initial character count (17)
# BPE grows vocabulary by merging pairs, so we need a target larger than starting size
vocabulary, merges, charset, tokens = byte_pair_encoding(corpus, 25)
print(f"vocabulary: {vocabulary}")
print(f"merges: {merges}")
print(f"charset: {charset}")
print(f"tokens: {tokens}")


17
vocabulary: defaultdict(<class 'int'>, {'_I': 1, '_like': 1, '_to': 1, '_w a t c h': 1, '_ g o o d': 1, '_ m o v i e s': 1})
merges: [('_', 'I'), ('_', 'l'), ('_l', 'i'), ('_li', 'k'), ('_lik', 'e'), ('_', 't'), ('_t', 'o'), ('_', 'w')]
charset: {'m', 'k', 'h', 'I', 's', 'l', 'e', 'v', 'a', 'i', 'd', 'c', 'o', 'w', 'g', '_', 't'}
tokens: {'m', '_lik', 'h', '_I', '_li', '_t', 'i', 'o', '_w', 'g', 't', 'k', 'I', 's', 'l', 'e', '_like', 'v', 'a', '_l', 'd', 'c', '_to', 'w', '_'}


In [42]:
def tokenize_word(word, merges, vocabulary, charset, unk_token='<unk>'):
    word = '_' + word
    if word in vocabulary:
        return vocabulary[word]
    tokens = [char if char in charset else unk_token for char in word]
    
    for left, right in merges:
        i = 0
        while i < len(tokens) - 1:
            if tokens[i: i+2] == [left, right]:
                tokens[i: i+2] = [left + right]
            else:
                i += 1
    return tokens

tokenized_word = tokenize_word('Lets proceed to the language modeling chapter', merges, vocabulary, charset)
print(tokenized_word)

['_', '<unk>', 'e', 't', 's', '<unk>', '<unk>', '<unk>', 'o', 'c', 'e', 'e', 'd', '<unk>', 't', 'o', '<unk>', 't', 'h', 'e', '<unk>', 'l', 'a', '<unk>', 'g', '<unk>', 'a', 'g', 'e', '<unk>', 'm', 'o', 'd', 'e', 'l', 'i', '<unk>', 'g', '<unk>', 'c', 'h', 'a', '<unk>', 't', 'e', '<unk>']
