# Task 1
Реализуйте алгоритм Symspell - https://medium.com/@wolfgarbe/1000x-faster-spelling-correction-algorithm-2012-8701fcd87a5f. Он похож на алгоритм Норвига, но проще и быстрее. Там к словам в словаре применяется только одна операция - удаление символа. Чтобы найти исправление из слова тоже удаляются символы и сравниваются с теми, что хранятся в словаре. Оцените качество полученного алгоритма теми же тремя метриками.


In [1]:
from nltk.tokenize import sent_tokenize
from razdel import sentenize
from razdel import tokenize as razdel_tokenize
from string import punctuation

punctuation += "«»—…“”"
punct = set(punctuation)

class Preprocessor():
    
    """ a class to preprocess a txt file availabe through the path """
    def __init__(self, path):
        self.path = path # a path to a txt file
        self.sentences = []
        self.tokens_by_sent = []
        self.ngrams = []
        
    def extractContent(self):
        """ loads the content of the file """
        with open(self.path, 'r', encoding="utf-8") as f: # open in readonly mode
            self.sentences.extend(f.read().splitlines())
            
    def normalize(self, text) -> list:
        """ normalizes a string into a list of tokens """
        normalized_text = [word.text.strip(punctuation) for word \
                                                                in razdel_tokenize(text)]
        normalized_text = [word.lower() for word in normalized_text if word and len(word) < 20]
        return normalized_text
        
    def get_tokens_by_sent(self, n=0):
        """
            tokenizes a text into a list of tokens by sentence
            inserts n times <start> and n times <end> of tags
            at the beginning and the end of a sentence
        """
        tmp = [self.sp_tags(self.normalize(sent), n) for sent in self.sentences]
        self.tokens_by_sent = tmp
        
    def sp_tags(self, tokens, n=0) -> list:
        """ recieves a list of tokens and returns them with special tags """
        if n >= 0:
            return(['<start>'] * n + tokens + ['<end>'] *n)
        else:
            return(ValueError)

In [2]:
bad = Preprocessor('data/sents_with_mistakes.txt')
true = Preprocessor('data/correct_sents.txt')
bad.extractContent()
true.extractContent()
bad.get_tokens_by_sent()
true.get_tokens_by_sent()

In [3]:
print(bad.sentences[0])
print(true.sentences[0])
print(bad.tokens_by_sent[0])
print(true.tokens_by_sent[0])

Симпатичнейшое шпионское устройство, такой себе гламурный фотоаппарат девушки Бонда - миниатюрная модель камеры Superheadz Clap Camera.
Симпатичнейшее шпионское устройство такой себе гламурный фотоаппарат девушки Бонда миниатюрная модель камеры Superheadz Clap Camera
['симпатичнейшое', 'шпионское', 'устройство', 'такой', 'себе', 'гламурный', 'фотоаппарат', 'девушки', 'бонда', 'миниатюрная', 'модель', 'камеры', 'superheadz', 'clap', 'camera']
['симпатичнейшее', 'шпионское', 'устройство', 'такой', 'себе', 'гламурный', 'фотоаппарат', 'девушки', 'бонда', 'миниатюрная', 'модель', 'камеры', 'superheadz', 'clap', 'camera']


In [4]:
class TokenAligner():
    
    """ a class to align tokens of two parrallel texts """
    def __init__(self, correct_tokens_by_sent, incorrect_tokens_by_sent):
        self.aligned_tokens = self.align_sents(correct_tokens_by_sent,
                                         incorrect_tokens_by_sent)
        self.mistakes = []
        self.n_mistakes = 0
        
    def align_sents(self, tokens_1, tokens_2) -> list:
        """ returns aligned tokens and sentences"""
        tmp = []
        [tmp.extend(self.align_tokens(sent_pair)) for sent_pair in zip(tokens_1, tokens_2)]
        return tmp

    def align_tokens(self, sent_pair) -> list:
        """ aligns tokens from a pair of sentences """
        return list(zip(sent_pair[0], sent_pair[1]))
        
    def extract_mistakes(self):
        """ extracts only tokens that do not match """
        for pair in self.aligned_tokens:
            if pair[0] != pair[1]:
                self.mistakes.append(pair)
                
    def calculate_proportion(self):
        """ calculates the proportion of mistakes in a text """
        return ('Доля ошибок - ', 
                len(self.mistakes) / len(self.aligned_tokens))

In [5]:
aligner = TokenAligner(true.tokens_by_sent, bad.tokens_by_sent)
aligner.aligned_tokens[0:3]

[('симпатичнейшее', 'симпатичнейшое'),
 ('шпионское', 'шпионское'),
 ('устройство', 'устройство')]

In [6]:
import re
from collections import Counter


class SpellChecker():
    
    """ a class to spellcheck using a given corpus of correct words """
    def __init__(self, path):
        self.path = path
        self.dictionary = {}
        self.deletes = {}
        self.total_count = 1
        
    def create_dictionary(self):
        """ 
            creates a dictionary of correct words
            and their deletes 
        """
        self.dictionary = Counter(re.findall('\w+', self.get_content()))
        
        self.total_count = sum(self.dictionary.values())
        
        for word in list(self.dictionary.keys()):
            for d in self.generate_deletes(word):
                if d in self.deletes:
                    self.deletes[d].append(word)
                else:
                    self.deletes[d] = [word]
        
    def get_content(self) -> str:
        """ loads the content of the file """
        with open(self.path, 'r', encoding="utf-8") as f: # open in readonly mode
            return f.read().lower()
        
    def generate_deletes(self, word) -> list:
        """ generate deletes for a word """

        deletes = []
        tmp = [word]
        for d in range(1): # maximum 1 characters to delete
            tmp_2 = []
            for word in tmp:
                if len(word) > 2:
                    for char in range(len(word)):  # character index
                        word_minus_char = word[:char] + word[char + 1:]
                        if word_minus_char not in deletes:
                            deletes.append(word_minus_char)
                        if word_minus_char not in tmp_2:
                            tmp_2.append(word_minus_char)
            tmp = tmp_2

        return deletes
    
    def generate_candidates(self, word) -> list:
        """ get a list of candidates """
        
        return (self.known([word]) or 
                self.known_deletes(word) or 
                [word])
    
    def known(self, words) -> list:
        """ check if the word is in the dictionary """
        return set(w for w in words if w in self.dictionary)
    
    def known_deletes(self, word) -> list:
        """ checks if any there is a potential candidate in deletes """
        deletes = self.generate_deletes(word)
        candidates = []
        [candidates.extend(self.deletes[w]) for w in deletes if w in self.deletes]
        return set(candidates)

    def calc_prob(self, word) -> float:
        """ calculates probability for a given word """
        return self.dictionary[word] / self.total_count
    
    def get_corrected(self, word):
        """ find the best match """
        return max(self.generate_candidates(word), key=self.calc_prob)


In [7]:
spellcheck = SpellChecker('data/wiki_data.txt')
spellcheck.create_dictionary()

In [8]:
spellcheck.get_corrected('сонлце')

'солнце'

In [9]:
class SpellcheckEvaluator():
    
    """ a class to check how accurate is a given spell checker """
    def __init__(self):
        self.correct = 0
        self.total = 0
        self.total_mistaken = 0
        self.mistaken_fixed = 0
        self.total_correct = 0
        self.correct_broken = 0
        self.cashe = {}
        
    def evaluate(self, pairs, spellchecker):
        """ updates metrics to evaluate a spellchecker on given word pairs """
        
        for pair in pairs:
            # cashe results
            predicted = self.cashe.get(pair[1], spellchecker.get_corrected(pair[1]))
            self.cashe[pair[1]] = predicted

            if predicted == pair[0]:
                self.correct += 1
            self.total += 1

            if pair[0] == pair[1]:
                self.total_correct += 1
                if pair[0] !=  predicted:
                    self.correct_broken += 1
            else:
                self.total_mistaken += 1
                if pair[0] == predicted:
                    self.mistaken_fixed += 1

In [10]:
evaluator = SpellcheckEvaluator()
evaluator.evaluate(aligner.aligned_tokens, spellcheck)

Для оценки используем будем использовать три метрики:  
1) процент правильных слов;  
2) процент исправленных ошибок  
3) процент ошибочно исправленных правильных слов

In [11]:
print(evaluator.correct/evaluator.total)
print(evaluator.mistaken_fixed/evaluator.total_mistaken)
print(evaluator.correct_broken/evaluator.total_correct)

0.8496105452366687
0.18827838827838828
0.04601688056422708


# Task 2
2. Добавьте к полученному алгоритму исправления (Symspell) триграммную модель и проверьте, улучшает ли она качество. Триграммную модель нужно вставить туда, где у вас выбирается один из нескольких кандидатов на исправление.

In [12]:
from collections import Counter
import numpy as np

class NgramModel():
    """
        a class to generate text based on a list tokens split by sentences
        input: [['tokens', 'of', 'a', 'single', 'sentence']]
        
        !!! modify to work on any ngram >= 2. Now it does not suit any to the full extent !!!
    """
    def __init__(self, tokens, ngram_length=2):
        if ngram_length < 2: # for bigrams and larger series only
            return(ValueError)
        self.tokens = tokens
        self.ngram_length = ngram_length
        self.ngrams = [] # [0] (n-1)grams [1] ngrams
        self.id2unigram = []
        self.unigram2id = {}
        self.id2ngram = []
        self.ngram2id = {}
        self.matrix = np.empty(0)
        
     
    def generate_ngrams(self):
        """ generate ngrams from sentences """
        self.generate_counters() # generate enough counters for ngrams
        
        for sentence in self.tokens:
            self.ngrams[0].update(sentence)
            self.ngrams[1].update(
                self.ngrammer(sentence, (self.ngram_length-1)) # gerenate ngrams of n-1 lenght
                ) 
            self.ngrams[2].update(
                self.ngrammer(sentence, self.ngram_length) # generate ngrams
                )

    def generate_counters(self):
        """
            generates a number of ngrams Counter() to store in self.ngrams
        """
        [self.ngrams.append(Counter()) for i in range(3)]
        
    def ngrammer(self, tokens, n=2) -> list:
        """ return ngrams """
        ngrams = []
        for i in range(0,len(tokens)-n+1):
            ngrams.append(' '.join(tokens[i:i+n]))
        return(ngrams)
    
    def build_matrix(self):
        self.matrix = np.zeros(
            (len(self.ngrams[1]), # rows with bigrams
             len(self.ngrams[0]) # columns with unigrams
            ))
        
        self.id2unigram = list(self.ngrams[0]) # index unigrams
        self.unigram2id = {unigram:i for i, unigram in enumerate(self.id2unigram)}
        self.id2ngram = list(self.ngrams[1]) # index (n-1)grams
        self.ngram2id = {ngram:i for i, ngram in enumerate(self.id2ngram)}
        
        
        for ngram in self.ngrams[-1]:
            tmp = ngram.split()
            prev = " ".join(tmp[:-1]) # save ngram without last token
            self.matrix[self.ngram2id[prev]][self.unigram2id[tmp[-1]]] = (self.ngrams[-1][ngram]/
                                                                            self.ngrams[-2][prev])
    

In [13]:
corpus = Preprocessor('data/wiki_data.txt')
corpus.extractContent()
corpus.get_tokens_by_sent(2)

In [14]:
model = NgramModel(corpus.tokens_by_sent[0:650], 3)
model.generate_ngrams()
model.build_matrix()

In [15]:
class SpellcheckEvaluator_2():
    
    """ a class to check how accurate is a given spell checker """
    def __init__(self):
        self.correct = 0
        self.total = 0
        self.total_mistaken = 0
        self.mistaken_fixed = 0
        self.total_correct = 0
        self.correct_broken = 0
        self.cashe = {}
        self.context = '<start> <start>'
        
    def evaluate(self, pairs, spellchecker, model): # add a model to predict words
        """ updates metrics to evaluate a spellchecker on given word pairs """
        
        for i, pair in enumerate(pairs):
            predicted = self.cashe.get(pair[1], self.get_corrected(pair[1],
                                                                   model, spellchecker))
            self.cashe[pair[1]] = predicted
            
            if predicted == pair[0]:
                self.correct += 1
            self.total += 1

            if pair[0] == pair[1]:
                self.total_correct += 1
                if pair[0] !=  predicted:
                    self.correct_broken += 1
            else:
                self.total_mistaken += 1
                if pair[0] == predicted:
                    self.mistaken_fixed += 1
            
        self.context = self.context.split()[-1] + " " + predicted
                                       
    def get_corrected(self, word, model, spellchecker) -> str:
        """ chooses the best option out of generated candidates """
        candidates = spellchecker.generate_candidates(word)
        weights = {c:spellchecker.calc_prob(c) for c in candidates}
        
        for c in candidates:
            if c not in model.ngrams[0]:
                continue
            
            weight = model.matrix[model.ngram2id[self.context]][model.unigram2id[c]]
            if weight > 0:
                weights[c] *= weight
            
            else:
                continue
                
        return sorted(candidates, key=weights.__getitem__)[0]
                                       

In [16]:
evaluator = SpellcheckEvaluator_2()
evaluator.evaluate(aligner.aligned_tokens, spellcheck, model)

In [17]:
print(evaluator.correct/evaluator.total)
print(evaluator.mistaken_fixed/evaluator.total_mistaken)
print(evaluator.correct_broken/evaluator.total_correct)

0.841521869382864
0.12893772893772895
0.04601688056422708
