# Load the data

In [8]:
import os
input_dir = os.path.join(os.getcwd(), 'data')
input_file = os.path.join(input_dir, 'corpus.txt')

with open(input_file, 'r') as f:
    corpus = f.readlines()
    for i in range(len(corpus)):
        corpus[i] = corpus[i][:-1]

In [9]:
print(corpus[0])
print(corpus[1])

i stand here i feel empty a class post count link href http mooshilu
i literally just text tychelle to see if she wants to hang out because reading what i just wrote about my nonexistent social life made me feel so pathetic


In [10]:
print('Total number of lines in corpus:', len(corpus))

Total number of lines in corpus: 2400


## Normalization

In [11]:
need = False
for line in corpus:
    if any(c.isupper() for c in line) or any(c.isdigit() for c in line) or any(c in ['!', '?', '.', ',', ':'] for c in line):
        need = True
print('Need to clean corpus:', need)

Need to clean corpus: False


# Tokenizer class

In [12]:
from typing import List, Dict, Tuple


class Tokenizer:
    def __init__(self):
        self.all_merges = dict()
        self.vocabulary = []
        self.word_frequencies = dict()
        self.word_splits = dict()
    
    def learn_vocablury(self, corpus:List[str], num_merges:int):
        '''
        Learn the vocabulary from the corpus using the BPE algorithm
        '''
        word_frequencies = self.__count_words(corpus)
        vocabulary = self.__create_base_vocabulary(word_frequencies)
        word_splits = self.__split_words(word_frequencies)
        all_merges = dict()
        # print('Initial vocabulary:', vocabulary)
        # print('Length of initial vocabulary:', len(vocabulary))
        # print('Initial word splits:', word_splits)
        # print()

        for i in range(num_merges):
            # print('Iteration:', i+1)
            pair_frequencies = self.__compute_pair_frequencies(word_frequencies, word_splits)
            max_freq_pair = max(pair_frequencies, key=pair_frequencies.get) if pair_frequencies else None
            # print('Pair frequencies:', pair_frequencies)
            # print('Max frequency pair:', max_freq_pair)
            if max_freq_pair is None:
                return
            
            max_freq = pair_frequencies[max_freq_pair]
            word_splits = self.__merge_pair(max_freq_pair, word_splits, word_frequencies)
            vocabulary.append(''.join(max_freq_pair))
            all_merges[max_freq_pair] = {'merge': ''.join(max_freq_pair), 'frequency': max_freq}
            
            # print('Merged pair frequency:', max_freq)
            # print('New vocabulary:', vocabulary)
            # print('Length of new vocabulary:', len(vocabulary))
            # print('New word splits:', word_splits)
            # print()
            
        self.vocabulary = vocabulary
        self.word_frequencies = word_frequencies
        self.all_merges = all_merges
        self.word_splits = word_splits
    
    def tokenize(self, sentence:str):
        '''
        Tokenize a sentence using the vocabulary learned from the corpus
        '''
        words = sentence.split()
        test_word_splits = []
        for word in words:
            word = word + '$'
            test_word_splits.append([character for character in word])
        
        for merge_pair, merge_info in self.all_merges.items():
            for j in range(len(test_word_splits)):
                split = test_word_splits[j]
                i = 0
                while i < len(split) - 1:
                    if split[i] == merge_pair[0] and split[i+1] == merge_pair[1]:
                        split = split[:i] + [merge_info['merge']] + split[i+2:]
                    else:
                        i += 1
                test_word_splits[j] = split

        tokenize_sentence = []
        [tokenize_sentence.extend(split) for split in test_word_splits]
        return tokenize_sentence                
        
    
    def __count_words(self, corpus:List[str]):
        '''
        Count the frequency of each word in the corpus
        '''
        word_frequencies = dict()
        for sentence in corpus:
            words_list = sentence.split()
            for word in words_list:
                word = word + '$'
                word_frequencies[word] = word_frequencies.get(word, 0) + 1
        return word_frequencies
                
    def __create_base_vocabulary(self, word_frequencies:Dict[str, int]):
        '''
        Create a base vocabulary from the words in the corpus which contains all the characters in the corpus
        '''
        vocabulary = set()
        for word in word_frequencies:
            for character in word:
                vocabulary.add(character)
        vocabulary = list(vocabulary)
        return vocabulary
    
    def __split_words(self, word_frequencies:Dict[str, int]):
        '''
        Split each word in the corpus into a list of characters
        '''
        word_splits = dict()
        for word in word_frequencies:
            word_splits[word] = [character for character in word]
        return word_splits
    
    def __compute_pair_frequencies(self, word_frequencies:Dict[str, int], word_splits:Dict[str, List[str]]):
        '''
        Compute the frequency of each pair of characters in the corpus
        '''
        pair_frequencies = dict()
        for word in word_frequencies:
            split = word_splits[word]
            for i in range(len(split) - 1):
                pair = (split[i], split[i+1])
                pair_frequencies[pair] = pair_frequencies.get(pair, 0) + word_frequencies[word]
        return pair_frequencies
    
    def __merge_pair(self, pair:Tuple[str, str], word_splits:Dict[str, List[str]], word_frequencies:Dict[str, int]):
        '''
        Given the most frequent pair of token, merge them into a single token in all the words in the corpus
        '''
        new_word = ''.join(pair)
        for word in word_frequencies:
            split = word_splits[word]
            i = 0
            while i < len(split) - 1:
                if split[i] == pair[0] and split[i+1] == pair[1]:
                    split = split[:i] + [new_word] + split[i+2:]
                else:
                    i += 1
            word_splits[word] = split
        return word_splits

# Intialize the Tokenizer with corpus and number of merges

In [13]:
byte_pair_tokenizer = Tokenizer()
byte_pair_tokenizer.learn_vocablury(corpus, 4000)

In [15]:
ans = byte_pair_tokenizer.tokenize('that is my life anytime after 5pm right so that i need to')
print(ans)

['that$', 'is$', 'my$', 'life$', 'any', 'time$', 'after$', '5', 'p', 'm$', 'right$', 'so$', 'that$', 'i$', 'need$', 'to$']


In [17]:
# byte_pair_tokenizer.all_merges
# byte_pair_tokenizer.vocabulary

# Save the results 

In [18]:
import os

out_dir = os.path.join(os.path.curdir, "output")
if not os.path.exists(out_dir):
    os.mkdir(out_dir)
tokens_dir = os.path.join(out_dir, "tokens.txt")
merges_dir = os.path.join(out_dir, "merge_rule.txt")
tokenized_samples_dir = os.path.join(out_dir, "tokenized_samples.txt")

In [19]:
with open(tokens_dir, 'w') as f:
    for token in byte_pair_tokenizer.vocabulary:
        f.write(token + '\n')

In [20]:
with open(merges_dir, 'w') as f:
    for merge_pair in byte_pair_tokenizer.all_merges:
        f.write(merge_pair[0] + ',' + merge_pair[1] + '\n')

In [22]:
test_sentences = [
    'this is an nlp course',
    'i love nlp',
    'how are you doing today',
    'my name is khushdev'
]

with open(tokenized_samples_dir, 'w') as f:
    for sentence in test_sentences:
        tokens = byte_pair_tokenizer.tokenize(sentence)
        f.write(','.join(tokens) + '\n')