Import necessary libraries

In [153]:
import heapq
import collections

Loading text data and creating out corpus

In [154]:
with open("the-verdict.txt", "r", encoding="ascii") as f:
    raw_text = f.read()

print("Total number of characters:", len(raw_text))

# converting string to list
corpus = list(raw_text)

print(corpus[:20])

Total number of characters: 20479
['I', ' ', 'H', 'A', 'D', ' ', 'a', 'l', 'w', 'a', 'y', 's', ' ', 't', 'h', 'o', 'u', 'g', 'h', 't']


Our initial vocabulary will be all ascii characters for huge amount of texts consisting for different languages we can use some suitable initial vocab

In [155]:
vocab = [chr(i) for i in range(256)]

print(vocab[97:97+26])

['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']


<h2>The Byte Pair Encoding Algo<h2>
<p>We will use max heap to fetch pair with maximum frequency in the corpus<p>

In [156]:
num_iters = 200

# Initialize frequency counts for all adjacent pairs
pair_freqs = collections.defaultdict(int)
for i in range(len(corpus) - 1):
    pair = (corpus[i], corpus[i+1])
    pair_freqs[pair] += 1

# Step 2: Use a max-heap (priority queue) to efficiently find the most frequent pair
pq = []
for pair, freq in pair_freqs.items():
    # Use negative frequencies to simulate a max-heap with a min-heap
    heapq.heappush(pq, (-freq, pair))

for iter in range(num_iters):
    if not pq:
        print(f"BPE finished after {iter} merges as no more pairs were found.")
        break
    
    # Get the most frequent pair from the heap
    neg_freq, most_freq_pair = heapq.heappop(pq)
    
    # Merge the pair and add the new token to the vocabulary
    most_freq_seq = "".join(most_freq_pair)
    vocab.append(most_freq_seq)
    
    # Rebuild the corpus and update pair frequencies
    new_corpus = []
    i = 0
    while i < len(corpus) - 1:
        seq = corpus[i] + corpus[i+1]
        
        if seq == most_freq_seq:
            new_corpus.append(most_freq_seq)
            i += 2
        else:
            new_corpus.append(corpus[i])
            i += 1
            
    # Handle the last token if it was not part of a pair
    if i < len(corpus):
        new_corpus.append(corpus[i])

    # Re-initialize for simplicity in this example
    corpus = new_corpus
    pair_freqs = collections.defaultdict(int)
    for i in range(len(corpus) - 1):
        pair = (corpus[i], corpus[i+1])
        pair_freqs[pair] += 1
    
    pq = []
    for pair, freq in pair_freqs.items():
        heapq.heappush(pq, (-freq, pair))

print("Updated vocab length:", len(vocab))
print(vocab[-20:])

Updated vocab length: 456
['don', 'ed to ', 'ent ', 'fac', 'pr', 't the ', ' b', 'anc', 'ca', 'e of ', 'id ', 'im', 'ind', 'nd ', 'pp', 'ust ', 'as', 'him ', 'k ', 'not']


Tokenizer class on new vocab generated by Byte Pair Encoding

In [157]:
class Tokenizer:
    def __init__(self, vocab):
        self.vocab = vocab
        self.vocab_to_token_id = {token: num for num, token in enumerate(vocab)}
        self.token_id_to_vocab = {value: key for key, value in self.vocab_to_token_id.items()}

        self.max_token_length = max([len(token) for token in vocab])

    # In the given text find the best matching sub strings in the vocab and convert them into tokens
    def encode(self, text):
        tokens = list()

        start_ind = 0

        while start_ind < len(text):
            best_sub_str = None

            for end_ind in range(start_ind + self.max_token_length, i, -1):
                sub_str = text[start_ind:end_ind]

                if sub_str in vocab:
                    best_sub_str = sub_str
                    break

            if best_sub_str:
                tokens.append(self.vocab_to_token_id[best_sub_str])
                start_ind += len(best_sub_str)
            else:
                tokens.append(self.vocab_to_token_id[text[start_ind]])
                start_ind += 1

        return tokens

    def decode(self, tokens_list):
        return "".join([self.token_id_to_vocab[token_id] for token_id in tokens_list])
    
tokenizer = Tokenizer(vocab)

text = "She raised her eyebrows with a smile"
tokens = tokenizer.encode(text)
print(text)
print("Tokens:", tokens)
print("Decoded Output:", tokenizer.decode(tokens))

She raised her eyebrows with a smile
Tokens: [83, 104, 101, 32, 114, 97, 105, 115, 101, 100, 32, 104, 101, 114, 32, 101, 121, 101, 98, 114, 111, 119, 115, 32, 119, 105, 116, 104, 32, 97, 32, 115, 109, 105, 108, 101]
Decoded Output: She raised her eyebrows with a smile
