In [7]:
from collections import defaultdict, Counter
from typing import Dict, List, Tuple, Set
import re
import json
import os
import math

class BPETokenizer:
    def __init__(self, vocab_size: int = None):
        self.vocab_size = vocab_size
        self.word_freqs = defaultdict(int)
        self.vocab = {}
        self.merges = {}
        self.inverse_vocab = {}
        self.pattern = None
        self.original_text = ""
    
    def train_from_file(self, file_path: str, encoding='utf-8', auto_vocab_size=True):
        """Train the tokenizer from a text file and calculate compression ratio."""
        print(f"Loading text file: {file_path}")
        
        # Store the original text for compression analysis
        with open(file_path, 'r', encoding=encoding) as f:
            self.original_text = f.read()
            
        if auto_vocab_size or self.vocab_size is None:
            self.vocab_size = self._suggest_vocab_size(self.original_text)
            print(f"\nUsing automatically determined vocabulary size: {self.vocab_size}")
            
        print("\nTraining tokenizer...")
        self.train(self.original_text.split('\n'))
        
        # Calculate and print compression metrics for training data
        self.print_training_compression_stats()
    
    def train(self, texts: List[str]):
        """Train the BPE tokenizer on a list of texts."""
        # Count word frequencies
        print("Counting word frequencies...")
        for text in texts:
            words = text.split()
            for word in words:
                self.word_freqs[' '.join(list(word)) + ' </w>'] += 1
        
        # Initialize characters vocabulary
        chars = set()
        for word in self.word_freqs.keys():
            chars.update(word.split())
        self.vocab = {char: idx for idx, char in enumerate(chars)}
        self.inverse_vocab = {idx: char for char, idx in self.vocab.items()}
        
        # Main BPE training loop
        print(f"Starting BPE training with target vocabulary size: {self.vocab_size}")
        num_merges = min(self.vocab_size - len(self.vocab), 1000)
        for i in range(num_merges):
            pairs = self.get_stats()
            if not pairs:
                break
                
            if (i + 1) % 100 == 0:
                print(f"Completed {i + 1} merges...")
                
            best_pair = max(pairs.items(), key=lambda x: x[1])[0]
            self.merge_vocab(best_pair)
            
            # Add the merged pair to vocabulary
            merged_token = best_pair[0] + best_pair[1]
            self.vocab[merged_token] = len(self.vocab)
            self.inverse_vocab[len(self.vocab) - 1] = merged_token
            self.merges[best_pair] = merged_token
        
        # Create regex pattern for tokenization
        self.pattern = re.compile("|".join(map(re.escape, sorted(self.vocab.keys(), key=len, reverse=True))))
        print(f"Final vocabulary size: {len(self.vocab)}")
    
    def get_stats(self) -> Dict[Tuple[str, str], int]:
        """Count frequency of adjacent pairs in current vocabulary."""
        pairs = defaultdict(int)
        for word, freq in self.word_freqs.items():
            symbols = word.split()
            for i in range(len(symbols) - 1):
                pairs[symbols[i], symbols[i + 1]] += freq
        return pairs
    
    def merge_vocab(self, pair: Tuple[str, str]):
        """Merge a pair of symbols in the vocabulary."""
        bigram = " ".join(pair)
        replacement = "".join(pair)
        
        # Create a new dictionary to store updated frequencies
        new_word_freqs = defaultdict(int)
        
        # Iterate over a copy of the keys
        for word in list(self.word_freqs.keys()):
            if bigram in word:
                new_word = word.replace(bigram, replacement)
                new_word_freqs[new_word] += self.word_freqs[word]
            else:
                new_word_freqs[word] += self.word_freqs[word]
                
        self.word_freqs = new_word_freqs
    
    def encode(self, text: str) -> List[int]:
        """Encode text into token ids."""
        tokens = []
        for word in text.split():
            word = " ".join(list(word)) + " </w>"
            
            # Find all non-overlapping matches
            current_word = word
            encoded_word = []
            
            while current_word:
                match = self.pattern.search(current_word)
                if not match:
                    if current_word.strip():
                        print(f"Warning: Could not encode '{current_word.strip()}'")
                    break
                    
                token = match.group(0)
                encoded_word.append(self.vocab[token])
                start, end = match.span()
                current_word = current_word[end:].strip()
            
            tokens.extend(encoded_word)
        
        self.total_tokens = len(tokens)  # Update token count
        return tokens
    
    def decode(self, token_ids: List[int]) -> str:
        """Decode token ids back to text."""
        text = []
        current_word = []
        
        for token_id in token_ids:
            if token_id not in self.inverse_vocab:
                print(f"Warning: Unknown token ID {token_id}")
                continue
                
            token = self.inverse_vocab[token_id]
            if token == "</w>":
                text.append("".join(current_word))
                current_word = []
            else:
                current_word.append(token)
                
        if current_word:  # Handle case where last token wasn't </w>
            text.append("".join(current_word))
            
        return " ".join(text)
    
    def print_statistics(self):
        """Print tokenizer statistics including compression ratio."""
        print("\nTokenizer Statistics:")
        print(f"Vocabulary size: {len(self.vocab)}")
        print(f"Number of merges: {len(self.merges)}")
        if self.total_tokens > 0 and self.original_chars > 0:
            compression_ratio = self.original_chars / (self.total_tokens * 2)  # Assuming 2 bytes per token on average
            print(f"Original characters: {self.original_chars:,}")
            print(f"Total tokens: {self.total_tokens:,}")
            print(f"Compression ratio: {compression_ratio:.2f}x")
    
    def save(self, path: str):
        """Save tokenizer configuration to file."""
        config = {
            'vocab': self.vocab,
            'merges': self.merges,
            'vocab_size': self.vocab_size
        }
        with open(path, 'w', encoding='utf-8') as f:
            json.dump(config, f, ensure_ascii=False, indent=2)
    
    @classmethod
    def load(cls, path: str) -> 'BPETokenizer':
        """Load tokenizer configuration from file."""
        with open(path, 'r', encoding='utf-8') as f:
            config = json.load(f)
            
        tokenizer = cls(vocab_size=config['vocab_size'])
        tokenizer.vocab = config['vocab']
        tokenizer.merges = config['merges']
        tokenizer.inverse_vocab = {idx: char for char, idx in tokenizer.vocab.items()}
        tokenizer.pattern = re.compile("|".join(map(re.escape, sorted(tokenizer.vocab.keys(), key=len, reverse=True))))
        return tokenizer
    
    
    def analyze_text(self, file_path: str, encoding='utf-8') -> int:
        """Analyze text file and suggest vocabulary size."""
        print(f"Analyzing text file: {file_path}")
        
        # Read file
        with open(file_path, 'r', encoding=encoding) as f:
            text = f.read()
            
        # Basic text statistics
        total_chars = len(text)
        unique_chars = len(set(text))
        total_words = len(text.split())
        unique_words = len(set(text.split()))
        
        # Calculate suggested vocab size
        # Rule of thumb: sqrt(unique_words) * log(total_words)
        suggested_size = int(math.sqrt(unique_words) * math.log(total_words + 1))
        # Round to nearest hundred and ensure minimum size
        suggested_size = max(100, round(suggested_size, -2))
        
        print("\nText Analysis:")
        print(f"Total characters: {total_chars:,}")
        print(f"Unique characters: {unique_chars}")
        print(f"Total words: {total_words:,}")
        print(f"Unique words: {unique_words:,}")
        print(f"Suggested vocabulary size: {suggested_size:,}")
        
        return suggested_size
    
    def calculate_compression_metrics(self, text: str) -> dict:
        """Calculate detailed compression metrics for a given text."""
        # Original text metrics
        original_bytes = len(text.encode('utf-8'))
        original_chars = len(text)
        
        # Encode the text
        token_ids = self.encode(text)
        
        # Calculate encoded size
        # Each token ID needs enough bits to represent the vocab size
        bits_per_token = max(math.ceil(math.log2(len(self.vocab))), 8)
        encoded_bits = len(token_ids) * bits_per_token
        encoded_bytes = math.ceil(encoded_bits / 8)
        
        metrics = {
            'original_size': {
                'bytes': original_bytes,
                'chars': original_chars
            },
            'encoded_size': {
                'tokens': len(token_ids),
                'bits_per_token': bits_per_token,
                'total_bits': encoded_bits,
                'bytes': encoded_bytes
            },
            'compression': {
                'ratio': original_bytes / encoded_bytes,
                'space_saving_percent': (1 - (encoded_bytes / original_bytes)) * 100
            }
        }
        
        return metrics

    def print_training_compression_stats(self):
        """Calculate and print compression statistics for training data."""
        if not self.original_text:
            print("No training data available for compression analysis")
            return
            
        # Original text metrics
        original_bytes = len(self.original_text.encode('utf-8'))
        original_chars = len(self.original_text)
        
        # Encode the entire training text
        encoded_tokens = self.encode(self.original_text)
        
        # Calculate bits needed per token based on vocab size
        bits_per_token = max(math.ceil(math.log2(len(self.vocab))), 8)
        encoded_bits = len(encoded_tokens) * bits_per_token
        encoded_bytes = math.ceil(encoded_bits / 8)
        
        # Calculate compression ratio
        compression_ratio = original_bytes / encoded_bytes
        space_saving = (1 - (encoded_bytes / original_bytes)) * 100
        
        print("\nTraining Data Compression Analysis:")
        print(f"Original text size:")
        print(f"  - Characters: {original_chars:,}")
        print(f"  - Bytes: {original_bytes:,}")
        
        print(f"\nTokenized text size:")
        print(f"  - Vocabulary size: {len(self.vocab):,}")
        print(f"  - Total tokens: {len(encoded_tokens):,}")
        print(f"  - Bits per token: {bits_per_token}")
        print(f"  - Total bits: {encoded_bits:,}")
        print(f"  - Bytes: {encoded_bytes:,}")
        
        print(f"\nCompression metrics:")
        print(f"  - Compression ratio: {compression_ratio:.2f}x")
        print(f"  - Space saving: {space_saving:.1f}%")
        print(f"  - Average tokens per character: {len(encoded_tokens)/original_chars:.2f}")
    
    def _suggest_vocab_size(self, text: str) -> int:
        """Suggest vocabulary size based on text statistics."""
        words = text.split()
        unique_words = len(set(words))
        total_words = len(words)
        
        suggested_size = int(math.sqrt(unique_words) * math.log(total_words + 1))
        return max(100, round(suggested_size, -2))


In [3]:
import re
from collections import defaultdict, Counter
from typing import List, Dict, Tuple, Set
import numpy as np
import json
import pickle
from pathlib import Path
from tqdm import tqdm

class HindiBPE:
    def __init__(self, vocab_size: int = 5000):
        self.vocab_size = vocab_size
        self.vocab = set()
        self.merge_rules = {}
        self.token_counts = Counter()
        
    def get_stats(self, pairs: List[Tuple[str, str]]) -> Dict[Tuple[str, str], int]:
        """Count frequency of pairs in the current vocabulary"""
        stats = defaultdict(int)
        for pair in pairs:
            stats[pair] += 1
        return stats
    
    def get_pairs(self, word: List[str]) -> List[Tuple[str, str]]:
        """Get all adjacent pairs in a word"""
        return [(word[i], word[i+1]) for i in range(len(word)-1)]
    
    def merge_vocab(self, pair: Tuple[str, str], v_in: List[str]) -> List[str]:
        """Merge a pair of tokens in the vocabulary"""
        v_out = []
        i = 0
        while i < len(v_in):
            if i < len(v_in) - 1 and v_in[i] == pair[0] and v_in[i+1] == pair[1]:
                v_out.append(pair[0] + pair[1])
                i += 2
            else:
                v_out.append(v_in[i])
                i += 1
        return v_out

    def train(self, texts: List[str]):
        
        # Count word frequencies in parallel
        word_freqs = Counter()
        for text in tqdm(texts, desc="Counting words"):
            words = text.split()
            for word in words:
                word = '▁' + word  # Add space marker
                word_freqs[word] += 1
        
        print(f"Found {len(word_freqs)} unique words")        
        # Initialize vocabulary with characters
        for word, freq in word_freqs.items():
            # if freq < min_freq:
            #     continue
            for char in word:
                self.vocab.add(char)
        
        # Convert words to list of characters
        splits = {word: list(word) for word in word_freqs.keys()}
        
        # Main training loop with progress bar
        pbar = tqdm(total=min(self.vocab_size - len(self.vocab), len(word_freqs)), desc="Training BPE")
        while len(self.vocab) < self.vocab_size:
            pairs = defaultdict(int)
            for word, freq in word_freqs.items():
                # if freq < min_freq:
                #     continue
                word_pairs = self.get_pairs(splits[word])
                for pair in word_pairs:
                    pairs[pair] += freq
            
            if not pairs:
                break
                
            best_pair = max(pairs.items(), key=lambda x: x[1])[0]
            self.vocab.add(''.join(best_pair))
            self.merge_rules[best_pair] = ''.join(best_pair)
            
            new_splits = {}
            for word in splits:
                new_splits[word] = self.merge_vocab(best_pair, splits[word])
            splits = new_splits
            pbar.update(1)
        
        pbar.close()
        
        # Update token counts after training
        self._update_token_counts(texts)
            
    def tokenize(self, text: str) -> List[str]:
        """Tokenize text using learned BPE rules"""
        words = text.split()
        tokens = []
        
        for word in words:
            word = '▁' + word  # Add space marker
            current_tokens = list(word)
            
            while True:
                pairs = self.get_pairs(current_tokens)
                if not pairs:
                    break
                    
                # Find mergeable pair
                mergeable = False
                for pair in pairs:
                    if pair in self.merge_rules:
                        current_tokens = self.merge_vocab(pair, current_tokens)
                        mergeable = True
                        break
                
                if not mergeable:
                    break
                    
            tokens.extend(current_tokens)
            
        return tokens
    
    def save(self, path: str):
        """Save the tokenizer to a file"""
        save_dict = {
            'vocab_size': self.vocab_size,
            'vocab': list(self.vocab),
            'merge_rules': {str(k): v for k, v in self.merge_rules.items()},
            'token_counts': dict(self.token_counts)
        }
        
        path = Path(path)
        # Save main model
        with open(path, 'wb') as f:
            pickle.dump(save_dict, f)
            
        # Save token statistics separately as JSON for easy viewing
        stats_path = path.with_suffix('.stats.json')
        stats = {
            'vocab_size': len(self.vocab),
            'token_counts': dict(self.token_counts.most_common(100)),  # Save top 100 tokens
            'total_tokens': sum(self.token_counts.values())
        }
        with open(stats_path, 'w', encoding='utf-8') as f:
            json.dump(stats, f, ensure_ascii=False, indent=2)


    @classmethod
    def load(cls, path: str):
        """Load a saved tokenizer"""
        with open(path, 'rb') as f:
            save_dict = pickle.load(f)
            
        tokenizer = cls(vocab_size=save_dict['vocab_size'])
        tokenizer.vocab = set(save_dict['vocab'])
        tokenizer.merge_rules = {tuple(eval(k)): v for k, v in save_dict['merge_rules'].items()}
        tokenizer.token_counts = Counter(save_dict['token_counts'])
        return tokenizer

    def _update_token_counts(self, texts: List[str]):
        """Update token count statistics"""
        self.token_counts.clear()
        for text in texts:
            tokens = self.tokenize(text)
            self.token_counts.update(tokens)

    def get_token_stats(self) -> Dict:
        """Get token statistics"""
        total_tokens = sum(self.token_counts.values())
        unique_tokens = len(self.token_counts)
        most_common = self.token_counts.most_common(20)
        
        return {
            'total_tokens': total_tokens,
            'unique_tokens': unique_tokens,
            'vocab_size': len(self.vocab),
            'most_common_tokens': [
                {'token': token, 'count': count, 'percentage': count/total_tokens*100}
                for token, count in most_common
            ]
        }


def load_text_file(file_path: str, encoding: str = 'utf-8') -> List[str]:
    """Load text from a file and return a list of lines."""
    try:
        with open(file_path, 'r', encoding=encoding) as file:
            lines = [line.strip() for line in file.readlines()]
            lines = [line for line in lines if line]
            return lines
    except UnicodeDecodeError:
        print(f"Error: Could not decode file with {encoding} encoding.")
        print("Try using a different encoding (e.g., 'utf-8-sig' for files with BOM)")
        return []
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        return []
    except Exception as e:
        print(f"Error loading file: {str(e)}")
        return []

def calculate_compression_ratio(original_texts: List[str], tokenized_texts: List[List[str]]) -> float:
    """Calculate compression ratio (original characters / tokens)"""
    total_chars = sum(len(text) for text in original_texts)
    total_tokens = sum(len(tokens) for tokens in tokenized_texts)
    return total_chars / total_tokens

In [None]:
# # Load text from file
# file_path = r"C:\Users\vasal\Study\TSAI\TSAI-11\hin_news_2010_30K\hin_news_2010_30K-sentences.txt"  # Replace with your file path
# texts = load_text_file(file_path)

# # Initialize and train BPE
# bpe = HindiBPE(vocab_size=4500)
# bpe.train(texts)

# # Test tokenization and compression ratio
# tokenized_texts = [bpe.tokenize(text) for text in texts]
# compression_ratio = calculate_compression_ratio(texts, tokenized_texts)

# # Print results
# print(f"Loaded {len(texts)} lines of text")
# print(f"Vocabulary size: {len(bpe.vocab)}")
# print(f"Compression ratio: {compression_ratio:.2f}")

In [4]:
file_path = r"C:\Users\vasal\Study\TSAI\TSAI-11\hin_news_2010_30K\hin_news_2010_30K-sentences.txt"
texts = load_text_file(file_path)

# Train tokenizer
bpe = HindiBPE(vocab_size=4500)
bpe.train(texts)  # Use 4 workers for parallel processing

# Get and print token statistics
stats = bpe.get_token_stats()
print("\nToken Statistics:")
print(f"Total tokens processed: {stats['total_tokens']:,}")
print(f"Unique tokens: {stats['unique_tokens']:,}")
print(f"Vocabulary size: {stats['vocab_size']:,}")

print("\nMost common tokens:")
for token_info in stats['most_common_tokens'][:10]:  # Show top 10
    print(f"{token_info['token']}: {token_info['count']:,} ({token_info['percentage']:.2f}%)")

# Save the tokenizer
bpe.save("hindi_bpe_model.pkl")
print("\nTokenizer saved to hindi_bpe_model.pkl")
print("Token statistics saved to hindi_bpe_model.stats.json")

# Example of loading the saved tokenizer
loaded_bpe = HindiBPE.load("hindi_bpe_model.pkl")
print("\nSuccessfully loaded the saved tokenizer")

Counting words: 100%|██████████| 30000/30000 [00:00<00:00, 172174.08it/s]


Found 74835 unique words


Training BPE: 100%|██████████| 4385/4385 [15:28<00:00,  4.72it/s]



Token Statistics:
Total tokens processed: 1,278,670
Unique tokens: 2,743
Vocabulary size: 4,500

Most common tokens:
ं: 32,111 (2.51%)
▁।: 29,497 (2.31%)
▁के: 26,057 (2.04%)
र: 23,741 (1.86%)
ा: 22,920 (1.79%)
▁मे: 20,373 (1.59%)
े: 20,052 (1.57%)
▁की: 16,136 (1.26%)
ी: 15,424 (1.21%)
▁है: 14,084 (1.10%)

Tokenizer saved to hindi_bpe_model.pkl
Token statistics saved to hindi_bpe_model.stats.json

Successfully loaded the saved tokenizer


In [5]:
tokenized_texts = [bpe.tokenize(text) for text in texts]
compression_ratio = calculate_compression_ratio(texts, tokenized_texts)

In [6]:
compression_ratio

2.514001266941431