# Assignment 2

This assignment is about training and evaluating a POS tagger with some real data. The dataset is available through the Universal Dependencies (https://universaldependencies.org/) (UD) project. To get to know the project, please visit https://universaldependencies.org/introduction.html)

In [None]:
import numpy as np
import operator
import nltk
import pandas as pd
import operator
import math
import random 
import os
from collections import Counter, defaultdict
from typing import List, Dict
!pip install conllutils
import conllutils



**Part 1** (getting the data)

You can download the dataset files directly from the UD website, but it will let you only download all the languages in one compressed file. In this assignment you will be working with th GUM dataset, which you can download directly from:
https://github.com/UniversalDependencies/UD_English-GUM.
Please download it to your colab machine.



In [None]:
!git clone https://github.com/UniversalDependencies/UD_English-GUM
%cd /content/UD_English-GUM/
!git checkout 2c8b062269f2d2d3d62405c82d8c25cf24f705dd
%cd /content/

fatal: destination path 'UD_English-GUM' already exists and is not an empty directory.
/content/UD_English-GUM
HEAD is now at 2c8b062 Updated statistics.
/content


In [None]:
!ls UD_English-GUM

CONTRIBUTING.md       en_gum-ud-test.conllu   LICENSE.txt     README.md
en_gum-ud-dev.conllu  en_gum-ud-train.conllu  not-to-release  stats.xml


We will use the (train/dev/test) files:

UD_English-GUM/en_gum-ud-train.conllu

UD_English-GUM/en_gum-ud-dev.conllu

UD_English-GUM/en_gum-ud-test.conllu

They are all formatted in the conllu format. You may read about it [here](https://universaldependencies.org/format.html). There is a utility library **conllutils**, which can help you read the data into the memory. It has already been installed and imported above.

You should write a code that reads the three datasets into memory. You may choose the data structure by yourself. As you can see, every word is represented by a line, with columns representing specific features. We are only interested in the first and fourth columns, corresponding to the word and its POS tag.

In [None]:
import conllutils

ROOT_PATH = 'UD_English-GUM'

train_file = os.path.join(ROOT_PATH, 'en_gum-ud-train.conllu')
dev_file = os.path.join(ROOT_PATH, 'en_gum-ud-dev.conllu')
test_file = os.path.join(ROOT_PATH, 'en_gum-ud-test.conllu')

In [None]:
# Your code goes here
paths = ['UD_English-GUM/en_gum-ud-train.conllu', 'UD_English-GUM/en_gum-ud-dev.conllu', 'UD_English-GUM/en_gum-ud-test.conllu']

def load_file_from_git(path):
  return list(conllutils.read_conllu(path))

In [None]:
train_list = load_file_from_git(paths[0])

In [None]:
dev_list = load_file_from_git(paths[1])

In [None]:
test_list = load_file_from_git(paths[2])

In [None]:
#print(train_list[0])

In [None]:
#print(train_list[0][4])

In [None]:
#type(train_list[0][4])

In [None]:
#print(train_list[0][4]['lemma'])

In [None]:
#print(train_list[0][4]['xpos'])

In [None]:
# Takes the loaded and parsed data file (parsed with conllu), returns a list of sentences where each sentence is a map between words and their associated tags
def preprocess_data(data: list) -> List[Dict[str, str]]:
    result = []
    for sentence in data:
        words_to_tags = {word['form']: word.get('xpos', '') for word in sentence}
        result.append(words_to_tags)
    return result

In [None]:
train_list = preprocess_data(conllutils.read_conllu(train_file))
print(f"Train data has {len(train_list)} sentences")

Train data has 4287 sentences


In [None]:
dev_list = preprocess_data(conllutils.read_conllu(dev_file))
print(f"Dev data has {len(dev_list)} sentences")

Dev data has 784 sentences


In [None]:
test_list = preprocess_data(conllutils.read_conllu(test_file))
print(f"Test data has {len(test_list)} sentences")

Test data has 890 sentences


**Part 2**

Write a class **simple_tagger**, with methods *train* and *evaluate*. The method *train* receives the data as a list of sentences, and use it for training the tagger. In this case, it should learn a simple dictionary that maps words to tags, defined as the most frequent tag for every word (in case there is more than one most frequent tag, you may select one of them randomly). The dictionary should be stored as a class member for evaluation.

The method *evaluate* receives the data as a list of sentences, and use it to evaluate the tagger performance. Specifically, you should calculate the word and sentence level accuracy.
The evaluation process is simply going word by word, querying the dictionary (created by the train method) for each word’s tag and compare it to the true tag of that word. The word-level accuracy is the number of successes divided by the number of words. For OOV (out of vocabulary, or unknown) words, the tagger should assign the most frequent tag in the entire training set (i.e., the mode). The function should return the two numbers: word level accuracy and sentence level accuracy.


In [None]:
word = str
tag = str

In [None]:
from collections import Counter
from operator import itemgetter

class SimpleTagger:
  def __init__(self):
    self.words_to_tags: Dict[word, tag] = {}
    self.most_common_tag = None

  def train(self, data: List[Dict[word, tag]]):
    words_tags_counter = Counter()
    tags_counter = Counter()
    for sentence in data:
      words_tags_counter.update(sentence.items())
    d = defaultdict(list)
    for (word, tag), num_of_occ in words_tags_counter.items():
      d[word].append((tag, num_of_occ))
      tags_counter.update([tag]*num_of_occ)
    for word, tags in d.items():
      self.words_to_tags[word] = max(tags, key=itemgetter(1))[0]
    self.most_common_tag = tags_counter.most_common(1)[0][0]
  
  def predict_tag(self, word: word) -> tag:
    return self.words_to_tags.get(word, self.most_common_tag)

  def evaluate(self, data: List[Dict[word, tag]]):
    total_words = sum(len(sentence) for sentence in data)
    total_sentences = len(data)
    total_words_correct = 0
    total_sentences_correct = 0
    for sentence in data:
      correct_words_predictions = sum(self.predict_tag(word) == tag for word,tag in sentence.items())
      total_words_correct += correct_words_predictions
      if correct_words_predictions == len(sentence):
        total_sentences_correct += 1
    word_acc = total_words_correct / total_words
    sent_acc = total_sentences_correct / total_sentences
    return word_acc, sent_acc

In [None]:
tagger = SimpleTagger()
tagger.train(train_list)
tagger.most_common_tag
tagger.words_to_tags['walk']

'VB'

In [None]:
simple_words_accuracy_train, simple_sentences_accuracy_train = tagger.evaluate(train_list)
print("words_accuracy - train file = {:.5f}".format(simple_words_accuracy_train))
print("sentences_accuracy - train file = {:.5f}".format(simple_sentences_accuracy_train))

words_accuracy - train file = 0.93645
sentences_accuracy - train file = 0.39701


In [None]:
simple_words_accuracy_test, simple_sentences_accuracy_test = tagger.evaluate(test_list)
print("words_accuracy - test file = {:.5f}".format(simple_words_accuracy_test))
print("sentences_accuracy - test file = {:.5f}".format(simple_sentences_accuracy_test))

words_accuracy - test file = 0.80626
sentences_accuracy - test file = 0.13933


In [None]:
simple_words_accuracy_dev, simple_sentences_accuracy_dev = tagger.evaluate(dev_list)
print("words_accuracy - dev file = {:.5f}".format(simple_words_accuracy_dev))
print("sentences_accuracy - dev file = {:.5f}".format(simple_sentences_accuracy_dev))

words_accuracy - dev file = 0.81871
sentences_accuracy - dev file = 0.12117


**Part 3**

Similar to part 2, write the class hmm_tagger, which implements HMM tagging. The method *train* should build the matrices A, B and Pi, from the data as discussed in class. The method *evaluate* should find the best tag sequence for every input sentence using he Viterbi decoding algorithm, and then calculate the word and sentence level accuracy using the gold-standard tags. You should implement the Viterbi algorithm in the next block and call it from your class.

Additional guidance:
1. The matrix B represents the emissions probabilities. Since B is a matrix, you should build a dictionary that maps every unique word in the corpus to a serial numeric id (starting with 0). This way columns in B represents word ids.
2. During the evaluation, you should first convert each word into it’s index and then create the observation array to be given to Viterbi, as a list of ids. OOV words should be assigned with a random tag. To make sure Viterbi works appropriately, you can simply break the sentence into multiple segments every time you see an OOV word, and decode every segment individually using Viterbi.


In [None]:
def convert_word_to_index(data: List[Dict[word, tag]]):
    words_counter = Counter()
    tags_counter = Counter()
    for sentence in data:
        # print(type(sentence))
        words_counter.update(sentence.keys())
        tags_counter.update(sentence.values())
    def counter_to_index_dict(counter): 
        return dict(zip(counter.keys(), range(len(counter))))
    return counter_to_index_dict(words_counter), counter_to_index_dict(tags_counter)

In [None]:
train_words_index, train_tags_index = convert_word_to_index(train_list)

In [None]:
# Viterbi
def viterbi(observations, A, B, Pi):
  N = A.shape[0]
  T = len(observations)
  delta = np.zeros((N, T))
  phi = np.zeros((N, T))
  delta[:, 0] = B[:, observations[0]] * Pi
  phi[:, 0] = 0
  for t in range(1, T):
    for n in range(N):
      path_prob = delta[:, t-1]*A[:, n]
      delta[n, t] = B[n, observations[t]] * np.max(path_prob)
      phi[n,t] = np.argmax(path_prob)
  best_sequence = np.zeros(T, np.int8)
  best_sequence[-1] = np.argmax(delta[:, T-1])
  for i in reversed(range(1, T)):
    best_sequence[i-1] = phi[best_sequence[i], i]
  return best_sequence

In [None]:
# A simple example to run the Viterbi algorithm:
#( Same as in presentation "NLP 3 - Tagging" on slide 35)

A = np.array([[0.3, 0.7], [0.2, 0.8]])
B = np.array([[0.1, 0.1, 0.3, 0.5], [0.3, 0.3, 0.2, 0.2]])
Pi = np.array([0.4, 0.6])
print(viterbi([0, 3, 2, 0], A, B, Pi))
# Expected output: 1, 1, 1, 1

[1 1 1 1]


In [None]:
from sklearn.preprocessing import normalize
word_id = int

In [None]:
class hmm_tagger:
  def __init__(self):
    pass

  def train(self, data: List[Dict[word, tag]]):
    words_index, tags_index = convert_word_to_index(data)
    total_unique_words, total_unique_tags = len(words_index), len(tags_index)
    total_words = sum(len(sentence) for sentence in data)
    self.transitions = np.zeros((total_unique_tags, total_unique_tags))  # A matrix
    self.emissions = np.zeros((total_unique_tags, total_unique_words))  # B matrix
    self.starting_probs = np.zeros(total_unique_tags)  # pi vector
    for sentence in data:
      for i, (word, tag) in enumerate(sentence.items()):
        if i == 0:
          self.starting_probs[tags_index[tag]] += 1
        self.emissions[tags_index[tag], words_index[word]] += 1
      for (prev_tag, tag) in zip(sentence.values(), list(sentence.values())[1:]):
        transition = (tags_index[prev_tag], tags_index[tag])
        self.transitions[transition] += 1
    # Normalize all matrices
    self.starting_probs = self.starting_probs / total_words
    self.transitions = normalize(self.transitions, axis=1, norm='l1')
    self.emissions = normalize(self.emissions, axis=1, norm='l1')

    self.train_words_index = words_index
    self.train_tags_index = tags_index

  # Predicts POS tags for the given sentence
  def predict_tags(self, sentence: List[word_id]) -> List[tag]:
    predicted_tags = np.array([], dtype=np.int8)
    current_segment = []
    for word in sentence:
      if word is None:
        if current_segment:
          tags = viterbi(current_segment, self.transitions, self.emissions, self.starting_probs)
          predicted_tags = np.concatenate((predicted_tags, tags))
        predicted_tags = np.append(predicted_tags, np.random.choice(list(self.train_tags_index.values())))
        current_segment = []
      else:
        current_segment.append(word)
    if current_segment:
      tags = viterbi(current_segment, self.transitions, self.emissions, self.starting_probs)
      predicted_tags = np.concatenate((predicted_tags, tags))
    return predicted_tags

  def evaluate(self, data: List[Dict[word, tag]]):
    total_words = sum(len(sentence) for sentence in data)
    total_sentences = len(data)
    total_words_correct = 0
    total_sentences_correct = 0
    for sentence in data:
      indexed_sentence = [self.train_words_index.get(word) for word in sentence]
      predicted_tags = self.predict_tags(indexed_sentence)
      true_tags = np.array([self.train_tags_index[tag] for tag in sentence.values()])
      correct_words_predictions = np.sum(predicted_tags == true_tags)
      total_words_correct += correct_words_predictions
      if correct_words_predictions == len(sentence):
        total_sentences_correct += 1
    word_acc = total_words_correct / total_words
    sent_acc = total_sentences_correct / total_sentences
    return word_acc, sent_acc

In [None]:
hmm_tagger = hmm_tagger()
hmm_tagger.train(train_list)

In [None]:
hmm_words_accuracy_train, hmm_sentences_accuracy_train = hmm_tagger.evaluate(train_list)
print("words_accuracy - train file = {:.5f}".format(hmm_words_accuracy_train))
print("sentences_accuracy - train file = {:.5f}".format(hmm_sentences_accuracy_train))

words_accuracy - train file = 0.96622
sentences_accuracy - train file = 0.61325


In [None]:
hmm_words_accuracy_test, hmm_sentences_accuracy_test = hmm_tagger.evaluate(test_list)
print("words_accuracy - test file = {:.5f}".format(hmm_words_accuracy_test))
print("sentences_accuracy - test file = {:.5f}".format(hmm_sentences_accuracy_test))

words_accuracy - test file = 0.73609
sentences_accuracy - test file = 0.11461


In [None]:
hmm_words_accuracy_dev, hmm_sentences_accuracy_dev = hmm_tagger.evaluate(dev_list)
print("words_accuracy - dev file = {:.5f}".format(hmm_words_accuracy_dev))
print("sentences_accuracy - dev file = {:.5f}".format(hmm_sentences_accuracy_dev))

words_accuracy - dev file = 0.74591
sentences_accuracy - dev file = 0.12117


**Part 4**

Compare the results obtained from both taggers and a MEMM tagger, implemented by NLTK (a known NLP library), over both, the dev and test datasets. To train the NLTK MEMM tagger you should execute the following lines (it may take some time to train...):

In [None]:
from nltk.tag import tnt 

In [None]:
paths = ['UD_English-GUM/en_gum-ud-train.conllu', 'UD_English-GUM/en_gum-ud-dev.conllu', 'UD_English-GUM/en_gum-ud-test.conllu']

def load_file_from_git(path):
  return list(conllutils.read_conllu(path))

train_list = load_file_from_git(paths[0])
dev_list = load_file_from_git(paths[1])
test_list = load_file_from_git(paths[2])

In [None]:
def getDataAsArrayOfTupple(data):
  return [[(word['lemma'], word['xpos']) for word in sentence] for sentence in data]

In [None]:
train_array = getDataAsArrayOfTupple(train_list)
test_array = getDataAsArrayOfTupple(test_list)
dev_array = getDataAsArrayOfTupple(dev_list)
tnt_pos_tagger = tnt.TnT()
tnt_pos_tagger.train(train_array)

In [None]:
def get_sentence_accuracy(data):
  total_sentences = 0
  correct_sentences = 0
  for sentence in data:
    total_sentences += 1
    predicted = tnt_pos_tagger.evaluate([sentence])
    if predicted == 1:
      correct_sentences += 1
  sentence_accuracy = correct_sentences / total_sentences
  return sentence_accuracy

Print both, word level and sentence level accuracy for all the three taggers in a table.

In [None]:
nltk_words_accuracy_train = tnt_pos_tagger.evaluate(train_array)
nltk_sentences_accuracy_train = get_sentence_accuracy(train_array)
print("words_accuracy - train file = {:.5f}".format(nltk_words_accuracy_train))
print("sentences_accuracy - train file = {:.5f}".format(nltk_sentences_accuracy_train))

words_accuracy - train file = 0.92155
sentences_accuracy - train file = 0.32004


In [None]:
nltk_words_accuracy_test = tnt_pos_tagger.evaluate(test_array)
nltk_sentences_accuracy_test = get_sentence_accuracy(test_array)
print("words_accuracy - test file = {:.5f}".format(nltk_words_accuracy_test))
print("sentences_accuracy - test file = {:.5f}".format(nltk_sentences_accuracy_test))

words_accuracy - test file = 0.74601
sentences_accuracy - test file = 0.09663


In [None]:
nltk_words_accuracy_dev = tnt_pos_tagger.evaluate(dev_array)
nltk_sentences_accuracy_dev = get_sentence_accuracy(dev_array)
print("words_accuracy - dev file = {:.5f}".format(nltk_words_accuracy_dev))
print("sentences_accuracy - dev file = {:.5f}".format(nltk_sentences_accuracy_dev))

words_accuracy - dev file = 0.77157
sentences_accuracy - dev file = 0.07143


In [None]:
df_train = pd.DataFrame({'Sentence level accuracy': [simple_sentences_accuracy_train, hmm_sentences_accuracy_train, nltk_sentences_accuracy_train],'Word level accuracy': [simple_words_accuracy_train, hmm_words_accuracy_train, nltk_words_accuracy_train]}, index=['Simple', 'HMM', 'MEMM'])

In [None]:
display(df_train)

Unnamed: 0,Sentence level accuracy,Word level accuracy
Simple,0.397014,0.936454
HMM,0.613249,0.966218
MEMM,0.320037,0.92155


In [None]:
df_test = pd.DataFrame({'Sentence level accuracy': [simple_sentences_accuracy_test, hmm_sentences_accuracy_test, nltk_sentences_accuracy_test],'Word level accuracy': [simple_words_accuracy_test, hmm_words_accuracy_test, nltk_words_accuracy_test]}, index=['Simple', 'HMM', 'MEMM'])

In [None]:
display(df_test)

Unnamed: 0,Sentence level accuracy,Word level accuracy
Simple,0.139326,0.806259
HMM,0.114607,0.736089
MEMM,0.096629,0.746013


In [None]:
df_dev = pd.DataFrame({'Sentence level accuracy': [simple_sentences_accuracy_dev, hmm_sentences_accuracy_dev, nltk_sentences_accuracy_dev],'Word level accuracy': [simple_words_accuracy_dev, hmm_words_accuracy_dev, nltk_words_accuracy_dev]}, index=['Simple', 'HMM', 'MEMM'])

In [None]:
display(df_dev)

Unnamed: 0,Sentence level accuracy,Word level accuracy
Simple,0.121173,0.818713
HMM,0.121173,0.745906
MEMM,0.071429,0.771573
