In [2]:
import nltk
import numpy as np
import pandas as pd
import random
from sklearn.model_selection import train_test_split
import pprint, time

In [3]:
nltk_data = list(nltk.corpus.treebank.tagged_sents(tagset='universal'))

In [4]:
# split data into training and validation set in the ratio 80:20

train_set,test_set = train_test_split(nltk_data,train_size=0.80,test_size=0.20,random_state = 101)

In [5]:
train_tagged_words = [tup for sent in train_set for tup in sent]
test_tagged_words = [tup[0] for sent in test_set for tup in sent]
print(len(train_tagged_words))
print(len(test_tagged_words))

80310
20366


In [6]:
#check how many unique tags are present in training data
tags = {tag for word,tag in train_tagged_words}
print(len(tags))
print(tags)

12
{'NUM', 'CONJ', 'ADV', 'ADJ', 'PRT', 'VERB', '.', 'X', 'PRON', 'DET', 'NOUN', 'ADP'}


In [7]:
def word_given_tag(word,tag,train_bag= train_tagged_words):
    taglist = [pair for pair in train_bag if pair[1] == tag]
    tag_count = len(taglist)    
    w_in_tag = [pair[0] for pair in taglist if pair[0]==word]    
    word_count_given_tag = len(w_in_tag)    
    
    return (word_count_given_tag,tag_count)

#emission probability

In [8]:
# transitionprobability
def t2_given_t1(t2,t1,train_bag=train_tagged_words):
    tags = [pair[1] for pair in train_bag]
    
    t1_tags = [tag for tag in tags if tag==t1]
    
    count_of_t1 = len(t1_tags)
    
    t2_given_t1 = [tags[index+1] for index in range(len(tags)-1) if tags[index] == t1 and tags[index+1] == t2]
    
    count_t2_given_t1 = len(t2_given_t1)
    
    return(count_t2_given_t1,count_of_t1)

In [9]:
t2_given_t1('NOUN','DET')

(4424, 6957)

In [10]:
# creating t x t transition matrix of tags
# each column is t2, each row is t1
# thus M(i, j) represents P(tj given ti)

tags_matrix = np.zeros((len(tags), len(tags)), dtype='float32')
for i, t1 in enumerate(list(tags)):
    for j, t2 in enumerate(list(tags)): 
        tags_matrix[i, j] = t2_given_t1(t2, t1)[0]/t2_given_t1(t2, t1)[1]

In [11]:
tags_df = pd.DataFrame(tags_matrix, columns = list(tags), index=list(tags))
tags_df

Unnamed: 0,NUM,CONJ,ADV,ADJ,PRT,VERB,.,X,PRON,DET,NOUN,ADP
NUM,0.18422,0.014281,0.00357,0.035345,0.026062,0.020707,0.119243,0.202428,0.001428,0.00357,0.35166,0.037487
CONJ,0.040615,0.000549,0.05708,0.113611,0.004391,0.150384,0.035126,0.00933,0.060373,0.123491,0.349067,0.055982
ADV,0.029868,0.006982,0.081458,0.130721,0.01474,0.339022,0.139255,0.022886,0.012025,0.071373,0.032196,0.119472
ADJ,0.021748,0.016893,0.005243,0.063301,0.011456,0.011456,0.066019,0.020971,0.000194,0.005243,0.696893,0.080583
PRT,0.056751,0.002348,0.009393,0.082975,0.001174,0.401174,0.04501,0.012133,0.017613,0.10137,0.250489,0.019569
VERB,0.022836,0.005433,0.083886,0.06639,0.030663,0.167956,0.034807,0.21593,0.035543,0.13361,0.110589,0.092357
.,0.07821,0.060079,0.052569,0.046132,0.002789,0.08969,0.092372,0.025641,0.068769,0.172192,0.218539,0.092908
X,0.003075,0.010379,0.025754,0.017682,0.185086,0.206419,0.160869,0.075726,0.0542,0.05689,0.061695,0.142226
PRON,0.006834,0.005011,0.036902,0.070615,0.014123,0.484738,0.041913,0.088383,0.006834,0.009567,0.212756,0.022323
DET,0.022855,0.000431,0.012074,0.206411,0.000287,0.040247,0.017393,0.045134,0.003306,0.006037,0.635906,0.009918


In [12]:
# Vanilla Viterbi Algorithm
def Viterbi(words, train_bag = train_tagged_words):
    state = []
    
    T = list(set([pair[1] for pair in train_bag]))
    
    for key, word in enumerate(words):
        #initialise list of probability column for a given observation
        p = [] 
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['.', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]
                
            # compute emission and state probabilities
            emission_p = word_given_tag(words[key], tag)[0]/word_given_tag(words[key], tag)[1]
            state_probability = emission_p * transition_p    
            p.append(state_probability)
            
        pmax = max(p)
        # getting state for which probability is maximum
        state_max = T[p.index(pmax)] 
        state.append(state_max)
    return list(zip(words, state))

In [13]:
random.seed(1234)

# choose random 5 sents
rndom = [random.randint(1,len(test_set)) for x in range(5)]

# list of sents
test_run = [test_set[i] for i in rndom]

# list of tagged words
test_run_base = [tup for sent in test_run for tup in sent]

# list of untagged words
test_tagged_words = [tup[0] for sent in test_run for tup in sent]

In [14]:
start = time.time()
tagged_seq = Viterbi(test_tagged_words)
end = time.time()
difference = end-start

print("Time taken in seconds: ", difference)

# accuracy
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j] 

accuracy = len(check)/len(tagged_seq)
print('Vanilla Viterbi Algorithm Accuracy: ',accuracy*100)

Time taken in seconds:  39.07226204872131
Vanilla Viterbi Algorithm Accuracy:  92.90780141843972


In [15]:
tag_prob = []
total_tag = len([tag for word,tag in train_tagged_words])
for t in tags:
    each_tag = [tag for word,tag in train_tagged_words if tag==t]
    tag_prob.append((t,len(each_tag)/total_tag))

tag_prob

[('NUM', 0.03487735026771261),
 ('CONJ', 0.02268708753579878),
 ('ADV', 0.03210061013572407),
 ('ADJ', 0.06412650977462334),
 ('PRT', 0.03181421989789565),
 ('VERB', 0.13522599925289502),
 ('.', 0.11606275681733283),
 ('X', 0.06478645249657577),
 ('PRON', 0.02733159008840742),
 ('DET', 0.0866268210683601),
 ('NOUN', 0.28596687834640766),
 ('ADP', 0.09839372431826672)]

In [16]:
def Viterbi_1(words, train_bag = train_tagged_words):
    state = []
    T = list(set([pair[1] for pair in train_bag]))
    
    for key, word in enumerate(words):
        #initialise list of probability column for a given observation
        p = [] 
        p_transition =[] # list for storing transition probabilities
       
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['.', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]
                
            # compute emission and state probabilities
            emission_p = word_given_tag(words[key], tag)[0]/word_given_tag(words[key], tag)[1]
            state_probability = emission_p * transition_p    
            p.append(state_probability)
            
            # find POS tag occurance probability
            tag_p = [pair[1] for pair in tag_prob if pair[0]==tag ]
            
            # calculate the transition prob weighted by tag occurance probability.
            transition_p = tag_p[0]*transition_p             
            p_transition.append(transition_p)
            
        pmax = max(p)
        state_max = T[p.index(pmax)] 
        
      
        # if probability is zero (unknown word) then use weighted transition probability
        if(pmax==0):
            pmax = max(p_transition)
            state_max = T[p_transition.index(pmax)]                 
                           
        else:
            state_max = T[p.index(pmax)] 
        
        state.append(state_max)
    return list(zip(words, state))

In [18]:
start = time.time()
tagged_seq = Viterbi_1(test_tagged_words)
end = time.time()
difference = end-start

print("Time taken : ", difference)

# accuracy
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j] 
accuracy = len(check)/len(tagged_seq)
print('Modified Viterbi_1 Accuracy: ',accuracy)

Time taken :  35.562872648239136
Modified Viterbi_1 Accuracy:  0.950354609929078
