# Context-sensitive Spelling Correction

The goal of the assignment is to implement context-sensitive spelling correction. The input of the code will be a set of text lines and the output will be the same lines with spelling mistakes fixed.

Submit the solution of the assignment to Moodle as a link to your GitHub repository containing this notebook.

Useful links:
- [Norvig's solution](https://norvig.com/spell-correct.html)
- [Norvig's dataset](https://norvig.com/big.txt)
- [Ngrams data](https://www.ngrams.info/download_coca.asp)

Grading:
- 60 points - Implement spelling correction
- 20 points - Justify your decisions
- 20 points - Evaluate on a test set


## Implement context-sensitive spelling correction

Your task is to implement context-sensitive spelling corrector using N-gram language model. The idea is to compute conditional probabilities of possible correction options. For example, the phrase "dking sport" should be fixed as "doing sport" not "dying sport", while "dking species" -- as "dying species".

The best way to start is to analyze [Norvig's solution](https://norvig.com/spell-correct.html) and [N-gram Language Models](https://web.stanford.edu/~jurafsky/slp3/3.pdf).

You may also want to implement:
- spell-checking for a concrete language - Russian, Tatar, etc. - any one you know, such that the solution accounts for language specifics,
- some recent (or not very recent) paper on this topic,
- solution which takes into account keyboard layout and associated misspellings,
- efficiency improvement to make the solution faster,
- any other idea of yours to improve the Norvig’s solution.

IMPORTANT:  
Your project should not be a mere code copy-paste from somewhere. You must provide:
- Your implementation
- Analysis of why the implemented approach is suggested
- Improvements of the original approach that you have chosen to implement

In [1]:
from collections import defaultdict
from functools import lru_cache


class NGramLanguageModel:
    """
    Base class for any n-gram-based language model
    """

    n = -1  # `n` in n-gram
    candidates = defaultdict(list)  # possible next words based on context
    vocab = defaultdict(int)  # vocab with frequencies
    total = 0  # total frequency count
    has_pad = False  # whether LM support <pad> token

    def get(self, context: tuple[str]) -> list[tuple[str, int]]:
        """
        Get all possible next words based on context

        :param context:  tuple of string representing words that came earlier
        :return:         all possible words that can come next with their frequencies
        """

        return self.candidates[context]

    def get_vocab_freq(self, word: str) -> int:
        """
        Get frequency in vocab of a given word

        :param word:  word to get frequency count to
        :return:      frequency count (would be 0 if not in vocab)
        """

        return self.vocab[word]

    def get_total_count(self) -> int:
        """
        Get total number of words (with frequencies) in vocab

        :return:  total number of words
        """
        
        return self.total

In [2]:
class NGramLanguageModelFromFile(NGramLanguageModel):
    """
    N-gram based LM that could be initialized from frequency count

    Examples of the file structure & input data:
        * https://www.ngrams.info/download_coca.asp
    """

    def __init__(self, file_path: str, do_pad: bool = True):
        """
        Initialize model

        :param file_path:  path to the n-gram based file
        :param do_pad:     whether to add <pad> token & support smaller n-grams
        """

        # initialize ngram `n` and `has_pad`
        self.n = -1
        self.has_pad = do_pad

        # init base class variables
        self.candidates = defaultdict(list)
        self.vocab = defaultdict(int)
        self.total = 0

        # calculate statistics
        self.__parse_file(file_path)
        self.__aggregate()

    def __parse_file(self, file_path: str):
        """
        Hidden method that parse given file and load data to
        `self.candidates` and `self.vocab`

        :param file_path:  path where the file is located (note: latin-1 encoding will be used)
        """

        with open(file_path, 'r', encoding='latin-1') as file:
            # each like has format `# token1 token2 ... tokenN`
            for line in file:
                # parse and transform
                cnt, *data = line.strip().split()
                cnt = int(cnt)

                # get ngram and add it to candidates
                for *context, pred in self.__sub_ngram(data, self.has_pad):
                    self.candidates[tuple(context)].append((pred, cnt))

                # update word count for each unique word in data
                for word in data:
                    self.vocab[word] += cnt
                    self.total += cnt

            # define `n` in ngram from candidates
            self.n = 1 + len(tuple(self.candidates.keys())[0])

    def __aggregate(self):
        """
        Hidden method that aggregates same values in candidates
        """

        for key in tuple(self.candidates.keys()):

            # if same key present several times - we will combine them into one
            df = defaultdict(int)
            for word, cnt in self.candidates[key]:
                df[word] += cnt

            # update candidates with aggregated frequency list
            self.candidates[key] = list(df.items())

    @staticmethod
    def __sub_ngram(data: tuple[str], pad: bool = True) -> tuple[str]:
        """
        Hidden method that generates ngram based on given text:
            * `n` is determined as len(data) - 1 (due to file format)
            * `pad` represents whether we want to add <pad> tokens to   \
                beginning or not, it is beneficial if mistake is in the \
                first word

        :param data:  token list to generate ngrams from
        :param pad:   whether to add <pad> or no
        :return:      yields possible ngrams
        """
        
        n = len(data)
        for i in range(1 if pad else n, n + 1):
            yield ('<pad>',) * (n - i) + tuple(data[:i])

In [3]:
from collections import defaultdict
from functools import lru_cache
from typing import Generator, Any


class NorvigSolution:
    """
    Norvig solution taken and slightly modified from [1].

    [1] - https://norvig.com/spell-correct.html
    """

    def __init__(self, file_path: str):
        """
        Init model based on frequency file

        :param file_path:  path to the n-gram based file (same as in NGramLanguageModelFromFile)
        """

        # init key components
        self.words = defaultdict(int)
        self.total = 0

        # load data
        self.__parse_file(file_path)

    def __parse_file(self, file_path: str):
        """Hidden method that parse and load data to `self.words`"""

        with open(file_path, 'r', encoding='latin-1') as file:
            # each like has format `# token1 token2 ... tokenN`
            for line in file:
                # parse and transform
                cnt, *data = line.strip().split()
                cnt = int(cnt)

                # update word count for each word in data
                for word in data:
                    self.words[word] += cnt
                    self.total += cnt

    def __word_prob(self, word: str) -> float:
        """Get word probability to be observed"""
        return self.words[word] / self.total

    def correction(self, word: str) -> str:
        """Most probable spelling correction for word"""
        return max(self.candidates(word), key=self.__word_prob)

    def candidates(self, word: str) -> list[str] or set[str]:
        """Generate possible spelling corrections for word"""
        return self.known([word]) or self.known(self.edits1(word)) or self.known(self.edits2(word)) or [word]

    def known(self, words) -> set[str]:
        """The subset of `words` that appear in the dictionary of `self.words`"""
        return set(w for w in words if w in self.words)

    @staticmethod
    def edits1(word: str) -> set[str]:
        """All edits that are one edit away from `word`"""

        letters = 'abcdefghijklmnopqrstuvwxyz'
        splits = [(word[:i], word[i:]) for i in range(len(word) + 1)]
        deletes = [L + R[1:] for L, R in splits if R]
        transposes = [L + R[1] + R[0] + R[2:] for L, R in splits if len(R) > 1]
        replaces = [L + c + R[1:] for L, R in splits if R for c in letters]
        inserts = [L + c + R for L, R in splits for c in letters]
        return set(deletes + transposes + replaces + inserts)

    @staticmethod
    def edits2(word: str) -> Generator[str, Any, None]:
        """All edits that are two edits away from `word`"""
        return (e2 for e1 in NorvigSolution.edits1(word) for e2 in NorvigSolution.edits1(e1))

In [4]:

class SpellCorrection:
    """
    Apply spelling correction algorithm based on n-gram model
    """

    def __init__(self, lm: NGramLanguageModel, edit_lim: int = 2, freq_threshold: float = 1e-5):
        """
        Initialize model hyperparameters

        :param lm:              fitted n-gram language model
        :param edit_lim:        maximum edit distance
        :param freq_threshold:  minimum threshold needed to be achieved for word not to be corrected
        """

        # safe all data
        self.lm = lm
        self.edit_lim = edit_lim
        self.freq_threshold = freq_threshold

    def check_correct(self, text: str) -> str:
        """
        Check spelling and correct if any mistakes found.
        Text should be formatted as sequence of tokens separated
        by space symbol (either space, tab)

        :param text:  text where to check spelling
        :return:      transformed text (ideally with corrected mistakes)
        """

        if self.lm.has_pad:
            # if lm support pad, add it to beginning
            correct = ['<pad>'] * (self.lm.n - 1)
            start_idx = 0
        else:
            # otherwise consider first n-tokens as correct
            correct = text.lower().split()[:self.lm.n - 1]
            start_idx = self.lm.n - 1

        for pred in text.lower().split()[start_idx:]:
            # determine context (last n-1)
            context = tuple(correct[-(self.lm.n - 1):])

            # check and correct based & context
            correct.append(self.__correct_based_on_context(context, pred))

        # return corrected text & remove <pad> if needed
        return ' '.join(correct[self.lm.n - 1:] if self.lm.has_pad else correct)

    def __pred_prob(self, context: tuple[str], pred: str) -> float:
        """
        Hidden method that gives probability of
        finding `pred` after given `context`

        :param context:  tuple of tokens representing context
        :param pred:     word to count probability for
        :return:         probability of encountering `pred` after `context`
        """

        # get all possible words & calculate total frequency
        possibilities = self.lm.get(context)
        total = sum(map(lambda p: p[1], possibilities))

        # find `pred` and return probability
        for word, freq in possibilities:
            if word == pred:
                return freq / total

        # if not found, return 0 (this could be improved!)
        return 0

    @lru_cache(maxsize=None)
    def __correct_based_on_context(self, context: tuple[str], pred: str) -> str:
        """
        Hidden method that corrects `pred` given the context:
            * if `pred` satisfy `self.freq_threshold` consider it correct
            * otherwise make edits & search them in ngram
                1. main idea is to consider `min edit, most frequent` candidate as correct
                2. if no ngram edit found, consider raw vocab
                3. if vocab is also empty, consider original `pred` as correct

        :param context:  tuple of tokens representing context
        :param pred:     word to analyze spelling for
        :return:         'corrected' best candidate
        """

        # if next word probability satisfy threashold, just consider it correct
        if self.__pred_prob(context, pred) > self.freq_threshold:
            return pred

        # otherwise get all possible next words
        possibilities = dict()
        for (word, cnt) in self.lm.get(context):
            possibilities[word] = cnt

        # get all corrections from `1` edit to `edit_lim`
        edits = [{pred}]
        for i in range(self.edit_lim):
            # get all 'possible' edits at distance i+1
            edits.append({edit for edit_word in edits[i] for edit in self.edits1(edit_word)})
            edits[-1].add(pred)  # give original word a chance (makes sense only when freq_threshold > 0)

            # determine 'correct' answer as most frequent word from 'edits'
            ans = (pred, -1)
            for edit in edits[i + 1]:
                if possibilities.get(edit, -1) > ans[1]:
                    ans = (edit, possibilities[edit])

            # if new answer found, consider it correct ("min edit, most frequent" policy)
            if ans[1] != -1:
                return ans[0]

        # if after editing nothing found, consider raw frequencies
        ans = (pred, 0)
        for edit_set in edits:
            for edit in edit_set:
                if self.lm.get_vocab_freq(edit) > ans[1]:
                    ans = (edit, self.lm.get_vocab_freq(edit))

            # if new answer found, consider it correct (same "min edit, most frequent" idea)
            if ans[1] > 0:
                return ans[0]

        # if still nothing found, consider original prediction as correct
        return pred

    @staticmethod
    def edits1(word: str) -> set[str]:
        """
        All edits that are one edit away from `word`.
        Code taken from https://norvig.com/spell-correct.html

        :param word:  word to modify
        :return:      set of possible modifications
        """

        letters = 'abcdefghijklmnopqrstuvwxyz'
        splits = [(word[:i], word[i:]) for i in range(len(word) + 1)]

        deletes = [L + R[1:] for L, R in splits if R]
        transposes = [L + R[1] + R[0] + R[2:] for L, R in splits if len(R) > 1]
        replaces = [L + c + R[1:] for L, R in splits if R for c in letters]
        inserts = [L + c + R for L, R in splits for c in letters]
        return set(deletes + transposes + replaces + inserts)

In [5]:
# init model and spell-checker
model = NGramLanguageModelFromFile('data/bigrams.txt', do_pad=True)
sp = SpellCorrection(model, edit_lim=1, freq_threshold=1e-6)

## Justify your decisions

Write down justificaitons for your implementation choices. For example, these choices could be:
- Which ngram dataset to use
- Which weights to assign for edit1, edit2 or absent words probabilities
- Beam search parameters
- etc.

### Dataset decision

First of all I want to make some comments regarding dataset. The main problems I faced are:
1. It takes a lot of time to count ngram frequencies on large corpora
2. There are no* (maybe? I struggled to find) free, large, general-purpose frequencies count online

Let's discuss details. The first point is that is takes unreasonable amount of time to calculate good general-purpose ngrams. Finding large texts is not a problem, as thouthands of data samples are available online, there even exists some benchmarks for spell-correction problem (for example, [Billion Word Benchmark](https://paperswithcode.com/dataset/billion-word-benchmark)). The main downside is that to calculate good general-purpose ngrams we need to analyze a lot of text, and ideally the more text we feed, the more robust model will be. But even for small datasets such as [Norvig data](https://norvig.com/big.txt) it took me 20 seconds to calculate all statistics (it resulted in bad dependencies where metric was worther than baseline). If we scale that data up to a billion samples, it becomes unreasonable thing to do. 

So, if we cannot make ngram dataset ourselves, it sounds reasonable to take already existing one. However here we face another problem, ideally we need free, general-purpose ngrams that are large enough to highlight common dependencies. This task was not as easy as it seems like, as calculating ngrams is a difficult task and most of datasets are private and have paid-access. There exist some services such as [Public n-gram data](https://www.ngrams.info/download_coca.asp), however I faced difficulties accessing anything from there.

Given all this problems above, I decided to still use [Publoc n-gram data](https://www.ngrams.info/download_coca.asp) that is given us with current assignment - bigrams. Why not 5-grams? Because they contain less information in general. For some reason 5-grams dataset contain in total of 16451820 different samples, when bigrams have 286758206 (17 times more info!). For that reason, even thought bigrams do not capture long dependencies, they still more preferable, and with their heigher unique word count, they seems to be applicable in more texts.


### N-gram LM specifics

Basically my implementation is slight improvement over Norvig solution. The key difference - we use ngram for context-specific suggesitons.

The algorithm principle is simple. If we already have "pretrained" language model, we can use it to calculate probabillity of seing some given word. If this probability is high enough, we can suppose that the word is correct. However if it is not, we can apply some edits (Novig's idea) and then find not just most frequent token, but most frequent in given context (there are also some edge cases when all edits are not present, then we find edit in raw vocab (idea of backoff), and if not found again, suppose initial word is correct). This solution should add context-dependency to Novig solution and make algorithm more robust overall.

However as it is still same idea. In addition used dataset do not capture long dependencies, so not much improvement would be seen.

### Hyperparameters

The only two hyperparameters we can tune are `freq_threshold` and `edit_lim` (minimum threshold needed to be achieved for word not to be corrected, maximum edit distance). 

Even though `edit_lim` is not bound, any value greater than 3 takes minutes to compute, so in some sort `3` is limit. My experiments showed that `2` is optimal, however for faster computation I am going to use `1` in evaluation step. 

And speaking about `freq_threshold`, value of `0` means we will accept any word if it is possible in context, and value of `1` will try to correct each word in a text. I found that value between `1e-6` and `5e-6` provides more optimal result (I tuned it on my own examples).

## Evaluate on a test set

Your task is to generate a test set and evaluate your work. You may vary the noise probability to generate different datasets with varying compexity. Compare your solution to the Norvig's corrector, and report the accuracies.

In [6]:
!wget "https://huggingface.co/datasets/vishnun/SpellGram/resolve/main/train.csv" -O "data/eval.csv"

--2024-03-22 02:59:20--  https://huggingface.co/datasets/vishnun/SpellGram/resolve/main/train.csv
Resolving huggingface.co (huggingface.co)... 108.138.189.74, 108.138.189.96, 108.138.189.57, ...
Connecting to huggingface.co (huggingface.co)|108.138.189.74|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4489429 (4,3M) [text/plain]
Saving to: ‘data/eval.csv’


2024-03-22 02:59:22 (3,10 MB/s) - ‘data/eval.csv’ saved [4489429/4489429]



In [7]:
import pandas as pd
from tqdm import tqdm

df = pd.read_csv('data/eval.csv').iloc[:2000]

df.head()

Unnamed: 0,source,target
0,rate the silent upeaker four out oe 6,rate the silent speaker four out of 6
1,please find me tqe gork tqe bfrning sorld,please find me the work the burning world
2,three friendl afe relaxing uround the tsble,three friends are relaxing around the table
3,what dm they want,what do they want
4,man in tan aat working with stones,man in tan hat working with stones


In [8]:
predictions = []
for _, row in tqdm(df.iterrows(),  total=df.shape[0], desc=f'Running evaluation'):
    predictions.append(sp.check_correct(row['source']))

Running evaluation: 100%|██████████████████| 2000/2000 [00:17<00:00, 113.35it/s]


In [9]:
norvig_predictions = []
norvig = NorvigSolution('data/bigrams.txt')
for _, row in tqdm(df.iterrows(),  total=df.shape[0], desc=f'Running evaluation'):
    norvig_predictions.append(' '.join(norvig.correction(word) for word in row['source'].split()))

Running evaluation: 100%|███████████████████| 2000/2000 [00:57<00:00, 34.95it/s]


In [10]:
def evaluate(predictions, targets, sources):
    total = 0
    confusion = {'TP': 0, 'TN': 0, 'FP': 0, 'FN': 0}
    
    for pred, target, source in zip(predictions, targets, sources):
        for p, t, s in zip(pred.split(), target.split(), source.split()):
            
            if p == t and t == s:
                confusion['TP'] += 1
            elif p == t:
                confusion['TN'] += 1
            elif t == s:
                confusion['FP'] += 1
            else:
                confusion['FN'] += 1
                
            total += 1
    
    return confusion, total


def print_confusion(confusion: dict, total: int, round_f: int = 4):
    print(f"== Confusion matrix ==")
    print(*[
        [confusion['TP'], confusion['FP']], 
        [confusion['FN'], confusion['TN']]
    ], sep='\n')
    print()
    
    accuracy = (confusion['TP'] + confusion['TN']) / total
    precision = confusion['TP'] / (confusion['TP'] + confusion['FP'])
    recall = confusion['TP'] / (confusion['TP'] + confusion['FN'])
    f1 = 2 * precision * recall / (precision + recall)
    
    print('Accuracy: ', round(accuracy, round_f), sep='\t')
    print('Precision:', round(precision, round_f), sep='\t')
    print('Recall:   ', round(recall, round_f), sep='\t')
    print('F1:       ', round(f1, round_f), sep='\t')
    

In [11]:
confusion, total = evaluate(predictions, df['target'], df['source'])

print_confusion(confusion, total)

== Confusion matrix ==
[14263, 659]
[1444, 3166]

Accuracy: 	0.8923
Precision:	0.9558
Recall:   	0.9081
F1:       	0.9313


In [12]:
confusion, total = evaluate(norvig_predictions, df['target'], df['source'])

print_confusion(confusion, total)

== Confusion matrix ==
[14511, 411]
[1911, 2699]

Accuracy: 	0.8811
Precision:	0.9725
Recall:   	0.8836
F1:       	0.9259


In [13]:
confusion, total = evaluate(df['source'], df['target'], df['source'])

print_confusion(confusion, total)

== Confusion matrix ==
[14922, 0]
[4610, 0]

Accuracy: 	0.764
Precision:	1.0
Recall:   	0.764
F1:       	0.8662


### Evaluation dataset choice

I decided to use [SpellGram](https://huggingface.co/datasets/vishnun/SpellGram/resolve/main/train.csv) dataset for evaluation. It provide several benefits, such as data is already tokenized in the same way as "training" data, and it was specifially designed for n-grams. In addition it is also general-purpose and has a lot of samples we can work with. Due to the fact that I already trained model, I decided to use [Spellgram training data](https://huggingface.co/datasets/vishnun/SpellGram/resolve/main/train.csv) for eval. And as models are slow, I just cut data for faster evaluation.

### Results

As we can see, both Norvig solution and my perform better than no replacement at all. However some interesting pattern occurs. Even thought accorging to accuracy and f1-score n-gram solution is better, it is not that significat as jump from no-edit. 

In addition we can see that precision seems to decrease the more complex model we build. This potentially mean that `freq_threshold` is still a bit high, so further tuning could be done.

Opposite to precision, recall is getting heigher. This indicates that when model corrects any word, it is more "sure" that the answer is indeed correct.

### Credits

Work done by Polina Zelenskaya (DS21-01)