# Intro to the Tokenizers

Tokenization is the process of splitting text into smaller units (tokens)

<img src = https://raw.githubusercontent.com/lwtztea/ml_pic/9211f0d/week_5/tokenization_pipeline.png width = 1000 >

HuggingFace Tokenizers Summary: https://huggingface.co/docs/transformers/tokenizer_summary

## Basic Word Tokenization

In [1]:
import re
from collections import Counter, defaultdict

In [2]:
TEXT = "Hello world! This is a simple example."
tokenized_text = TEXT.split()
print(f"Word tokens: {tokenized_text}")

Word tokens: ['Hello', 'world!', 'This', 'is', 'a', 'simple', 'example.']


**Limitations:**

* Splits on whitespace, handles punctuation poorly
* Can't handle out-of-vocabulary words

## Byte-Pair Encoding

https://arxiv.org/pdf/1508.07909

BPE is used in models like RoBERTa and GPT-2

**Algorithm Steps:**

1. Split text into characters
2. Count adjacent pair frequencies
3. Merge most frequent pair
4. Repeat for desired vocabulary size

Final vocab is composed of the special token, the initial alphabet, and all the results of the merges.


Stanford detailed video about BPE: https://www.youtube.com/watch?v=tOMjTCO0htA

HuggingFace BPE explanation: https://www.youtube.com/watch?v=HEikzVL-lZU

<img src = https://raw.githubusercontent.com/lwtztea/ml_pic/611cde3/week_5/bpe_tokenization.png width = 1000 >

### Implementing BPE from Scratch

In [3]:
def preprocess(text):
    text = re.sub(r"\s+", " ", text)  # normalize whitespace
    return text.lower().split()


CORPUS = """
Split text into characters
Count adjacent pair frequencies
Merge most frequent pair
Repeat for desired vocabulary size
"""
tokenized_words = [list(word) for word in preprocess(CORPUS)]  # basic (char-level) tokenization
initial_vocab = {char for word in tokenized_words for char in word}

print(f"Initial vocabulary: {initial_vocab}")
print(f"Initial vocabulary size: {len(initial_vocab)}")

Initial vocabulary: {'d', 'i', 'g', 'x', 'y', 'm', 'f', 'u', 'z', 'e', 'q', 't', 'n', 'a', 'v', 'c', 's', 'h', 'l', 'o', 'p', 'b', 'j', 'r'}
Initial vocabulary size: 24


In [4]:
# tokenized_words
[" ".join(word) for word in tokenized_words]

['s p l i t',
 't e x t',
 'i n t o',
 'c h a r a c t e r s',
 'c o u n t',
 'a d j a c e n t',
 'p a i r',
 'f r e q u e n c i e s',
 'm e r g e',
 'm o s t',
 'f r e q u e n t',
 'p a i r',
 'r e p e a t',
 'f o r',
 'd e s i r e d',
 'v o c a b u l a r y',
 's i z e']

In [5]:
def merge_tokens(pair, corpus_in):
    corpus_out = []
    bigram = re.escape(" ".join(pair))
    p = re.compile(r"(?<!\S)" + bigram + r"(?!\S)")
    for word in corpus_in:
        new_word = p.sub("".join(pair), word)
        corpus_out.append(new_word)
    return corpus_out


def learn_bpe(words, num_merges):
    tokenized_corpus = [" ".join(word) for word in words]
    vocab = {char for word in words for char in word}
    merges = []

    for _ in range(num_merges):
        pairs = defaultdict(int)
        for word in tokenized_corpus:
            token = word.split()
            for j in range(len(token) - 1):
                pairs[(token[j], token[j + 1])] += 1

        if not pairs:
            break

        best_pair = max(pairs, key=pairs.get)  # most common pair
        merges.append(best_pair)
        vocab.add(best_pair[0] + best_pair[1])

        tokenized_corpus = merge_tokens(best_pair, tokenized_corpus)

    return merges, vocab

In [6]:
learned_merges, new_vocab = learn_bpe(tokenized_words, 50)
print("Learned merges:", *learned_merges, sep="\n")

Learned merges:
('n', 't')
('r', 'e')
('t', 'e')
('a', 'r')
('a', 'c')
('e', 'nt')
('p', 'a')
('pa', 'i')
('pai', 'r')
('f', 're')
('fre', 'q')
('freq', 'u')
('e', 's')
('s', 'p')
('sp', 'l')
('spl', 'i')
('spli', 't')
('te', 'x')
('tex', 't')
('i', 'nt')
('int', 'o')
('c', 'h')
('ch', 'ar')
('char', 'ac')
('charac', 'te')
('characte', 'r')
('character', 's')
('c', 'o')
('co', 'u')
('cou', 'nt')
('a', 'd')
('ad', 'j')
('adj', 'ac')
('adjac', 'ent')
('frequ', 'e')
('freque', 'n')
('frequen', 'c')
('frequenc', 'i')
('frequenci', 'es')
('m', 'e')
('me', 'r')
('mer', 'g')
('merg', 'e')
('m', 'o')
('mo', 's')
('mos', 't')
('frequ', 'ent')
('re', 'p')
('rep', 'e')
('repe', 'a')


In [7]:
print(f"New vocabulary: {new_vocab}")
print(f"Vocabulary size: {len(new_vocab)}")

New vocabulary: {'characters', 'frequent', 'd', 'i', 'text', 'g', 'pa', 'cou', 'tex', 'char', 'count', 'adj', 'repea', 'x', 'ar', 'y', 'repe', 'm', 'fre', 'f', 'most', 'es', 'u', 'z', 're', 'e', 'merge', 'te', 'frequ', 'q', 't', 'pair', 'int', 'n', 'a', 'adjac', 'v', 'c', 'frequen', 'mer', 's', 'merg', 'rep', 'split', 'charac', 'spl', 'characte', 'ad', 'sp', 'frequencies', 'mos', 'spli', 'h', 'frequenc', 'l', 'pai', 'nt', 'into', 'o', 'ent', 'me', 'p', 'ac', 'freque', 'frequenci', 'b', 'adjacent', 'mo', 'j', 'ch', 'co', 'r', 'freq', 'character'}
Vocabulary size: 74


In [8]:
def bpe_encode(word, merges):
    encoded = list(word.lower())
    for pair in merges:
        i = 0
        while i < len(encoded) - 1:
            if (encoded[i], encoded[i + 1]) == pair:
                encoded = encoded[:i] + [pair[0] + pair[1]] + encoded[i + 2:]
            else:
                i += 1
    return encoded


TEXT = "adjaces adjacent"
print(f"Encoded '{TEXT}': {bpe_encode(TEXT, learned_merges)}")

Encoded 'adjaces adjacent': ['adjac', 'es', ' ', 'adjacent']


## WordPiece

<img src = https://raw.githubusercontent.com/lwtztea/ml_pic/611cde3/week_5/wordpiece_tokenization.png width = 1000 >

WordPiece is a subword tokenization algorithm used in models like BERT. Key features:
- Uses a probabilistic approach to build vocabulary
- Balances between character-level and word-level tokenization
- Adds special suffix (e.g., "##") to denote subword units

**Algorithm Steps:**

1. Each word is split by adding "##" prefix to all the characters inside it
2. Initial alphabet contains all the characters present at the beginning of the words and the characters present inside the words
3. Iteratively merge most probable symbol pairs to maximize language model likelihood

$$\text{pair_score} = \frac{\text{freq_of_pair}}{\text{freq_of_first_element } \times \text{ freq_of_second_element}}$$

In [9]:
class WordPieceTokenizer:
    def __init__(self, unk_token="[UNK]", max_input_chars_per_word=100):
        """
        Initialize the WordPiece tokenizer.

        Args:
            unk_token (str): The token to represent unknown words.
            max_input_chars_per_word (int): Maximum length of a word to tokenize.
        """
        self.unk_token = unk_token
        self.max_input_chars_per_word = max_input_chars_per_word
        self.vocab = None
        self.word_piece_regex = re.compile(r"[\w]+[']?[\w]*")

    def learn_vocab(self, text, vocab_size=1000):
        """
        Learn the vocabulary from the provided text.

        Args:
            text (str): The input text to learn the vocabulary from.
            vocab_size (int): The desired size of the vocabulary.
        """
        # Initialize vocabulary with individual characters and special tokens
        self.vocab = {"[PAD]", "[UNK]", "[CLS]", "[SEP]", "[MASK]"}
        words = self.word_piece_regex.findall(text.lower())  # Convert to lowercase for consistency
        word_counts = Counter(words)

        # Add individual characters to the vocabulary
        for word in word_counts:
            for char in word:
                self.vocab.add(char)

        # Perform WordPiece training
        while len(self.vocab) < vocab_size:
            # Count pairs of subwords
            pair_counts = defaultdict(int)
            for word, count in word_counts.items():
                subwords = self._split_to_current_subwords(word)
                if len(subwords) <= 1:
                    continue

                for i in range(len(subwords) - 1):
                    pair = (subwords[i], subwords[i + 1])
                    pair_counts[pair] += count

            if not pair_counts:
                break

            # Find the most frequent pair
            best_pair = max(pair_counts.items(), key=lambda x: x[1], default=(None, 0))
            if best_pair[0] is None:
                break

            first, second = best_pair[0]
            # Create the new subword
            if second.startswith("##"):
                new_subword = first + second[2:]
            else:
                new_subword = first + second

            # Add the new subword to the vocabulary
            self.vocab.add(new_subword)

            # No need to update word_counts as we're only building the vocabulary

        print(f"Vocabulary size: {len(self.vocab)}")
        print(f"Sample vocabulary items: {list(self.vocab)[:30]}")

    def _split_to_current_subwords(self, word):
        """
        Split a word into its current subwords based on the vocabulary.
        This is a helper function for learning the vocabulary.
        """
        subwords = []
        start = 0
        while start < len(word):
            end = len(word)
            curr_subword = None

            while start < end:
                substring = word[start:end]
                if start > 0:
                    substring = "##" + substring

                if substring in self.vocab:
                    curr_subword = substring
                    break
                end -= 1

            if curr_subword is None:
                # If no subword is found, use the first character and add prefix if needed
                end = start + 1
                substring = word[start:end]
                if start > 0:
                    substring = "##" + substring
                curr_subword = substring

            subwords.append(curr_subword)
            start = end

        return subwords

    def _word_to_subwords(self, word):
        """
        Split a word into subwords using the current vocabulary.

        Args:
            word (str): The word to tokenize.

        Returns:
            list: A list of subwords.
        """
        if not word:
            return []

        subwords = []
        start = 0
        while start < len(word):
            end = len(word)
            found = False

            while start < end:
                substring = word[start:end]
                if start > 0:
                    substring = "##" + substring

                if substring in self.vocab:
                    subwords.append(substring)
                    start = end
                    found = True
                    break
                end -= 1

            if not found:
                # If no subword is found, use the unknown token and move on
                subwords.append(self.unk_token)
                break

        return subwords

    def tokenize(self, text):
        """
        Tokenize the input text into subwords.

        Args:
            text (str): The input text to tokenize.

        Returns:
            list: A list of tokens.
        """
        if self.vocab is None:
            raise ValueError(
                "Vocabulary not learned. Call `learn_vocab` first.")

        tokens = []
        for word in self.word_piece_regex.findall(text.lower()):
            if len(word) > self.max_input_chars_per_word:
                tokens.append(self.unk_token)
                continue
            subwords = self._word_to_subwords(word)
            tokens.extend(subwords)
        return tokens

In [10]:
TEXT = "hello world! This is a test. WordPiece is awesome."

tokenizer = WordPieceTokenizer()

# Learn the vocabulary
tokenizer.learn_vocab(TEXT, vocab_size=40)

Vocabulary size: 40
Sample vocabulary items: ['wordpiec', 'hello', 'wordpi', 'wo', 'm', 'wordp', 'worl', 'hell', '[MASK]', 'tes', 'wordpiece', 'c', '[PAD]', 'thi', 'word', 'he', 'r', 'this', 'is', 'd', 'i', '[CLS]', 'hel', '##es', 'e', 't', 'a', 'world', 's', 'w']


In [11]:
NEW_TEXT = "hello world! This is a new test."
tokens = tokenizer.tokenize(NEW_TEXT)
print("Tokens:", tokens)

Tokens: ['hello', 'world', 'this', 'is', 'a', '[UNK]', 'test']
