In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# /content/drive/MyDrive/NLP/train.csv
# /content/drive/MyDrive/NLP/valid.csv

In [None]:
import pandas as pd
import unicodedata
import regex
import json
import os

In [None]:
class TwoStageUrduTokenizer:
    def __init__(self):
        self.vocab = {}
        self.merges = {}

        self.regex_pattern = r""" ?\p{Arabic}+| ?\p{N}+| ?[^\s\p{Arabic}\p{N}]+|\s+""" #regex

        self.compiled_regex = regex.compile(self.regex_pattern)

    def _get_stats(self, ids):
        counts = {}
        for pair in zip(ids, ids[1:]):
            counts[pair] = counts.get(pair, 0) + 1
        return counts

    def _merge_vocab(self, pair, idx, ids):
        new_ids = []
        i = 0
        while i < len(ids):
            if i < len(ids) - 1 and ids[i] == pair[0] and ids[i+1] == pair[1]:
                new_ids.append(idx); i += 2
            else:
                new_ids.append(ids[i]); i += 1
        return new_ids

    def train(self, text, vocab_size=5000, transition_pct=0.9):
        print(f"--- Training (Vocab: {vocab_size} | Split: {transition_pct}) ---")
        self.vocab = {idx: bytes([idx]) for idx in range(256)}
        self.merges = {}

        # Subword
        text_chunks = regex.findall(self.compiled_regex, text)
        ids_list = [list(chunk.encode("utf-8")) for chunk in text_chunks]

        num_merges = vocab_size - 256
        stage_1_limit = int(num_merges * transition_pct)
        stage_2_limit = num_merges - stage_1_limit

        print(f"Stage 1: Learning {stage_1_limit} subword merges...")
        for i in range(stage_1_limit):
            stats = {}
            for chunk_ids in ids_list:
                chunk_stats = self._get_stats(chunk_ids)
                for pair, freq in chunk_stats.items():
                    stats[pair] = stats.get(pair, 0) + freq
            if not stats: break
            pair = max(stats, key=stats.get)
            idx = 256 + len(self.merges)
            ids_list = [self._merge_vocab(pair, idx, chunk) for chunk in ids_list]
            self.merges[pair] = idx
            self.vocab[idx] = self.vocab[pair[0]] + self.vocab[pair[1]]

        # Superword
        print(f"Stage 2: Learning {stage_2_limit} superword merges...")
        flat_ids = [token for chunk in ids_list for token in chunk]
        for i in range(stage_2_limit):
            stats = self._get_stats(flat_ids)
            if not stats: break
            pair = max(stats, key=stats.get)
            idx = 256 + len(self.merges)
            flat_ids = self._merge_vocab(pair, idx, flat_ids)
            self.merges[pair] = idx
            self.vocab[idx] = self.vocab[pair[0]] + self.vocab[pair[1]]

        print("Training Complete.")

    def encode(self, text):
        ids = list(text.encode("utf-8"))
        while True:
            stats = self._get_stats(ids)
            if not stats: break
            valid_pairs = {p: self.merges[p] for p in stats if p in self.merges}
            if not valid_pairs: break
            best_pair = min(valid_pairs, key=valid_pairs.get)
            idx = self.merges[best_pair]
            ids = self._merge_vocab(best_pair, idx, ids)
        return ids


In [None]:
# DATA CLEANING
def get_clean_corpus(csv_path):
    print(f"Loading and normalizing {csv_path}...")
    try:
        df = pd.read_csv(csv_path)
        col = 'headline' if 'headline' in df.columns else df.columns[0]
        raw_data = df[col].astype(str).tolist()

        urdu_pattern = regex.compile(r'[\u0600-\u06ff]')
        cleaned = []
        for line in raw_data:
            # nfkc normalization
            norm = unicodedata.normalize('NFKC', line).strip()
            if urdu_pattern.search(norm):
                cleaned.append(norm)
        return "\n".join(cleaned)
    except Exception as e:
        print(f"Error: {e}")
        return ""

In [None]:
# execution
train_text = get_clean_corpus('/content/drive/MyDrive/NLP/train.csv')
valid_text = get_clean_corpus('/content/drive/MyDrive/NLP/valid.csv')

# if not train_text:
#     train_text = "سچن تیندولکر کا وراٹ کوہلی کو مشورہ، یہ غلطی کبھی مت کرنا۔ " * 500
#     valid_text = "سچن تیندولکر کا وراٹ کوہلی"

print("\n>>> TRAINING BASELINE MODEL <<<")
baseline_tok = TwoStageUrduTokenizer()
baseline_tok.train(train_text, vocab_size=2000, transition_pct=1.0)

print("\n>>> TRAINING IST MODEL <<<")
ist_tok = TwoStageUrduTokenizer()
ist_tok.train(train_text, vocab_size=2000, transition_pct=0.8)

# fertility score
def get_fertility(model, text):
    words = text.split()
    if not words: return 0
    tokens = model.encode(text)
    return len(tokens) / len(words)

print("\n================ RESULTS ================")
base_score = get_fertility(baseline_tok, valid_text)
ist_score = get_fertility(ist_tok, valid_text)

print(f"Baseline Fertility: {base_score:.4f}")
print(f"IST Fertility:      {ist_score:.4f}")

if base_score > 0:
    imp = ((base_score - ist_score) / base_score) * 100
    print(f"Improvement:        {imp:.2f}%")
    print("Interpretation: IST represents the text using fewer tokens.")
print("=========================================")

Loading and normalizing /content/drive/MyDrive/NLP/train.csv...
Loading and normalizing /content/drive/MyDrive/NLP/valid.csv...

>>> TRAINING BASELINE MODEL <<<
--- Training (Vocab: 2000 | Split: 1.0) ---
Stage 1: Learning 1744 subword merges...
Stage 2: Learning 0 superword merges...
Training Complete.

>>> TRAINING IST MODEL <<<
--- Training (Vocab: 2000 | Split: 0.8) ---
Stage 1: Learning 1395 subword merges...
Stage 2: Learning 349 superword merges...
Training Complete.

Baseline Fertility: 1.5082
IST Fertility:      1.4014
Improvement:        7.08%
Interpretation: IST represents the text using fewer tokens.
