In [73]:
import pandas as pd
import conllu as cl
from os.path import join
import math

In [74]:
class Quad:
    def __init__(self, entity, indexSentence, indexStart, indexEnd):
        self.entity = entity
        self.indexSentence = indexSentence
        self.indexStart = indexStart
        self.indexEnd = indexEnd
    
    def __eq__(self, other):
        return (self.entity == other.entity and
                self.indexSentence == other.indexSentence and
                self.indexStart == other.indexStart and
                self.indexEnd == other.indexEnd)
    
    def __hash__(self):
        return hash((self.entity, self.indexSentence, self.indexStart, self.indexEnd))

    def toString(self):
        return f"Quad(Entity={self.entity}, indexSentence={self.indexSentence}, indexStart={self.indexStart}, indexEnd={self.indexEnd})"

In [75]:
def reconstructSentences(sentences):
    modified_sentences = []
    for sentence in sentences:
        modified_sentence = []
        # create a start token
        new_token_start = {
            "id": len(sentence),
            "form": "<s>",
            "lemma": "START",
        }
        # create a end token
        new_token_end = {
            "id": len(sentence) + 1,
            "form": "</s>",
            "lemma": "END",
        }
        # append the start token
        modified_sentence.append(new_token_start)
        # append the rest of the sentence
        for word in sentence:
            modified_sentence.append(word)
        # append the end token
        modified_sentence.append(new_token_end)

        # append the new sentence to the list of sentences
        modified_sentences.append(modified_sentence)

    return modified_sentences

In [76]:
def load(language):
    filePath = ".\Datasets\\" + language + "\\"
    
    with open(join(filePath, "train.conllu"), 'r', encoding='utf-8') as file:
        data = file.read()
    train_sentences = reconstructSentences(cl.parse(data))

    with open(join(filePath, "test.conllu"), 'r', encoding='utf-8') as file:
        data = file.read()
    test_sentences = cl.parse(data)

    with open(join(filePath, "val.conllu"), 'r', encoding='utf-8') as file:
        data = file.read()
    val_sentences = cl.parse(data)

    return train_sentences, test_sentences, val_sentences

In [77]:
# Fit the HMM model with the train sentences, it generates 2 outputps containing the probabilities that will be used 
# in the decoding phase.
def HMMFit(train_sentences):
    # dictionary of dictionaries
    countTag = {}   # Count(tag(i))
    transitionCount = {}    # Count(tag(i-1), tag(i))
    wordTagCount = {}   # Count(tag(i), word(i))
    tagDistribution = {}    # Count(tag(i-1), tag(i)) / Count(tag(i-1))
    wordDistribution = {}   # Count(tag(i), word(i))
    uniqueTagDistribution = {}

    # complete creation of counTag dictorary
    # partial initialization of transitionCount dictionary
    # partial creation of wordTagCount dictionary
    for sentence in train_sentences:
        for word in sentence:
            if word['lemma'] in countTag:
                countTag[word['lemma']] += 1
            else:
                countTag[word['lemma']] = 1
                uniqueTagDistribution[word['lemma']] = 0
                transitionCount[word['lemma']] = {}
            
            if word['form'] not in wordTagCount:
                wordTagCount[word['form']] = {}
            if word['lemma'] in wordTagCount[word['form']]:
                wordTagCount[word['form']][word['lemma']] += 1
            else:
                wordTagCount[word['form']][word['lemma']] = 1
    
    # complete inizialization of transitionCount dictionary
    for key in transitionCount.keys():
        for tag in countTag.keys():
            transitionCount[key][tag] = 0

    # complete creation of wordTagCount dictionary
    for word in wordTagCount.keys():
        for tag in countTag.keys():
            if tag not in wordTagCount[word]:
                wordTagCount[word][tag] = 0

    # complete creation of transitionCount dictionary
    # previousTag --> Markov's hypothesis of first grade
    for tag in transitionCount.keys():
        for sentence in train_sentences:
            previousTag = None
            for word in sentence:
                if previousTag == None:
                    previousTag = word['lemma']
                else:
                    if word['lemma'] == tag:
                        if previousTag in transitionCount[tag]:
                            transitionCount[tag][previousTag] += 1
                    previousTag = word['lemma']
    
    # complete creation of tagDistribution dictionary
    # we use the log-probability for better accuracy (since the probability are very small)
    # if the probability is 0, then the log-probability will be log(10**-10) 
    epsilon = 10**-10
    for eventTag in countTag.keys():
        tagDistribution[eventTag] = {}
        for condTag in countTag.keys():
            if transitionCount[eventTag][condTag]/countTag[condTag] == 0:
                tagDistribution[eventTag][condTag] = math.log(epsilon)
            else:
                tagDistribution[eventTag][condTag] = math.log(transitionCount[eventTag][condTag]/countTag[condTag])
            
    # complete creation of wordDistribution dictionary
    # we use the log-probability for better accuracy (since the probability are very small)
    # if the probability is 0, then the log-probability will be log(10**-10)
    for word in wordTagCount.keys():
        wordDistribution[word] = {}

        uniqueTagDistribution.pop("START", None)
        uniqueTagDistribution.pop("END", None)
        if sum(wordTagCount[word][tagLoop] for tagLoop in countTag.keys()) == 1:
            tag = [tagLoop for tagLoop in countTag.keys() if wordTagCount[word][tagLoop] == 1]
            uniqueTagDistribution[tag[0]] += 1

        for tag in countTag.keys():
            if wordTagCount[word][tag]/countTag[tag] == 0:
                wordDistribution[word][tag] = math.log(epsilon)
            else:
                wordDistribution[word][tag] = math.log(wordTagCount[word][tag]/countTag[tag])
        
    return tagDistribution, wordDistribution, wordTagCount, uniqueTagDistribution

In [78]:
# strategy can be: 
# 0: always "O": P(unk|O) = 1 
# 1: always "O" and "MISC": P(unk|O) = P(unk|B-MISC) = 0.5
# 2: uniform: P(unk|tag(i)) = 1 / #(NER_TAGs)
# 3: 1 / # of word's tags appearing only once
def smoothing(strategy, currentTag, totNERtags, uniqueTagDistribution):
    epsilon = 10**-10
    match(strategy):
        case 0:
            if currentTag == "O":
                return math.log(1)
            else:
                return math.log(epsilon)
        case 1: 
            if currentTag == "O" or currentTag == "B-MISC":
                return math.log(0.5)
            else:
                return math.log(epsilon)
        case 2:
            return math.log(1 / totNERtags)
        case 3:
            if uniqueTagDistribution[currentTag] != 0:
                return math.log(1 / uniqueTagDistribution[currentTag])
            else:
                math.log(epsilon)
        case _:
            return math.log(epsilon)

In [79]:
def viterbiAlgorithm(sentence, tagDistribution, wordDistribution, b_value, uniqueTagDistribution, strategySmoothing):
    epsilon = 10**-10
    columns = len(sentence) # words
    rows = len(tagDistribution.keys())-2 #tags
    viterbi = [[math.log(epsilon) for _ in range(columns)] for _ in range(rows)]
    backpointer = [["" for _ in range(columns)] for _ in range(rows)]

    # initialization step
    i = 0
    for tag in tagDistribution.keys():
        if tag != "START" and tag != "END": 
            viterbi[i][0] = tagDistribution[tag]["START"] + b_value
            backpointer[i][0] = "0"
            i += 1

    tagList = list(tagDistribution)
    tagList.remove("START")
    tagList.remove("END")

    # recursion step
    for i in range(1, len(sentence)):
        j = 0
        for tag in tagDistribution.keys():
            if tag != "START" and tag != "END":
                if (sentence[i])['form'] not in wordDistribution:
                    obsLikelihood = smoothing(strategySmoothing, tag, len(tagList), uniqueTagDistribution)
                else:
                    obsLikelihood = wordDistribution[(sentence[i])['form']][tag]
                
                viterbi[j][i] = max(viterbi[tagLoop][i-1] + obsLikelihood + tagDistribution[tag][tagList[tagLoop]] for tagLoop in range(len(tagList)))
                _, maxTag = max((viterbi[tagLoop][i-1] + tagDistribution[tag][tagList[tagLoop]], tagLoop) for tagLoop in range(len(tagList)))
                backpointer[j][i] = maxTag
                j += 1

    # termitation step
    viterbi_end = max(viterbi[tagLoop][len(sentence)-1] + tagDistribution["END"][tagList[tagLoop]] for tagLoop in range(len(tagList)))
    _, maxTag = max((viterbi[tagLoop][len(sentence)-1] + tagDistribution["END"][tagList[tagLoop]], tagLoop) for tagLoop in range(len(tagList)))
    backpointer_end = maxTag
    
    # reconstructing the solution
    path = [None] * len(sentence)
    solution = [None] * len(sentence)
    path[len(sentence)-1] = backpointer_end
    solution[len(sentence)-1] = tagList[backpointer_end]
    for i in range(len(sentence)-2, -1, -1): # decrementing loop, 0 inclusive
        path[i] = backpointer[path[i+1]][i+1]
        solution[i] = tagList[path[i]]

    return solution

In [80]:
def convertToListEntites(predictions, test_sentences):
    listTestSentences = []
    currentTag = ""
    indexSentence = 0
    indexStart = 0
    indexEnd = 0
    for sentence in test_sentences:
        for word in sentence:
            if word['lemma'] != "O": 
                lemma = word['lemma'].split("-")[1]
                if currentTag != lemma:
                    if currentTag == "O":
                        currentTag = lemma
                        indexStart = word['id']
                    elif currentTag != "":
                        indexEnd = word['id']-1
                        listTestSentences.append(Quad(currentTag, indexSentence, indexStart, indexEnd))
                    currentTag = lemma
                    indexStart = word['id']
            elif word['lemma'] == "O" and currentTag != "" and currentTag != "O":
                indexEnd = word['id']-1
                listTestSentences.append(Quad(currentTag, indexSentence, indexStart, indexEnd))
                currentTag = "O"

        indexSentence += 1

    listPredict = []
    currentTag = ""
    indexSentence = 0
    indexStart = 0
    indexEnd = 0
    for i in range(len(predictions)):
        for j in range(len(predictions[i])):
            if predictions[i][j] != "O":
                lemma = predictions[i][j].split("-")[1]
                if currentTag != lemma:
                    if currentTag == "O":
                        currentTag = lemma
                        indexStart = j
                    elif currentTag != "":
                        indexEnd = j-1
                        listPredict.append(Quad(currentTag, i, indexStart, indexEnd))
                    currentTag = lemma
                    indexStart = j
            elif predictions[i][j] == "O" and currentTag != "" and currentTag != "O":
                indexEnd = j-1
                listPredict.append(Quad(currentTag, i, indexStart, indexEnd))
                currentTag = "O"
    
    return listPredict, listTestSentences


In [81]:
def convertListToDict(list):
    dict = {}
    for quad in list:
        key = (quad.entity, quad.indexSentence, quad.indexStart, quad.indexEnd)
        dict[key] = quad
    
    return dict

In [82]:
def precisionAndRecall(predictions, test_sentences, trueClass):

    dict_predictions = convertListToDict(predictions)
    dict_testSentences = convertListToDict(test_sentences)    

    truePositive = [pred for pred in predictions if pred.entity == trueClass and (pred.entity, pred.indexSentence, pred.indexStart, pred.indexEnd) in dict_testSentences]
    falsePositive = [pred for pred in predictions if pred.entity == trueClass and (pred.entity, pred.indexSentence, pred.indexStart, pred.indexEnd) not in dict_testSentences]
    falseNegative = [test for test in test_sentences if test.entity == trueClass and (test.entity, test.indexSentence, test.indexStart, test.indexEnd) not in dict_predictions]

    precision = len(truePositive) / (len(truePositive) + len(falsePositive)) if (len(truePositive) + len(falsePositive)) > 0 else 0
    recall = len(truePositive) / (len(truePositive) + len(falseNegative)) if (len(truePositive) + len(falseNegative)) > 0 else 0

    return precision, recall        

In [83]:
def evaluation(predictions, test_sentences, tagDistribution):
    totPredictions = 0
    correctPredictions = 0
    
    tagList = list(tagDistribution)
    tagList.remove("START")
    tagList.remove("END")
    tagList.remove("O")
    for i in range(len(tagList)):
        tagList[i] = tagList[i].split("-")[1]
    entity = list(set(tagList))

    # accuracy
    i = 0
    for sentence in test_sentences:
        j = 0
        for word in sentence:
            if word['lemma'] == predictions[i][j]:
                correctPredictions += 1
            totPredictions += 1
            j += 1
        i += 1
    accuracy = correctPredictions / totPredictions

    listPredictions, listTestSentences = convertToListEntites(predictions, test_sentences)
    
    dictPrecision = {}
    dictRecall = {}
    for i in range(len(entity)):
        precision, recall = precisionAndRecall(listPredictions, listTestSentences, entity[i])
        dictPrecision[entity[i]] = precision
        dictRecall[entity[i]] = recall

    return accuracy, dictPrecision, dictRecall

In [84]:
def HMMPredict(test_senteces, tagDistribution, wordDistribution, uniqueTagDistribution, strategySmoothing):
    b_value = 0 # in log probabilities 0 is the neutral element
    solution = []

    for sentence in test_senteces:
        sentenceSolution = viterbiAlgorithm(sentence, tagDistribution, wordDistribution, b_value, uniqueTagDistribution, strategySmoothing)
        # recontruct and add to the solution...
        solution.append(sentenceSolution)

    return solution

In [85]:
def printEvaluation(accuracy, precision, recall):
    precisionTot = 0
    recallTot = 0
    print("The model's accuracy is: " +  str(accuracy))
    print()
    print("Entities' precision and recall:")
    for key in precision.keys():
        print("Precision " + key + ": " + str(precision[key]))
        print("Recall " + key + ": " + str(recall[key]))
        precisionTot = precisionTot + precision[key]
        recallTot = recallTot + recall[key]
        print()

    precisionTot = precisionTot / len(precision.keys())
    recallTot = recallTot / len(recall.keys())
    print("Average precision: " + str(precisionTot))
    print("Average recall: " + str(precisionTot))

In [86]:
def baselineFrequentTag(wordTagCount, it_test_sentences):
    predictions = []
    
    for sentence in it_test_sentences:
        sentencePrediction = []
        for word in sentence:
            if word['form'] not in wordTagCount:
                maxTag = "B-MISC"
            else:
                _, maxTag = max((wordTagCount[word['form']][tagLoop] , tagLoop) for tagLoop in wordTagCount[word['form']].keys())
            sentencePrediction.append(maxTag)
        predictions.append(sentencePrediction)

    return predictions

In [96]:
# it: italian
# es: español 
# en: english
language = "en"
train_sentences, test_sentences, val_sentences =  load(language)

In [97]:
tagDistribution, wordDistribution, wordTagCount, uniqueTagDistribution = HMMFit(train_sentences)

In [102]:
# strategy can be: 
# 0: always "O": P(unk|O) = 1 
# 1: always "O" and "MISC": P(unk|O) = P(unk|B-MISC) = 0.5
# 2: uniform: P(unk|tag(i)) = 1 / #(NER_TAGs)
# 3: 1 / # of word's tags appearing only once
strategySmoothing = 0
predictions = HMMPredict(test_sentences, tagDistribution, wordDistribution, uniqueTagDistribution, strategySmoothing)

In [103]:
accuracy, precision, recall = evaluation(predictions, test_sentences, tagDistribution)
printEvaluation(accuracy, precision, recall)

The model's accuracy is: 0.9566880649810036

Entities' precision and recall:
Precision PER: 0.6768205616373155
Recall PER: 0.545873320537428

Precision ORG: 0.7599584343609282
Recall ORG: 0.6379761558592614

Precision MISC: 0.7057971014492753
Recall MISC: 0.5888754534461911

Precision LOC: 0.8006094077318606
Recall LOC: 0.7085791336591943

Average precision: 0.735796376294845
Average recall: 0.735796376294845


In [100]:
predictionsBaseline = baselineFrequentTag(wordTagCount, test_sentences)
accuracy, precision, recall = evaluation(predictionsBaseline, test_sentences, tagDistribution)
printEvaluation(accuracy, precision, recall)

The model's accuracy is: 0.931242162789392

Entities' precision and recall:
Precision PER: 0.5594050084729806
Recall PER: 0.5702495201535509

Precision ORG: 0.43610755441741356
Recall ORG: 0.49520209363186973

Precision MISC: 0.23050379572118704
Recall MISC: 0.6058041112454655

Precision LOC: 0.625724404378622
Recall LOC: 0.6551491656834654

Average precision: 0.46293519074755074
Average recall: 0.46293519074755074


In [101]:
"""import memm_tagger as memm

data = memm.initialize()
predictions = memm.train(".\Datasets\\it\\train.conllu", data)
memm.test(".\Datasets\\it\\test.conllu", predictions, data)"""

# aggiunto file = open(filename, 'r', encoding='utf-8') alla funzione load_data (riga 175)
# aggiunto un padding ai token (righe 200-202), altrimenti andava in eccezione (riga 208-210)
# MemoryError: Unable to allocate 32.5 GiB for an array with shape (88400, 266) and data type <U371

'import memm_tagger as memm\n\ndata = memm.initialize()\npredictions = memm.train(".\\Datasets\\it\\train.conllu", data)\nmemm.test(".\\Datasets\\it\\test.conllu", predictions, data)'