# BPE implementation

In [16]:
from collections import Counter

class BPETokenizer:
    def __init__(self, vocab_size=100, min_frequency=2):
        # Initialize the tokenizer with target vocab size and minimum pair frequency
        self.vocab_size = vocab_size
        self.min_frequency = min_frequency
        self.merges = []       # list to store merge rules as tuples (token_a, token_b)
        self.token2id = {}     # dictionary mapping token string -> token ID
        self.id2token = {}     # dictionary mapping token ID -> token string

    def train(self, corpus):
        # Training: learn BPE merges from the given corpus text.
        # corpus can be a single string or a list of strings.
        if isinstance(corpus, str):
            words = corpus.split()
        else:
            words = []
            for text in corpus:
                words.extend(text.split())
        # Append '_' marker to each word to mark end-of-word
        words = [word + '_' for word in words]
        # Count frequency of each distinct word in the corpus
        word_freq = Counter(words)
        print("Word freq: ",word_freq)
        # Initialize base vocabulary with all unique characters (including '_')
        base_tokens = set()
        for word in word_freq:
            base_tokens.update(list(word))
        base_tokens = sorted(base_tokens)  # sort for consistency (optional)
        # Assign an ID to each base token (character)
        self.token2id = {token: idx for idx, token in enumerate(base_tokens)}
        self.id2token = {idx: token for token, idx in self.token2id.items()}
        # Represent each word as a list of character tokens (with '_')
        word_tokens = {word: list(word) for word in word_freq}
        print("word tokens: ",word_tokens)
        # Learn merge rules until vocab size reached or no frequent pair meets threshold
        while len(self.token2id) < self.vocab_size:
            # Count frequency of each adjacent token pair across all words
            pair_counts = Counter()
            for word, freq in word_freq.items():
                tokens = word_tokens[word]
                # count pairs in this word's token sequence
                for i in range(len(tokens) - 1):
                    pair = (tokens[i], tokens[i+1])
                    pair_counts[pair] += freq
            print("pair counts: ", pair_counts)
            if not pair_counts:
                break  # no pairs to merge (shouldn't really happen unless corpus is empty)
            # Find the most frequent pair
            (token_a, token_b), pair_freq = pair_counts.most_common(1)[0]
            if pair_freq < self.min_frequency:
                break  # stop if no pair is frequent enough
            # Merge this pair into a new token
            new_token = token_a + token_b
            # Add new token to vocab with the next available ID
            new_id = len(self.token2id)
            self.token2id[new_token] = new_id
            self.id2token[new_id] = new_token
            # Record the merge rule
            self.merges.append((token_a, token_b))
            # Update word token sequences: replace occurrences of the pair with the new token
            for word, tokens in word_tokens.items():
                i = 0
                new_tokens = []
                while i < len(tokens) - 1:
                    if tokens[i] == token_a and tokens[i+1] == token_b:
                        # Merge token_a and token_b into new_token
                        new_tokens.append(new_token)
                        i += 2  # skip over the merged pair
                    else:
                        new_tokens.append(tokens[i])
                        i += 1
                # Don't forget the last token if it wasn't part of a merge
                if i < len(tokens):
                    new_tokens.append(tokens[i])
                # Update the word's token list
                word_tokens[word] = new_tokens
        print("new word tokens: ", word_tokens)
            # Continue to next merge iteration
        # Training complete. We have our merges and vocab.

    def encode(self, text):
        # Convert a text string into a list of token IDs using learned merges.
        tokens_ids = []
        # Split input text into words and encode each word
        for word in text.split():
            # Start with characters + '_' for the word
            tokens = list(word) + ['_']
            # Apply each merge rule in order to the token list
            for token_a, token_b in self.merges:
                i = 0
                merged_tokens = []
                while i < len(tokens) - 1:
                    if tokens[i] == token_a and tokens[i+1] == token_b:
                        merged_tokens.append(token_a + token_b)
                        i += 2
                    else:
                        merged_tokens.append(tokens[i])
                        i += 1
                if i < len(tokens):
                    merged_tokens.append(tokens[i])
                tokens = merged_tokens
            # Convert tokens to IDs
            for token in tokens:
                # .get(token) will fetch the ID; all tokens should exist from training
                tokens_ids.append(self.token2id.get(token, None))
        return tokens_ids

    def decode(self, token_ids):
        # Convert a list of token IDs back into the original text string.
        tokens = [self.id2token[token_id] for token_id in token_ids]
        text = ""
        for token in tokens:
            if token.endswith('_'):
                # Remove the end-of-word marker and add a space
                text += token[:-1] + " "
            else:
                # Token without marker (should be punctuation or part of word that isn't ending)
                text += token
        return text.strip()  # strip any trailing space

# Sample corpus for training
corpus = [
    "low", "lower", "newest", "widest",
    "low", "low", "low", "low",  # Repetition to build frequency
    "newest", "newest", "newest", "newest"
]

# Initialize tokenizer with small vocab size
tokenizer = BPETokenizer(vocab_size=50, min_frequency=2)
tokenizer.train(corpus)

# Print learned vocabulary
print("Vocabulary:", tokenizer.token2id)
print("Merge rules:", tokenizer.merges)

# Encode a new word
encoded = tokenizer.encode("lowest")
print("Encoded 'lowest':", encoded)

# Decode the tokens back
decoded = tokenizer.decode(encoded)
print("Decoded text:", decoded)

Word freq:  Counter({'low_': 5, 'newest_': 5, 'lower_': 1, 'widest_': 1})
word tokens:  {'low_': ['l', 'o', 'w', '_'], 'lower_': ['l', 'o', 'w', 'e', 'r', '_'], 'newest_': ['n', 'e', 'w', 'e', 's', 't', '_'], 'widest_': ['w', 'i', 'd', 'e', 's', 't', '_']}
pair counts:  Counter({('l', 'o'): 6, ('o', 'w'): 6, ('w', 'e'): 6, ('e', 's'): 6, ('s', 't'): 6, ('t', '_'): 6, ('w', '_'): 5, ('n', 'e'): 5, ('e', 'w'): 5, ('e', 'r'): 1, ('r', '_'): 1, ('w', 'i'): 1, ('i', 'd'): 1, ('d', 'e'): 1})
pair counts:  Counter({('lo', 'w'): 6, ('w', 'e'): 6, ('e', 's'): 6, ('s', 't'): 6, ('t', '_'): 6, ('w', '_'): 5, ('n', 'e'): 5, ('e', 'w'): 5, ('e', 'r'): 1, ('r', '_'): 1, ('w', 'i'): 1, ('i', 'd'): 1, ('d', 'e'): 1})
pair counts:  Counter({('e', 's'): 6, ('s', 't'): 6, ('t', '_'): 6, ('low', '_'): 5, ('n', 'e'): 5, ('e', 'w'): 5, ('w', 'e'): 5, ('low', 'e'): 1, ('e', 'r'): 1, ('r', '_'): 1, ('w', 'i'): 1, ('i', 'd'): 1, ('d', 'e'): 1})
pair counts:  Counter({('es', 't'): 6, ('t', '_'): 6, ('low', '_')

In [18]:
%pip install tokenizers

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Note: you may need to restart the kernel to use updated packages.


# Train and use HuggingFace’s ByteLevelBPETokenizer from the tokenizers library.
- Trains a Byte-Level BPE tokenizer on a small text file.
- Saves the tokenizer files.
- Loads the tokenizer and uses it to encode/decode a sample sentence.

In [20]:
from tokenizers import ByteLevelBPETokenizer

# 1. Train a Byte-Level BPE tokenizer
tokenizer = ByteLevelBPETokenizer()

# Train on a simple sample text file
tokenizer.train(files="sample.txt", vocab_size=1000, min_frequency=2, special_tokens=[
    "<s>",
    "<pad>",
    "</s>",
    "<unk>",
    "<mask>",
])

# Save the tokenizer
tokenizer.save_model("tokenizer_model")

# 2. Load the trained tokenizer
tokenizer = ByteLevelBPETokenizer(
    "tokenizer_model/vocab.json",
    "tokenizer_model/merges.txt"
)

# 3. Encode and decode a sentence
encoded = tokenizer.encode("Hello, ByteLevel BPE!")
print("Tokens:", encoded.tokens)
print("Token IDs:", encoded.ids)

decoded = tokenizer.decode(encoded.ids)
print("Decoded:", decoded)





Tokens: ['Hello', ',', 'ĠB', 'y', 'te', 'L', 'e', 'v', 'el', 'ĠB', 'P', 'E', '!']
Token IDs: [271, 16, 268, 93, 267, 48, 73, 90, 261, 268, 52, 41, 5]
Decoded: Hello, ByteLevel BPE!


In [21]:
%pip install transformers

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Note: you may need to restart the kernel to use updated packages.


# use a pretrained Byte-Level BPE tokenizer (GPT-2 from Hugging Face)

In [22]:
from transformers import GPT2TokenizerFast

# Load the pretrained GPT-2 tokenizer (ByteLevel BPE)
tokenizer = GPT2TokenizerFast.from_pretrained("gpt2")

# Example text
text = "Hello, ByteLevel BPE!"

# Encode the text
encoded = tokenizer.encode(text)
print("Token IDs:", encoded)

# Decode the token IDs
decoded = tokenizer.decode(encoded)
print("Decoded:", decoded)

# Optional: Show tokens
tokens = tokenizer.tokenize(text)
print("Tokens:", tokens)


Token IDs: [15496, 11, 30589, 4971, 347, 11401, 0]
Decoded: Hello, ByteLevel BPE!
Tokens: ['Hello', ',', 'ĠByte', 'Level', 'ĠB', 'PE', '!']
