In [1]:
from tqdm import tqdm
import re
import numpy as np
from collections import Counter
from itertools import product
import pickle
import gzip
from nltk.tokenize import sent_tokenize

In [2]:
class Lychee():
    def __init__(self, 
                 mode = 'cor',
                 lm_order = 3, 
                 lm_counts = 'default', 
                 c_dict = 'default',
                 check_all=False, 
                 lm_lambdas = [0.2, 0.3, 0.5], 
                 threshold=4):
        self.mode = mode
        self.threshold = threshold
        self.lm = self.LanguageModel(order = lm_order, lambdas = lm_lambdas, counts = lm_counts)
        self.c = self.Candidator(self.lm.counts['n1'] if c_dict == 'default' else c_dict, 
                                 check_all = check_all)
        
    def check_pattern(self, pattern):
        '''генерирует все варианты для полученной строки на основе комбинаций кандидатов
        и проверяет их с помощью модели языка, выбирает наилучший вариант'''
        cand_list = self.c.candidates(pattern)
        cand_probs = sorted([(cand, self.lm.prob(cand, log=True)) for cand in cand_list], 
                key=lambda x:x[1], reverse=True)
        if self.mode == 'cor':
            return cand_probs[0][0]
        elif self.mode == 'prob':
            return cand_probs[:10]   

        
    def check(self, text):   
        corrected_text = []
        for sent in sent_tokenize(text):
            corrected_sent = []
            tmp = []
            words = Lychee.cleanse(sent).split()
            for i, word in enumerate(words):
                if len(tmp) < self.threshold-1 and i != len(words)-1:
                    tmp.append(word)
                else: 
                    tmp.append(word)
                    corrected_sent.append(self.check_pattern(' '.join(tmp)))
                    tmp = []
            corrected_text.append(' '.join(corrected_sent))
        return corrected_text
                    
    
    class LanguageModel():
        def __init__(self, order, lambdas, counts=None):
            self.order = order
            self.lambdas = lambdas
            if counts == 'default':
                print('... please wait ... loading model ...')
                self.counts = pickle.load(gzip.open('default.pkl.gz'))
            else: self.counts = counts

        def product(self, nums):
            "возвращает произведение чисел"
            result = 1
            for x in nums: result *= x
            return result

        def get_ngrams(self, tokens, n):
            '''группирует на н-граммы список токенов (слов)'''
            return [' '.join(tokens[i:i+n]) for i, token in enumerate(tokens)]

        def get_counts(self, corpus, order):  
            '''создает набор частотных словарей порядка от 0 до заданного 
            на основе списка токенов'''
            counts = {'n' + str(i) : Counter(self.get_ngrams(corpus, n=i)) \
                      for i in range(1, order+1)}
            counts['n0'] = {'':len(corpus)}
            return counts

        def get_prob(self, counts, word, context=''):
            '''возвращает вероятность токена (слова или нграммы)'''
            order = len(context.split())+1
            separator = ' ' if order > 1 else ''
            return (counts['n'+str(order)][separator.join([context, word])] + 1) / \
                   (counts['n'+str(order-1)][context] + len(counts['n'+str(order)]))

        def get_logprob(self, counts, word, context=''):
            '''возвращает логарифмическую вероятность токена'''
            return np.log(self.get_prob(counts, word, context))

        def get_following(self, counts, context):
            '''возвращает список слов, следующих за контекстом, с частотами и вероятностями'''
            order = len(context.split())+1
            return sorted(
                [(k.split()[-1], v, self.get_prob(counts, k.split()[-1], context)) \
                for k, v in counts['n'+str(order)].items()                         \
                if re.match(context+' '+'\w+', k)],                                \
                key=lambda x:x[1], reverse=True)   

        def get_string_probs(self, counts, string, order, log=True):
            '''возвращает вероятность набора слов любой длины'''
            prob_fun = self.get_logprob if log else self.get_prob
            tokens = string.split()
            probs = []
            for i in range(len(tokens)):
                context = ' '.join(tokens[i-order+1:i]) if i>=order else ' '.join(tokens[:i])
                prob = prob_fun(counts, word = tokens[i], context = context)
                probs.append(prob)
            return probs

        def interpolate(self, counts, string, order, log=True, lambdas='default'):
            '''возвращает интерполированную вероятность набора слов любой длины,
            комбинируя вероятности, расчитыванные на нграммах разной длины,
            с весами из аргумента lambdas'''
            lmbd = self.lambdas if lambdas == 'default' else lambdas
            aggregate = sum if log else self.product
            probs = [self.get_string_probs(counts, string, order=i, log=log) \
                     for i in range(1, order+1)]
            probs_interpolated = []
            for tup in zip(*probs):
                prob_token = 0
                for i in range(len(tup)):
                    prob_token += tup[i] * lmbd[i]
                probs_interpolated.append(prob_token)
            return aggregate(probs_interpolated)

        def fit(self, corpus):
            '''расчет и сохранение модели'''
            self.counts = self.get_counts(corpus, self.order)

        def prob(self, string, log=False):
            '''возвращает конечную (интерполированную) вероятность 
            для произвольного работа слов любой длины, "публичный" метод'''
            return self.interpolate(self.counts, string, self.order, log=log)

        def context_prob(self, word, context='', log=False):
            '''возвращает вероятность того, что данное слово следует за данным контекстом'''
            prob_fun = self.get_logprob if log else self.get_prob
            c = context.split()
            history = ' '.join(c) if len(c) < self.order else ' '.join(c[-self.order+1:])
            return prob_fun(self.counts, word, history)  

        def following(self, context):
            '''возвращает список слов, следующих за контекстом, 
            с частотами и вероятностями, "публичный" метод'''
            c = context.split()
            history = ' '.join(c) if len(c) < self.order else ' '.join(c[-self.order+1:])
            return self.get_following(self.counts, history)

    class Candidator():
        def __init__(self, dictionary, abc='йцукенгшщзхъфывапролджэячсмитьбю-', check_all=False):
            self.abc = abc
            self.dictionary = set(dictionary)
            self.check_all = check_all

        def edits(self, word): # mostly from norvig, modified for faster dict search
            '''возвращает множество кандидатов (редакций) для отдельного слова,
            сохраняются только те редакции, которые есть в словаре'''
            letters = self.abc
            d = self.dictionary
            splits = [(word[:i], word[i:]) for i in range(len(word) + 1)]
            deletes = [ed for ed in [L + R[1:] for L, R in splits if R] if ed in d]
            transposes = [ed for ed in [L + R[1] + R[0] + R[2:] for L, R in splits if len(R)>1] \
                          if ed in d]
            replaces = [ed for ed in [L + c + R[1:] for L, R in splits if R for c in letters] \
                        if ed in d]
            inserts = [ed for ed in [L + c + R for L, R in splits for c in letters] if ed in d]
            return set(deletes + transposes + replaces + inserts + [word])

        def get_sentences(self, text):
            '''делит текст на предложения'''
            return text.lower().split('. ')

        def word_candidates(self, sent):
            '''для всех или только для неизвестных слов в предложении 
            возвращает список кандидатов'''
            if self.check_all: return [self.edits(word) for word in sent.lower().split()]
            else: return [self.edits(word) if (word not in self.dictionary) else {word}\
                        for word in sent.lower().split()]

        def sent_candidates(self, sent):
            '''генерирует множество кандидатов для всего предложения'''
            return {candidate for candidate in product(*self.word_candidates(sent))}

        def candidates(self, sent):
            '''возвращает множество кандидатов-предложений в виде текста'''
            return {' '.join(candidate) for candidate in self.sent_candidates(sent)}

    class Ranker():
        def __init__(self, lang_model, candidator):
            self.lm = lang_model
            self.c = candidator
            
    def cleanse(s):
        rgxp = '[\`\)\(\|©~^<>/\'\"\«\»№#$&\*.,;=+?!\—_@:\]\[%\{\}\\n]'
        return re.sub(' +', ' ', re.sub(rgxp, ' ', s.lower()))
    

In [3]:
%%time
lychee = Lychee()

... please wait ... loading model ...
Wall time: 10.9 s


In [5]:
%%time
text = '. '.join(['вкучный свуп c мэсом', 
          'гжегож бженчишчикевич', 
          'влодя дуурак', 
          'оппечатка в злове упечатка не опечатко', 
          'ммоя програма работаит паросто дур-дом'])
lychee.check(text)

Wall time: 3.67 s


['вкусный суп с мясом',
 'гжегож бженчишчикевич',
 'володя дурак',
 'опечатка в слове опечатка не опечатка',
 'моя программа работает просто дурдом']

In [6]:
with open('training.txt', encoding='utf-8') as f:
    pairs = [[s for s in pair.split('\n')] for pair in f.read().split('\n\n')]

wrong = [pair[0] for pair in pairs]
correct = [pair[1].lower() for pair in pairs]

In [7]:
%%time
lycheed = [lychee.check(sent) for sent in tqdm(wrong)]
total = 0
hooray = 0
for pair in zip(lycheed, correct):
    total += 1
    if pair[0] == [pair[1]]:
        hooray += 1     
print('{} из {}({:.1%}) предложений полностью исправлены'.format(hooray, total, hooray/total))

100%|█████████████████████████████████████████████████████████████████████████████| 3000/3000 [00:07<00:00, 382.34it/s]


1251 из 3000(41.7%) предложений полностью исправлены
Wall time: 7.88 s
