In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
data_dir = '/content/drive/MyDrive/NLP-HW2'

In [None]:
# Importing libraries
import nltk
import numpy as np
import pandas as pd
import random
import pprint, time

In [None]:
path = data_dir + '/Train.txt'
file = open(path, 'r', encoding="utf-8")
content = file.read()
content = content.split('\n\n')
train_data = [list(map(lambda x:(x.split(' ')[0],x.split(' ')[1]),c.split('\n'))) for c in content]
train_data[:2]

In [None]:
path = data_dir + '/Test.txt'
file = open(path, 'r', encoding="utf-8")
content = file.read()
content = content.split('\n\n')
test_data = []
for c in content:
    items = c.split('\n')
    new_items = []
    for it in items:
        splitted = it.split(' ')
        if len(splitted) == 2:
            new_items.append((splitted[0],splitted[1]))
    test_data.append(new_items)
test_data[3]

In [None]:
train_tagged_words = [ tup for sent in train_data for tup in sent ]
test_tagged_words = [ tup for sent in test_data for tup in sent ]
print(len(train_tagged_words))
print(len(test_tagged_words))

In [None]:
# check some of the tagged words.
print(train_tagged_words[:5])
print(test_tagged_words[:5])

In [None]:
#use set datatype to check how many unique tags are present in training data
tags = {tag for word,tag in train_tagged_words}
print(len(tags))
print(tags)
 
# check total words in vocabulary
vocab = {word for word,tag in train_tagged_words}

In [None]:
len(vocab), vocab

In [None]:
# compute Emission Probability
def word_given_tag(word, tag, train_bag = train_tagged_words):
    tag_list = [pair for pair in train_bag if pair[1]==tag]
    count_tag = len(tag_list)#total number of times the passed tag occurred in train_bag
    w_given_tag_list = [pair[0] for pair in tag_list if pair[0]==word]
#now calculate the total number of times the passed word occurred as the passed tag.
    count_w_given_tag = len(w_given_tag_list)
    #print(count_w_given_tag, count_tag)
    return (count_w_given_tag, count_tag)

In [None]:
# compute  Transition Probability
def t2_given_t1(t2, t1, train_bag = train_tagged_words):
    tags = [pair[1] for pair in train_bag]
    count_t1 = len([t for t in tags if t==t1])
    count_t2_t1 = 0
    for index in range(len(tags)-1):
        if tags[index]==t1 and tags[index+1] == t2:
            count_t2_t1 += 1
    return (count_t2_t1, count_t1)

In [None]:
# creating t x t transition matrix of tags, t= no of tags
# Matrix(i, j) represents P(jth tag after the ith tag)
 
tags_matrix = np.zeros((len(tags), len(tags)), dtype='float32')
for i, t1 in enumerate(list(tags)):
    for j, t2 in enumerate(list(tags)): 
        tags_matrix[i, j] = t2_given_t1(t2, t1)[0]/t2_given_t1(t2, t1)[1]
print(tags_matrix)

In [None]:
# convert the matrix to a df for better readability
# the table is same as the transition table shown in section 3 of article
tags_df = pd.DataFrame(tags_matrix, columns = list(tags), index=list(tags))
display(tags_df)

In [None]:
def Viterbi(words, train_bag = train_tagged_words):
    state = []
    T = list(set([pair[1] for pair in train_bag]))
     
    for key, word in enumerate(words):
        #initialise list of probability column for a given observation
        p = [] 
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['DELM', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]
                 
            # compute emission and state probabilities
            emission_p = word_given_tag(words[key], tag)[0]/word_given_tag(words[key], tag)[1]
            state_probability = emission_p * transition_p    
            p.append(state_probability)
             
        pmax = max(p)
        # getting state for which probability is maximum
        state_max = T[p.index(pmax)] 
        state.append(state_max)
    return list(zip(words, state))

In [None]:
# Let's test our Viterbi algorithm on a few sample sentences of test dataset
random.seed(1234)      #define a random seed to get same sentences when run multiple times
 
# choose random 10 numbers
rndom = [ random.randint(1, len(test_data)) for x in range(10) ]
 
# list of 10 sents on which we test the model
test_run = [test_data[i] for i in rndom]
 
# list of tagged words
test_run_base = [tup for sent in test_run for tup in sent]
 
# list of untagged words
test_tagged_words = [tup[0] for sent in test_run for tup in sent]

In [None]:
#Here We will only test 10 sentences to check the accuracy
#as testing the whole training set takes huge amount of time
start = time.time()
tagged_seq = Viterbi(test_tagged_words)
end = time.time()
difference = end-start
 
print("Time taken in seconds: ", difference)
 
# accuracy
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j]
 
accuracy = len(check)/len(tagged_seq)
print('Viterbi Algorithm Accuracy On Test: ',accuracy*100)

In [None]:
# Let's test our Viterbi algorithm on a few sample sentences of train dataset
random.seed(1234)      #define a random seed to get same sentences when run multiple times
 
# choose random 10 numbers
rndom = [random.randint(1,len(train_data)) for x in range(10)]
 
# list of 10 sents on which we test the model
train_run = [train_data[i] for i in rndom]
 
# list of tagged words
train_run_base = [tup for sent in train_run for tup in sent]
 
# list of untagged words
train_tagged_words = [tup[0] for sent in train_run for tup in sent]

In [None]:
#Here We will only train 10 sentences to check the accuracy
#as testing the whole training set takes huge amount of time
start = time.time()
tagged_seq = Viterbi(train_tagged_words)
end = time.time()
difference = end-start
 
print("Time taken in seconds: ", difference)
 
# accuracy
check = [i for i, j in zip(tagged_seq, train_run_base) if i == j]
 
accuracy = len(check)/len(tagged_seq)
print('Viterbi Algorithm Accuracy On Train: ',accuracy*100)