In [1]:
!pip install textdistance -q

In [2]:
import re
import textdistance
from tqdm import tqdm
from string import punctuation
from collections import Counter

from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity, cosine_distances

In [3]:
punctuation += "«»—…“”"

# Домашнее задание № 3. Исправление опечаток

## 1. Доп. ранжирование по вероятности (3 балла)

Дополните get_closest_hybrid_match в семинаре так, чтобы из кандадатов с одинаковым расстоянием редактирования выбиралось наиболее вероятное.

In [8]:
corpus = open('wiki_data.txt', encoding='utf8').read()
true = open('correct_sents.txt', encoding='utf8').read().splitlines()
bad = open('sents_with_mistakes.txt', encoding='utf8').read().splitlines()

In [5]:
def align_words(sent_1, sent_2):
    tokens_1 = sent_1.lower().split()
    tokens_2 = sent_2.lower().split()

    tokens_1 = [token.strip(punctuation) for token in tokens_1]
    tokens_2 = [token.strip(punctuation) for token in tokens_2]

    tokens_1 = [token for token in tokens_1 if token]
    tokens_2 = [token for token in tokens_2 if token]

    assert len(tokens_1) == len(tokens_2)

    return list(zip(tokens_1, tokens_2))

In [9]:
vocab = Counter(re.findall(r'\w+', corpus.lower()))

word2id = list(vocab.keys())
id2word = {i:word for i, word in enumerate(vocab)}

vec = CountVectorizer(analyzer='char', max_features=10000, ngram_range=(1,3))
X = vec.fit_transform(vocab)

In [6]:
def get_closest_match_vec(text, X, vec, topn=20):
    v = vec.transform([text])

    similarities = cosine_distances(v, X)[0]
    topn = similarities.argsort()[:topn]

    return [(id2word[top], similarities[top]) for top in topn]

In [10]:
def get_closest_match_with_metric(text, lookup,topn=20, metric=textdistance.levenshtein):
    # Counter можно использовать и с не целыми числами
    similarities = Counter()

    for word in lookup:
        similarities[word] = metric.normalized_similarity(text, word)

    return similarities.most_common(topn)

def get_closest_hybrid_match(text, X, vec, topn=3, metric=textdistance.damerau_levenshtein):
    candidates = get_closest_match_vec(text, X, vec, topn*4)
    lookup = [cand[0] for cand in candidates]
    closest = get_closest_match_with_metric(text, lookup, topn, metric=metric)

    return closest

N = sum(vocab.values())

def P(word, N=N):
    return vocab[word] / N

def predict_mistaken(word, vocab):
    return 0 if word in vocab else 1

In [57]:
# новая функция
def get_closest_hybrid_match(text, X, vec, topn=3, metric=textdistance.damerau_levenshtein):
    if text in vocab:
        return text

    candidates = get_closest_match_vec(text, X, vec, topn*4)
    lookup = [cand[0] for cand in candidates]
    closest = get_closest_match_with_metric(text, lookup, topn, metric=metric)

    max_metr = max([word[1] for word in closest])

    new_closest = {}
    for word, metr in closest:
        if metr == max_metr:
            new_closest[word] = P(word)

    max_word = None
    max_value = float('-inf')
    for word, value in new_closest.items():
        if value > max_value:
            max_value = value
            max_word = word

    return max_word

In [58]:
mistakes = []
total_mistaken = 0
mistaken_fixed = 0

total_correct = 0
correct_broken = 0

total = 0
correct = 0

cashed = {}
for i in tqdm(range(len(true))):
    word_pairs = align_words(true[i], bad[i])
    for pair in word_pairs:
        if predict_mistaken(pair[1], vocab):
            pred = cashed.get(pair[1], get_closest_hybrid_match(pair[1], X, vec)[0][0])
            cashed[pair[1]] = pred
        else:
            pred = pair[1]

        if pred == pair[0]:
            correct += 1
        else:
            mistakes.append((pair[0], pair[1], pred))
        total += 1

        if pair[0] == pair[1]:
            total_correct += 1
            if pair[0] != pred:
                correct_broken += 1
        else:
            total_mistaken += 1
            if pair[0] == pred:
                mistaken_fixed += 1

100%|██████████| 915/915 [18:11<00:00,  1.19s/it]


In [59]:
print(correct/total)
print(mistaken_fixed/total_mistaken)
print(correct_broken/total_correct)

0.7928964482241121
0.0015527950310559005
0.09004249454461927


## 2.  Symspell (7 баллов)

Реализуйте алгоритм Symspell. Он похож на алгоритм Норвига, но проще и быстрее. Он основан только на одной операции - удалении символа. Описание алгоритма по шагам:

1) Составляется словарь правильных слов  
2) На основе словаря правильных слов составляется словарь удалений - для каждого правильного слова создаются все варианты удалений и создается словарь, где ключ - слово с удалением, а значение - правильное слово  (!)
3) Для выбора исправления для слова с опечаткой генерируются все варианты удаления, из них выбираются те, что есть в словаре удалений, построенного на шаге 2. Слово с опечаткой заменяется на правильное слово, соответствующее варианту удаления  
4) Если в словаре удалений есть несколько вариантов, то выбирается удаление, которому соответствует наиболее вероятное правильное слово  


Оцените качество полученного алгоритма теми же тремя метриками.

In [37]:
def get_deletion_set(word):
    deletion_set = set()
    deleted_words = []
    for i in range(len(word) + 1):
        deleted_words.append((word[:i], word[i:]))
        for left, right in deleted_words:
            if right:
                deletion_set.add(left + right[1:])
    return deletion_set

deletion_dict = {}

sorted_words = sorted(vocab.keys(), key=vocab.get)

for current_word in sorted_words:
    for deleted_word in get_deletion_set(current_word):
        deletion_dict[deleted_word] = deleted_word
    deletion_dict[current_word] = current_word

deletion_set = set(deletion_dict)

In [52]:
def correct_word(word):
    if word in vocab:
        return word

    del_set = get_deletion_set(word)
    deletion_union = del_set.union({word})

    deletion_intersection = tuple(deletion_union.intersection(deletion_set))

    intersection_len = len(deletion_intersection)
    if intersection_len == 0:
        return word
    elif intersection_len == 1:
        return deletion_dict[deletion_intersection[0]]
    else:
        max_value = None
        max_key = None

        for word in deletion_intersection:
            value = deletion_dict[word]
            if max_value is None or P(value) > P(max_value):
                max_value = value
                max_key = word

        return max_key

In [55]:
mistakes = []
total_mistaken = 0
mistaken_fixed = 0

total_correct = 0
correct_broken = 0

total = 0
correct = 0

cashed = {}
for i in tqdm(range(len(true))):
    word_pairs = align_words(true[i], bad[i])
    for pair in word_pairs:
        if predict_mistaken(pair[1], vocab):
            pred = cashed.get(pair[1], correct_word(pair[1]))
            cashed[pair[1]] = pred
        else:
            pred = pair[1]

        if pred == pair[0]:
            correct += 1
        else:
            mistakes.append((pair[0], pair[1], pred))
        total += 1

        if pair[0] == pair[1]:
            total_correct += 1
            if pair[0] != pred:
                correct_broken += 1
        else:
            total_mistaken += 1
            if pair[0] == pred:
                mistaken_fixed += 1

100%|██████████| 915/915 [00:00<00:00, 9754.49it/s]


In [56]:
print(correct/total)
print(mistaken_fixed/total_mistaken)
print(correct_broken/total_correct)

0.8456228114057028
0.13043478260869565
0.04858160101068106
