<a href="https://colab.research.google.com/github/ItaiKaplan/NLP/blob/main/HW_2_tagging.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Assignment 2

This assignment is about training and evaluating a POS tagger with some real data. The dataset is available through the Universal Dependencies (https://universaldependencies.org/) (UD) project. To get to know the project, please visit https://universaldependencies.org/introduction.html)

In [None]:
import numpy as np
import operator
import nltk

!pip install conllutils
import conllutils
from collections import defaultdict, Counter
import pandas as pd



**Part 1** (getting the data)

You can download the dataset files directly from the UD website, but it will let you only download all the languages in one compressed file. In this assignment you will be working with th GUM dataset, which you can download directly from:
https://github.com/UniversalDependencies/UD_English-GUM.
Please download it to your colab machine.



In [None]:
!git clone https://github.com/UniversalDependencies/UD_English-GUM

Cloning into 'UD_English-GUM'...
remote: Enumerating objects: 3705, done.[K
remote: Counting objects: 100% (7/7), done.[K
remote: Compressing objects: 100% (3/3), done.[K
remote: Total 3705 (delta 5), reused 4 (delta 4), pack-reused 3698[K
Receiving objects: 100% (3705/3705), 30.77 MiB | 23.57 MiB/s, done.
Resolving deltas: 100% (3355/3355), done.


We will use the (train/dev/test) files:

UD_English-GUM/en_gum-ud-train.conllu

UD_English-GUM/en_gum-ud-dev.conllu

UD_English-GUM/en_gum-ud-test.conllu

They are all formatted in the conllu format. You may read about it [here](https://universaldependencies.org/format.html). There is a utility library **conllutils**, which can help you read the data into the memory. It has already been installed and imported above.

You should write a code that reads the three datasets into memory. You may choose the data structure by yourself. As you can see, every word is represented by a line, with columns representing specific features. We are only interested in the first and fourth columns, corresponding to the word and its POS tag.

In [None]:
def clean_data(data):
  return [sentence for sentence in data if all(['xpos' in word for word in sentence])]

In [None]:
# Your code goes here
train_set = clean_data(conllutils.pipe().read_conllu('UD_English-GUM/en_gum-ud-train.conllu').collect())
dev_set = clean_data(conllutils.pipe().read_conllu('UD_English-GUM/en_gum-ud-dev.conllu').collect())
test_set = clean_data(conllutils.pipe().read_conllu('UD_English-GUM/en_gum-ud-test.conllu').collect())

# Give indices to words and xpos
indices = conllutils.pipe().read_conllu(
    'UD_English-GUM/en_gum-ud-train.conllu').pipe().create_index(fields={'form', 'xpos'})
vocab = set([word['form'] for sentence in train_set for word in sentence])
word_indices = {word: i for i, word in enumerate(vocab)}
index_to_word = {v: k for k, v in word_indices.items()}
xpos_indices = indices['xpos']
xpos_indices = {k: v-1 for k, v in xpos_indices.items()} # Start with 0 instead of 1
index_to_xpos = {v: k for k, v in xpos_indices.items()}
num_xpos = len(xpos_indices)
vocab_length = len(word_indices)

In [None]:
print(train_set[0][0]['form'])
print(train_set[0][0]['xpos'])
print(num_xpos)
print(vocab_length)

Aesthetic
JJ
44
12163


**Part 2**

Write a class **simple_tagger**, with methods *train* and *evaluate*. The method *train* receives the data as a list of sentences, and use it for training the tagger. In this case, it should learn a simple dictionary that maps words to tags, defined as the most frequent tag for every word (in case there is more than one most frequent tag, you may select one of them randomly). The dictionary should be stored as a class member for evaluation.

The method *evaluate* receives the data as a list of sentences, and use it to evaluate the tagger performance. Specifically, you should calculate the word and sentence level accuracy.
The evaluation process is simply going word by word, querying the dictionary (created by the train method) for each word’s tag and compare it to the true tag of that word. The word-level accuracy is the number of successes divided by the number of words. For OOV (out of vocabulary, or unknown) words, the tagger should assign the most frequent tag in the entire training set (i.e., the mode). The function should return the two numbers: word level accuracy and sentence level accuracy.


In [None]:
class simple_tagger:
  def train(self, data):
    # Count all tags in relevant context
    all_tags_count = defaultdict(Counter)
    for sentence in data:
      for word in sentence:
        all_tags_count[word['form']][word['xpos']] += 1
    
    tagger = dict()
    most_frequent_tag = None
    highest_freq = -float('inf')
    for word, tags_count in all_tags_count.items():
      curr_freq_tag = max(tags_count, key=tags_count.get)
      curr_freq = tags_count[curr_freq_tag]
      if curr_freq > highest_freq:
        highest_freq = curr_freq
        most_frequenct_tag = curr_freq_tag

      tagger[word] = curr_freq_tag

    self._tagger = defaultdict(lambda: most_frequent_tag, tagger)
    
  
  def evaluate(self, data):
    word_level_acc = 0
    senetence_level_acc = 0
    word_count = 0
    sentence_count = 0

    for sentence in data:
      accurate_sentence = True
      sentence_count += 1
      for word in sentence:
        word_count += 1
        if self._tagger[word['form']] == word['xpos']:
          word_level_acc += 1
        else:
          accurate_sentence = False
          
      if accurate_sentence:
        senetence_level_acc += 1
    
    return word_level_acc / word_count, senetence_level_acc / sentence_count


In [None]:
test_simple_tagger = simple_tagger()
test_simple_tagger.train(train_set)
simple_tagger_dev_word_acc, simple_tagger_dev_sentence_acc = test_simple_tagger.evaluate(dev_set)
simple_tagger_test_word_acc, simple_tagger_test_sentence_acc = test_simple_tagger.evaluate(test_set)
print(f"Word level: {simple_tagger_test_word_acc}, Sentence level: {simple_tagger_test_sentence_acc}")

Word level: 0.7921648835745752, Sentence level 0.14


**Part 3**

Similar to part 2, write the class hmm_tagger, which implements HMM tagging. The method *train* should build the matrices A, B and Pi, from the data as discussed in class. The method *evaluate* should find the best tag sequence for every input sentence using he Viterbi decoding algorithm, and then calculate the word and sentence level accuracy using the gold-standard tags. You should implement the Viterbi algorithm in the next block and call it from your class.

Additional guidance:
1. The matrix B represents the emissions probabilities. Since B is a matrix, you should build a dictionary that maps every unique word in the corpus to a serial numeric id (starting with 0). This way columns in B represents word ids.
2. During the evaluation, you should first convert each word into it’s index and then create the observation array to be given to Viterbi, as a list of ids. OOV words should be assigned with a random tag. To make sure Viterbi works appropriately, you can simply break the sentence into multiple segments every time you see an OOV word, and decode every segment individually using Viterbi.


In [None]:
# Viterbi
def viterbi (observations, A, B, Pi, num_xpos_override=None):
  #...
  N = num_xpos_override if num_xpos_override is not None else num_xpos
  T = len(observations)
  delta = np.zeros((N, T,), dtype=np.float64)
  psi = np.zeros((N, T,), dtype=np.float64)

  for n in range(N):
    delta[n, 0] = B[n, observations[0]] * Pi[n]
    # Psi is already zero
  
  for t in range(1, T):
    for n in range(N):
      prev_values = np.array([delta[j, t-1] * A[j, n] for j in range(N)])
      delta[n, t] = B[n, observations[t]] * np.max(prev_values)
      psi[n, t] = np.argmax(prev_values)

  best_sequence = np.zeros((T,), dtype=int)
  best_sequence[T - 1] = np.argmax(delta[:, T - 1])
  for t in reversed(range(0, T-1)):
    best_sequence[t] = psi[best_sequence[t + 1], t + 1]

  return best_sequence

# A simple example to run the Viterbi algorithm:
#( Same as in presentation "NLP 3 - Tagging" on slide 35)

# A = np.array([[0.3, 0.7], [0.2, 0.8]])
# B = np.array([[0.1, 0.1, 0.3, 0.5], [0.3, 0.3, 0.2, 0.2]])
# Pi = np.array([0.4, 0.6])
# print(viterbi([0, 3, 2, 0], A, B, Pi))
# Expected output: 1, 1, 1, 1

In [None]:
A = np.array([[0.3, 0.7], [0.2, 0.8]])
B = np.array([[0.1, 0.1, 0.3, 0.5], [0.3, 0.3, 0.2, 0.2]])
Pi = np.array([0.4, 0.6])
print(viterbi([0, 3, 2, 0], A, B, Pi, 2))
# Expected output: 1, 1, 1, 1

[1 1 1 1]


In [None]:
class hmm_tagger:
  def train(self, data):
    self.PI = np.zeros((num_xpos,), dtype=np.float64)
    self.A = np.zeros((num_xpos, num_xpos,), dtype=np.float64)
    self.B = np.zeros((num_xpos, vocab_length,), dtype=np.float64)

    tag_counts = np.zeros((num_xpos,), dtype=np.float64)
    sentence_count = 0

    for sentence in data:
      sentence_count += 1
      prev_tag_index = None
      for i, word in enumerate(sentence):        
        curr_tag_index = xpos_indices[word['xpos']]
        curr_word_index = word_indices[word['form']]
        if i != 0:
          self.A[prev_tag_index, curr_tag_index] += 1

        self.B[curr_tag_index, curr_word_index] += 1
        tag_counts[curr_tag_index] += 1
        prev_tag_index = curr_tag_index
    
    self.PI = tag_counts / np.sum(tag_counts)
    self.A = (self.A.T * (1 / tag_counts)).T
    self.B = (self.B.T * (1 / tag_counts)).T
    

  def evaluate(self, data):
    word_level_acc = 0
    senetence_level_acc = 0
    word_count = 0
    sentence_count = 0

    for sentence in data:
      accurate_sentence = True
      sentence_count += 1
      observation_buffer = list()
      predicted_sequence = list()

      for word in sentence:
        if word['form'] in word_indices:
          observation_buffer.append(word_indices[word['form']])
          continue
        
        predicted_tag = np.random.randint(num_xpos)
        if len(observation_buffer) > 0:
          predicted_sequence.extend(viterbi(observation_buffer, self.A, self.B, self.PI))
          observation_buffer = list()

        predicted_sequence.append(predicted_tag)

      if len(observation_buffer) > 0:
            predicted_sequence.extend(viterbi(observation_buffer, self.A, self.B, self.PI))
            observation_buffer = list()

      for i, word in enumerate(sentence):
        word_count += 1
        predicted = index_to_xpos[predicted_sequence[i]]
        actual = word['xpos']
        if predicted == actual:
          word_level_acc += 1
        else:
          accurate_sentence = False
      
      senetence_level_acc += int(accurate_sentence)
    
    return word_level_acc / word_count, senetence_level_acc / sentence_count

In [None]:
test_hmm_tagger = hmm_tagger()
test_hmm_tagger.train(train_set)
hmm_tagger_dev_word_acc, hmm_tagger_dev_sentence_acc = test_hmm_tagger.evaluate(dev_set)
hmm_tagger_test_word_acc, hmm_tagger_test_sentence_acc = test_hmm_tagger.evaluate(test_set)
print(f"Word level: {hmm_tagger_test_word_acc}, Sentence level: {hmm_tagger_test_sentence_acc}")

**Part 4**

Compare the results obtained from both taggers and a MEMM tagger, implemented by NLTK (a known NLP library), over both, the dev and test datasets. To train the NLTK MEMM tagger you should execute the following lines (it may take some time to train...):

In [None]:
# Convert to wanted form
train_data = list()
dev_data = list()
test_data = list()

for sentence in train_set:
  train_data.append([(word['form'], word['xpos']) for word in sentence])

for sentence in dev_set:
  dev_data.append([(word['form'], word['xpos']) for word in sentence])

for sentence in test_set:
  test_data.append([(word['form'], word['xpos']) for word in sentence])

In [None]:
from nltk.tag import tnt 

tnt_pos_tagger = tnt.TnT()
tnt_pos_tagger.train(train_data)
print(tnt_pos_tagger.evaluate(test_data))

0.8214285714285714


In [None]:
# Evaluate word and sentence level accuracy:
memm_tagger_dev_word_acc = tnt_pos_tagger.evaluate(dev_data)
memm_tagger_test_word_acc = tnt_pos_tagger.evaluate(test_data)

memm_tagger_dev_sentence_acc = 0
memm_tagger_test_sentence_acc = 0

for sentence in dev_set:
  converted_sentence = [[(word['form'], word['xpos']) for word in sentence]]
  memm_tagger_dev_sentence_acc += tnt_pos_tagger.evaluate(converted_sentence) == 1

memm_tagger_dev_sentence_acc /= len(dev_set)

for sentence in test_set:
  converted_sentence = [[(word['form'], word['xpos']) for word in sentence]]
  memm_tagger_test_sentence_acc += tnt_pos_tagger.evaluate(converted_sentence) == 1

memm_tagger_test_sentence_acc /= len(test_set)

Print both, word level and sentence level accuracy for all the three taggers in a table.

In [None]:
# Your code goes here
pd.DataFrame([
                [simple_tagger_dev_word_acc, simple_tagger_dev_sentence_acc, simple_tagger_test_word_acc, simple_tagger_test_sentence_acc],
                [hmm_tagger_dev_word_acc, hmm_tagger_dev_sentence_acc, hmm_tagger_test_word_acc, hmm_tagger_test_sentence_acc], 
                [memm_tagger_dev_word_acc, memm_tagger_dev_sentence_acc, memm_tagger_test_word_acc, memm_tagger_test_sentence_acc]],
             columns=['Dev word acc', 'Dev sentence acc', 'Test word acc', 'Test sentence acc'],
             index=['Simple', 'HMM', 'MEMM'])


Unnamed: 0,Dev word acc,Dev sentence acc,Test word acc,Test sentence acc
Simple,0.814608,0.151652,0.792165,0.14
HMM,0.841526,0.171171,0.81545,0.162667
MEMM,0.845384,0.181682,0.821429,0.170667
