In [77]:
#############################################################################
### Търсене и извличане на информация. Приложение на дълбоко машинно обучение
### Стоян Михов
### Зимен семестър 2021/2022
#############################################################################

### Домашно задание 1
###
### Преди да се стартира програмата е необходимо да се активира съответното обкръжение с командата:
### conda activate tii
###
### Ако все още нямате създадено обкръжение прочетете файла README.txt за инструкции


########################################
import numpy as np
import random

alphabet = ['а', 'б', 'в', 'г', 'д', 'е', 'ж', 'з', 'и', 'й', 'к', 'л', 'м', 'н', 'о', 'п', 'р', 'с', 'т', 'у', 'ф', 'х', 'ц', 'ч', 'ш', 'щ', 'ъ', 'ь', 'ю', 'я']

def extractDictionary(corpus):
    dictionary = set()
    for doc in corpus:
        for w in doc:
            if w not in dictionary: dictionary.add(w)
    return dictionary

def editDistance(s1, s2):
    #### функцията намира разстоянието на Левенщайн-Дамерау между два низа
    #### вход: низовете s1 и s2
    #### изход: минималният брой на елементарните операции ( вмъкване, изтриване, субституция и транспоциция на символи) необходими, за да се получи от единия низ другия

    #############################################################################
    #### Начало на Вашия код. На мястото на pass се очакват 10-25 реда

    distMatrix = np.zeros((len(s1)+1,len(s2)+1), dtype=np.uint32)
    distMatrix[0] = np.arange(len(s2)+1)
    distMatrix[:,0] = np.arange(len(s1)+1)

    for i in range(1,len(s1)+1):
        for j in range(1,len(s2)+1):
            cost = 0 if s1[i-1]==s2[j-1] else 1
            distMatrix[i, j] = min(distMatrix[i-1, j-1] + cost,
                                distMatrix[i, j-1] + 1,
                                distMatrix[i-1, j] + 1)
            if i>1 and j>1 and s1[i-1]==s2[j-2] and s1[i-2]==s2[j-1] and s1[i-1]!=s2[j-1]:
                distMatrix[i, j] = min(distMatrix[i, j],
                                distMatrix[i-2, j-2] + 1)
    return distMatrix[-1,-1]

    #### Край на Вашия код
    #############################################################################
    
def editOperations(s1, s2):
    #### функцията намира елементарни редакции, неободими за получаването на един низ от друг
    #### вход: низовете s1 и s2
    #### изход: списък с елементарните редакции ( идентитет, вмъкване, изтриване, субституция и транспоциция на символи) необходими, за да се получи втория низ от първия
    
    #### Например: editOperations('котка', 'октава') би следвало да връща списъка:
    ####    [('ко', 'ок'), ('т','т'), ('', 'а'), ('к', 'в'), ('а','а')]
    ####        |ко   |т |   |к  |а |
    ####        |ок   |т |а  |в  |а |
    ####        |Trans|Id|Ins|Sub|Id|
    ####
    #### Можете да преизползвате и модифицирате кода на функцията editDistance
    #############################################################################
    #### Начало на Вашия код.

    cellType = [('0', np.uint32, 3),('1','U2',2)]
    distMatrix = np.empty((len(s1)+1,len(s2)+1), dtype=cellType)
    distMatrix[0] = np.array([([x,0,max(x-1,0)],['',s2[x-1] if x>0 else '']) for x in range(len(s2)+1)], dtype=cellType)
    distMatrix[:,0] = np.array([([x,max(x-1,0),0],[s1[x-1] if x>0 else '','']) for x in range(len(s1)+1)], dtype=cellType)

    for i in range(1,len(s1)+1):
        for j in range(1,len(s2)+1):
            cost = 0 if s1[i-1]==s2[j-1] else 1
            distMatrix[i, j] = min(([distMatrix[i-1, j-1][0][0] + cost, i-1, j-1], [s1[i-1], s2[j-1]]),
                                ([distMatrix[i, j-1][0][0] + 1, i, j-1], ['', s2[j-1]]),
                                ([distMatrix[i-1, j][0][0] + 1, i-1, j], [s1[i-1], '']),
                                key = lambda x: x[0][0])
            if i>1 and j>1 and s1[i-1]==s2[j-2] and s1[i-2]==s2[j-1] and s1[i-1]!=s2[j-1]:
                distMatrix[i, j] = min(distMatrix[i, j],
                                ([distMatrix[i-2, j-2][0][0] + 1, i-2, j-2], [s1[i-2:i], s2[j-2:j]]),
                                key = lambda x: x[0][0])

    result = np.empty(0, dtype=[('0','U2'),('1','U2')])
    cell = [np.size(distMatrix,0)-1, np.size(distMatrix,1)-1]
    while any(cell):
        result = np.insert(result, 0, tuple(distMatrix[cell[0], cell[1]][1]))
        cell = distMatrix[cell[0],cell[1]][0][1:]
    return result.tolist()

    #### Край на Вашия код
    #############################################################################

def computeOperationProbs(corrected_corpus,uncorrected_corpus,smoothing = 0.2):
    #### Функцията computeOperationProbs изчислява теглата на дадени елементарни операции (редакции)
    #### Теглото зависи от конкретните символи. Използвайки корпусите, извлечете статистика. Използвайте принципа за максимално правдоподобие. Използвайте изглаждане. 
    #### Вход: Корпус без грешки, Корпус с грешки, параметър за изглаждане. С цел простота може да се счете, че j-тата дума в i-тото изречение на корпуса с грешки е на разстояние не повече от 2 (по Левенщайн-Дамерау) от  j-тата дума в i-тото изречение на корпуса без грешки.
    #### Следва да се използват функциите generateCandidates, editOperations, 
    #### Помислете как ще изберете кандидат за поправка измежду всички възможни.
    #### Важно! При изтриване и вмъкване се предполага, че празния низ е представен с ''
    #### Изход: Речник, който по зададена наредена двойка от низове връща теглото за операцията.
    
    #### Първоначално ще трябва да преброите и запишете в речника operations броя на редакциите от всеки вид нужни, за да се поправи корпуса с грешки. След това изчислете съответните вероятности.
    
    operations = {} # Брой срещания за всяка елементарна операция + изглаждане
    operationsProb = {} # Емпирична вероятност за всяка елементарна операция
    for c in alphabet:
        operations[(c,'')] = smoothing    # deletions
        operations[('',c)] = smoothing    # insertions
        for s in alphabet:
            operations[(c,s)] = smoothing    # substitution and identity
            if c == s:    
                continue
            operations[(c+s,s+c)] = smoothing    # transposition

    #############################################################################
    #### Начало на Вашия код.

    for i in range(len(corrected_corpus)):
        for j in range(len(corrected_corpus[i])):
            for op in editOperations(corrected_corpus[i][j], uncorrected_corpus[i][j]):
                if op in operations:
                    operations[op] += 1

    operationsCorpus = sum(int(o) for o in operations.values())
    for op,val in operations.items():
        operationsProb[op] = val/(smoothing*len(operations)+operationsCorpus)
    
    #### Край на Вашия код.
    #############################################################################
    return operationsProb

def operationWeight(a,b,operationProbs):
    #### Функцията operationWeight връща теглото на дадена елементарна операция
    #### Вход: Двата низа a,b, определящи операцията.
    ####       Речник с вероятностите на елементарните операции.
    #### Важно! При изтриване и вмъкване се предполага, че празния низ е представен с ''
    #### изход: Теглото за операцията
    
    if (a,b) in operationProbs.keys():
        return -np.log(operationProbs[(a,b)])
    else:
        print("Wrong parameters ({},{}) of operationWeight call encountered!".format(a,b))

def editWeight(s1, s2, operationProbs):
    #### функцията editWeight намира теглото между два низа
    #### За намиране на елеметарните тегла следва да се извиква функцията operationWeight
    #### вход: низовете s1 и s2 и речник с вероятностите на елементарните операции.
    #### изход: минималното тегло за подравняване, за да се получи втория низ от първия низ
    
    #############################################################################
    #### Начало на Вашия код. На мястото на pass се очакват 10-25 реда

    distMatrix = np.zeros((len(s1)+1,len(s2)+1))
    if np.size(distMatrix, 1) > 1:
        distMatrix[0, 1:] = np.array([operationWeight('', x, operationProbs) for x in s2])
        for i in range(2, np.size(distMatrix,1)):
            distMatrix[0, i] += distMatrix[0, i-1]
    if np.size(distMatrix, 0) > 1:
        distMatrix[1:, 0] = np.array([operationWeight(x, '', operationProbs) for x in s1])
        for i in range(2, np.size(distMatrix,0)):
            distMatrix[i, 0] += distMatrix[i-1, 0]

    for i in range(1,len(s1)+1):
        for j in range(1,len(s2)+1):
            distMatrix[i, j] = min(distMatrix[i-1, j-1] + operationWeight(s1[i-1], s2[j-1], operationProbs),
                                distMatrix[i, j-1] + operationWeight('', s2[j-1], operationProbs),
                                distMatrix[i-1, j] + operationWeight(s1[i-1], '', operationProbs))
            if i>1 and j>1 and s1[i-1]==s2[j-2] and s1[i-2]==s2[j-1] and s1[i-1]!=s2[j-1]:
                distMatrix[i, j] = min(distMatrix[i, j],
                                distMatrix[i-2, j-2] + operationWeight(s1[i-2:i], s2[j-2:j], operationProbs))
    return distMatrix[-1,-1]

    #### Край на Вашия код. 
    #############################################################################

def generateEdits(q):
    ### помощната функция, generateEdits по зададена заявка генерира всички възможни редакции на разстояние едно от тази заявка.
    ### Вход: заявка като низ q
    ### Изход: Списък от низове на разстояние 1 по Левенщайн-Дамерау от заявката
    ###
    ### В тази функция вероятно ще трябва да използвате азбука, която е дефинирана с alphabet
    ###
    #############################################################################
    #### Начало на Вашия код. На мястото на pass се очакват 10-15 реда

#     result = np.empty(0, dtype='U{}'.format(len(q)+1))
#     for i in range(len(q)+1):
#         result = np.append(result, q[:i] + q[i+1:])
#         result = np.append(result, q[:i] + q[i:i+2][::-1] + q[i+2:])
#         result = np.append(result, [[q[:i] + c + q[i:], q[:i] + c + q[i+1:]] for c in alphabet])
#     return np.unique(np.delete(result, np.where(result == q))).tolist()

    result = []
    for i in range(len(q)+1):
        result.append(q[:i] + q[i+1:]) # deletions
        result.append(q[:i] + q[i:i+2][::-1] + q[i+2:]) # transpositions
        for c in alphabet:
            result.append(q[:i] + c + q[i:]) # insertions
            result.append(q[:i] + c + q[i+1:]) # substitutions
    result = set(result)
    result.remove(q)
    return list(result)
        
    #### Край на Вашия код
    #############################################################################


def generateCandidates(query,dictionary,operationProbs):
    ### Започва от заявката query и генерира всички низове НА РАЗСТОЯНИЕ <= 2, за да се получат кандидатите за корекция. Връщат се единствено кандидати, които са в речника dictionary.
        
    ### Вход:
    ###     Входен низ query
    ###     Речник с допустими (правилни) думи: dictionary
    ###     речник с вероятностите на елементарните операции.

    ### Изход:
    ###     Списък от двойки (candidate, candidate_edit_log_probability), където candidate е низ на кандидат, а candidate_edit_log_probability е логаритъм от вероятността за редакция -- минус теглото.
    
    #############################################################################
    #### Начало на Вашия код. На мястото на pass се очакват 10-15 реда

    allCandidates = np.array([query], dtype='U{}'.format(len(query)+2))
    maxDist = 2
    for i in range(maxDist):
        temp = np.empty(0, dtype='U{}'.format(len(query)+2))
        for q in allCandidates:
            temp = np.append(temp, generateEdits(q))
        allCandidates = np.append(np.delete(allCandidates, allCandidates==query), temp)
        
    result = np.empty(0, dtype=[('0', 'U{}'.format(len(query)+2)),('1','f4')])
    for candidate in allCandidates:
        if candidate in dictionary:
            candidate_edit_log_prob = editWeight(candidate, query, operationProbs) #abs(sum(np.log(operationProbs[op]) for op in editOperations(candidate, query)) + editWeight(candidate, query, operationProbs))
            result = np.append(result, np.array([(candidate, candidate_edit_log_prob)], dtype = [('0', 'U{}'.format(len(query)+2)),('1','f4')]))
            
    return np.unique(result).tolist()

    #### Край на Вашия код
    #############################################################################

def correctSpelling(r, dictionary, operationProbs):
    ### Функцията поправя корпус съдържащ евентуално сгрешени думи
    ### Генераторът на кандидати връща и вероятността за редактиране.
    ###
    ### Вход:
    ###    заявка: r - корпус от думи
    ###    речник с правилни думи: dictionary,
    ###    речник с вероятностите на елементарните операции: operationProbs
    ###    Изход: поправен корпус

    #############################################################################
    #### Начало на Вашия код. На мястото на pass се очакват 5-15 реда

    corrected = []
    for i,s in enumerate(r):
        corrected.append([])
        for j,w in enumerate(s):
            if any(map(lambda x: x not in alphabet, w)) or w in dictionary:
                corrected[i].append(w)
                continue
            candidates = generateCandidates(w, dictionary, operationProbs)
            corrected[i].append(min(candidates, key=lambda x: x[1])[0] if candidates else r[i][j])
    return corrected

    #### Край на Вашия код
    #############################################################################


In [2]:
import a1 as a1
import nltk
nltk.download('punkt')
from nltk.corpus import PlaintextCorpusReader
import pickle

[nltk_data] Downloading package punkt to /home/psarlov/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [3]:
L1 = ['заявката','заявката','заявката','заявката','заявката','заявката']
L2 = ['заявката','заявьата','завякатва','заявкатаа','вя','язвката']
C = [0,1,2,1,7,3]
O = [[('з', 'з'),  ('а', 'а'),  ('я', 'я'),  ('в', 'в'),  ('к', 'к'),  ('а', 'а'),  ('т', 'т'),  ('а', 'а')],
     [('з', 'з'),  ('а', 'а'),  ('я', 'я'),  ('в', 'в'),  ('к', 'ь'),  ('а', 'а'),  ('т', 'т'),  ('а', 'а')],
     [('з', 'з'),  ('а', 'а'),  ('яв', 'вя'),  ('к', 'к'),  ('а', 'а'),  ('т', 'т'),  ('', 'в'),  ('а', 'а')],
     [('з', 'з'),  ('а', 'а'),  ('я', 'я'),  ('в', 'в'),  ('к', 'к'),  ('а', 'а'),  ('т', 'т'),  ('', 'а'),  ('а', 'а')],
     [('з', ''), ('а', ''), ('я', ''), ('в', 'в'), ('к', ''), ('а', ''), ('т', ''), ('а', 'я')],
     [('з', ''), ('а', 'я'), ('я', 'з'), ('в', 'в'),  ('к', 'к'),  ('а', 'а'),  ('т', 'т'),  ('а', 'а')]]
D = [22.75, 32.06, 35.93, 32.02, 62.03, 43.71]

In [4]:
for s1,s2,d in zip(L1,L2,C):
    assert editDistance(s1,s2) == d, "Разстоянието между '{}' и '{}' следва да е '{}'".format(s1,s2,d)
print("Функцията editDistance премина теста.")

Функцията editDistance премина теста.


In [5]:
for s1,s2,o in zip(L1,L2,O):
    assert editOperations(s1,s2) == o, "Операциите редактиращи '{}' до '{}' следва да са '{}'".format(s1,s2,o)
print("Функцията editOperations премина теста.")

Функцията editOperations премина теста.


In [6]:
print('Прочитане на корпуса от текстове...')
corpus_root = '.'
original = PlaintextCorpusReader(corpus_root, 'corpus_original.txt')
fullSentCorpusOriginal = [[w.lower() for w in sent] for sent in original.sents()]
typos = PlaintextCorpusReader(corpus_root, 'corpus_typos.txt')
fullSentCorpusTypos = [[w.lower() for w in sent] for sent in typos.sents()]
print('Готово.')

Прочитане на корпуса от текстове...
Готово.


In [7]:
import time
start = time.time()
print('Пресмятане вероятностите на елементарните операции...')
operationProbs = computeOperationProbs(fullSentCorpusOriginal,fullSentCorpusTypos)
print('Готово.')
ps = [operationProbs[k] for k in operationProbs.keys()]
assert max(ps) < 0.2, "Не би следвало  да има елементарна операция с толкова голяма вероятност."
assert min(ps) > 0, "Използвайте изглаждане."
id_prob = 0
for k in operationProbs.keys():
    if k[0]==k[1]:
        id_prob += operationProbs[k]
assert id_prob > 0.95, "Би следвало операцията идентитет да има най-голяма вероятност."
print("Функцията computeOperationProbs премина теста.")
end = time.time()
print(end-start)

Пресмятане вероятностите на елементарните операции...
Готово.
Функцията computeOperationProbs премина теста.
72.54115843772888


In [8]:
id_prob

0.9855009376341495

In [9]:
print("Запис на вероятностите във файл probabilities.pkl ...")
opfile = open("probabilities.pkl", "wb")
pickle.dump(operationProbs, opfile)
opfile.close()
print('Готово.')

Запис на вероятностите във файл probabilities.pkl ...
Готово.


In [10]:
for s1,s2,d in zip(L1,L2,D):
    print(d, editWeight(s1,s2,operationProbs))
    assert abs(editWeight(s1,s2,operationProbs) - d) < 1 , "Теглото между '{}' и '{}' следва да е приблизително '{}'".format(s1,s2,d)
print("Функцията editWeight премина теста.")

22.75 22.74648836787877
32.06 32.15199173375289
35.93 35.95728125951019
32.02 32.022027714067235
62.03 62.06029433333477
43.71 43.719280741425045
Функцията editWeight премина теста.


In [78]:
assert len(set(generateEdits("тест"))) == 269, "Броят на елементарните редакции \"тест\"  следва да е 269"
print("Функцията generateEdits премина теста.")

Функцията generateEdits премина теста.


In [73]:
import time
start = time.time()
len(generateEdits("товаемногодългадумаааааааааааааааааааааааааааааааааа"))
end = time.time()
print(end-start)

0.010230064392089844


In [17]:
dictionary = extractDictionary(fullSentCorpusOriginal)
#### Тест на generate_candidates
assert len(set(generateCandidates("такяива",dictionary,operationProbs))) == 4, "Броят на генерираните кандидати следва да е 4"
print("Функцията generateCandidates премина теста.")

Функцията generateCandidates премина теста.


In [79]:
sorted(generateCandidates("нва",dictionary,operationProbs), key=lambda x: x[1])

[('на', 14.067445755004883),
 ('ва', 14.416622161865234),
 ('нова', 16.40955352783203),
 ('нива', 16.45205307006836),
 ('два', 17.35114288330078),
 ('сва', 17.72583770751953),
 ('ав', 18.995677947998047),
 ('а', 20.50920867919922),
 ('нрав', 21.009632110595703),
 ('нави', 21.031108856201172),
 ('н', 21.370433807373047),
 ('в', 21.7196102142334),
 ('ана', 21.931293487548828),
 ('ра', 22.28449249267578),
 ('та', 22.712587356567383),
 ('ла', 22.782331466674805),
 ('вар', 22.872339248657227),
 ('ван', 23.03335952758789),
 ('явна', 23.241352081298828),
 ('са', 23.251583099365234),
 ('да', 23.443729400634766),
 ('над', 23.49928092956543),
 ('нов', 23.712541580200195),
 ('нас', 23.730920791625977),
 ('за', 23.818422317504883),
 ('ов', 23.84320068359375),
 ('не', 23.852130889892578),
 ('ма', 23.898210525512695),
 ('лв', 23.992733001708984),
 ('вас', 24.080095291137695),
 ('нам', 24.099170684814453),
 ('нрава', 24.294424057006836),
 ('ба', 24.424558639526367),
 ('га', 24.424558639526367),
 ('па

In [80]:
#### Тест на correct_spelling
import time
start = time.time()
corr = correctSpelling(fullSentCorpusTypos[3668:3669],dictionary,operationProbs)
assert ' '.join(corr[0]) == 'третата група ( нареченската ) бе ударила на камък : поради курортния характер на селото пръчовете от наречен били още миналата година премахнати , защото замърсявали околната среда със силната си миризма и създавали у чужденците впечатление за първобитност .', "Коригираната заявка следва да е 'третата група ( нареченската ) бе ударила на камък : поради курортния характер на селото пръчовете от наречен били още миналата година премахнати , защото замърсявали околната среда със силната си миризма и създавали у чужденците впечатление за първобитност .'."
print("Функцията correctSpelling премина теста.")
end = time.time()
print(end-start)

Функцията correctSpelling премина теста.
10.651591300964355


In [33]:
' '.join(fullSentCorpusTypos[3668:3669][0])

'третадта група ( наруеченската ) бе ударила на камък : поради курортния хаарктер на селото пръчовете от наречен били още имналата гоаина премахнати , защоот замърсявали околната среда със силнащта си миризма и съзадвали у чужденците впечатление за първобтиност .'

In [38]:
' '.join(corr[0])

'третата група ( нареченската ) бе ударила на камък : поради курортния характер на селото пръчовете от наречен били още миналата година премахнати , защото замърсявали околната среда със силната си миризма и създавали у чужденците впечатление за първобитност .'

In [39]:
import time
start = time.time()
correct = PlaintextCorpusReader(corpus_root, 'corpus_correct.txt')
fullSentCorpusCorrect = [[w.lower() for w in sent] for sent in correct.sents()]
corpus_corrected = correctSpelling(fullSentCorpusCorrect,dictionary,operationProbs)
corpus_corrected = '\n'.join(' '.join(s) for s in corpus_corrected)
with open('corrected.txt', 'w') as f:
    f.write(corpus_corrected)
end = time.time()
print(end-start)

343.6697359085083
