<a href="https://colab.research.google.com/github/MojTabaa4/NLP-Byte-Pair-Encoding-and-WordPiece-Tokenizer/blob/main/Byte-Pair-Encoding-and-WordPiece-Tokenizer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Byte Pair Encoding (BPE) for Text Compression

Byte Pair Encoding (BPE) is a data compression technique that works by replacing repeated sequences of characters with a single token. In this notebook, we will implement a BPE algorithm from scratch using Python.

In [None]:
!pip install tokenizers

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tokenizers
  Downloading tokenizers-0.13.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.8/7.8 MB[0m [31m57.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tokenizers
Successfully installed tokenizers-0.13.3


In [None]:
import collections
import json
import re
from collections import Counter
from operator import itemgetter
from pprint import pprint
from typing import Dict, List, Tuple

import requests
from tokenizers import Tokenizer
from tokenizers.models import BPE, WordPiece
from tokenizers.pre_tokenizers import Whitespace
from tokenizers.trainers import BpeTrainer, WordPieceTrainer


In [None]:
def read_corpus(file_name: str) -> List[str]:
    """
    Reads a text file and returns a list of words.
    """
    with open(file_name, "r", encoding="utf-8") as file:
        words_list = file.read().split()

    return words_list

In [None]:
file_name: str = "text.txt"
all_words: List[str] = read_corpus(file_name)
print(all_words)

['low', 'lower', 'newest', 'low', 'lower', 'newest', 'low', 'widest', 'newest', 'low', 'widest', 'newest', 'low', 'widest', 'newest']


## Functions Used
The code contains the following functions:
- `read_corpus(file_name: str) -> List[str]`: Reads a text file and returns a list of words.
- `word_frequency(words: List[str]) -> Dict[str, int]`: Calculates the frequency of words in a list.
- `add_end_of_word_symbol(freqs: Dict[str, int]) -> Dict[str, int]`: Adds the end-of-word symbol "\</w>" to each word in the frequency dictionary and returns a new dictionary with the modified keys.
- `get_stats(word_frequencies: Dict[str, int]) -> Dict[Tuple[str, str], int]`: Returns a dictionary of symbol pairs and their frequencies in the given words.
- `merge_vocab(pair: Tuple[str, str], vocab: Dict[str, int]) -> Dict[str, int]`: Merges a pair of symbols into a single symbol in a vocabulary dictionary.
- `train(word_freq: Dict[str, int]) -> Dict[Tuple[str, str], int]`: Trains a byte pair encoding model on a word frequency dictionary.
- `encode(original_word: str, all_merge_steps: Dict[Tuple[str, str], int]) -> Dict[str, int]`: Encodes a word using a byte pair encoding model.

In [None]:
def word_frequency(words: List[str]) -> Dict[str, int]:
    """
    Calculate frequency of words in a list.
    """
    count = dict(Counter(words))
    return count

In [None]:
word_freqs = word_frequency(all_words)

print(word_freqs)

{'low': 5, 'lower': 2, 'newest': 5, 'widest': 3}


In [None]:
def add_end_of_word_symbol(freqs: Dict[str, int]) -> Dict[str, int]:
    """
    Adds the end-of-word symbol "</w>" to each word in the frequency dictionary
    and returns a new dictionary with the modified keys.
    """
    word_freq = {}
    for word, freq in freqs.items():
        word_freq[" ".join(list(word)) + " </w>"] = freq
    
    return word_freq


In [None]:
word_freq = add_end_of_word_symbol(word_freqs)

pprint(word_freq)

In [None]:
def get_stats(word_frequencies: Dict[str, int]) -> Dict[Tuple[str, str], int]:
    """Returns a dictionary of symbol pairs and their frequencies in the given words.

    Args:
        word_frequencies: A dictionary where keys are words and values are their frequencies.

    Returns:
        A dictionary where keys are symbol pairs and values are their frequencies.
    """
    symbol_pairs_frequency = collections.defaultdict(int)

    for word, frequency in word_frequencies.items():
        symbols = word.split()

        for i in range(len(symbols) - 1):
            symbol_pair = (symbols[i], symbols[i + 1])
            symbol_pairs_frequency[symbol_pair] += frequency

    return symbol_pairs_frequency

In [None]:
pprint(get_stats(word_freq))

defaultdict(<class 'int'>,
            {('d', 'e'): 3,
             ('e', 'r'): 2,
             ('e', 's'): 8,
             ('e', 'w'): 5,
             ('i', 'd'): 3,
             ('l', 'o'): 7,
             ('n', 'e'): 5,
             ('o', 'w'): 7,
             ('r', '</w>'): 2,
             ('s', 't'): 8,
             ('t', '</w>'): 8,
             ('w', '</w>'): 5,
             ('w', 'e'): 7,
             ('w', 'i'): 3})


In [None]:
def merge_vocab(pair: Tuple[str, str], vocab: Dict[str, int]) -> Dict[str, int]:
    """
    Merge a pair of symbols into a single symbol in a vocabulary dictionary.

    Args:
        pair: A tuple of two symbols to be merged.
        vocab: A dictionary where the keys are words and the values are their frequencies.

    Returns:
        A new dictionary where each instance of the pair in the keys of the input dictionary has been replaced with the
        concatenated string of the pair.
    """
    pair_str = ' '.join(pair)
    new_vocab = {
        re.sub(re.escape(pair_str), ''.join(pair), word): freq for word, freq in vocab.items()
        }    
    
    return new_vocab

In [None]:
MAX_MERGES = 10

def train(word_freq: Dict[str, int]) -> Dict[Tuple[str, str], int]:
    """
    Train a byte pair encoding model on a word frequency dictionary.

    Args:
        word_freq: A dictionary where the keys are words and the values are their frequencies.

    Returns:
        A dictionary where the keys are symbol pairs (tuples) and the values are the iteration numbers at which they were merged.
    """
    all_merge_steps: Dict[Tuple[str, str], int] = {}

    for i, _ in enumerate(range(MAX_MERGES)):
        pair_stats = get_stats(word_freq)
        
        if not pair_stats:
            print("Reached maximum iterations")
            break

        best_pair = max(pair_stats, key=lambda x: pair_stats[x])
        all_merge_steps[best_pair] = i
        print(f"Most frequent pair at iteration {i + 1}: {best_pair}")
        word_freq = merge_vocab(best_pair, word_freq)
        print(f"Vocabulary at iteration {i + 1}:")
        print(json.dumps(word_freq, indent=4) + '\n')

    return all_merge_steps

In [None]:
all_merge_steps = train(word_freq)

Most frequent pair at iteration 1: ('e', 's')
Vocabulary at iteration 1:
{
    "l o w </w>": 5,
    "l o w e r </w>": 2,
    "n e w es t </w>": 5,
    "w i d es t </w>": 3
}

Most frequent pair at iteration 2: ('es', 't')
Vocabulary at iteration 2:
{
    "l o w </w>": 5,
    "l o w e r </w>": 2,
    "n e w est </w>": 5,
    "w i d est </w>": 3
}

Most frequent pair at iteration 3: ('est', '</w>')
Vocabulary at iteration 3:
{
    "l o w </w>": 5,
    "l o w e r </w>": 2,
    "n e w est</w>": 5,
    "w i d est</w>": 3
}

Most frequent pair at iteration 4: ('l', 'o')
Vocabulary at iteration 4:
{
    "lo w </w>": 5,
    "lo w e r </w>": 2,
    "n e w est</w>": 5,
    "w i d est</w>": 3
}

Most frequent pair at iteration 5: ('lo', 'w')
Vocabulary at iteration 5:
{
    "low </w>": 5,
    "low e r </w>": 2,
    "n e w est</w>": 5,
    "w i d est</w>": 3
}

Most frequent pair at iteration 6: ('low', '</w>')
Vocabulary at iteration 6:
{
    "low</w>": 5,
    "low e r </w>": 2,
    "n e w est</w

In [None]:
def encode(original_word: str, all_merge_steps: Dict[Tuple[str, str], int]) -> Dict[str, int]:
    """
    Encode a word using a byte pair encoding model.

    Args:
        original_word: The word to be encoded.
        all_merge_steps: A dictionary where the keys are symbol pairs (tuples) and the values are the iteration numbers at which they were merged.

    Returns:
        A dictionary where the keys are the symbols in the encoded word and the values are their frequencies.
    """
    if len(original_word) == 1:
        return {original_word: 1}

    vocab: Dict[str, int] = {original_word: 1}
    vocab = add_end_of_word_symbol(vocab)
    print(f'{vocab=}')

    candidate_pairs = []
    print(f'{all_merge_steps=}\n')

    while True:
        symbol_pairs = get_stats(vocab)
        print(f'{symbol_pairs=}')
        candidate_pairs = [(pair, all_merge_steps[pair]) for pair in symbol_pairs if pair in all_merge_steps]
        print(f'{candidate_pairs=}')

        
        if not candidate_pairs:
            break
        
        best_pair = min(candidate_pairs, key=itemgetter(1))[0]
        print(f"pair to merge: {best_pair}\n")
        vocab = merge_vocab(best_pair, vocab)

    return vocab

In [None]:
original_word = 'lowest'
encode(original_word, all_merge_steps)

vocab={'l o w e s t </w>': 1}
all_merge_steps={('e', 's'): 0, ('es', 't'): 1, ('est', '</w>'): 2, ('l', 'o'): 3, ('lo', 'w'): 4, ('low', '</w>'): 5, ('n', 'e'): 6, ('ne', 'w'): 7, ('new', 'est</w>'): 8, ('w', 'i'): 9}

symbol_pairs=defaultdict(<class 'int'>, {('l', 'o'): 1, ('o', 'w'): 1, ('w', 'e'): 1, ('e', 's'): 1, ('s', 't'): 1, ('t', '</w>'): 1})
candidate_pairs=[(('l', 'o'), 3), (('e', 's'), 0)]
pair to merge: ('e', 's')

symbol_pairs=defaultdict(<class 'int'>, {('l', 'o'): 1, ('o', 'w'): 1, ('w', 'es'): 1, ('es', 't'): 1, ('t', '</w>'): 1})
candidate_pairs=[(('l', 'o'), 3), (('es', 't'), 1)]
pair to merge: ('es', 't')

symbol_pairs=defaultdict(<class 'int'>, {('l', 'o'): 1, ('o', 'w'): 1, ('w', 'est'): 1, ('est', '</w>'): 1})
candidate_pairs=[(('l', 'o'), 3), (('est', '</w>'), 2)]
pair to merge: ('est', '</w>')

symbol_pairs=defaultdict(<class 'int'>, {('l', 'o'): 1, ('o', 'w'): 1, ('w', 'est</w>'): 1})
candidate_pairs=[(('l', 'o'), 3)]
pair to merge: ('l', 'o')

symbol_pairs=de

{'low est</w>': 1}

## Implementation of a BPE (Byte Pair Encoding) tokenizer using the HuggingFace Tokenizers library in Python.

### BPE Tokenizer Implementation

I implemented a Byte Pair Encoding (BPE) tokenizer using the Hugging Face tokenizers library. I initialized a BPE tokenizer and trainer, then trained the tokenizer on a sample text file. After training, I saved the trained tokenizer to a JSON file and printed the vocabulary size and top 20 tokens.

In [None]:
# Initialize BPE tokenizer and trainer
bpe_text_box_tokenizer = Tokenizer(BPE())
bpe_text_box_tokenizer.pre_tokenizer = Whitespace()
bpe_text_box_trainer = BpeTrainer()

# Train tokenizer on sample text files
files = ["./Sample.txt"]
bpe_text_box_tokenizer.train(files, bpe_text_box_trainer)

bpe_text_box_tokenizer.save("./BPE-text-box.json")

vocab_size = bpe_text_box_tokenizer.get_vocab_size()
tokens = bpe_text_box_tokenizer.get_vocab()
tokens = sorted(tokens.items(), key=lambda x: x[1])

print("BPE - TextBox Tokenizer Results")
print("--------------------------------")
print(f"Vocabulary Size: {vocab_size}")
print("Tokens:")
pprint(tokens[:20])


BPE - TextBox Tokenizer Results
--------------------------------
Vocabulary Size: 113
Tokens:
[('!', 0),
 ('.', 1),
 ('?', 2),
 ('E', 3),
 ('L', 4),
 ('N', 5),
 ('P', 6),
 ('T', 7),
 ('W', 8),
 ('a', 9),
 ('b', 10),
 ('c', 11),
 ('d', 12),
 ('e', 13),
 ('f', 14),
 ('g', 15),
 ('h', 16),
 ('i', 17),
 ('k', 18),
 ('l', 19)]


### WordPiece Tokenizer Implementation

I implemented a WordPiece tokenizer using the same tokenizers library. I initialized a WordPiece tokenizer and trainer, then trained the tokenizer on a sample text file. After training, I saved the trained tokenizer to a JSON file and printed the vocabulary size and top 20 tokens.

In [None]:
# Initialize and configure the WordPiece tokenizer
wp_text_box_tokenizer = Tokenizer(WordPiece())
wp_text_box_tokenizer.pre_tokenizer = Whitespace()

# Train the WordPiece tokenizer on the sample text file
wp_text_box_trainer = WordPieceTrainer()
files = [f"./Sample.txt"]
wp_text_box_tokenizer.train(files, wp_text_box_trainer)

# Save the trained tokenizer to a JSON file
wp_text_box_tokenizer.save("./WordPiece-text-box.json")

vocab_size = wp_text_box_tokenizer.get_vocab_size()
tokens = wp_text_box_tokenizer.get_vocab()
tokens = sorted(tokens.items(), key=lambda x: x[1])

print("WordPiece - TextBox Tokenizer Results")
print("--------------------------------")
print(f"Vocabulary Size: {vocab_size}")
print("Tokens:")
pprint(tokens[:20])

WordPiece - TextBox Tokenizer Results
--------------------------------
Vocabulary Size: 139
Tokens:
[('!', 0),
 ('.', 1),
 ('?', 2),
 ('E', 3),
 ('L', 4),
 ('N', 5),
 ('P', 6),
 ('T', 7),
 ('W', 8),
 ('a', 9),
 ('b', 10),
 ('c', 11),
 ('d', 12),
 ('e', 13),
 ('f', 14),
 ('g', 15),
 ('h', 16),
 ('i', 17),
 ('k', 18),
 ('l', 19)]


### BPE and WordPiece Tokenization of Romeo and Juliet Book

I tokenized sentences from the Romeo and Juliet book using both BPE and WordPiece tokenization. I initialized a BPE tokenizer and trainer, then trained the tokenizer on the Romeo and Juliet book. After training, I saved the trained tokenizer to a JSON file and printed the number of extracted tokens. Then, I initialized a WordPiece tokenizer and trainer, trained the tokenizer on the same book, saved the trained tokenizer to a JSON file, and printed the number of extracted tokens. Finally, I tokenized a sentence from the book using both the BPE and WordPiece tokenizers and printed the tokenization results.

In [None]:
url = "https://www.gutenberg.org/cache/epub/1513/pg1513.txt"
rj_file = requests.get(url, allow_redirects=True).content

open('./Romeo_and_Juliet.txt', 'wb').write(rj_file);

### BPE

In [None]:
# Initialize BPE tokenizer
bpe_rj_tokenizer = Tokenizer(BPE())
bpe_rj_tokenizer.pre_tokenizer = Whitespace()

# Initialize trainer with special token [UNK]
bpe_rj_trainer = BpeTrainer(special_tokens=["[UNK]"])

# Train tokenizer on input files
rj_file = ["./Romeo_and_Juliet.txt"]
bpe_rj_tokenizer.train(rj_file, bpe_rj_trainer)

bpe_rj_tokenizer.save("./BPE-rj.json")

num_tokens = bpe_rj_tokenizer.get_vocab_size()
print(f"The number of extracted tokens (BPE - Romeo and Juliet Book): {num_tokens}")

The number of extracted tokens (BPE - Romeo and Juliet Book): 7216


### WordPiece

In [None]:
# Initialize WordPiece tokenizer
wp_rj_tokenizer = Tokenizer(WordPiece())
wp_rj_tokenizer.pre_tokenizer = Whitespace()

# Initialize trainer with special token [UNK]
wp_rj_trainer = WordPieceTrainer(special_tokens=["[UNK]"])

# Train tokenizer on input files
rj_file = ["./Romeo_and_Juliet.txt"]
wp_rj_tokenizer.train(rj_file, wp_rj_trainer)

# Save trained WordPiece tokenizer to file
wp_rj_tokenizer.save("./WP-rj.json")

num_tokens = wp_rj_tokenizer.get_vocab_size()
print(f"The number of extracted tokens (WordPiece - Romeo and Juliet Book): {num_tokens}")

The number of extracted tokens (WordPiece - Romeo and Juliet Book): 7759


## Apply tokenizers on the sample text box

I loaded a short paragraph on tokenization from a text file called "Sample.txt" and stored it in a variable called "text_box". Then I applied Byte Pair Encoding (BPE) and WordPiece tokenization techniques on the text of the Romeo and Juliet book.

To implement the tokenization models, I initialized BPE and WordPiece tokenizers and encoded the text of the book using each tokenizer. I stored the encoded outputs in variables called "encoded_rj_text" and "encoded_wp_text", respectively. After encoding the text, I printed the resulting encodings to the console along with the number of extracted tokens.

In [None]:
text_box = open("Sample.txt", encoding="utf8").read()
text_box

'This is a deep learning tokenization tutorial. Tokenization is the first step in a deep learning NLP pipeline. We will be comparing the tokens generated by each tokenization model. Excited much?! 😍'

### BPE on Romeo and Juliet book

In [None]:
bpe_rj_output = bpe_rj_tokenizer.encode(text_box)
print(bpe_rj_output)
print(f"Tokens: {bpe_rj_output.tokens}")

Encoding(num_tokens=60, attributes=[ids, type_ids, tokens, offsets, attention_mask, special_tokens_mask, overflowing])
Tokens: ['This', 'is', 'a', 'deep', 'learning', 'to', 'ken', 'iz', 'ation', 'tutor', 'ial', '.', 'To', 'ken', 'iz', 'ation', 'is', 'the', 'first', 'step', 'in', 'a', 'deep', 'learning', 'N', 'L', 'P', 'pi', 'p', 'el', 'ine', '.', 'We', 'will', 'be', 'comp', 'aring', 'the', 'to', 'k', 'ens', 'gen', 'er', 'ated', 'by', 'each', 'to', 'ken', 'iz', 'ation', 'mo', 'de', 'l', '.', 'Ex', 'c', 'ited', 'much', '?', '!']


### WordPiece on Romeo and Juliet book

In [None]:
wp_rj_output = wp_rj_tokenizer.encode(text_box)
print(wp_rj_output)
print(f"Tokens: {wp_rj_output.tokens}")

Encoding(num_tokens=59, attributes=[ids, type_ids, tokens, offsets, attention_mask, special_tokens_mask, overflowing])
Tokens: ['This', 'is', 'a', 'deep', 'learning', 'to', '##ken', '##iz', '##ation', 'tutor', '##ial', '.', 'To', '##ken', '##iz', '##ation', 'is', 'the', 'first', 'step', 'in', 'a', 'deep', 'learning', 'N', '##L', '##P', 'pip', '##el', '##ine', '.', 'We', 'will', 'be', 'comp', '##aring', 'the', 'to', '##ken', '##s', 'gen', '##er', '##ated', 'by', 'each', 'to', '##ken', '##iz', '##ation', 'mod', '##el', '.', 'Ex', '##ci', '##te', '##d', 'much', '[UNK]', '[UNK]']


## Apply models on Romeo and Juliet book

### BPE

In [None]:
rj_text = open("./Romeo_and_Juliet.txt").read()
encoded_rj_text = bpe_rj_tokenizer.encode(rj_text)
print(encoded_rj_text)

Encoding(num_tokens=38285, attributes=[ids, type_ids, tokens, offsets, attention_mask, special_tokens_mask, overflowing])


### WordPiece

In [None]:
encoded_wp_text = wp_rj_tokenizer.encode(rj_text)
print(encoded_wp_text)

Encoding(num_tokens=38285, attributes=[ids, type_ids, tokens, offsets, attention_mask, special_tokens_mask, overflowing])
