# NER Tagging con Viterbi

La documentazione si trova al seguente link https://github.com/Davidesfed/TLN-21-22

## Setup

### Import

In [1]:
import math
import time

### Funzioni accessorie

In [2]:
def log(number):
    if number == 0:
        number += 1e-15
    return math.log(number)

### Retrieval del Corpus

In [3]:
def get_corpus(path):
    with open(path, 'r', encoding='utf8') as f:
        return parse_conllu(f.readlines())

def parse_conllu(lines):
    result = []
    sentence = []
    for line in lines:
        if len(line) <= 1:
            result.append(sentence.copy())
            sentence = []
        else:
            token = parse_conllu_token(line)
            sentence.append(token)
    return result

def parse_conllu_token(line):
    line = line.strip('\n').split('\t')
    tag = line[2]
    if tag != 'O':
        tag = tag[2:]
    return (line[1], tag)

## Implementazione

In [4]:
class Tagger():
    # Simple interface that will only be used to evaluate the two models
    def __init__(self, model, algorithm):
        self.model = model
        self.algorithm = algorithm
        self.tagset = model.tagset

    def tag_sentence(self, sentence):
        if sentence.__class__ == str:
            sentence = sentence.lower().split()
        return self.algorithm(self.model, sentence)

### Hidden Markov Model

In [5]:
class HiddenMarkovModel:
    def __init__(self, corpus, tagset):
        self.tagset = tagset
        self.vocabulary = self.build_vocabulary(corpus)
        self.start_prob = self.compute_start_prob(corpus)
        self.end_prob = self.compute_end_prob(corpus)
        self.transition_prob = self.compute_transition_prob(corpus)
        self.emission_prob = self.compute_emission_prob(corpus)
        self.smoothing_strategy = 'always_o'
    
    # Building methods
    def build_vocabulary(self, corpus):
        vocabulary = set()
        for sent in corpus:
            for token in sent:
                vocabulary.add(token[0].lower())
        return list(vocabulary)

    def compute_start_prob(self, corpus):
        # The start probabilities are calculated as frequencies
        count_tag = dict((tag,0) for tag in self.tagset)
        for sent in corpus:
            tag = sent[0][1]
            count_tag[tag] += 1
        n = len(corpus)
        return dict((tag,count/n) for tag,count in count_tag.items())

    def compute_transition_prob(self, corpus):
        # The transition probabilities are calculated as frequencies

        # Inizialization
        count_tag = dict(((tag1, tag2),0) for tag1 in self.tagset for tag2 in self.tagset)
        total = dict((tag, 0) for tag in self.tagset)
    
        # Count of all the possible couples of tags and totals
        for sent in corpus:
            for i in range(len(sent)-1):
                tag1 = sent[i][1]
                total[tag1] += 1
                tag2 = sent[i+1][1]
                count_tag[(tag1, tag2)] += 1

        # Division of count per totals, in order to obtain the frequency for each couple
        for tag1 in self.tagset:
            for tag2 in self.tagset:
                count_tag[(tag1,tag2)] = count_tag[(tag1,tag2)]/total[tag1]
        return count_tag

    def compute_end_prob(self, corpus):
        # The end probabilities are calculated as frequencies.
        count_tag = dict((tag,0) for tag in self.tagset)
        for sent in corpus:
            # Punctuation is excluded from the count, so we have to retrieve the last non-punctuation tag
            tag = self._get_last_tag(sent)
            count_tag[tag] += 1
        n = len(corpus)
        return dict((tag, count/n) for tag,count in count_tag.items())

    def _get_last_tag(self, sent):
        # Returns the tag of the last non-punctuation token
        for i, token in enumerate(reversed(sent)):
            if token[0] not in [',', '.', '-', ':', '\'', '(', ')', '"']:
                return token[1]

    def compute_emission_prob(self, corpus):
        # The emission probabilities are calculated as frequencies

        # Inizialization
        count_dict = dict(((word, tag),0) for word in self.vocabulary for tag in self.tagset)
        total = dict((tag, 0) for tag in self.tagset)

        # Count the number of occurrences and totals
        for sent in corpus:
            for word, tag in sent:
                count_dict[(word.lower(), tag)] += 1
                total[tag] += 1

        # Divide the n. of occurrences for the total, contained in 
        for word, tag in count_dict:
            count_dict[(word,tag)] = count_dict[(word,tag)]/total[tag]
        return count_dict

    # Getters and setters
    def get_start_prob(self, tag):
        return self.start_prob[tag]

    def get_transition_prob(self, tag1, tag2):
        return self.transition_prob[(tag1, tag2)]

    def get_end_prob(self, tag):
        return self.end_prob[tag]

    def get_emission_prob(self, word, tag):
        # Getters that behaves differently depending on the selected smoothing strategy
        if word.lower() in self.vocabulary:
            return self.emission_prob[(word.lower(),tag)]
        if self.smoothing_strategy == 'always_o':
            return 1.0 if tag == 'O' else 0.0
        if self.smoothing_strategy == 'misc_or_o':
            return 0.5 if tag in ['MISC', 'O'] else 0.0
        if self.smoothing_strategy == 'uniform':
            return 1/len(self.tagset)

    def set_smoothing_strategy(self, strategy):
        if strategy.lower() in ['always_o', 'misc_or_o', 'uniform']:
            self.smoothing_strategy = strategy.lower()
        else:
            raise ValueError("Incorrect smoothing strategy")

### Viterbi

In [6]:
def viterbi(model, sentence):
    # We will not work with probabilities but with log probabilities, hence the -inf in place of 0 and + in place of *
    viterbi_matrix, backpointer = inizialitazion_step(model, sentence)
    result = 0
    for t in range(1,len(sentence)):
        for i in range(len(model.tagset)):
            viterbi_matrix[i][t], backpointer[i][t] = compute_max(model, viterbi_matrix, sentence[t], t, i)
    viterbi_matrix[-1][-1], backpointer[-1][-1] = termination_step(model, viterbi_matrix)
    result = backtrace(backpointer, model.tagset)
    return result

def inizialitazion_step(model, sentence):
    tagset = model.tagset
    viterbi_matrix = [[-math.inf for _ in range(len(sentence))] for _ in range(len(tagset)+1)]
    backpointer = [[-1 for _ in range(len(sentence))] for _ in range(len(tagset)+1)]
    for i, tag in enumerate(tagset):
        start_prob = log(model.get_start_prob(tag))
        emission_prob = log(model.get_emission_prob(sentence[0], tag))
        viterbi_matrix[i][0] = start_prob + emission_prob
        backpointer[i][0] = 0
    return viterbi_matrix, backpointer

def compute_max(model, viterbi_matrix, word, t, i):
    tagset = model.tagset
    max_prob =  viterbi_matrix[0][t-1] + log(model.get_transition_prob(tagset[0], tagset[i])) 
    max_tag = tagset[0]
    for j in range(1, len(tagset)):
        transition_prob = log(model.get_transition_prob(tagset[j], tagset[i]))
        log_prob = viterbi_matrix[j][t-1] + transition_prob
        if log_prob > max_prob:
            max_prob = log_prob
            max_tag = tagset[j]
    emission_prob = log(model.get_emission_prob(word, tagset[i]))
    return max_prob + emission_prob, max_tag

def termination_step(model, viterbi_matrix):
    tagset = model.tagset
    max_prob = viterbi_matrix[0][-1] + log(model.get_end_prob(tagset[0]))
    max_tag = tagset[0]
    for j in range(1,len(tagset)):
        end_prob = model.get_end_prob(tagset[j])
        log_prob = viterbi_matrix[j][-1] + log(end_prob)
        if log_prob > max_prob:
            max_prob = log_prob
            max_tag = tagset[j]
    return max_prob, max_tag

def backtrace(backpointer, tagset):
    tags = [backpointer[-1][-1]]
    for t in range(len(backpointer[0])-1, 0, -1):
        tag = tags[-1]
        tag_idx = tagset.index(tag)
        tags.append(backpointer[tag_idx][t])
    return list(reversed(tags))

### Baseline

In [7]:
class BaselineModel():
    def __init__(self, corpus, tagset):
        self.tagset = tagset
        self.vocabulary = self.build_vocabulary(corpus)
        self.frequencies = self.compute_frequencies(corpus)

    def build_vocabulary(self, corpus):
        vocabulary = set()
        for sent in corpus:
            for token in sent:
                vocabulary.add(token[0].lower())
        return list(vocabulary)
    
    def compute_frequencies(self, corpus):
        # The values calculated are not tecnically frequencies, but occurrences. Since we only care about the max value it does not matter.
        word_dict = dict((tag, 0) for tag in self.tagset)
        freq_dict = dict((word, word_dict.copy()) for word in self.vocabulary)
        for sent in corpus:
            for token in sent:
                freq_dict[token[0].lower()][token[1]] += 1
        return freq_dict
    
    def assign_tag(self, word):
        if word.lower() not in self.vocabulary:
            return 'MISC'
        tag_freq = self.frequencies[word.lower()]
        tag_freq = list(sorted(tag_freq.items(), key=lambda x: x[1], reverse=True))
        return tag_freq[0][0]

def baseline_tag_sentence(model, sentence):
    tags = []
    for word in sentence:
        tags.append(model.assign_tag(word))
    return tags

## Valutazione

### Processing del test set

In [8]:
def process_test_set(tagger, test_set):
    model_outputs, target_outputs = [], []
    for sentence in test_set:
        sent, target_output = get_sent_and_tags(sentence)
        model_output = tagger.tag_sentence(sent)
        target_outputs.append(target_output)
        model_outputs.append(model_output)
    return model_outputs, target_outputs

def get_sent_and_tags(sentence):
    sent, tags = [], []
    for token in sentence:
        sent.append(token[0].lower())
        tags.append(token[1])
    return sent, tags

### Accuracy

In [9]:
def compute_accuracy(model_outputs, target_outputs):
    score = 0
    total = 0
    for i in range(len(model_outputs)):
        for j in range(len(model_outputs[i])):
            total += 1
            if model_outputs[i][j] == target_outputs[i][j]:
                score += 1
    return round(score/total, 4)

### Precision e recall

In [10]:
def compute_statistics(tagset, model_outputs, target_outputs):
    true_positives = dict((tag,0) for tag in tagset)
    false_positives = dict((tag,0) for tag in tagset)
    false_negatives = dict((tag,0) for tag in tagset)
    for i in range(len(model_outputs)):
        for j in range(len(model_outputs[i])):
            for tag in tagset:
                if model_outputs[i][j] != tag and target_outputs[i][j] != tag:
                    continue
                if model_outputs[i][j] == target_outputs[i][j]:
                    true_positives[tag] += 1
                elif model_outputs[i][j] == tag:
                    false_positives[tag] += 1
                else:
                    false_negatives[tag] += 1
    return true_positives, false_positives, false_negatives

In [11]:
def compute_precision(true_positives, false_positives):
    precisions = dict()
    for tag in true_positives:
        if true_positives[tag] == 0:
            precisions[tag] = 0
            continue
        precisions[tag] = round(true_positives[tag] / (true_positives[tag] + false_positives[tag]), 4)
    return precisions

In [12]:
def compute_recall(true_positives, false_negatives):
    recalls = dict()
    for tag in true_positives:
        if true_positives[tag] == 0:
            recalls[tag] = 0
            continue
        recalls[tag] = round(true_positives[tag] / (true_positives[tag] + false_negatives[tag]), 4)
    return recalls

### Confronto tra i modelli

In [13]:
def evaluate_tagger(tagger, test_set):
    print(f'---- Evaluating model {str(tagger.algorithm.__name__)} ----')
    start_time = time.time()
    model_outputs, target_outputs = process_test_set(tagger, test_set)
    end_time = time.time()
    print(f'Evaluation of {len(test_set)} sentences took {int(end_time-start_time)} seconds')
    accuracy = compute_accuracy(model_outputs, target_outputs)
    true_positives, false_positives, false_negatives = compute_statistics(tagger.tagset, model_outputs, target_outputs)
    precision = compute_precision(true_positives, false_positives)
    recall = compute_recall(true_positives, false_negatives)
    print(f'Model accuracy: {accuracy}')
    print(f'Model precision per tag: {precision}')
    print(f'Model recall per tag: {recall}\n')

## Esecuzione

In [14]:
languages = ['en', 'it']
tagset = ['O', 'ORG', 'LOC', 'PER', 'MISC']

for lang in languages:
    corpus = get_corpus(f'data/{lang}/train.conllu')

    hmm = HiddenMarkovModel(corpus, tagset)
    my_viterbi = Tagger(hmm, viterbi)

    baseline_model = BaselineModel(corpus, tagset)
    baseline = Tagger(baseline_model, baseline_tag_sentence)

    test_set = get_corpus(f'data/{lang}/test.conllu')
    N = len(test_set)
    # N = 20

    print(f"Evaluating taggers for language: {lang}")
    for tagger in [my_viterbi, baseline]:
        evaluate_tagger(tagger, test_set[:N])

Evaluating taggers for language: en
---- Evaluating model viterbi ----


### Test

In [None]:
sentences = {
    'it': [
        'La vera casa di Harry Potter è il Castello di Hogwarts .',
        'Harry le raccontò del loro incontro a Diagon Alley .',
        'Mr Dursley era direttore di una ditta di nome Grunnings , che fabbricava trapani .'
    ], 'en': [
        'Harry Potter \' s true home is Hogwarts Castle .',
        'Harry told her about their meeting at Diagon Alley .',
        'Mr. Dursley was director of a company named Grunnings that manufactured drills .'
    ]
}

correct_tags = {
    'it': [
        ['O', 'O', 'O', 'O', 'PER', 'PER', 'O', 'O', 'LOC', 'LOC', 'LOC', 'O'],
        ['PER', 'O', 'O', 'O', 'O', 'O', 'O', 'LOC', 'LOC', 'O'],
        ['PER', 'PER', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'ORG', 'O', 'O', 'O', 'O', 'O'],
    ], 'en': [
        ['PER', 'PER', 'O', 'O', 'O', 'O', 'O', 'LOC', 'LOC', 'O'],
        ['PER', 'O', 'O', 'O', 'O', 'O', 'O', 'LOC', 'LOC', 'O'],
        ['PER', 'PER', 'O', 'O', 'O', 'O', 'O', 'O', 'ORG', 'O', 'O', 'O', 'O'],
    ]
}

for i, sentence in enumerate(sentences[lang]):
    print('-'*225)
    header = sentence.split()
    row1 = my_viterbi.tag_sentence(sentence)
    row2 = baseline.tag_sentence(sentence)
    row3 = correct_tags[lang][i]
    rows = [header, row1, row2, row3]
    names = ['', 'Viterbi', 'Baseline', 'Target']
    for i in range(len(rows)):
        rows[i].insert(0,names[i])
    print('\n'.join([''.join(['{:16}'.format(x) for x in r]) for r in rows]))

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
                Harry           Potter          '               s               true            home            is              Hogwarts        Castle          .               
Viterbi         PER             PER             O               O               O               O               O               MISC            MISC            O               
Baseline        PER             MISC            O               O               O               O               O               MISC            LOC             O               
Target          PER             PER             O               O               O               O               O               LOC             LOC             O               
------------------------------------------------------------------

### Utils

In [None]:
def prob_two_ne_near(path):
    number = 0
    total = 0
    with open(path, 'r', encoding='utf8') as f:
        lines = f.readlines()
        for i in range(len(lines)-1):
            if 'B-' in lines[i]:
                total += 1
            if 'I-' in lines[i] and 'B-' in lines[i+1]:
                number += 1
    return number/total

prob_two_ne_near('data/en/train.conllu')

0.00031491204956330055

In [None]:
def trasposta(matrix):
    res = []
    for i in range(len(matrix[0])):
        new_col = []
        for row in matrix:
            new_col.append(row[i])
        res.append(new_col.copy())
    return res

def print_table(input_matrix):
    V = trasposta(input_matrix)
    yield " ".join(("%12d" % i) for i in range(len(V)))
    for state in range(len(V[0])):
        yield "%.7s: " % state + " ".join("%.7s" % ("%f" % v[state]) for v in V)

sentence="on this occasion he failed to gain the support of the South Wales Miners ' Federation and had to stand down ."
tagset = viterbi(hmm, sentence.split())
print(tagset)
# for line in print_table(b):
#     print(line)

['O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'LOC', 'LOC', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O']
