In [18]:
import re
import collections
from typing import Dict, List, Tuple, Set
import json

## 1. BPE (Byte Pair Encoding) Tokenizer

BPE is a data-driven subword tokenization algorithm that iteratively merges the most frequent pair of consecutive symbols in a corpus. It starts with a vocabulary of individual characters and gradually builds up larger subword units.

**Function-based implementation** using global variables to maintain state.

In [19]:
bpe_vocab = {}
bpe_merges = []
bpe_word_freqs = {}
bpe_vocab_size = 1000

def get_pair_stats(splits, word_freqs):
    """Get frequency of consecutive symbol pairs"""
    pairs = collections.defaultdict(int)
    for word, freq in word_freqs.items():
        symbols = splits[word]
        for i in range(len(symbols) - 1):
            pairs[(symbols[i], symbols[i + 1])] += freq
    return pairs

def merge_vocab_pair(pair, splits):
    """Merge the most frequent pair in the vocabulary"""
    new_splits = {}
    bigram = re.escape(' '.join(pair))
    p = re.compile(r'(?<!\S)' + bigram + r'(?!\S)')
    
    for word in splits:
        new_word = p.sub(''.join(pair), ' '.join(splits[word]))
        new_splits[word] = new_word.split()
    return new_splits

def train_bpe(corpus, vocab_size=1000):
    """
    Train the BPE tokenizer on a given corpus
    """
    global bpe_vocab, bpe_merges, bpe_word_freqs, bpe_vocab_size
    
    bpe_vocab_size = vocab_size
    bpe_word_freqs = collections.defaultdict(int)
    bpe_merges = []
    
    # Count word frequencies
    for text in corpus:
        words = text.lower().split()
        for word in words:
            bpe_word_freqs[word] += 1
    
    # Initialize splits, each word split into characters with </w> at the end
    splits = {}
    for word in bpe_word_freqs:
        splits[word] = list(word) + ['</w>']
    
    # Get initial vocabulary (all characters)
    vocab = set()
    for word in bpe_word_freqs:
        vocab.update(splits[word])
    
    # Convert to numbered vocabulary
    bpe_vocab = {symbol: i for i, symbol in enumerate(sorted(vocab))}
    
    # Perform BPE merges
    num_merges = bpe_vocab_size - len(bpe_vocab)
    
    for i in range(num_merges):
        pairs = get_pair_stats(splits, bpe_word_freqs)
        if not pairs:
            break
            
        # Find most frequent pair
        best_pair = max(pairs, key=pairs.get)
        
        # Merge the pair
        splits = merge_vocab_pair(best_pair, splits)
        
        # Add merged token to vocabulary
        new_token = ''.join(best_pair)
        bpe_vocab[new_token] = len(bpe_vocab)
        bpe_merges.append(best_pair)
        
        if i % 100 == 0:
            print(f"Merge {i}: {best_pair} -> {new_token}")

def apply_bpe_merges(word):
    """Apply learned merges to a word"""
    splits = list(word) + ['</w>']
    
    for merge in bpe_merges:
        i = 0
        while i < len(splits) - 1:
            if splits[i] == merge[0] and splits[i + 1] == merge[1]:
                splits = splits[:i] + [''.join(merge)] + splits[i + 2:]
            else:
                i += 1
    return splits

def tokenize_bpe(text):
    """
    Tokenize text using learned BPE
    """
    words = text.lower().split()
    tokens = []
    
    for word in words:
        word_tokens = apply_bpe_merges(word)
        tokens.extend(word_tokens)
    
    return tokens

def encode_bpe(text):
    """
    Encode text to token IDs
    """
    tokens = tokenize_bpe(text)
    return [bpe_vocab.get(token, bpe_vocab.get('<unk>', 0)) for token in tokens]

def decode_bpe(token_ids):
    """
    Decode token IDs back to text
    """
    # Create reverse vocabulary
    id_to_token = {v: k for k, v in bpe_vocab.items()}
    
    tokens = [id_to_token.get(token_id, '<unk>') for token_id in token_ids]
    
    # Join tokens and handle word boundaries
    text = ''.join(tokens).replace('</w>', ' ').strip()
    return text

def save_bpe_model(filepath):
    """Save the trained BPE model"""
    model_data = {
        'vocab': bpe_vocab,
        'merges': bpe_merges,
        'vocab_size': bpe_vocab_size
    }
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(model_data, f, ensure_ascii=False, indent=2)

def load_bpe_model(filepath):
    """Load a trained BPE model"""
    global bpe_vocab, bpe_merges, bpe_vocab_size
    
    with open(filepath, 'r', encoding='utf-8') as f:
        model_data = json.load(f)
    
    bpe_vocab = model_data['vocab']
    bpe_merges = [tuple(merge) for merge in model_data['merges']]
    bpe_vocab_size = model_data['vocab_size']

def get_bpe_vocab_size():
    """Get current BPE vocabulary size"""
    return len(bpe_vocab)

def get_bpe_merges_count():
    """Get number of BPE merges learned"""
    return len(bpe_merges)

## Rule-based Tokenizer

A rule-based tokenizer uses predefined rules and patterns to split text into tokens.

In [None]:
# Global variables for rule-based tokenizer configuration
rule_preserve_case = False
rule_handle_contractions = True
rule_handle_punctuation = True
rule_handle_numbers = True
rule_handle_urls = True
rule_handle_emails = True
rule_pattern = None

# Common contractions mapping
contractions_map = {
    "n't": "not",
    "'re": "are", 
    "'ve": "have",
    "'ll": "will",
    "'d": "would",
    "'m": "am",
    "'s": "is"  # Note: 's can also be possessive
}

def configure_rule_tokenizer(preserve_case=False,
                           handle_contractions=True,
                           handle_punctuation=True,
                           handle_numbers=True,
                           handle_urls=True,
                           handle_emails=True):
    """
    Configure rule-based tokenizer with various options
    
	preserve_case: Whether to preserve original casing
	handle_contractions: Whether to split contractions (e.g., "don't" -> "do", "n't")
	handle_punctuation: Whether to separate punctuation
	handle_numbers: Whether to treat numbers as separate tokens
	handle_urls: Whether to keep URLs as single tokens
	handle_emails: Whether to keep emails as single tokens
    """
    global rule_preserve_case, rule_handle_contractions, rule_handle_punctuation
    global rule_handle_numbers, rule_handle_urls, rule_handle_emails, rule_pattern
    
    rule_preserve_case = preserve_case
    rule_handle_contractions = handle_contractions
    rule_handle_punctuation = handle_punctuation
    rule_handle_numbers = handle_numbers
    rule_handle_urls = handle_urls
    rule_handle_emails = handle_emails
    
    # Compile patterns
    rule_pattern = compile_rule_patterns()

def compile_rule_patterns():
    """Compile regex patterns for tokenization"""
    patterns = []
    
    # URLs
    if rule_handle_urls:
        url_pattern = r'https?://(?:[-\w.])+(?:[:\d]+)?(?:/(?:[\w/_.])*(?:\?(?:[\w&=%.])*)?(?:#(?:\w)*)?)?'
        patterns.append(url_pattern)
    
    # Email addresses
    if rule_handle_emails:
        email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
        patterns.append(email_pattern)
    
    # Numbers
    if rule_handle_numbers:
        number_pattern = r'\d+(?:\.\d+)?%?'
        patterns.append(number_pattern)
    
    # Contractions
    if rule_handle_contractions:
        contraction_pattern = r"\w+(?:'[a-z]+)?"
        patterns.append(contraction_pattern)
    
    # Words (letters, including apostrophes within words)
    patterns.append(r"\w+(?:'\w+)*")
    
    # Punctuation
    if rule_handle_punctuation:
        patterns.append(r'[^\w\s]')
    
    # Combine all patterns
    return re.compile('|'.join(f'({p})' for p in patterns), re.IGNORECASE)

def handle_contraction(token):
    """Handle contraction splitting"""
    if not rule_handle_contractions:
        return [token]
    
    # Check for common contractions
    for contraction, expansion in contractions_map.items():
        if token.lower().endswith(contraction):
            base = token[:-len(contraction)]
            if base:
                return [base, expansion]
    
    return [token]

def normalize_case(token):
    """Normalize case based on settings"""
    if rule_preserve_case:
        return token
    return token.lower()

def tokenize_rule_based(text):
    """
    Tokenize text using rule-based approach
    """
    if not text:
        return []
    
    if rule_pattern is None:
        raise ValueError("Rule tokenizer not configured. Call configure_rule_tokenizer() first.")
    
    tokens = []
    
    # Find all matches using compiled pattern
    for match in rule_pattern.finditer(text):
        token = match.group().strip()
        if not token:
            continue
        
        # Handle contractions
        if rule_handle_contractions and "'" in token:
            contraction_tokens = handle_contraction(token)
            for ct in contraction_tokens:
                tokens.append(normalize_case(ct))
        else:
            tokens.append(normalize_case(token))
    
    return tokens

def sentence_tokenize(text):
    """
    Split text into sentences using rules
    """
    # Pattern for sentence boundaries
    sentence_pattern = r'(?<=[.!?])\s+(?=[A-Z])'
    
    # Handle abbreviations (simple approach)
    # Replace common abbreviations temporarily
    abbrevs = ['Mr.', 'Mrs.', 'Dr.', 'Prof.', 'Sr.', 'Jr.', 'vs.', 'etc.', 'i.e.', 'e.g.']
    temp_text = text
    for i, abbrev in enumerate(abbrevs):
        temp_text = temp_text.replace(abbrev, f'__ABBREV_{i}__')
    
    # Split sentences
    sentences = re.split(sentence_pattern, temp_text)
    
    # Restore abbreviations
    for i, abbrev in enumerate(abbrevs):
        sentences = [s.replace(f'__ABBREV_{i}__', abbrev) for s in sentences]
    
    return [s.strip() for s in sentences if s.strip()]

def get_token_types(tokens):
    """
    Classify tokens by type
    """
    types = []
    
    for token in tokens:
        if re.match(r'https?://', token):
            types.append('URL')
        elif re.match(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', token):
            types.append('EMAIL')
        elif re.match(r'\d+(?:\.\d+)?%?', token):
            types.append('NUMBER')
        elif re.match(r'[^\w\s]', token):
            types.append('PUNCTUATION')
        elif token.lower() in contractions_map.values():
            types.append('CONTRACTION_WORD')
        elif re.match(r'\w+', token):
            types.append('WORD')
        else:
            types.append('OTHER')
    
    return types

def get_tokenization_statistics(text):
    """
    Get tokenization statistics
    """
    tokens = tokenize_rule_based(text)
    token_types = get_token_types(tokens)
    
    stats = {
        'total_tokens': len(tokens),
        'unique_tokens': len(set(tokens)),
        'sentences': len(sentence_tokenize(text)),
        'characters': len(text),
        'characters_no_spaces': len(text.replace(' ', ''))
    }
    
    # Count by token type
    type_counts = collections.Counter(token_types)
    stats.update(type_counts)
    
    return stats

## 3. Example Usage and Comparison

Let's test both tokenizers with sample text to see how they perform.

In [21]:
# Sample training corpus for BPE
training_corpus = [
    "the quick brown fox jumps over the lazy dog",
    "natural language processing is fascinating",
    "machine learning algorithms are powerful tools",
    "tokenization is an important preprocessing step",
    "byte pair encoding learns subword units automatically",
    "rule based approaches use predefined patterns",
    "both methods have their advantages and disadvantages",
    "preprocessing text data requires careful consideration",
    "the effectiveness of tokenization depends on the task",
    "subword tokenization helps with out of vocabulary words"
]

# Test text
test_text = """
Hello world! There can't be cats in my house otherwise it's gonna explode or something. 
Stuff stuff, https://www.google.com, giacomo.antonelli3@gmail.com at $0.99 if interested, possible 99% discount!
"""

In [22]:
# Training and use BPE
train_bpe(training_corpus, vocab_size=200)

print(f"\nBPE Vocabulary size: {get_bpe_vocab_size()}")
print(f"Number of merges learned: {get_bpe_merges_count()}")

bpe_tokens = tokenize_bpe(test_text)
print(f"\nBPE Tokens ({len(bpe_tokens)}): {bpe_tokens}")

bpe_encoded = encode_bpe(test_text)
bpe_decoded = decode_bpe(bpe_encoded)
print(f"BPE Decoded: {bpe_decoded}")

Merge 0: ('s', '</w>') -> s</w>
Merge 100: ('langu', 'ag') -> languag

BPE Vocabulary size: 200
Number of merges learned: 173

BPE Tokens (164): ['h', 'e', 'l', 'l', 'o', '</w>', 'wor', 'l', 'd', '!', '</w>', 'th', 'er', 'e</w>', 'c', 'an', "'", 't</w>', 'b', 'e</w>', 'c', 'at', 's</w>', 'in', '</w>', 'm', 'y</w>', 'h', 'o', 'use</w>', 'o', 'th', 'er', 'w', 'i', 's', 'e</w>', 'i', 't', "'", 's</w>', 'g', 'o', 'n', 'n', 'a', '</w>', 'e', 'x', 'p', 'l', 'o', 'd', 'e</w>', 'or', '</w>', 's', 'o', 'm', 'e', 'th', 'in', 'g', '.', '</w>', 's', 't', 'u', 'f', 'f', '</w>', 's', 't', 'u', 'f', 'f', ',', '</w>', 'h', 't', 't', 'p', 's', ':', '/', '/', 'w', 'w', 'w', '.', 'g', 'o', 'o', 'g', 'le', '.', 'co', 'm', ',', '</w>', 'g', 'i', 'ac', 'o', 'm', 'o', '.', 'an', 'to', 'n', 'e', 'l', 'l', 'i', '3', '@', 'g', 'm', 'a', 'i', 'l', '.', 'co', 'm', '</w>', 'at', '</w>', '$', '0', '.', '9', '9', '</w>', 'i', 'f', '</w>', 'in', 't', 'er', 'es', 'te', 'd', ',', '</w>', 'p', 'o', 's', 's', 'i', 'b', '

In [23]:
# Configuration and use of rule-based tokenizer
configure_rule_tokenizer(
    preserve_case=True,
    handle_contractions=True,
    handle_punctuation=True,
    handle_numbers=True,
    handle_urls=True,
    handle_emails=True
)

rule_tokens = tokenize_rule_based(test_text)
print(f"Rule-based Tokens ({len(rule_tokens)}): {rule_tokens}")

token_types = get_token_types(rule_tokens)
print(f"\nToken types: {token_types}")

stats = get_tokenization_statistics(test_text)
print(f"\nTokenization statistics:")
for key, value in stats.items():
    print(f"  {key}: {value}")

sentences = sentence_tokenize(test_text)
print(f"\nSentences ({len(sentences)}):")
for i, sentence in enumerate(sentences, 1):
    print(f"  {i}. {sentence}")

Rule-based Tokens (35): ['Hello', 'world', '!', 'There', 'ca', 'not', 'be', 'cats', 'in', 'my', 'house', 'otherwise', 'it', 'is', 'gonna', 'explode', 'or', 'something', '.', 'Stuff', 'stuff', ',', 'https://www.google.com', ',', 'giacomo.antonelli3@gmail.com', 'at', '$', '0.99', 'if', 'interested', ',', 'possible', '99%', 'discount', '!']

Token types: ['WORD', 'WORD', 'PUNCTUATION', 'WORD', 'WORD', 'CONTRACTION_WORD', 'WORD', 'WORD', 'WORD', 'WORD', 'WORD', 'WORD', 'WORD', 'CONTRACTION_WORD', 'WORD', 'WORD', 'WORD', 'WORD', 'PUNCTUATION', 'WORD', 'WORD', 'PUNCTUATION', 'URL', 'PUNCTUATION', 'EMAIL', 'WORD', 'PUNCTUATION', 'NUMBER', 'WORD', 'WORD', 'PUNCTUATION', 'WORD', 'NUMBER', 'WORD', 'PUNCTUATION']

Tokenization statistics:
  total_tokens: 35
  unique_tokens: 32
  sentences: 3
  characters: 203
  characters_no_spaces: 178
  WORD: 22
  PUNCTUATION: 7
  CONTRACTION_WORD: 2
  URL: 1
  EMAIL: 1
  NUMBER: 2

Sentences (3):
  1. Hello world!
  2. There can't be cats in my house otherwise