https://github.com/kpu/kenlm

* языковая модель
* лексикон
* модель ошибок

In [88]:
import kenlm
import os
import re
import fuzzyset
import nltk
from spell_checker import find_spell_errors
from nltk.tokenize import WordPunctTokenizer

latin_model = kenlm.Model('../../kenlm/lm/latin_3gram.arpa')
sent_tokenize = nltk.data.load('tokenizers/punkt/english.pickle')
word_tokenize = WordPunctTokenizer()

# Noisy Channel

In [87]:
def words_list(path_ref, path_error, even):
    """
    return list (word, word with error)
    """
    lat_sent = fuzzyset.FuzzySet()
    for book in os.listdir(path_ref):
        with open(path_ref + book, "r") as f:
            for s in sent_tokenize.tokenize(f.read().lower()):
                lat_sent.add(s)
    
    text = ""
    for pages in sorted(os.listdir(path_error), key = lambda x: int(x[:-4])):
        if even:
            if int(pages[:-4]) % 2 == 0:
                with open(path_error + pages, 'r') as f:
                    text += re.sub("\n", " ", re.sub("-\n", "", f.read().lower()))
        else:
            if int(pages[:-4]) % 2 != 0:
                with open(path_error + pages, 'r') as f:
                    text += re.sub("\n", " ", re.sub("-\n", "", f.read().lower()))
    latin_OCR = sent_tokenize.tokenize(text)
    print("text error done", len(latin_OCR))
    
    align_sent = []
    for lat_ocr in latin_OCR:
        try:
            if lat_sent.get(lat_ocr)[0][0] < 1 and lat_sent.get(lat_ocr)[0][0] > 0.7:
                align_sent.append((lat_ocr, lat_sent.get(lat_ocr)[0][1]))
        except:
            pass
    print("align sent done", len(align_sent))
    
    words = []
    for s_ocr, s_ref in align_sent:
        f_ref = fuzzyset.FuzzySet(word_tokenize.tokenize(s_ref))
        for w in word_tokenize.tokenize(s_ocr):
            if w not in ".';/,-?!":
                try:
                    if f_ref.get(w)[0][0] > 0.7 and f_ref.get(w)[0][1] != w:
                        words.append((f_ref.get(w)[0][1], w))
                except:
                    pass
                
    print("words done", len(words))
    return words

In [75]:
words_1 = words_list("./spell_checker_data/edit_text/GaiusJuliusCaesar/", "./data_text_new/L072/", True)

text error done 2867
align sent done 1333
words done


In [78]:
words_2 = words_list("./spell_checker_data/edit_text/lucan/", "./data_text_new/L220/", False)

text error done 3989
align sent done 1876
words done


In [None]:
print("Words ", len(words_1) + len(words_2))

## Lexicon

In [78]:
def make_lexicon(text):
    lexicon = list(set(text))
    with open("./spell_checker_data/noisy_channel_lexicon.txt", "w") as f:
        f.write("\n".join(lexicon))
    print("lexicon done")

with open("./corpora_lat.txt", "r") as f:
    corpora_lat = f.read().lower()

corpora_lat = word_tokenize.tokenize(corpora_lat)
make_lexicon(corpora_lat)

lexicon done


In [79]:
print("latin words in corpora", len(set(corpora_lat)))

latin words in corpora 525249


In [85]:
with open("./spell_checker_data/noisy_channel_lexicon.txt", "r") as f:
     lexicon = f.read().split()

525249

## Error model

In [None]:
def error_model(words):
    substition = {}; insertion = {}; deletion = {}
    for w_cor, w_error in words:
        sub, ins, dl = find_spell_errors(w_cor, w_error)
        for s in sub.keys():
            if s not in substition.keys():
                substition[s] = sub[s]
            else:
                substition[s] += sub[s]

        for i in ins.keys():
            if i not in insertion.keys():
                insertion[i] = ins[i]
            else:
                insertion[i] += ins[i]

        for d in dl.keys():
            if d not in deletion.keys():
                deletion[d] = dl[d]
            else:
                deletion[d] += dl[d]
    
    with open("./spell_checker_data/substitions.txt", "w") as f:
        f.write("\n".join([s[0] + " " + s[1] + " " + str(self.substition[s]) for s in self.substition.keys()]))

    with open("./spell_checker_data/insertion.txt", "w") as f:
        f.write("\n".join([s[0] + " " + s[1] + " " + str(self.insertion[s]) for s in self.insertion.keys()]))

    with open("./spell_checker_data/deletion.txt", "w") as f:
        f.write("\n".join([s[0] + " " + s[1] + " " + str(self.deletion[s]) for s in self.deletion.keys()]))
    
    print("confusion matrix done")

* заменить на additive smoothing (???)

In [104]:
import pybktree
import editdistance
import operator
from math import log

class noisy_channel:
    def __init__(self, lexicon):
        self.lexicon_tree = pybktree.BKTree(editdistance.eval, lexicon)
        print("lexicon_tree done")
            
        with open("./spell_checker_data/substitions.txt", "r") as f:
                self.substition = {(i.split()[0], i.split()[1]): int(i.split()[2]) for i in f.read().split("\n")}
            
        with open("./spell_checker_data/insertion.txt", "r") as f:
            self.insertion = {(i.split()[0], i.split()[1]): int(i.split()[2]) for i in f.read().split("\n")}
            
        with open("./spell_checker_data/deletion.txt", "r") as f:
            self.deletion = {(i.split()[0], i.split()[1]): int(i.split()[2]) for i in f.read().split("\n")}
            
        n_sub = sum(self.substition.values())
        n_ins = sum(self.insertion.values())
        n_del = sum(self.deletion.values())
            
        self.substition = {s: -log(self.substition[s] / (n_sub + n_ins + n_del), 10) for s in self.substition.keys()}
        self.insertion = {s: -log(self.insertion[s] / (n_sub + n_ins + n_del), 10) for s in self.insertion.keys()}
        self.deletion = {s: -log(self.deletion[s] / (n_sub + n_ins + n_del), 10) for s in self.deletion.keys()}

        self.sub_none = max(self.substition.values())
        self.ins_none = max(self.insertion.values())
        self.del_none = max(self.deletion.values())
        print("confusion matrix done")
        
    def predict_score(self, word, contex, lang_model):
        candidats = self.lexicon_tree.find(word, 2)
        candidats_weighted = {}
        for candidat in candidats:
            sub, ins, dl = find_spell_errors(candidat[1], word)
            sub_sum = sum([self.substition[s] * candidat[0] if s in self.substition.keys() else self.sub_none * candidat[0] for s in sub.keys()])
            ins_sum = sum([self.insertion[s] * candidat[0]  if s in self.insertion.keys() else self.ins_none * candidat[0] for s in ins.keys()])
            del_sum = sum([self.deletion[s] * candidat[0] if s in self.deletion.keys() else self.del_none * candidat[0] for s in dl.keys()])
            levenshtein_weighted = sub_sum + ins_sum + del_sum
            сond_prob = lang_model.score(contex + " " + candidat[1]) - lang_model.score(contex) #logP(c|a,b) = logP(a,b,c)-logP(a,b). kenLM
            candidats_weighted[candidat] = levenshtein_weighted - сond_prob
        
        if candidats_weighted:
            return min(candidats_weighted.items(), key = lambda kv: kv[1])
        return None
    
    def fix_fragment(self, sentence, lang_model):
        tokenize = word_tokenize.tokenize(sentence)
        correct_sentence = []
        w1 = self.predict_score(tokenize[0], "", lang_model)
        if w1:
            correct_sentence.append(w1[0][1])
        else:
            correct_sentence.append(tokenize[0])
        
        w2 = self.predict_score(tokenize[1], tokenize[0], lang_model)
        if w2:
            correct_sentence.append(w2[0][1])
        else:
            correct_sentence.append(tokenize[1])
        
        for w1, w2, w3 in nltk.trigrams(tokenize):
            smth = self.predict_score(w3, w1 + " " + w2, lang_model)
            if smth:
                correct_sentence.append(smth[0][1])
            else:
                correct_sentence.append(w3)
        return ' '.join(correct_sentence)

In [105]:
model = noisy_channel(lexicon, load = True)

lexicon_tree done
confusion matrix done


## Jamspell

In [29]:
import jamspell
from cer_and_wer import CER, WER

In [32]:
latin_corrector = jamspell.TSpellCorrector()
latin_corrector.LoadLangModel("./model_lat.bin")

True

In [115]:
import re
def test_spellchecker(path_ref, path_error):
    text_ref = ""
    for book in sorted(os.listdir(path_ref)):
        with open(path_ref + book, "r") as f:
            text_ref += re.sub("\n", "", f.read().lower())
    
    text_OCR = ""
    for pages in sorted(os.listdir(path_error), key = lambda x: int(x[:-4])):
        if int(pages[:-4]) % 2 == 0:
            with open(path_error + pages, 'r') as f:
                text_OCR += re.sub("\n", " ", re.sub("-\n", "", f.read().lower()))
    
    return text_ref, text_OCR

In [116]:
text_ref, text_OCR = test_spellchecker("./spell_checker_data/edit_text/GaiusJuliusCaesar/", "./data_text_new/L072/")

In [5]:
import lang_detector
detector = lang_detector.language_detector()

latin done
english done


In [48]:
def fix_one_book(path, source, name_book, latin_corrector, detector):
    """
    fix one book with Jamspell
    """
    os.mkdir(path + name_book + '/')
    os.mkdir(path + name_book + '/' + "eng/")
    os.mkdir(path + name_book + '/' + "lat/")
    
    all_pages = os.listdir(source)
    all_pages.sort(key = lambda s1: int(s1[:-4]))
    
    with open(source + all_pages[69], "r") as f:
        d = detector.predict(re.sub("\n", " ", re.sub("-\n", "", f.read().lower())))
    
    for page in all_pages:
        with open(source + page, "r") as r_file:
            text = re.sub("\n", " ", re.sub("-\n", "", r_file.read().lower()))
            if d:
                if int(page[:-4]) % 2 == 0:
                    with open(path + name_book + '/' + "lat/" + page, "w") as w_file:
                        w_file.write(' '.join([latin_corrector.FixFragment(s) for s in sent_tokenize.tokenize(text)]))
                else:
                    with open(path + name_book + '/' + "eng/" + page, "w") as w_file:
                        w_file.write(text)
            else:
                if int(page[:-4]) % 2 == 0:
                    with open(path + name_book + '/' + "eng/" + page, "w") as w_file:
                        w_file.write(text)
                else:
                    with open(path + name_book + '/' + "lat/" + page, "w") as w_file:
                        w_file.write(' '.join([latin_corrector.FixFragment(s) for s in sent_tokenize.tokenize(text)]))

In [124]:
%%time
edit_test_2 = ' '.join(word_tokenize.tokenize(text_OCR)[:5000])
edit_test_2 = [model.fix_fragment(s, latin_model) for s in sent_tokenize.tokenize(edit_test_2)]

CPU times: user 22min 22s, sys: 6.13 s, total: 22min 28s
Wall time: 24min 59s


In [119]:
edit_test_1 = [latin_corrector.FixFragment(s) for s in sent_tokenize.tokenize(text_OCR)]
edit_test_1 = word_tokenize.tokenize(" ".join(edit_test_1))
print("WER OCR (Jamspell) vs ref", WER(edit_test_1[:5000], word_tokenize.tokenize(text_ref)[:5000]))

WER OCR (Jamspell) vs ref ([195, 192, 193], 0.1162)


In [120]:
edit_test_2 = ' '.join(word_tokenize.tokenize(text_OCR)[:5000])
edit_test_2 = [model.fix_fragment(s, latin_model) for s in sent_tokenize.tokenize(edit_test_2)]
edit_test_2 = word_tokenize.tokenize(" ".join(edit_test_2))
print("WER OCR (noisy_model) vs ref", WER(edit_test_2, word_tokenize.tokenize(text_ref)[:5000]))

WER OCR (noisy_model) vs ref ([170, 192, 193], 0.1112)


In [134]:
class model():
    def __init__(self, corpora):
        OOV_approximation = 0
        
        self.tri_model = defaultdict(lambda: defaultdict(lambda: 0))
        self.bi_model = defaultdict(lambda: defaultdict(lambda: 0))
        self.un_model = defaultdict(lambda: OOV_approximation)
        
        for s in corpora:
            for w1, w2, w3 in trigrams(s.split()):
                self.tri_model[(w1, w2)][w3] += 1
            for w1, w2 in bigrams(s.split()):
                self.bi_model[w1][w2] += 1
            for w1 in s.split():
                self.un_model[w1] += 1
        
        for pair in self.tri_model: # p(w_i | w_i-1, w_i-2)
            total_count = float(sum(self.tri_model[pair].values()))
            for w3 in self.tri_model[pair]:
                self.tri_model[pair][w3] /= total_count
        
        alpha_1 = 1
        
        for w2 in self.bi_model:
            total_count = float(sum(self.bi_model[w2].values())) * alpha_1
            for w1 in self.bi_model[w2]:
                self.bi_model[w2][w1] /= total_count
        
        alpha_2 = 1
        for w1 in self.un_model:
            self.un_model[w1] = float(self.un_model[w1] * alpha_1 * alpha_2) / len(self.un_model)
    
    def predict_score(self, sentence):
        s = sentence.split()
        n = float(len(s))
        
        final_sum = 0
        for i in range(2, int(n)):
            if self.tri_model[(s[i - 2], s[i - 1])][s[i]] != 0:
                final_sum += math.log(self.tri_model[(s[i - 2], s[i - 1])][s[i]], 2)
            elif self.bi_model[s[i - 1]][s[i]] != 0:
                final_sum += math.log(self.bi_model[s[i - 1]][s[i]], 2)
            else:
                final_sum += math.log(self.un_model[s[i]], 2)
            
        return (1 / n) * (final_sum + self.bi_model[s[1]][s[0]] + self.un_model[s[0]])