<a href="https://colab.research.google.com/github/edwardb1203/COMP590_NLP/blob/main/hw3_q2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Importing libraries
import nltk
import numpy as np
import pandas as pd
import random
from sklearn.model_selection import train_test_split
import pprint, time
# download the treebank corpus from nltk
nltk.download ('treebank')
# download the universal tagset from nltk
nltk.download ('universal_tagset')
# reading the Treebank tagged sentences
nltk_data = list(nltk.corpus.treebank.tagged_sents(tagset = 'universal'))
# split data into training and validation set in the ratio 80:20
train_set , test_set = train_test_split ( nltk_data , train_size =0.95 , test_size =0.05 ,
random_state =123)
# create list of train and test tagged words
train_tagged_words = [ tup for sent in train_set for tup in sent ]
# This is for you to test the correctness of your implementation when you test on the test set
test_tagged_words = [ tup for sent in test_set for tup in sent ]
# create the list of words for testing without the tags . This is the list you should be testing on!
test_words_without_tags = [ tup [0] for sent in test_set for tup in sent ]
# number of tagged words in training
print ( len( train_tagged_words ) )
# number of words in testing
print ( len( test_words_without_tags ) )

[nltk_data] Downloading package treebank to /root/nltk_data...
[nltk_data]   Unzipping corpora/treebank.zip.
[nltk_data] Downloading package universal_tagset to /root/nltk_data...
[nltk_data]   Unzipping taggers/universal_tagset.zip.


95656
5020


In [2]:
# b, calculating emission probabilities, pair is (word, tag)
# p(w|t) = C(t,w) / C(t)
def emission(pair):
  # tag is stored in 2nd position 
  tag = pair[1]
  # get tag count
  Ctag = sum(1 for elt in train_tagged_words if elt[1] == tag)
  # and joint count
  Cpair = sum(1 for elt in train_tagged_words if elt == pair)
  return Cpair / Ctag

In [3]:
# get all pos pairs for part c
pos_pairs = [(train_tagged_words[i][1], train_tagged_words[i+1][1]) for i in range(len(train_tagged_words)-1)]

In [4]:
# c, calculating emission probabilites given pair (tag1, tag2)
# p (tag2 | tag1) = C(tag1, tag2) / C(tag1)
def transition(pair):
  # tag1, the previous tag
  tag1 = pair[0]
  # get previous tag count
  Ctag1 = sum(1 for elt in pos_pairs if elt[0] == tag1)
  # and joint count
  Cpair = sum(1 for elt in pos_pairs if elt == pair)
  return Cpair / Ctag1

In [5]:
# gives unique tags = 12
# print(len(set(list(sum(pos_pairs, ())))))
tags = set(list(sum(pos_pairs, ())))

In [6]:
# d, TxT matrix
# construct the empty matrix
transition_matrix = pd.DataFrame(columns=tags, index=tags)
# fill in the probabilities by calling transition on each (row,col)
for col, series in transition_matrix.iteritems():
    for index, value in series.iteritems():
      transition_matrix.at[index,col] = transition((index,col))


In [None]:
transition_matrix.head()

In [7]:
# get all transition probs
def get_transitions():
  transitions = {tag:None for tag in tags}

  for col, series in transition_matrix.iteritems():
      for index, value in series.iteritems():
        # for every POS, create a dict of its transition probs
        if transitions[index] == None:
          transitions[index] = {}
        # EX: NOUN: {"X": .0004, "DET": .05}
        transitions[index][col] = transition((index, col))

  return transitions

In [8]:
 # get the emission probabilities for the given observed words
def get_emissions(observed):
  emissions = {tag:None for tag in tags}
  for tag in tags:
    for word in observed:
      if emissions[tag] == None:
        emissions[tag] = {}
      emissions[tag][word] = emission((word,tag))
  return emissions


In [None]:
# Testing func
# get_emissions(test_words_without_tags[0:5])
# get_transitions()

In [9]:
def smoothing(observed, emissions):
   # handle unseen emissions as NOUN
  for word in observed:
    sum = 0
    for pos in emissions:
      sum += emissions[pos][word]
    if sum == 0:
      emissions["NOUN"][word] = 1
  return emissions

In [10]:
# The Viterbi Algorithm
# Goal: Maximize the join probability of P(hidden states, observed states)

# Input: A sentence without POS
# Output: POS tags for sentence and probability
# Required: Emission & Transition probabilities

def viterbi(observed):
  
  # get emission probs for given observed states
  emissions = get_emissions(observed)
  # get transition probs for given POS tags
  transitions = get_transitions()
  # smooth
  emissions = smoothing(observed, emissions)
   # each dictionary in V stores the probabilities for all possible states at a certain time step 
   # with the keys being the POS tags and the values being the probabilities of reaching each tag at that time step.
  V = [{}]
  # keeps track of most probable path
  path = {}

  # Initialize base cases (t == 0)
  for POS in tags:
    # assume each x_1 follows a . tag
    # multiply the transition prob from start token to each POS by the emission from each POS to start observed word
      V[0][POS] = transitions["."][POS] * (emissions[POS][observed[0]])
      path[POS] = [POS]
  
  # Run Viterbi algorithm for t > 0
  for t in range(1, len(observed)):
      V.append({})
      newpath = {}

      for POS in tags:
          # Find the maximum probability and corresponding previous POS
          (prob, prev_POS) = max((V[t-1][prev_POS] * transitions[prev_POS][POS] * emissions[POS][observed[t]], prev_POS) 
                                  for prev_POS in tags)
          V[t][POS]= prob
          newpath[POS] = path[prev_POS] + [POS]

      # Update the path with the new best path
      path = newpath

  # Find the maximum probability and corresponding path
  (prob, POS) = max((V[len(observed)-1][POS], POS) for POS in tags)
  return (prob, path[POS])

In [11]:
sent1 = test_words_without_tags[0:28]
sent2 = test_words_without_tags[28:63]
sent3 = test_words_without_tags[63:81]


print("Sentence: " + " ".join(sent1) + " Viterbi result: " + str(viterbi(sent1)))
print("Sentence: " + " ".join(sent2) + " Viterbi result: " + str(viterbi(sent2)))
print("Sentence: " + " ".join(sent3) + " Viterbi result: " + str(viterbi(sent3)))

Sentence: Worksheets in a test-practice kit called * Learning Materials , sold * to schools across the country by Macmillan\/McGraw-Hill School Publishing Co. , contain the same questions . Viterbi result: (7.671191658704445e-70, ['NOUN', 'ADP', 'DET', 'NOUN', 'NOUN', 'VERB', 'X', 'NOUN', 'NOUN', '.', 'VERB', 'X', 'PRT', 'NOUN', 'ADP', 'DET', 'NOUN', 'ADP', 'NOUN', 'NOUN', 'NOUN', 'NOUN', '.', 'VERB', 'DET', 'ADJ', 'NOUN', '.'])
Sentence: Government officials , especially in Japan , probably would resist any onslaught of program trading by players trying * to shrug off the U.S. furor over their activities and marching abroad with their business . Viterbi result: (2.802551631041295e-94, ['NOUN', 'NOUN', '.', 'ADV', 'ADP', 'NOUN', '.', 'ADV', 'VERB', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'NOUN', 'ADP', 'NOUN', 'VERB', 'X', 'PRT', 'NOUN', 'ADP', 'DET', 'NOUN', 'NOUN', 'ADP', 'PRON', 'NOUN', 'CONJ', 'VERB', 'ADV', 'ADP', 'PRON', 'NOUN', '.'])
Sentence: Ms. Kirkpatrick , the Journal 's depu

In [None]:
pred = viterbi(test_words_without_tags)[1]
target = [pair[1] for pair in test_tagged_words]

def accuracy(pred, target):
  for i in range(len(pred)):
    wrong = 0
    if pred[i] != target[i]:
      wrong += 1
  correct = len(pred) - wrong
  return correct / len(pred)

print(accuracy(pred, target))