In [1]:
from collections import defaultdict
import re

In [2]:
# Global Variables
splitted_data = []
vocab = []
corpous_data = {}

In [3]:
# It converts each word to a new encrypted word using provided corpus data
def bpe_encrypt(actual_tokens, corpus_data):
    if len(actual_tokens) == 1:
        return actual_tokens
    token = list(actual_tokens)
    while True:
        bpe_pairs = {}
        pairs = get_possible_pairs(token)
        for pair in pairs:
            if pair in corpus_data:
                bpe_pairs[pair] = corpus_data[pair]
        if not bpe_pairs:
            break
        pair_to_merge = max(bpe_pairs, key=bpe_pairs.get)
        token = generate_new_token(token, pair_to_merge)
    return token

In [4]:
# It returns all matched pairs of a given word
def get_possible_pairs(word):
    pairs = set()
    prev_char = word[0]
    for char in word[1:]:
        pairs.add((prev_char, char))
        prev_char = char
    return pairs

In [5]:
# It replaces the matched characters with the encrypted pair values
def generate_new_token(input_word, character_pair):
    first_char, second_char = character_pair
    output_word = []
    index = 0
    while index < len(input_word):
        try:
            char_index = input_word.index(first_char, index)
            output_word.extend(input_word[index:char_index])
            index = char_index
        except ValueError:
            output_word.extend(input_word[index:])
            break
        if index < len(input_word) - 1 and input_word[index + 1] == second_char:
            output_word.append(first_char + second_char)
            index += 2
        else:
            output_word.append(first_char)
            index += 1
    return output_word

In [6]:
# Function used to Split Words
def get_splitted_words(words):
    for word in words:
        splitted_data.append(list(word.lower()))
    return splitted_data

# Function to get Initial Vocabs
def get_vocab(splitted_data):
    for word in splitted_data:
        for character in word:
            if character not in vocab:
                vocab.append(character)
    return vocab

In [7]:
# BPE Algorithm - Training Corpus Data
def bpe(iteration):
    print('-'*5, f"Iteration - {iteration+1}", '-'*5)
    global splitted_data, vocab
    global corpous_data
    pairs = defaultdict(int)
    for word in splitted_data:
        for i in range(len(word) - 1):
            pairs[word[i], word[i + 1]] += 1
    if pairs:
        # Get pair which has maximum frequency
        best = max(pairs, key=pairs.get)
        print("maxium frequency", best, pairs[best])
        corpous_data[best] = pairs[best]
        # Merge Pair and Add to Vocabs
        vocabs.append(''.join(best))
        temp_splitted_data = []
        # Update Merged Pair in sentence splitted_data
        for word in splitted_data:
            if len(word) > 1:
                skipNext = False
                temp = []
                for i in range(len(word) - 1):
                    if skipNext:
                        skipNext = False
                        continue
                    if [word[i], word[i + 1]] == [best[0], best[1]]:
                        temp.append(''.join(best))
                        skipNext = True
                    else:
                        temp.append(word[i])
                if [word[-2], word[-1]] != [best[0], best[1]]:
                    temp.append(word[-1])
                # print(word, temp)
                temp_splitted_data.append(temp)
        if temp_splitted_data != []:
            splitted_data = temp_splitted_data.copy()
        # print("splitted_data",splitted_data)
        print("Vocabulary", vocabs)
        return True
    return False

In [8]:
input_text = "Baker Betty Lou bought some butter. But, it made her batter bitter. So, Baker Betty Lou bought some better butter to make her bitter batter better."
iterations = 10
tokens = re.findall(r'\b\w+\b', input_text)
splitted_data = get_splitted_words(tokens)
vocabs = get_vocab(splitted_data)
print("*"*5, "Training Corpus Data", "*"*5)
print('-'*5, f"Initial Vocabulary", '-'*5)
print("Vocabulary", vocabs)
for iteration in range(iterations):
    if bpe(iteration) == False:
        break

***** Training Corpus Data *****
----- Initial Vocabulary -----
Vocabulary ['b', 'a', 'k', 'e', 'r', 't', 'y', 'l', 'o', 'u', 'g', 'h', 's', 'm', 'i', 'd']
----- Iteration - 1 -----
maxium frequency ('e', 'r') 12
Vocabulary ['b', 'a', 'k', 'e', 'r', 't', 'y', 'l', 'o', 'u', 'g', 'h', 's', 'm', 'i', 'd', 'er']
----- Iteration - 2 -----
maxium frequency ('t', 't') 10
Vocabulary ['b', 'a', 'k', 'e', 'r', 't', 'y', 'l', 'o', 'u', 'g', 'h', 's', 'm', 'i', 'd', 'er', 'tt']
----- Iteration - 3 -----
maxium frequency ('tt', 'er') 8
Vocabulary ['b', 'a', 'k', 'e', 'r', 't', 'y', 'l', 'o', 'u', 'g', 'h', 's', 'm', 'i', 'd', 'er', 'tt', 'tter']
----- Iteration - 4 -----
maxium frequency ('b', 'a') 4
Vocabulary ['b', 'a', 'k', 'e', 'r', 't', 'y', 'l', 'o', 'u', 'g', 'h', 's', 'm', 'i', 'd', 'er', 'tt', 'tter', 'ba']
----- Iteration - 5 -----
maxium frequency ('b', 'e') 4
Vocabulary ['b', 'a', 'k', 'e', 'r', 't', 'y', 'l', 'o', 'u', 'g', 'h', 's', 'm', 'i', 'd', 'er', 'tt', 'tter', 'ba', 'be']
----

In [9]:
print("*"*5, "Choosen Corpus Data", "*"*5)
print(*corpous_data.items(),sep="\n")
print("*"*5, "Sample Test", "*"*5)
testing_word = 'bitter'
result = bpe_encrypt(testing_word, corpous_data)
print(f"Input Text: {testing_word}")
print(f"BPE_Tokens: {result}")

***** Choosen Corpus Data *****
(('e', 'r'), 12)
(('t', 't'), 10)
(('tt', 'er'), 8)
(('b', 'a'), 4)
(('b', 'e'), 4)
(('o', 'u'), 4)
(('s', 'o'), 3)
(('b', 'u'), 3)
(('ba', 'k'), 2)
(('bak', 'er'), 2)
***** Sample Test *****
Input Text: bitter
BPE_Tokens: ['b', 'i', 'tter']
