Hamid Nemati(9535023)

# 2. POS tagging with HMM (the hard way)

In previous jupyter notebook we solved the same problem with a ready to use HMM library. Now, in this jupyter notebook we try to solve it by using our viterby algorithm.

In [1]:
import numpy as np
import pandas as pd
from sklearn.utils import shuffle
import random
import pprint, time
import nltk

In [2]:
raw_data = open('hmm-training.txt').read()
raw_data[:200]

"i\tPRP\n'd\tMD\nlike\tVB\nto\tTO\ngo\tVB\nto\tIN\na\tDT\nfancy\tJJ\nrestaurant\tNN\n.\t.\n\ni\tPRP\n'd\tMD\nlike\tVB\nfrench\tJJ\nfood\tNN\n.\t.\n\nnext\tJJ\nthursday\tNN\n.\t.\n\nnext\tJJ\nthursday\tNN\n.\t.\n\ndinner\tNN\n.\t.\n\ni\tPRP\nwant\tVBP\nto\tTO\n"

In [3]:
parsed_data = raw_data.replace('\t','\n').split()
parsed_data[:10]

['i', 'PRP', "'d", 'MD', 'like', 'VB', 'to', 'TO', 'go', 'VB']

In [4]:
list_of_words = parsed_data[0::2]
list_of_tags  = parsed_data[1::2]
print(len(list_of_words))
print(len(list_of_tags))

150120
150120


In [5]:
tags = list(set(list_of_tags))
print(len(tags))
print(tags)

36
['UH', 'HYPH', 'PDT', 'JJ', 'VBD', ':', 'RBS', 'NNS', 'VB', 'FW', 'VBZ', 'IN', 'RP', 'RB', 'VBN', 'VBG', 'POS', 'NN', 'NNP', 'PRP$', 'CC', 'RBR', 'TO', 'JJS', 'WP', 'MD', 'LS', 'DT', '.', 'VBP', 'CD', 'EX', 'WDT', 'WRB', 'JJR', 'PRP']


In [6]:
words = list(set(list_of_words))
print(len(words))
#print(words)

1311


In [7]:
tuple_data = list(zip(list_of_words, list_of_tags))
tuple_data[:10]

# this format is for POS_1
# for POS_2 and POS3 we need tuple format

[('i', 'PRP'),
 ("'d", 'MD'),
 ('like', 'VB'),
 ('to', 'TO'),
 ('go', 'VB'),
 ('to', 'IN'),
 ('a', 'DT'),
 ('fancy', 'JJ'),
 ('restaurant', 'NN'),
 ('.', '.')]

# Read data as sentences

In [8]:
list_of_tupled_sentences = []
temp = []
for i in range(len(tuple_data)):
    temp.append(tuple_data[i])
    if tuple_data[i]==('.','.'):
        list_of_tupled_sentences.append(temp)
        temp = []
             
print(len(list_of_tupled_sentences))    
print(list_of_tupled_sentences[0])

15711
[('i', 'PRP'), ("'d", 'MD'), ('like', 'VB'), ('to', 'TO'), ('go', 'VB'), ('to', 'IN'), ('a', 'DT'), ('fancy', 'JJ'), ('restaurant', 'NN'), ('.', '.')]


In [9]:
sentences_shuffled = shuffle(list_of_tupled_sentences)

split = int(len(sentences_shuffled) * 0.98)

train_set = sentences_shuffled[:split] # [start : split]
test_set  = sentences_shuffled[split:] # [spilt : end]

In [10]:
train_set_tuples = [ tup for sent in train_set for tup in sent ]
test_set_tuples = [ tup for sent in test_set for tup in sent ]

# HMM

In [11]:
# compute Emission Probability
def word_given_tag(word, tag, train_bag = tuple_data):
    tag_list = [pair for pair in train_bag if pair[1]==tag]
    count_tag = len(tag_list)#total number of times the passed tag occurred in train_bag
    w_given_tag_list = [pair[0] for pair in tag_list if pair[0]==word]
    # calculate the total number of times the passed word occurred as the passed tag.
    count_w_given_tag = len(w_given_tag_list)
 
    return (count_w_given_tag, count_tag)

In [12]:
# compute  Transition Probability
def t2_given_t1(t2, t1, train_bag = tuple_data):
    tags = [pair[1] for pair in train_bag]
    count_t1 = len([t for t in tags if t==t1])
    count_t2_t1 = 0
    for index in range(len(tags)-1):
        if tags[index]==t1 and tags[index+1] == t2:
            count_t2_t1 += 1
    return (count_t2_t1, count_t1)

In [13]:
# creating t x t transition matrix of tags, t= no of tags
# Matrix(i, j) represents P(jth tag after the ith tag)
 
tags_matrix = np.zeros((len(tags), len(tags)), dtype='float32')
for i, t1 in enumerate(list(tags)):
    for j, t2 in enumerate(list(tags)):
        temp = t2_given_t1(t2, t1)
        tags_matrix[i, j] = temp[0]/temp[1]
    
print(tags_matrix)

[[0.09255294 0.         0.         ... 0.02188913 0.00975494 0.1684511 ]
 [0.         0.         0.         ... 0.         0.         0.        ]
 [0.         0.         0.         ... 0.         0.         0.        ]
 ...
 [0.         0.         0.         ... 0.00262697 0.         0.08756568]
 [0.00713012 0.         0.         ... 0.         0.00950683 0.        ]
 [0.00625579 0.00030893 0.00123571 ... 0.00084955 0.01019462 0.00648749]]


In [14]:
# convert the matrix to a df for better readability
#the table is same as the transition table shown in section 3 of article
tags_df = pd.DataFrame(tags_matrix, columns = list(tags), index=list(tags))
display(tags_df)

Unnamed: 0,UH,HYPH,PDT,JJ,VBD,:,RBS,NNS,VB,FW,...,LS,DT,.,VBP,CD,EX,WDT,WRB,JJR,PRP
UH,0.092553,0.0,0.0,0.066143,0.00119,0.0,0.0,0.006186,0.142755,0.001665,...,0.0,0.048061,0.136093,0.014989,0.033547,0.002617,0.004283,0.021889,0.009755,0.168451
HYPH,0.0,0.0,0.0,0.002522,0.0,0.0,0.0,0.025221,0.0,0.103405,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
PDT,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
JJ,0.006049,0.005293,0.0,0.054694,0.0,0.0,0.0,0.076875,0.0,0.001008,...,0.0,0.002773,0.094266,0.000378,0.003781,0.000378,0.0,0.0,0.00252,0.015123
VBD,0.012302,0.0,0.0,0.029877,0.0,0.0,0.02812,0.015817,0.024605,0.0,...,0.0,0.179262,0.019332,0.0,0.02109,0.0,0.0,0.003515,0.036907,0.123023
:,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
RBS,0.0,0.0,0.0,0.428571,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.166667,0.0,0.0,0.02381,0.0,0.0,0.0,0.0,0.0
NNS,0.025628,0.001867,0.0,0.012729,0.000849,0.0,0.0,0.00611,0.002206,0.0,...,0.0,0.014766,0.499321,0.0426,0.001358,0.001697,0.01205,0.001697,0.001867,0.014766
VB,0.014464,0.00062,0.001378,0.055376,0.000207,0.0,0.0,0.009229,0.002617,0.0,...,0.0,0.117157,0.022935,0.001515,0.023073,0.0,0.000482,0.005786,0.036228,0.173221
FW,0.0,0.039835,0.0,0.01511,0.0,0.0,0.0,0.0,0.002747,0.508242,...,0.0,0.0,0.168956,0.016484,0.0,0.0,0.0,0.0,0.0,0.0


In [15]:
def Viterbi(words, train_bag = tuple_data):
    state = []
    T = list(set([pair[1] for pair in train_bag]))
     
    for key, word in enumerate(words):
        #initialise list of probability column for a given observation
        p = [] 
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['.', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]
                 
            # compute emission and state probabilities
            emission_p = word_given_tag(words[key], tag)[0]/word_given_tag(words[key], tag)[1]
            state_probability = emission_p * transition_p    
            p.append(state_probability)
             
        pmax = max(p)
        # getting state for which probability is maximum
        state_max = T[p.index(pmax)] 
        state.append(state_max)
    return list(zip(words, state))

In [16]:
# Let's test our Viterbi algorithm on a few sample sentences of test dataset

#define a random seed to get same sentences when run multiple times
random.seed(1234)      
 
# choose random 10 numbers
rndom = [random.randint(1,len(test_set)) for x in range(100)]
 
# list of 10 sents on which we test the model
test_run = [test_set[i] for i in rndom]
 
# list of tagged words
test_run_base = [tup for sent in test_run for tup in sent]
 
# list of untagged words
test_run_untagged_words = [tup[0] for sent in test_run for tup in sent]

In [17]:
#Here We will only test 10 sentences to check the accuracy
#as testing the whole training set takes huge amount of time
start = time.time()
tagged_seq = Viterbi(test_run_untagged_words)
end = time.time()
difference = end-start
 
print("Time taken in seconds: ", difference)
 
# accuracy
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j] 
 
accuracy = len(check)/len(tagged_seq)
print('Viterbi Algorithm Accuracy: ',accuracy*100)

Time taken in seconds:  593.4353129863739
Viterbi Algorithm Accuracy:  94.52679589509692


In [None]:
# now we test the whole test set
# this takes a lot of time (more than half an hour). 
# >>> so we won't run this. <<<
test_base = [tup for sent in test_set for tup in sent]
test_untagged_words = [tup[0] for sent in test_set for tup in sent]
 
start = time.time()
tagged_seq = Viterbi(test_untagged_words)
end = time.time()
difference = end-start
 
print("Time taken in seconds: ", difference)
 
# accuracy
check = [i for i, j in zip(tagged_seq, test_base) if i == j] 
 
accuracy = len(check)/len(tagged_seq)
print('Viterbi Algorithm Accuracy: ',accuracy*100)

In [21]:
# To improve the performance,we specify a rule base tagger for unknown words 
# specify patterns for tagging
# Although this is a very simple version and it might do noting.
patterns = [
    (r'.*ing$', 'VBG'),              
    (r'.*ed$', 'VBD'),                
    (r'.*es$', 'VBZ'),                 
    (r'.*\'s$', 'PRP'),              
    (r'.*s$', 'NNS'),                 
    (r'.*', 'NN')                   
]
 
# rule based tagger
rule_based_tagger = nltk.RegexpTagger(patterns)

In [22]:
#modified Viterbi to include rule based tagger in it
def Viterbi_rule_based(words, train_bag = train_set_tuples):
    state = []
    T = list(set([pair[1] for pair in train_bag]))
     
    for key, word in enumerate(words):
        #initialise list of probability column for a given observation
        p = [] 
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['.', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]
                 
            # compute emission and state probabilities
            emission_p = word_given_tag(words[key], tag)[0]/word_given_tag(words[key], tag)[1]
            state_probability = emission_p * transition_p    
            p.append(state_probability)
             
        pmax = max(p)
        state_max = rule_based_tagger.tag([word])[0][1]       
        
         
        if(pmax==0):
            state_max = rule_based_tagger.tag([word])[0][1] # assign based on rule based tagger
        else:
            if state_max != 'X':
                # getting state for which probability is maximum
                state_max = T[p.index(pmax)]                
             
         
        state.append(state_max)
    return list(zip(words, state))

In [23]:
#test accuracy on subset of test data 
start = time.time()
tagged_seq = Viterbi_rule_based(test_run_untagged_words)
end = time.time()
difference = end-start
 
print("Time taken in seconds: ", difference)
 
# accuracy
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j] 
 
accuracy = len(check)/len(tagged_seq)
print('Viterbi Algorithm Accuracy: ',accuracy*100)

Time taken in seconds:  575.9059166908264
Viterbi Algorithm Accuracy:  94.4127708095781
