# Assignment 2 

This assignment is about training and evaluating a POS tagger with some real data. The dataset is available through the Universal Dependencies (https://universaldependencies.org/) (UD) project. To get to know the project, please visit https://universaldependencies.org/introduction.html)

In [None]:
import numpy as np
import operator
import nltk
import pandas as pd
import operator
import math
import random 

!pip install conllutils
import conllutils



## **Part 1** (getting the data)

You can download the dataset files directly from the UD website, but it will let you only download all the languages in one compressed file. In this assignment you will be working with th GUM dataset, which you can download directly from:
https://github.com/UniversalDependencies/UD_English-GUM.
Please download it to your colab machine.



In [None]:
!git clone https://github.com/UniversalDependencies/UD_English-GUM
%cd /content/UD_English-GUM/
!git checkout 2c8b062269f2d2d3d62405c82d8c25cf24f705dd
%cd /content/

fatal: destination path 'UD_English-GUM' already exists and is not an empty directory.
/content/UD_English-GUM
HEAD is now at 2c8b062 Updated statistics.
/content


We will use the (train/dev/test) files:

UD_English-GUM/en_gum-ud-train.conllu

UD_English-GUM/en_gum-ud-dev.conllu

UD_English-GUM/en_gum-ud-test.conllu

They are all formatted in the conllu format. You may read about it [here](https://universaldependencies.org/format.html). There is a utility library **conllutils**, which can help you read the data into the memory. It has already been installed and imported above.

You should write a code that reads the three datasets into memory. You may choose the data structure by yourself. As you can see, every word is represented by a line, with columns representing specific features. We are only interested in the first and fourth columns, corresponding to the word and its POS tag.

In [None]:
paths = ['UD_English-GUM/en_gum-ud-train.conllu', 'UD_English-GUM/en_gum-ud-dev.conllu', 'UD_English-GUM/en_gum-ud-test.conllu']

def load_file_from_git(path):
  return list(conllutils.read_conllu(path))

train_list = load_file_from_git(paths[0])
dev_list = load_file_from_git(paths[1])
test_list = load_file_from_git(paths[2])

In [None]:
print(train_list)



In [None]:
print(dev_list)



In [None]:
print(test_list)



## **Part 2**

Write a class **simple_tagger**, with methods *train* and *evaluate*. The method *train* receives the data as a list of sentences, and use it for training the tagger. In this case, it should learn a simple dictionary that maps words to tags, defined as the most frequent tag for every word (in case there is more than one most frequent tag, you may select one of them randomly). The dictionary should be stored as a class member for evaluation.

The method *evaluate* receives the data as a list of sentences, and use it to evaluate the tagger performance. Specifically, you should calculate the word and sentence level accuracy.
The evaluation process is simply going word by word, querying the dictionary (created by the train method) for each word’s tag and compare it to the true tag of that word. The word-level accuracy is the number of successes divided by the number of words. For OOV (out of vocabulary, or unknown) words, the tagger should assign the most frequent tag in the entire training set (i.e., the mode). The function should return the two numbers: word level accuracy and sentence level accuracy.


In [None]:
class simple_tagger:
  def __init__(self):
    self.model = {}
    self.most_common = ''

  def train(self, data):
    model, most_common = self.get_sentences_and_tags(data)
    self.model = model
    self.most_common = most_common

  def evaluate(self, data):
    words_accuracy, sentences_accuracy = self.get_accuracy(data, self.model, self.most_common)
    return words_accuracy, sentences_accuracy

  def get_sentences_and_tags(self, data):
    words_tags = {}
    tags = {}
    sentences = []
    most_common = ''

    for sentence in data:
      for word in sentence:
        w, t = word['lemma'], word['upos']

        if t in tags:
          tags[t] += 1
        else:
          tags[t] = 1

        if w in words_tags:
          if t in words_tags[w]:
            words_tags[w][t] += 1
          else:
            words_tags[w][t] = 1 
        else:
            words_tags[w] = {t : 1}
    
    for word, tag in words_tags.items():
      words_tags[word] = max(words_tags[word].items(), key=operator.itemgetter(1))[0]

    most_common = max(tags.items(), key=operator.itemgetter(1))[0]

    return words_tags, most_common

  def get_accuracy(self, data, words_tags_map, most_common):
    total_sentences = 0
    total_words = 0

    correct_sentences = 0
    correct_words = 0

    for sentence in data:
      total_sentences += 1
      correct = True
      for word in sentence:
        total_words += 1
        w, t = word['lemma'], word['upos']
        if w in words_tags_map:
          predicted_tag_word = words_tags_map[w]
        else:
          predicted_tag_word = most_common

        if t == predicted_tag_word:
          correct_words += 1
        else:
          correct = False 
      
      if correct == True:
        correct_sentences += 1
    
    sentences_accuracy = correct_sentences/total_sentences
    words_accuracy = correct_words/total_words

    return words_accuracy, sentences_accuracy

In [None]:
tagger = simple_tagger()
tagger.train(train_list)

In [None]:
simple_words_accuracy_train, simple_sentences_accuracy_train = tagger.evaluate(train_list)
print("words_accuracy - train file = {:.5f}".format(simple_words_accuracy_train))
print("sentences_accuracy - train file = {:.5f}".format(simple_sentences_accuracy_train))

words_accuracy - train file = 0.93242
sentences_accuracy - train file = 0.37299


In [None]:
simple_words_accuracy_test, simple_sentences_accuracy_test = tagger.evaluate(test_list)
print("words_accuracy - test file = {:.5f}".format(simple_words_accuracy_test))
print("sentences_accuracy - test file = {:.5f}".format(simple_sentences_accuracy_test))

words_accuracy - test file = 0.84968
sentences_accuracy - test file = 0.18876


In [None]:
simple_words_accuracy_dev, simple_sentences_accuracy_dev = tagger.evaluate(dev_list)
print("words_accuracy - dev file = {:.5f}".format(simple_words_accuracy_dev))
print("sentences_accuracy - dev file = {:.5f}".format(simple_sentences_accuracy_dev))

words_accuracy - dev file = 0.86062
sentences_accuracy - dev file = 0.16327


## **Part 3**

Similar to part 2, write the class hmm_tagger, which implements HMM tagging. The method *train* should build the matrices A, B and Pi, from the data as discussed in class. The method *evaluate* should find the best tag sequence for every input sentence using he Viterbi decoding algorithm, and then calculate the word and sentence level accuracy using the gold-standard tags. You should implement the Viterbi algorithm in the next block and call it from your class.

Additional guidance:
1. The matrix B represents the emissions probabilities. Since B is a matrix, you should build a dictionary that maps every unique word in the corpus to a serial numeric id (starting with 0). This way columns in B represents word ids.
2. During the evaluation, you should first convert each word into it’s index and then create the observation array to be given to Viterbi, as a list of ids. OOV words should be assigned with a random tag. To make sure Viterbi works appropriately, you can simply break the sentence into multiple segments every time you see an OOV word, and decode every segment individually using Viterbi.


### Helpers

In [None]:
def update_signle_dict_counters(d, key):
  if key in d:
    d[key] += 1
  else:
    d[key] = 1
  return d

def update_dictionary_counters(d, key, val):
  if key in d:
    update_signle_dict_counters(d[key], val)
  else:
    d[key] = {val : 1}
  return d

def normalize_dictionary(d, factor):
  for key in d:
    d[key] = d[key]/factor
  return d

def normalize_dictionary_of_dictionaries(dod, factor):
  for key in dod:
      dod[key] = normalize_dictionary(dod[key], factor)
  return dod

def convert_to_log_scale(d):
  d1 = {}
  for k in d.keys():
    d1[k] = {x: ln(d[k][x]) for x in d[k]}
  return d1

def ln(x):
  if x < 1e-200:
    return -np.inf
  return np.log(x)

def create_tags_mapping(transitions):
  tag_to_id, id_to_tag = {}, {}
  cur = 0
  for tag in transitions:
    tag_to_id[tag] = cur
    id_to_tag[cur] = tag
    cur+=1
  return tag_to_id, id_to_tag

### Viterbi

In [None]:
def get_prob(d, k1, k2):
  if k2 in d[k1]:
    return d[k1][k2]
  return ln(0)

def get_emissions_prob(emissions, state, word):
  return get_prob(emissions, state, word)

def get_transitions_prob(transition, state1 , state2):
  return get_prob(transition, state1, state2)

In [None]:
# Viterbi
def viterbi(observations, A, B, Pi, tag_to_id = None): #changed the signature a little bit
  if tag_to_id is None:
    tag_to_id, _ = create_tags_mapping(A)

  path = {}
  mat = np.zeros((len(observations), len(A)))

  #init
  for st in A:
    mat[0][tag_to_id[st]] = Pi[st] + get_emissions_prob(B ,st, observations[0])
    path[st] = [st]

  for i in range(1, len(observations)):
    cur_path = {}
    
    for j in tag_to_id:
      states_aa = [(mat[i - 1][tag_to_id[state]] + get_transitions_prob(A, state, j), state) for state in tag_to_id]
      prob, state = max(states_aa)
      mat[i][tag_to_id[j]] = prob + get_emissions_prob(B, j, observations[i])
      cur_path[j] = path[state] + [j]
    path = cur_path

  (prob, state) = max((mat[len(observations) - 1][tag_to_id[state]], state) for state in tag_to_id)

  best_sequence = []
  
  for p in path[state]:
    best_sequence.append(tag_to_id[p])
  
  return best_sequence

In [None]:
# A simple example to run the Viterbi algorithm:
#( Same as in presentation "NLP 3 - Tagging" on slide 35)

# A = np.array([[0.3, 0.7], [0.2, 0.8]])
# B = np.array([[0.1, 0.1, 0.3, 0.5], [0.3, 0.3, 0.2, 0.2]])
# Pi = np.array([0.4, 0.6])
# print(viterbi([0, 3, 1, 0], A, B, Pi))
# Expected output: 1, 1, 1, 1

def run_test():
  A = {"N": {"N": 0.3, "V": 0.7}, "V": {"N": 0.2, "V":0.8}}
  B = {"N": {1: 0.1, 2: 0.1, 3: 0.3, 4: 0.5}, "V":{1: 0.3, 2: 0.3, 3: 0.2, 4: 0.2}}
  Pi = {"N":0.4, "V": 0.6}
  print(viterbi([0, 3, 1, 0], A, B, Pi))
run_test()

[1, 1, 1, 1]


### HMM Tagger

In [None]:
class hmm_tagger:
  def __init__(self):
    self.pi = {}
    self.transitions = {}
    self.emissions = {}
    self.word_to_id = {}
    self.id_to_word = {}
    self.tag_to_id = {}
    self.id_to_tag = {}
    self.states = {}
    self.B = {}
    self.most_common = ''

  def train(self, data):
    self.build_matrices(data)
    self.to_log()
    self.create_B()
    self.create_tags_mapping()
    self.find_common()
    self.tag_to_id, self.id_to_tag = create_tags_mapping(self.transitions)

  def evaluate(self, data):
    total_sentences, correct_sentences, total_words, correct_words = 0, 0, 0, 0
    
    for sentence in data:
      total_sentences += 1
      words = [word['lemma'] for word in sentence]
      tags = [word['upos'] for word in sentence]
      correct = True
      predict = self.get_prediction(words)

      for i in range(len(words)):
        total_words += 1
        if self.tag_to_id[tags[i]] == predict[i]:
          correct_words += 1
        else:
          correct = False

      if correct == True:
        correct_sentences += 1

    sentences_accuracy = correct_sentences/total_sentences
    words_accuracy = correct_words/total_words
    
    return words_accuracy, sentences_accuracy
    
###********************************************************************####
###********************************************************************####
###********************************************************************####

  def build_matrices(self, data):
    pi = {}  # (word -> count) dictionary where we take count only the first word of each sentence.
    transitions = {}
    emissions = {}
    number_of_sentences = 0
    number_of_words = 0

    for sentence in data:
      number_of_sentences += 1
      is_first_word = True
      prev_tag = ""

      for word in sentence:
        number_of_words += 1
        w, t = word['lemma'], word['upos']  # t is the state, w is the emission

        if is_first_word:
          # add to pi
          pi = update_signle_dict_counters(pi, t)
          is_first_word = False
        else:
          # add to transitions
          transitions = update_dictionary_counters(transitions, t, prev_tag)

        # add to emissions
        emissions = update_dictionary_counters(emissions, t, w)
        prev_tag = t

    # normalize the probabilites:
    self.pi = normalize_dictionary(pi, number_of_sentences)
    self.transitions = normalize_dictionary_of_dictionaries(transitions, number_of_words)
    self.emissions = normalize_dictionary_of_dictionaries(emissions, number_of_words)

  def create_tags_mapping(self):
    cur = 0
    for tag in self.transitions:
      self.states[tag] = cur
      cur += 1

  def find_common(self):
    tags = {}
    for t in self.transitions:
      if t in tags:
        tags[t] += 1
      else:
        tags[t] = 1

    self.most_common = max(tags.items(), key=operator.itemgetter(1))[0]

  def to_log(self):
    self.transitions = convert_to_log_scale(self.transitions)
    self.emissions = convert_to_log_scale(self.emissions)
    for k in self.pi:
      self.pi[k] = ln(self.pi[k])

  def create_B(self):
    self.B = {}
    cur_id = 0
    for tag in self.emissions:
      self.B[tag] = {}
      for word in self.emissions[tag]:
        # create the id-word mapping
        if word not in self.word_to_id:
          self.word_to_id[word] = cur_id
          self.id_to_word[cur_id] = word
          cur_id += 1

        self.B[tag][self.word_to_id[word]] = self.emissions[tag][word]

  # this should be used in viterby but actually cant so I need to decide where to keep it
  def get_emissions_prob(self, state, word):
    if word in self.B[state]:
      return self.B[state][word]
    return ln(0)

  def get_prediction(self, sentence):
    predict = []
    current = []

    for word in sentence:
      if word not in self.word_to_id:
        if len(current) > 0:
          predict += viterbi(current, self.transitions, self.B, self.pi, self.tag_to_id)
        random_tag = self.get_random_tag()
        predict.append(random_tag)
        current = []
      else:
        current.append(self.word_to_id[word])
    
    if (len(current) > 0):
      predict += viterbi(current, self.transitions, self.B, self.pi, self.tag_to_id)

    return predict

  def get_random_tag(self):
    return self.most_common


In [None]:
hmm_tagger = hmm_tagger()
hmm_tagger.train(train_list)

In [None]:
hmm_words_accuracy_train, hmm_sentences_accuracy_train = hmm_tagger.evaluate(train_list)
print("words_accuracy - train file = {:.5f}".format(hmm_words_accuracy_train))
print("sentences_accuracy - train file = {:.5f}".format(hmm_sentences_accuracy_train))

words_accuracy - train file = 0.82528
sentences_accuracy - train file = 0.14976


In [None]:
hmm_words_accuracy_test, hmm_sentences_accuracy_test = hmm_tagger.evaluate(test_list)
print("words_accuracy - test file = {:.5f}".format(hmm_words_accuracy_test))
print("sentences_accuracy - test file = {:.5f}".format(hmm_sentences_accuracy_test))

words_accuracy - test file = 0.70809
sentences_accuracy - test file = 0.08427


In [None]:
hmm_words_accuracy_dev, hmm_sentences_accuracy_dev = hmm_tagger.evaluate(dev_list)
print("words_accuracy - dev file = {:.5f}".format(hmm_words_accuracy_dev))
print("sentences_accuracy - dev file = {:.5f}".format(hmm_sentences_accuracy_dev))

words_accuracy - dev file = 0.72099
sentences_accuracy - dev file = 0.06122


## **Part 4**

Compare the results obtained from both taggers and a MEMM tagger, implemented by NLTK (a known NLP library), over both, the dev and test datasets. To train the NLTK MEMM tagger you should execute the following lines (it may take some time to train...):

In [None]:
from nltk.tag import tnt

In [None]:
def getDataAsArrayOfTupple(data):
  return [[(word['lemma'], word['upos']) for word in sentence] for sentence in data]

In [None]:
train_array = getDataAsArrayOfTupple(train_list)
test_array = getDataAsArrayOfTupple(test_list)
dev_array = getDataAsArrayOfTupple(dev_list)

tnt_pos_tagger = tnt.TnT()
tnt_pos_tagger.train(train_array)

In [None]:
def get_sentence_accuracy(data):
  total_sentences = 0
  correct_sentences = 0
  for sentence in data:
    total_sentences += 1
    predicted = tnt_pos_tagger.evaluate([sentence])
    if predicted == 1:
      correct_sentences += 1
  
  sentence_accuracy = correct_sentences / total_sentences
  return sentence_accuracy

In [None]:
nltk_words_accuracy_train = tnt_pos_tagger.evaluate(train_array)
nltk_sentences_accuracy_train = get_sentence_accuracy(train_array)
print("words_accuracy - train file = {:.5f}".format(nltk_words_accuracy_train))
print("sentences_accuracy - train file = {:.5f}".format(nltk_sentences_accuracy_train))

words_accuracy - train file = 0.96393
sentences_accuracy - train file = 0.57896


In [None]:
nltk_words_accuracy_test = tnt_pos_tagger.evaluate(test_array)
nltk_sentences_accuracy_test = get_sentence_accuracy(test_array)
print("words_accuracy - test file = {:.5f}".format(nltk_words_accuracy_test))
print("sentences_accuracy - test file = {:.5f}".format(nltk_sentences_accuracy_test))

words_accuracy - test file = 0.82745
sentences_accuracy - test file = 0.15393


In [None]:
nltk_words_accuracy_dev = tnt_pos_tagger.evaluate(dev_array)
nltk_sentences_accuracy_dev = get_sentence_accuracy(dev_array)
print("words_accuracy - dev file = {:.5f}".format(nltk_words_accuracy_dev))
print("sentences_accuracy - dev file = {:.5f}".format(nltk_sentences_accuracy_dev))

words_accuracy - dev file = 0.85184
sentences_accuracy - dev file = 0.16454


Print both, word level and sentence level accuracy for all the three taggers in a table.

**Train**

In [None]:
df_train = pd.DataFrame({'Sentence level accuracy': [simple_sentences_accuracy_train, hmm_sentences_accuracy_train, nltk_sentences_accuracy_train],'Word level accuracy': [simple_words_accuracy_train, hmm_words_accuracy_train, nltk_words_accuracy_train]}, index=['Simple', 'HMM', 'MEMM'])

In [None]:
display(df_train)

Unnamed: 0,Sentence level accuracy,Word level accuracy
Simple,0.372988,0.932422
HMM,0.149755,0.825277
MEMM,0.57896,0.963927


**Test**

In [None]:
df_test = pd.DataFrame({'Sentence level accuracy': [simple_sentences_accuracy_test, hmm_sentences_accuracy_test, nltk_sentences_accuracy_test],'Word level accuracy': [simple_words_accuracy_test, hmm_words_accuracy_test, nltk_words_accuracy_test]}, index=['Simple', 'HMM', 'MEMM'])

In [None]:
display(df_test)

Unnamed: 0,Sentence level accuracy,Word level accuracy
Simple,0.188764,0.84968
HMM,0.08427,0.708087
MEMM,0.153933,0.827452


**Dev**

In [None]:
df_dev = pd.DataFrame({'Sentence level accuracy': [simple_sentences_accuracy_dev, hmm_sentences_accuracy_dev, nltk_sentences_accuracy_dev],'Word level accuracy': [simple_words_accuracy_dev, hmm_words_accuracy_dev, nltk_words_accuracy_dev]}, index=['Simple', 'HMM', 'MEMM'])

In [None]:
display(df_dev)

Unnamed: 0,Sentence level accuracy,Word level accuracy
Simple,0.163265,0.860623
HMM,0.061224,0.72099
MEMM,0.164541,0.85184
