In [1]:
from pathlib import Path
import csv, re, math, unicodedata
from collections import Counter, defaultdict
from nltk.tokenize import RegexpTokenizer
import numpy as np

DATA_DIR = Path("csv-datasets")
LOWERCASE = True
LAMBDAS = (0.6, 0.3, 0.1)
MIN_FREQ = 3

In [2]:
# combines base characters and combining marks into single characters
def nfc(s): 
    return unicodedata.normalize("NFC", s)

# replaces multiple whitespaces char with a single space and removes leading and trailing whitespaces
def collapse_ws(s): 
    return re.sub(r"\s+", " ", s).strip()

SPLIT_EN = re.compile(r"(?<=[.!?])\s+")  # after . ! ?
SPLIT_HI = re.compile(r"(?:।|\?|!)\s+")  # Hindi danda ? !

# split into sentences (EN: . ? !, HI: । ? !)
def sent_split(text, lang):
    rx = SPLIT_EN if lang == "en" else SPLIT_HI
    return [s for s in rx.split(text) if s.strip()]

tok_en = RegexpTokenizer(r"[A-Za-z0-9]+'[A-Za-z0-9]+|[\w]+")
tok_hi = RegexpTokenizer(r"[\w]+") 

# tokenize each sentence
def word_tokens(text, lang):
    return (tok_en.tokenize(text) if lang == "en" else tok_hi.tokenize(text))

def normalize_text(text, lowercase):
    t = collapse_ws(nfc(text))
    return t.lower() if lowercase else t

In [3]:
def read_texts(csv_path) :
    texts = []
    with csv_path.open("r", encoding="utf-8", newline="") as f:
        rdr = csv.DictReader(f)
        for r in rdr:
            texts.append(r["Text"])
    return texts

def make_lm_sentences(texts, lang, lowercase):
    out = []
    for txt in texts:
        t = normalize_text(txt, lowercase)
        for s in sent_split(t, lang):
            toks = word_tokens(s, lang)
            if toks:
                out.append(["<s>", *toks, "</s>"])
    return out

In [4]:
def build_vocab(sentences, min_freq):
    cnt = Counter()
    for s in sentences:
        cnt.update(s)
    tok2Id = {"<unk>": 0, "<s>": 1, "</s>": 2}
    for tok, c in cnt.most_common():
        if tok in tok2Id:
            continue
        if c >= min_freq:
            tok2Id[tok] = len(tok2Id)
    return tok2Id

def map_to_ids(sentences, tok2Id):
    uid = tok2Id["<unk>"]
    return [[tok2Id.get(t, uid) for t in s] for s in sentences]

def count_ngrams(sent_ids):
    C1, C2, C3 = Counter(), Counter(), Counter()
    T = 0
    for seq in sent_ids:
        T += len(seq)
        for w in seq: # unigrams
            C1[w] += 1
        for i in range(1, len(seq)): # bigrams
            C2[(seq[i-1], seq[i])] += 1
        for i in range(2, len(seq)): # trigrams
            C3[(seq[i-2], seq[i-1], seq[i])] += 1
    return C1, C2, C3, T

In [5]:
def pml_uni(C1, T, w):
    return C1.get(w, 0) / T if T > 0 else 0.0

def pml_bi(C1, C2, v, w):
    den = C1.get(v, 0)
    return 0.0 if den == 0 else C2.get((v, w), 0) / den

def pml_tri(C2, C3, u, v, w):
    den = C2.get((u, v), 0)
    return 0.0 if den == 0 else C3.get((u, v, w), 0) / den

def logp(C1, C2, C3, T, w, u, v, lambdas):
    l3, l2, l1 = lambdas
    p3 = pml_tri(C2, C3, u, v, w) if (u is not None and v is not None) else 0.0
    p2 = pml_bi(C1, C2, v, w) if (v is not None) else 0.0
    p1 = pml_uni(C1, T, w)
    p = l3*p3 + l2*p2 + l1*p1
    return math.log(p if p > 0.0 else 1e-12)

def perplexity(C1, C2, C3, T, test_ids, lambdas):
    total_logp = 0.0
    total_tok = 0
    for seq in test_ids:
        for i, w in enumerate(seq):
            u = seq[i-2] if i >= 2 else None
            v = seq[i-1] if i >= 1 else None
            total_logp += logp(C1, C2, C3, T, w, u, v, lambdas)
            total_tok  += 1
    ppl = math.exp(- total_logp / max(1, total_tok))
    return {"tokens": total_tok, "ppl": ppl}

In [6]:
def csv_path(lang, size):
    sub = "english" if lang == "en" else "hindi"
    return DATA_DIR / sub / f"{sub}_{size}.csv"

def run_size(lang, size, lowercase = LOWERCASE, min_freq = MIN_FREQ, lambdas = LAMBDAS):
    assert abs(sum(lambdas) - 1.0) < 1e-9, "λ must sum to 1"
    train_csv = csv_path(lang, size)
    test_csv  = csv_path(lang, "test")

    # 1) Read CSVs
    train_texts = read_texts(train_csv)
    test_texts  = read_texts(test_csv)

    # 2) Build LM sentences
    train_sents = make_lm_sentences(train_texts, lang, lowercase)
    test_sents  = make_lm_sentences(test_texts,  lang, lowercase)

    # 3) Vocab from train
    tok2id = build_vocab(train_sents, min_freq=min_freq)
    for sp in ["<unk>", "<s>", "</s>"]:
        if sp not in tok2id: tok2id[sp] = len(tok2id)

    # 4) Map to ids
    train_ids = map_to_ids(train_sents, tok2id)
    test_ids  = map_to_ids(test_sents,  tok2id)

    # 5) Count n-grams
    C1, C2, C3, T = count_ngrams(train_ids)

    # 6) Perplexity on test with simple interpolation
    ppl_info = perplexity(C1, C2, C3, T, test_ids, lambdas)

    print(f"[{"English" if lang == 'en' else "Hindi"}-{size}] "
          f"V={len(tok2id)} "
          f"TrainTok={T} "
          f"TestTok={ppl_info['tokens']} "
          f"Perplexity={ppl_info['ppl']:.3f}"
        )

    return {
        "lang": lang, "size": size,
        "vocab_size": len(tok2id),
        "train_tokens": T,
        "test_tokens": ppl_info["tokens"],
        "ppl": ppl_info["ppl"],
        "lambdas": lambdas,
        "min_freq": min_freq,
        "lowercase": lowercase
    }

In [7]:
def run_language_all_sizes(lang: str):
    results = []
    for size in ["2500", "15000", "30000"]:
        results.append(run_size(lang, size))
    return results

print("Perplexity Evaluation:\n")
en_results = run_language_all_sizes("en")
print()
hi_results = run_language_all_sizes("hi")

Perplexity Evaluation:

[English-2500] V=13920 TrainTok=730301 TestTok=3315101 Perplexity=460.558
[English-15000] V=46472 TrainTok=6681508 TestTok=3315101 Perplexity=26.823
[English-30000] V=68715 TrainTok=12469632 TestTok=3315101 Perplexity=31.740

[Hindi-2500] V=1533 TrainTok=298135 TestTok=3568123 Perplexity=26.138
[Hindi-15000] V=2840 TrainTok=1792596 TestTok=3568123 Perplexity=23.818
[Hindi-30000] V=3216 TrainTok=3584631 TestTok=3568123 Perplexity=20.155


In [8]:
# Naive Bayes classification

def read_rows_single_label(csv_file: Path):
    rows = []
    with Path(csv_file).open("r", encoding="utf-8", newline="") as f:
        rdr = csv.DictReader(f)
        assert rdr.fieldnames, f"No headers in {csv_file}"
        k_cat = rdr.fieldnames[0]
        k_txt = rdr.fieldnames[1]

        for r in rdr:
            lab = (r.get(k_cat) or "").strip().lower()
            txt = r.get(k_txt) or ""
            if lab:
                rows.append((lab, txt))
    return rows

def doc_tokens(text: str, lang: str, lowercase: bool=True):
    t = normalize_text(text, lowercase)
    return word_tokens(t, lang)

# Unigram vocab (by document frequency) & featurizer
def build_unigram_vocab(train_docs, min_df=1):
    df = Counter()
    for toks in train_docs:
        for t in set(toks):  
            df[t] += 1
    feat2id = {}
    for t, c in sorted(df.items(), key=lambda x: (-x[1], x[0])):  # stable ids
        if c >= min_df:
            feat2id[t] = len(feat2id)
    return feat2id

def featurize_unigrams(docs, feat2id):
    X = []
    for toks in docs:
        dd = defaultdict(int)
        for t in toks:
            i = feat2id.get(t)
            if i is not None: 
                dd[i] += 1
        X.append(dd)
    return X

In [9]:
def fit_nb(X_train, y_train, C, F):
    N = len(X_train)

    class_docs = np.bincount(np.array(y_train, dtype=np.int64), minlength=C)
    logpi = np.log((class_docs) / (N))

    counts = np.zeros((C, F), dtype=np.int64)
    totals = np.zeros(C, dtype=np.int64)
    for x, c in zip(X_train, y_train):
        s = sum(x.values())
        totals[c] += s
        for f, v in x.items():
            counts[c, f] += v
    deno = totals[:, None] + F
    logtheta = np.log((counts + 1) / np.maximum(deno, 1))
    return logpi, logtheta

def predict_nb(X, logpi, logtheta):
    preds = []
    for x in X:
        s = logpi.copy()
        if x:
            idx = np.fromiter(x.keys(), dtype=np.int64, count=len(x))
            val = np.fromiter(x.values(), dtype=np.float64, count=len(x))
            s += (logtheta[:, idx] @ val)
        preds.append(int(np.argmax(s)))
    return preds

def metrics(y_true, y_pred, C):
    N = len(y_true)
    acc = sum(int(t==p) for t,p in zip(y_true,y_pred)) / max(1, N)

    tp = [0]*C; fp = [0]*C; fn = [0]*C
    for t,p in zip(y_true,y_pred):
        if t == p: tp[t] += 1
        else:      fp[p] += 1; fn[t] += 1

    precs, recs, f1s = [], [], []
    for c in range(C):
        P = tp[c] / (tp[c] + fp[c]) if (tp[c] + fp[c]) else 0.0
        R = tp[c] / (tp[c] + fn[c]) if (tp[c] + fn[c]) else 0.0
        F = 0.0 if (P+R)==0 else 2*P*R/(P+R)
        precs.append(P); recs.append(R); f1s.append(F)

    return {
        "accuracy": acc,
        "precision": float(np.mean(precs)),
        "recall":    float(np.mean(recs)),
        "f1":        float(np.mean(f1s)),
    }

In [None]:
def run_nb(lang, size, lowercase=True, min_df=1):
    # 1) Load rows → (label, text)
    train_pairs = read_rows_single_label(csv_path(lang, size))
    test_pairs  = read_rows_single_label(csv_path(lang, "test"))

    # 2) Label space from TRAIN only
    labels = sorted({lab for lab,_ in train_pairs})
    lab2idx = {l:i for i,l in enumerate(labels)}
    y_train = [lab2idx[lab] for lab,_ in train_pairs]

    # keep only TEST rows whose label is known from train
    test_pairs = [(lab, txt) for lab, txt in test_pairs if lab in lab2idx]
    y_test = [lab2idx[lab] for lab,_ in test_pairs]

    # 3) Tokenize docs (doc-level, unigrams only)
    train_docs = [doc_tokens(txt, lang, lowercase) for _, txt in train_pairs]
    test_docs  = [doc_tokens(txt,  lang, lowercase) for _, txt in test_pairs]

    # 4) Unigram vocab (train-only) and featurize
    feat2id = build_unigram_vocab(train_docs, min_df=min_df)
    X_train = featurize_unigrams(train_docs, feat2id)
    X_test  = featurize_unigrams(test_docs,  feat2id)

    # 5) Train NB 
    C, F = len(labels), len(feat2id)
    logpi, logtheta = fit_nb(X_train, y_train, C=C, F=F)
    y_pred = predict_nb(X_test, logpi, logtheta)

    mets = metrics(y_test, y_pred, C=C)
    print(f"[{"English" if lang == "en" else "Hindi"}-{size}] "
          f"Acc={mets['accuracy']:.3f}  Prec={mets['precision']:.3f}  "
          f"Rec={mets['recall']:.3f}  F1={mets['f1']:.3f}")
    return mets

print("Classification using Naive Bayes:\n")
for sz in ["2500", "15000", "30000"]:
    run_nb("en", sz, lowercase=True, min_df=3)

print()

for sz in ["2500", "15000", "30000"]:
    run_nb("hi", sz, lowercase=True, min_df=3)


Classification using Naive Bayes:

[English-2500] Acc=0.314  Prec=0.010  Rec=0.012  F1=0.009
[English-15000] Acc=0.710  Prec=0.054  Rec=0.036  F1=0.037
[English-30000] Acc=0.679  Prec=0.010  Rec=0.007  F1=0.007

[Hindi-2500] Acc=0.587  Prec=0.249  Rec=0.144  F1=0.144
[Hindi-15000] Acc=0.694  Prec=0.558  Rec=0.350  F1=0.392
[Hindi-30000] Acc=0.744  Prec=0.632  Rec=0.485  F1=0.515
