# Hybrid Spell Checker Pipeline

This notebook implements and evaluates a multi-stage spell correction pipeline as requested:

1.  **KNN**: For Candidate Generation (Recall).
2.  **Logistic Regression**: For Candidate Selection/Re-ranking (Precision).
3.  **Naive Bayes**: For Prior Probability scoring.

## 1. Setup & Data Loading

In [1]:
import sys
import os
import random
import time
import numpy as np
import pickle
from collections import Counter
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# Setup paths
sys.path.append(os.path.abspath('..'))
from src.knn_correction import KNNSpellChecker

DATA_PATH = '../data/urdu_words.txt'

# Load Word List (Language Model)
with open(DATA_PATH, 'r', encoding='utf-8') as f:
    words = f.read().split()
    
WORD_COUNTS = Counter(words)
TOTAL_WORDS = sum(WORD_COUNTS.values())
VOCAB = list(WORD_COUNTS.keys())

def get_prior(word):
    return WORD_COUNTS[word] / TOTAL_WORDS

print(f"Loaded {len(VOCAB)} unique words.")

Loaded 154781 unique words.


## 2. KNN Model (Candidate Generator)
We use our pre-implemented KNN model to find the top Candidates efficiently.

In [2]:
knn = KNNSpellChecker(literature_path=DATA_PATH, k=1)

def get_knn_candidates(typo, n=50):
    if not knn.fitted: return []
    try:
        vec = knn.vectorizer.transform([typo])
        dists, idxs = knn.knn.kneighbors(vec, n_neighbors=n)
        candidates = []
        for i in range(len(idxs[0])):
            idx = idxs[0][i]
            word = knn.words_list[idx]
            candidates.append((word, dists[0][i])) # (candidate, cosine_dist)
        return candidates
    except:
        return []

Loading KNN SpellChecker from data/urdu_words.txt.knn.pkl...
KNN Loaded. Vocabulary size: 154781


## 3. Training Logistic Regression (Candidate Selector)
We need to train a binary classifier to decide if a `(Typo, Candidate)` pair is a "Match" (1) or "Not Match" (0).

### Feature Engineering
Features for the classifier:
1. **Edit Distance**: Levenshtein distance.
2. **KNN Distance**: Cosine distance from vector space.
3. **Length Diff**: Abs diff in lengths.
4. **Start Match**: 1 if first char matches, else 0.
5. **End Match**: 1 if last char matches, else 0.

In [3]:
def levenshtein(s1, s2):
    if len(s1) < len(s2):
        return levenshtein(s2, s1)
    if len(s2) == 0:
        return len(s1)
    previous_row = range(len(s2) + 1)
    for i, c1 in enumerate(s1):
        current_row = [i + 1]
        for j, c2 in enumerate(s2):
            insertions = previous_row[j + 1] + 1
            deletions = current_row[j] + 1
            substitutions = previous_row[j] + (c1 != c2)
            current_row.append(min(insertions, deletions, substitutions))
        previous_row = current_row
    return previous_row[-1]

def extract_features(typo, candidate, knn_dist):
    features = []
    # 1. Edit Dist
    ed = levenshtein(typo, candidate)
    features.append(ed)
    # 2. KNN Dist
    features.append(knn_dist)
    # 3. Length Diff
    features.append(abs(len(typo) - len(candidate)))
    # 4. Start Match
    features.append(1 if typo and candidate and typo[0] == candidate[0] else 0)
    # 5. End Match
    features.append(1 if typo and candidate and typo[-1] == candidate[-1] else 0)
    return features

def generate_typo(word):
    # (Same noise function as before)
    if len(word) < 2: return word
    urdu_chars = 'ابپتٹثجچحخدڈذرڑزژسشصضطظعغفقکگلمنںوہیے'
    op = random.choice(['insert', 'delete', 'replace', 'transpose'])
    word = list(word)
    idx = random.randint(0, len(word) - 1)
    if op == 'insert': word.insert(idx, random.choice(urdu_chars))
    elif op == 'delete': word.pop(idx)
    elif op == 'replace': word[idx] = random.choice(urdu_chars)
    elif op == 'transpose' and idx < len(word)-1: word[idx], word[idx+1] = word[idx+1], word[idx]
    return "".join(word)

In [4]:
# Generate Training Data for Logistic Regression
print("Generating training data for LR model...")
random.seed(42)
TRAIN_SIZE = 1000
train_words = [w for w in VOCAB if len(w) > 3]
samples = random.sample(train_words, min(TRAIN_SIZE, len(train_words)))

X_data = []
y_data = []

for truth in samples:
    typo = generate_typo(truth)
    # Get KNN candidates
    candidates = get_knn_candidates(typo, n=10)
    
    has_truth = False
    for cand_word, cand_dist in candidates:
        label = 1 if cand_word == truth else 0
        feats = extract_features(typo, cand_word, cand_dist)
        X_data.append(feats)
        y_data.append(label)
        if label == 1: has_truth = True
    
    # If truth wasn't in top 10, maybe add it explicitly as a positive sample?
    # (Optional, but helps recall if KNN is poor. But we want to model the pipeline)

print(f"Training Data: {len(X_data)} samples. Positives: {sum(y_data)}")

Generating training data for LR model...


Training Data: 10000 samples. Positives: 880


In [5]:
# Train Logistic Regression
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_data)

clf = LogisticRegression(class_weight='balanced')
clf.fit(X_scaled, y_data)

print("Logistic Regression Trained.")
print("Coefficients:", clf.coef_)

Logistic Regression Trained.
Coefficients: [[-1.85467689 -1.35281612 -3.36916467  1.49038142  1.45819938]]


  raw_prediction = X @ weights + intercept
  raw_prediction = X @ weights + intercept
  raw_prediction = X @ weights + intercept


## 4. Full Pipeline Execution
Now we combine it ALL.
$$ Score(c) = P_{LR}(Correct | Features) \times P_{Prior}(c) $$

In [6]:
def hybrid_correction(typo):
    # 1. KNN Candidates
    candidates = get_knn_candidates(typo, n=50)
    if not candidates: return typo
    
    # 2. Extract Features for all candidates
    feats_batch = []
    cand_words = []
    for c_word, c_knn_dist in candidates:
        feats = extract_features(typo, c_word, c_knn_dist)
        feats_batch.append(feats)
        cand_words.append(c_word)
        
    # 3. Logistic Regression Scores (Probabilities)
    X_batch = scaler.transform(feats_batch)
    # predict_proba returns [prob_0, prob_1]
    lr_probs = clf.predict_proba(X_batch)[:, 1] 
    
    # 4. Naive Bayes Combination
    best_score = -1
    best_word = typo
    
    for i, prob in enumerate(lr_probs):
        cand = cand_words[i]
        prior = get_prior(cand)
        if prior == 0: prior = 1e-10 # Smoothing
        
        # Final Score: LR_Likelihood * Prior
        # (Using relatively high weight on LR prob)
        final_score = prob * prior
        
        if final_score > best_score:
            best_score = final_score
            best_word = cand
            
    return best_word

## 5. Evaluation (500 Words)

In [7]:
# Generate TEST set (different seed)
random.seed(101)
TEST_SIZE = 500
test_samples = random.sample(train_words, min(TEST_SIZE, len(train_words)))
test_set = [(generate_typo(w), w) for w in test_samples]

print(f"Evaluating on {len(test_set)} unseen test words...")

correct_count = 0
start_time = time.time()

for typo, truth in test_set:
    pred = hybrid_correction(typo)
    if pred == truth:
        correct_count += 1

duration = time.time() - start_time
acc = (correct_count / len(test_set)) * 100

print(f"\nHybrid Pipeline Accuracy: {acc:.2f}%")
print(f"Time taken: {duration:.2f}s")
print(f"Average latency: {(duration/len(test_set))*1000:.2f}ms")

Evaluating on 500 unseen test words...



Hybrid Pipeline Accuracy: 69.40%
Time taken: 50.06s
Average latency: 100.12ms
