# Imports and Installs

In [None]:
import pandas as pd
import numpy as np
import re
import os
import difflib
import nltk
nltk.download('punkt_tab')
from collections import Counter, defaultdict
from sklearn.model_selection import train_test_split

from tqdm.notebook import tqdm
import warnings
warnings.filterwarnings('ignore')

# Downloading necessary NLTK data
try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt')
try:
    nltk.data.find('taggers/averaged_perceptron_tagger')
except LookupError:
    nltk.download('averaged_perceptron_tagger')
try:
    nltk.data.find('taggers/averaged_perceptron_tagger_eng')
except LookupError:
    nltk.download('averaged_perceptron_tagger_eng')
try:
    nltk.data.find('corpra/wordnet')
except LookupError:
    nltk.download('wordnet')


!pip install pandas transformers datasets accelerate jiwer scikit-learn sentencepiece
!pip install --upgrade torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

import torch
from sklearn.model_selection import train_test_split
from transformers import (
    T5Tokenizer,
    T5ForConditionalGeneration,
    Seq2SeqTrainer,
    Seq2SeqTrainingArguments,
    DataCollatorForSeq2Seq
)
from datasets import Dataset
from jiwer import wer

from google.colab import drive

from nltk.translate.bleu_score import sentence_bleu


print("All Libraries imported successfully.")

# 1. Load and Clean Dataset

In [None]:
class DataLoader:
    def __init__(self, file_path):
        self.file_path = file_path
        self.df = None
        self.train_df = None
        self.val_df = None
        self.test_df = None

    def load_data(self):
        print(f"Loading data from {self.file_path}...")
        try:
            self.df = pd.read_excel(self.file_path)
            self.df.columns = ['correct_sentence', 'incorrect_sentence']
            print(f"Data loaded. Shape: {self.df.shape}")
            return self.df
        except Exception as e:
            print(f"Error loading data: {e}")
            raise

    def clean_text(self, text):
      if not isinstance(text, str):
        return str(text)

      # 1. Removing leading/trailing quotes
      text = text.strip('"')

      # 2. Normalizing distinct punctuation
      text = text.replace("’", "'")

      # Only removing these at the end as they are stop-char.
      # In between the sentence, they are actually part of mispelled so need to
      # be conserved
      text = text.strip(",")
      text = text.strip("?")
      text = text.strip(".")

      # 3. Removing bullet points and other non-essential symbols
      text = text.strip("•")

      # Removing leading-trailing space
      text = text.strip()

      # 4. Collapsing multiple spaces into one
      text = re.sub(r'\s+', ' ', text).strip()

      return text

    def preprocess(self):
        if self.df is None:
            self.load_data()

        print("Preprocessing data...")
        self.df['correct_sentence'] = self.df['correct_sentence'].apply(self.clean_text)
        self.df['incorrect_sentence'] = self.df['incorrect_sentence'].apply(self.clean_text)

        initial_count = len(self.df)
        self.df.drop_duplicates(inplace=True)
        print(f"Removed {initial_count - len(self.df)} duplicates.")
        return self.df

    def split_data(self, test_size=0.15, val_size=0.15, random_state=42):
        if self.df is None:
            self.preprocess()

        print("Splitting data...")
        remaining_df, self.test_df = train_test_split(
            self.df, test_size=test_size, random_state=random_state
        )

        relative_val_size = val_size / (1 - test_size)
        self.train_df, self.val_df = train_test_split(
            remaining_df, test_size=relative_val_size, random_state=random_state
        )

        print(f"Train size: {len(self.train_df)}")
        print(f"Validation size: {len(self.val_df)}")
        print(f"Test size: {len(self.test_df)}")

        return self.train_df, self.val_df, self.test_df

In [None]:
drive.mount('/content/drive')

# Read Dataset
xlsx_path = "/content/drive/MyDrive/NLP Assignment Submission/Spell_Correction_for_ASR_Noun_Enhancement_assignment_dataset.xlsx"
if os.path.exists(xlsx_path):
    loader = DataLoader(xlsx_path)
    train_df, val_df, test_df = loader.split_data()
else:
    print("Dataset not found. Please ensure the .xlsx file is present.")

## 1.1 Exploratory Data Analysis (EDA)

In [None]:

def run_eda(df, set_name="Training"):
    print(f"--- EDA for {set_name} Set ---")
    correct_lens = df['correct_sentence'].apply(lambda x: len(str(x).split()))
    incorrect_lens = df['incorrect_sentence'].apply(lambda x: len(str(x).split()))

    print(f"Average Correct Sentence Length: {correct_lens.mean():.2f} words")
    print(f"Average Incorrect Sentence Length: {incorrect_lens.mean():.2f} words")

    # Analysing Vocabulary
    all_text = " ".join(df['correct_sentence'].astype(str))
    vocab = set(all_text.split())
    print(f"Vocabulary Size: {len(vocab)} unique words")

    # Finding Common words
    word_counts = Counter(all_text.split())
    print("Top 10 most common words:")
    print(word_counts.most_common(100))
    print("\n")

if 'train_df' in locals():
    run_eda(train_df, "Training")


# 2. Error Analysis

## 2.2 Preprocessing (NER & POS Tagging)

In [None]:
def preprocess_pos_ner(df):
    print("Running POS Tagging and NER (Noun Extraction)...")
    def get_nouns(text):
        tokens = nltk.word_tokenize(str(text))
        tags = nltk.pos_tag(tokens)
        nouns = [word for word, pos in tags if pos.startswith('NN')]
        return nouns

    df['correct_nouns'] = df['correct_sentence'].apply(get_nouns)
    print("Nouns extracted.")
    return df

if 'train_df' in locals():
    train_df = preprocess_pos_ner(train_df)
    if 'val_df' in locals(): val_df = preprocess_pos_ner(val_df)
    test_df = preprocess_pos_ner(test_df)


2.2 Error Analysis

In [None]:

class ErrorAnalyzer:
    def __init__(self, df):
        self.df = df
        self.error_pairs = []
        self.detailed_df = pd.DataFrame()

    def get_diff_ops(self, correct_sent, incorrect_sent):
        c_words = correct_sent.split()
        i_words = incorrect_sent.split()
        matcher = difflib.SequenceMatcher(None, [i.lower() for i in c_words], [i.lower() for i in i_words])
        return matcher.get_opcodes(), c_words, i_words

    # Identifying the type of missmatch
    def categorize_error(self, correct, incorrect):
        if not correct or not incorrect: return "Insertion/Deletion"

        ratio = difflib.SequenceMatcher(None, correct, incorrect).ratio()
        if ratio > 0.8:
            return "Character-level (Likely Typo)"
        elif any(char.isdigit() for char in incorrect):
            return "Formatting/Number"
        else:
             # Checking phonetic similarity approximation
             if len(correct) == len(incorrect):
                 return "Phonetic/Substitution"
        return "Word-level/Other"

    def extract_errors(self):
        print("Extracting errors...")
        detailed_errors = []

        for idx, row in self.df.iterrows():
            correct = str(row['correct_sentence'])
            incorrect = str(row['incorrect_sentence'])
            nouns = set(row['correct_nouns']) if 'correct_nouns' in row else set()

            opcodes, c_words, i_words = self.get_diff_ops(correct, incorrect)
            for tag, i1, i2, j1, j2 in opcodes:
                if tag == 'replace':
                    c_segment = " ".join(c_words[i1:i2])
                    i_segment = " ".join(i_words[j1:j2])

                    category = self.categorize_error(c_segment, i_segment)

                    is_noun_error = all(word in nouns for word in c_segment.split())

                    if not is_noun_error:
                      continue

                    # for noun word eg: effect,
                    if (c_segment.strip(",") == i_segment.strip(",")):
                      continue

                    self.error_pairs.append((c_segment, i_segment))
                    detailed_errors.append({
                        'correct': c_segment,
                        'incorrect': i_segment,
                        'type': category,
                        'is_noun_error': is_noun_error
                    })

        self.detailed_df = pd.DataFrame(detailed_errors)
        return self.error_pairs

    def get_common_errors(self, n=20):
        return Counter(self.error_pairs).most_common(n)

    def get_all_errors(self):
        return Counter(self.error_pairs).most_common()


In [None]:
if 'train_df' in locals():
    analyzer = ErrorAnalyzer(train_df)
    analyzer.extract_errors()
    print("\nStats of Common Errors:")

    diff_words_list = []
    diff_words_list_map = {}
    for (correct, incorrect), count in analyzer.get_all_errors():
      diff_words_list.append((correct, incorrect, count))

    for (correct, incorrect), count in analyzer.get_common_errors(10):
        print(f"'{correct}' -> '{incorrect}' ({count} times)")

    if not analyzer.detailed_df.empty:
        print("\nError Category Distribution:")
        print(analyzer.detailed_df['type'].value_counts())
        print("\nNoun-related Errors:")
        print(analyzer.detailed_df['is_noun_error'].value_counts())
        print("Total Mis-spelled words: ", len(diff_words_list))

## 3. Model Development

## 3.1 Baseline Model: Levenshtein distance & N-gram language models
Using Edit distance algorithms (Levenshtein distance) to find correct words candidate for potentially mis-spelled word and using N-gram language models to pick the best match from the candidates

In [None]:
class BaselineSpellCorrector:
    def __init__(self):
        # To save words in lower case for NLP
        self.vocab = set()
        # To save the original word case. There is possibilty to get overrided
        # as word with different case has same lower case. But should help for
        # medical names
        self.vocab_to_original = {}
        self.unigram_counts = Counter()
        self.bigram_counts = defaultdict(int)

    # We want to lower all words for NLP process but for final result, we want
    # to preserve the original case for reason such as preserving medicine names
    # same as ground truth
    def tokenize(self, text, to_lower=True):
        """
        Simple tokenizer that separates words from punctuation.
        This ensures 'tablet.' and 'tablet' are treated as the same word 'tablet' + '.'
        """
        #return re.findall(r'\w+|[^\w\s]', str(text).lower())
        if to_lower:
          return re.findall(r'[a-zA-Z0-9]+(?:-[a-zA-Z0-9]+)*', str(text).lower())
        else:
          return re.findall(r'[a-zA-Z0-9]+(?:-[a-zA-Z0-9]+)*', str(text))


    def train(self, sentences):
        """
        Build the Dictionary (vocab) and Language Model (N-grams) from correct text.
        """
        print("Training Baseline Model...")
        for sentence in sentences:
            tokens = self.tokenize(sentence, to_lower=True)
            tokens_not_lower = self.tokenize(sentence, to_lower=False)

            # 1. Updating Dictionary & Frequency
            for ii in range(len(tokens)):
              self.vocab.add(tokens[ii])
              self.vocab_to_original[tokens[ii]] = tokens_not_lower[ii]

            self.unigram_counts.update(tokens)

            # 2. Updating Context (Bigrams)
            for i in range(len(tokens) - 1):
                self.bigram_counts[(tokens[i], tokens[i+1])] += 1

        print(f"Training & bi-gram Complete. Vocabulary Size: {len(self.vocab)}")

    def is_edit_score_close(self, score1, score2):
        if score2 == 0:
          if score1 != 0:
              return False
          return True
        return ((score1-score2)/score1)*100 <= 0.1

    def get_candidates(self, word):
        """
        Find correction candidates using Edit Distance.
        """
        # If word is in vocabulary, it's the only candidate
        if word.lower() in self.vocab:
            return [word]

        # Find closest matches in vocab
        # n=3: Top 3 matches
        # cutoff=0.6: Matches must be at least 60% similar
        matches = difflib.get_close_matches(word.lower(), self.vocab, n=3, cutoff=0.6)

        # If no similar words found, then return original
        if not matches:
            return [word]

        candidate_score = [
            (m, difflib.SequenceMatcher(None, word, m).ratio())
            for m in matches
        ]

        final_matched = [self.vocab_to_original[matches[0]]]

        # Only those cadidate whose edit score is very close to the best one are
        # to be considered as candidates
        if len(matches) > 1 and self.is_edit_score_close(candidate_score[0][1], candidate_score[1][1]):
            final_matched.append(self.vocab_to_original[matches[1]])

        if len(matches) > 2 and self.is_edit_score_close(candidate_score[0][1], candidate_score[2][1]):
            final_matched.append(self.vocab_to_original[matches[2]])

        return final_matched

    def correct_sentence(self, sentence):
        """
        Correct a sentence using the trained model.
        """
        tokens_orig = self.tokenize(sentence, to_lower=False)
        tokens = self.tokenize(sentence)
        corrected_tokens = []

        for i, word in enumerate(tokens):

            candidates = self.get_candidates(word)

            if len(candidates) == 1:
                best_word = candidates[0]
            else:
                # Ranking candidates by Context score
                best_word = candidates[0]
                best_score = -1

                # Looking at the previous corrected word for context
                prev_token = corrected_tokens[-1] if corrected_tokens else "START"

                for cand in candidates:
                    # Score = (BigramProb * 10) + UnigramProb
                    # Weight Bigrams higher than raw frequency
                    bigram_score = self.bigram_counts[(prev_token, cand)]
                    unigram_score = self.unigram_counts[cand]

                    score = (bigram_score * 10) + unigram_score

                    if score > best_score:
                        best_score = score
                        best_word = cand

            corrected_tokens.append(best_word)

        # Reconstructing sentence
        return " ".join(corrected_tokens)

# 1. Initialize and Train
baseline_model = BaselineSpellCorrector()
baseline_model.train(train_df['correct_sentence'])

## 3.1.1 Prediction on Test Dataset

#### Prediction on a Sample

In [None]:

print("\n--- Baseline Model Predictions (Sample) ---")
for idx, row in test_df.head(5).iterrows():
    input_text = row['incorrect_sentence']
    ground_truth = row['correct_sentence']
    prediction = baseline_model.correct_sentence(input_text)

    print(f"Input:    {input_text}")
    print(f"Pred:     {prediction}")
    print(f"Actual:   {ground_truth}")
    print("-" * 30)

#### Prediction on a full test data

In [None]:
if 'test_df' in locals():
    print("Predicting with Baseline...")
    test_df['baseline_prediction'] = test_df['incorrect_sentence'].apply(baseline_model.correct_sentence)
    print("Baseline predictions complete.")

# 4.Evaluation

## 4.1 Baseline Model
#### Word-Level Accuracy, WER, CER, BLEU and Mean Noun Recall

In [None]:
def calculate_word_level_accuracy():
  print("Calculating Accuracy on Test Set...")
  total_words = 0
  correct_words = 0
  for idx, row in test_df.iterrows():
      # Stripping punctuation for the metric calculation to be fair
      pred_words = baseline_model.correct_sentence(row['incorrect_sentence']).split()
      target_words = str(row['correct_sentence']).lower().split()

      # Comparing word-for-word
      length = min(len(pred_words), len(target_words))
      for i in range(length):
          if pred_words[i] == target_words[i]:
              correct_words += 1

      total_words += len(target_words)

  accuracy = correct_words / total_words if total_words > 0 else 0
  print(f"\n Word-Level Accuracy: {accuracy:.2%}")

def calculate_wer(reference, hypothesis):
    ref_words = reference.split()
    hyp_words = hypothesis.split()
    d = np.zeros((len(ref_words)+1, len(hyp_words)+1))
    for i in range(len(ref_words)+1): d[i,0] = i
    for j in range(len(hyp_words)+1): d[0,j] = j
    for i in range(1, len(ref_words)+1):
        for j in range(1, len(hyp_words)+1):
            cost = 0 if ref_words[i-1] == hyp_words[j-1] else 1
            d[i,j] = min(d[i-1,j]+1, d[i,j-1]+1, d[i-1,j-1]+cost)
    return d[-1,-1] / len(ref_words) if len(ref_words)>0 else 0

def evaluate_detailed(df, pred_col, target_col='correct_sentence'):
    wer_scores = []
    cer_scores = []
    bleu_scores = []
    noun_scores = []

    print(f"\n--- Detailed Evaluation for {pred_col} ---")

    for _, row in tqdm(df.iterrows(), total=len(df)):
        ref = str(row[target_col])
        hyp = str(row[pred_col])

        # WER
        wer_scores.append(calculate_wer(ref, hyp))

        # CER (Approximate using SequenceMatcher ratio)
        cer_scores.append(1 - difflib.SequenceMatcher(None, ref, hyp).ratio())

        # BLEU
        try:
            bleu = sentence_bleu([ref.split()], hyp.split(), weights=(1,0,0,0)) # BLEU-1
        except:
            bleu = 0
        bleu_scores.append(bleu)

        # Noun Recall
        if 'correct_nouns' in row and row['correct_nouns']:
            nouns = row['correct_nouns']
            hyp_words = set(hyp.split())
            matches = sum(1 for n in nouns if n in hyp_words)
            noun_scores.append(matches / len(nouns))
        else:
            noun_scores.append(1.0) # No nouns to miss

    print(f"Mean WER: {np.mean(wer_scores):.4f} (Lower is better)")
    print(f"Mean CER: {np.mean(cer_scores):.4f} (Lower is better)")
    print(f"Mean BLEU-1: {np.mean(bleu_scores):.4f} (Higher is better)")
    print(f"Mean Noun Recall: {np.mean(noun_scores):.4f} (Higher is better)")

if 'baseline_prediction' in test_df.columns:
    calculate_word_level_accuracy()
    evaluate_detailed(test_df, 'baseline_prediction')

