In [2]:
sampel_kannada_txt = """ಡಿಕೋಡರ್ ಒಂದು ಎಂಬೆಡಿಂಗ್ ಲೇಯರ್ ಅನ್ನು ಒಳಗೊಂಡಿರುತ್ತದೆ, ನಂತರ ಬಹು ಡಿಕೋಡರ್ ಲೇಯರ್‌ಗಳು, ನಂತರ ಅನ್-ಎಂಬೆಡಿಂಗ್ ಲೇಯರ್.
ಪ್ರತಿ ಡಿಕೋಡರ್ ಮೂರು ಪ್ರಮುಖ ಘಟಕಗಳನ್ನು ಒಳಗೊಂಡಿದೆ: ಒಂದು ಸಾಂದರ್ಭಿಕವಾಗಿ ಮುಖವಾಡದ ಸ್ವಯಂ-ಗಮನ ಯಾಂತ್ರಿಕತೆ, ಅಡ್ಡ-ಗಮನ ಕಾರ್ಯವಿಧಾನ ಮತ್ತು ಫೀಡ್-ಫಾರ್ವರ್ಡ್ ನರಮಂಡಲ. ಡಿಕೋಡರ್ ಎನ್‌ಕೋಡರ್‌ನಂತೆಯೇ ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತದೆ, ಆದರೆ ಹೆಚ್ಚುವರಿ ಗಮನ ಕಾರ್ಯವಿಧಾನವನ್ನು ಸೇರಿಸಲಾಗುತ್ತದೆ, ಅದು ಎನ್‌ಕೋಡರ್‌ಗಳಿಂದ ಉತ್ಪತ್ತಿಯಾಗುವ ಎನ್‌ಕೋಡಿಂಗ್‌ಗಳಿಂದ ಸಂಬಂಧಿತ ಮಾಹಿತಿಯನ್ನು ಸೆಳೆಯುತ್ತದೆ. ಈ ಕಾರ್ಯವಿಧಾನವನ್ನು ಎನ್‌ಕೋಡರ್-ಡಿಕೋಡರ್ ಗಮನ ಎಂದೂ ಕರೆಯಬಹುದು
"""

In [28]:
from tqdm import tqdm
from collections import Counter

class BytePairEncoding:
  def __init__(self, txt: str):

    #getting all characters in list in sorted order
    self.chars = sorted(list(set(txt)))

    #dictionary for strings to integers
    self.stoi = {ch: i for i, ch in enumerate(self.chars)}

    #dictionary for integers to strings
    self.itos = {i: ch for i, ch in enumerate(self.chars)}

    # inital form of text to integers in terms of list
    self.data = [self.stoi[c] for c in txt]

    # Statistics tracking
    self.stats = {
        "vocab_sizes": [len(self.chars)],
        "data_sizes": [len(self.data)],
        "compression_ratios": [1.0],
        "merge_counts": [],
        "tokens_created": [],
        "max_token_lengths": [1],
    }

    #length of characters of input txt
    self.max_token_length = 0
    self.orginal_txt_length = len(self.data)

  def get_stats(self):
    "counting pair of characters"
    counts = Counter()
    for pair in zip(self.data, self.data[1:]):
      pair = (int(pair[0]), int(pair[1]))
      counts[pair] += 1
    return counts

  def merge_pair_encoding(self, pair, idx):
    "replace tokens into newly defined token"
    newids = []
    i = 0
    while i < len(self.data):
      if i < len(self.data) - 1 and self.data[i] == pair[0] and self.data[i+1] == pair[1]:
        newids.append(idx)
        i += 2
      else:
        newids.append(self.data[i])
        i += 1
    return newids

  def add_new_encode_pair(self, pair: tuple[int, int]) -> int:
      """Add a new token to vocabulary dictionary"""
      pair_str = self.itos[pair[0]] + self.itos[pair[1]]
      next_idx = len(self.itos)
      self.stoi[pair_str] = next_idx
      self.itos[next_idx] = pair_str

      # Update max token length
      self.max_token_length = max(self.max_token_length, len(pair_str))
      return next_idx

  def update_stats(self, merge_count: int, new_token: str):
    """Record statistics after each merge operation"""
    self.stats["vocab_sizes"].append(len(self.itos))
    self.stats["data_sizes"].append(len(self.data))
    self.stats["compression_ratios"].append(self.orginal_txt_length / len(self.data))
    self.stats["merge_counts"].append(merge_count)
    self.stats["tokens_created"].append(new_token)


  def merge_byte_pair(self) -> tuple[int, str, int]:
      """
      Merge the top byte pair

      Returns:
          tuple: (new_token_id, new_token_str, merge_count) or None if no more pairs to merge
      """
      # Get pair frequencies
      stats = self.get_stats()
      if not stats:  # No more pairs to merge
          return None

      # Find most frequent pair
      (top_pair, count) = max(stats.items(), key=lambda x: x[1])

      # Add new token to vocabulary
      new_idx = self.add_new_encode_pair(top_pair)

      # Replace pairs in data
      self.data = self.merge_pair_encoding(top_pair, new_idx)

      # Update statistics
      self.update_stats(count, self.itos[new_idx])

      return new_idx, self.itos[new_idx], count

  def print_progress(self, iteration: int, new_token: str, merge_count: int):
    """
    Print training progress in text format

    Args:
        iteration: Current iteration number
        new_token: Newly created token
        merge_count: Number of merges for this token
    """
    print(f"\nIteration {iteration:,}")
    print(f"Current vocabulary size: {len(self.itos):,}")
    print(f"Current data size: {len(self.data):,}")
    print(f"Current compression ratio: {self.stats['compression_ratios'][-1]:.2f}")
    print("-" * 80)

  def encode_to_vocab_size(
        self,
        target_vocab_size: int,
        print_interval: int = 100,
    ) -> None:
        """
        Perform BPE encoding until reaching target vocabulary size

        Args:
            target_vocab_size: Maximum vocabulary size to reach
            print_interval: How often to print progress (None for no printing)
        """
        pbar = tqdm(
            total=target_vocab_size,
            desc=f"Encoding byte pairs",
            initial=len(self.chars),
            position=0,
            leave=True,
        )

        iteration = 0
        while len(self.itos) < target_vocab_size:
            # Train one iteration
            result = self.merge_byte_pair()
            if result is None:  # No more pairs to merge
                break

            new_idx, new_token, merge_count = result
            iteration += 1

            # Update progress bar
            pbar.update(1)

            # Print progress at intervals if requested
            if print_interval and iteration % print_interval == 0:
                self.print_progress(iteration, new_token, merge_count)

        pbar.close()

        # Final statistics
        print(f"\nTraining completed after {iteration:,} iterations")
        print(f"Final vocabulary size: {len(self.itos):,}")

  def encode(self, text: str) -> list[int]:
    """Convert text to token indices"""
    return [self.stoi[c] for c in text]

  def decode(self, token_ids: list[int]) -> str:
    """Convert token indices back to text"""
    return "".join(self.itos[idx] for idx in token_ids)

In [29]:
sample_enc = BytePairEncoding(sampel_kannada_txt)
sample_enc.encode_to_vocab_size(target_vocab_size = 300, print_interval=10)

Encoding byte pairs: 100%|██████████| 300/300 [00:00<00:00, 2176.54it/s]


Iteration 10
Current vocabulary size: 57
Current data size: 392
Current compression ratio: 1.24
--------------------------------------------------------------------------------

Iteration 20
Current vocabulary size: 67
Current data size: 344
Current compression ratio: 1.42
--------------------------------------------------------------------------------

Iteration 30
Current vocabulary size: 77
Current data size: 307
Current compression ratio: 1.59
--------------------------------------------------------------------------------

Iteration 40
Current vocabulary size: 87
Current data size: 277
Current compression ratio: 1.76
--------------------------------------------------------------------------------

Iteration 50
Current vocabulary size: 97
Current data size: 250
Current compression ratio: 1.95
--------------------------------------------------------------------------------

Iteration 60
Current vocabulary size: 107
Current data size: 230
Current compression ratio: 2.12
------------




In [20]:
len(list(set(sampel_kannada_txt)))

47

In [None]:
chars = sorted(list(set(sampel_kannada_txt)))
chars

In [None]:
stoi = {ch: i for i, ch in enumerate(chars)}
stoi

In [None]:
data = [stoi[c] for c in sampel_kannada_txt]
data

In [4]:
for ele in sampel_kannada_txt[:10]:
  print(f"Kannada letter is {ele} and its ordinal is {ord(ele)}")

Kannada letter is ಡ and its ordinal is 3233
Kannada letter is ಿ and its ordinal is 3263
Kannada letter is ಕ and its ordinal is 3221
Kannada letter is ೋ and its ordinal is 3275
Kannada letter is ಡ and its ordinal is 3233
Kannada letter is ರ and its ordinal is 3248
Kannada letter is ್ and its ordinal is 3277
Kannada letter is   and its ordinal is 32
Kannada letter is ಒ and its ordinal is 3218
Kannada letter is ಂ and its ordinal is 3202


In [7]:
tokens = [ele for ele in sampel_kannada_txt]

In [16]:
def get_stats(ids):
    counts = {}
    for pair in zip(ids, ids[1:]):
        counts[pair] = counts.get(pair, 0) + 1
    return counts

def merge(ids, pair, idx):
  newids = []
  i = 0
  while i < len(ids):
    if i < len(ids) - 1 and ids[i] == pair[0] and ids[i+1] == pair[1]:
      newids.append(idx)
      i += 2
    else:
      newids.append(ids[i])
      i += 1
  return newids

# ---
vocab_size = 500 # the desired final vocabulary size
num_merges = vocab_size - 256
ids = list(tokens) # copy so we don't destroy the original list

merges = {} # (int, int) -> int
for i in range(num_merges):
  stats = get_stats(ids)
  pair = max(stats, key=stats.get)
  idx = 256 + i
  print(f"merging {pair} into a new token {idx}")
  ids = merge(ids, pair, idx)
  merges[pair] = idx

merging ('ರ', '್') into a new token 256
merging ('ನ', '್') into a new token 257
merging ('ು', ' ') into a new token 258
merging ('ಡ', 'ಿ') into a new token 259
merging ('ಕ', 'ೋ') into a new token 260
merging ('ತ', '್') into a new token 261
merging (' ', 'ಮ') into a new token 262
merging (260, 'ಡ') into a new token 263
merging (263, 256) into a new token 264
merging (' ', 'ಅ') into a new token 265
merging (257, 'ನ') into a new token 266
merging ('ಾ', 'ಗ') into a new token 267
merging (266, 258) into a new token 268
merging ('ಟ', '್') into a new token 269
merging (261, 'ತ') into a new token 270
merging ('ದ', 'ೆ') into a new token 271
merging ('ಂ', 'ದ') into a new token 272
merging (',', ' ') into a new token 273
merging ('ರ', 'ಿ') into a new token 274
merging (257, '\u200c') into a new token 275
merging (259, 264) into a new token 276
merging ('ು', 270) into a new token 277
merging (277, 271) into a new token 278
merging ('ಗ', 'ಳ') into a new token 279
merging ('್', 'ರ') into a new token

In [17]:
print("tokens length:", len(tokens))
print("ids length:", len(ids))
print(f"compression ratio: {len(tokens) / len(ids):.2f}X")

tokens length: 1252
ids length: 363
compression ratio: 3.45X


## Custome Byte pair encoding

In [2]:
def get_vocab(text):
    """Generate vocabulary from input text."""
    vocab = Counter()
    for line in text:
        words = line.strip().split()
        for word in words:
            # Add space between characters and append a special end-of-word token
            word = " ".join(list(word)) + " <endoftext>"
            vocab[word] += 1
    return vocab

txt = "hello I am testing this function"
get_vocab()

Counter({'h <endoftext>': 2,
         'e <endoftext>': 2,
         'l <endoftext>': 2,
         'o <endoftext>': 2,
         'I <endoftext>': 1,
         'a <endoftext>': 1,
         'm <endoftext>': 1,
         't <endoftext>': 4,
         's <endoftext>': 2,
         'i <endoftext>': 3,
         'n <endoftext>': 3,
         'g <endoftext>': 1,
         'f <endoftext>': 1,
         'u <endoftext>': 1,
         'c <endoftext>': 1})

In [1]:
from collections import Counter, defaultdict
import re

def get_vocab(text):
    """Generate vocabulary from input text."""
    vocab = Counter()
    for line in text:
        words = line.strip().split()
        for word in words:
            # Add space between characters and append a special end-of-word token
            word = " ".join(list(word)) + " <endoftext>"
            vocab[word] += 1
    return vocab

def get_stats(vocab):
    """Calculate pair frequencies in the vocabulary."""
    pairs = defaultdict(int)
    for word, freq in vocab.items():
        symbols = word.split()
        for i in range(len(symbols) - 1):
            pairs[(symbols[i], symbols[i + 1])] += freq
    return pairs

def merge_vocab(pair, vocab):
    """Merge the most frequent pair in the vocabulary."""
    new_vocab = {}
    bigram = re.escape(" ".join(pair))  # Escape special characters
    pattern = re.compile(r"(?<!\S)" + bigram + r"(?!\S)")  # Match whole bigram

    for word in vocab:
        # Replace the bigram with the merged token
        new_word = pattern.sub("".join(pair), word)
        new_vocab[new_word] = vocab[word]
    return new_vocab

def train_bpe(text, num_merges):
    """Train a Byte Pair Encoding model."""
    vocab = get_vocab(text)
    for i in range(num_merges):
        pairs = get_stats(vocab)
        if not pairs:
            break
        best_pair = max(pairs, key=pairs.get)  # Find the most frequent pair
        vocab = merge_vocab(best_pair, vocab)
        print(f"Step {i + 1}: Merged pair {best_pair}")
    return vocab

# Example usage
if __name__ == "__main__":
    training_data = [
        "low low lower lowest",
        "newer newer newer new",
        "widest wider wide"
    ]
    num_merges = 10
    final_vocab = train_bpe(training_data, num_merges)

    print("\nFinal Vocabulary:")
    for word, freq in final_vocab.items():
        print(f"{word}: {freq}")


Step 1: Merged pair ('w', 'e')
Step 2: Merged pair ('r', '</w>')
Step 3: Merged pair ('l', 'o')
Step 4: Merged pair ('we', 'r</w>')
Step 5: Merged pair ('n', 'e')
Step 6: Merged pair ('w', '</w>')
Step 7: Merged pair ('ne', 'wer</w>')
Step 8: Merged pair ('w', 'i')
Step 9: Merged pair ('wi', 'd')
Step 10: Merged pair ('wid', 'e')

Final Vocabulary:
lo w</w>: 2
lo wer</w>: 1
lo we s t </w>: 1
newer</w>: 3
ne w</w>: 1
wide s t </w>: 1
wide r</w>: 1
wide </w>: 1
