In [1]:
import nltk
import random
import numpy as np
import pandas as pd
import pprint, time
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split

'''
Reading the input file and storing the 
values of the 3 columns of each row in
a tuple: (<Word>, <POS_TAG>, <CHUNK_TAG>)
'''
f = open("train.txt", "r")
sentence_corpus = []
sentence = []

for line in f:
    line = line.strip()
    if line == "":
        sentence_corpus.append(sentence)
        sentence = []
    else:
        word, pos_tag, _ = line.split(" ")
        #ignoring the chunk tag for this task
        sentence.append((word, pos_tag))
f.close()

# Add the last sentence (if any)
if sentence:
    sentence_corpus.append(sentence)

In [2]:
#Splitting the corpus data into train_data and test_data (validadtion) (80/20 split)
train_set,test_set =train_test_split(sentence_corpus,train_size=0.80,test_size=0.20,random_state = 101)

# List of all the tags in the train and the test set (it may not be unique)
train_tag_corpus = [ t for sentence in train_set for t in sentence ]
test_tag_corpus = [ t for sentence in test_set for t in sentence ]
    
# Finding number of unique tags and words (Vocabulary)
train_tag_set = {tag for word, tag in train_tag_corpus}
vocab = {word for word, tag in train_tag_corpus}

In [3]:
'''
First implementation: Vertebi Algorithm from scratch
Note: Time consuming: Test data running for more than
      3 Hours.
'''
#Methods to compute transition and emission

'''
prev_tag -> current_tag 
Pr(current_tag | prev_tag) = (# of prev_tag -> current_tag)/(# of prev_tag)
'''
def computeTransition(prev_tag, current_tag):
    tags = [tag for _, tag in train_tag_corpus]
    
    #Count of prev_tag
    cnt_prev_tag = len([tag for tag in tags if tag == prev_tag])
    cnt_prev_curr_tag = 0
    
    for i in range(1, len(tags)):
        if tags[i-1] == prev_tag and tags[i] == current_tag:
            cnt_prev_curr_tag += 1
    
    return cnt_prev_curr_tag / cnt_prev_tag

# compute Emission Probability
def computeEmission(word, tag):
    data = train_tag_corpus
    tag_list = [tg for tg in data if tg[1]==tag]
    cnt_tag = len(tag_list)
    w_given_tag_list = [pair[0] for pair in tag_list if pair[0]==word]
    count_w_given_tag = len(w_given_tag_list)
 
    return count_w_given_tag / cnt_tag


#The crux of HMM is the emission and transition probabilities

#Transition
transition = np.zeros((len(train_tag_set), len(train_tag_set)), dtype='float32')
train_tag_list = list(train_tag_set)
for i in range(len(train_tag_list)):
    for j in range(len(train_tag_list)):
        transition[i,j] = computeTransition(train_tag_list[i], train_tag_list[j])

In [4]:
#Converting transition matrix into a dataframe for better access of the data.
tags_df = pd.DataFrame(transition, columns = list(train_tag_list), index=list(train_tag_list))

In [23]:
# This part of code helps in cases of unseen words. 
# Using Regex to match the unseen words with the most appropriate tags  

regex = [
    (r'.*ing$', 'VBG'),
    (r'.*ed$', 'VBD'), 
    (r'.*es$', 'VBZ'), 
    (r'.*end$', 'VB'),
    (r'^[A-Z].*$', 'NNP'),
    (r'\b\w+s\b', 'NNS'),
    (r'\b\w+NN\b', 'NN'),
    (r'.*', 'NN'),
    (r'^-?[0-9]+(.[0-9]+)?$', 'CD'),
]

unseen_word_tagger = nltk.RegexpTagger(regex)

In [27]:
#Implmenting HMM: Vertebi 
def Viterbi(words):
    data = train_tag_corpus
    pos = []
    dp = {}
    tags = list(set([pair[1] for pair in data]))
     
    for key, word in enumerate(words):
        prob_lst = []
        prob_max = -float("inf")
        prob_index = 0
        for i, tag in enumerate(tags):
            if key == 0:
                transition_p = tags_df.loc['.', tag]
            else:
                transition_p = tags_df.loc[pos[-1], tag]
            
            if (tag, word) in dp.keys():
                emission_p = dp[(tag, word)]
            else:
                emission_p = computeEmission(words[key], tag)
                dp[(tag, word)] = emission_p
                
            prob = emission_p * transition_p    
            prob_lst.append(prob)
            if prob > prob_max:
                prob_max = prob
                prob_index = i
             
        # getting state for which probability is maximum
        best_pos_tag = tags[prob_index] 
        pos.append(best_pos_tag)
    return list(zip(words, pos))

In [51]:
#Testing random 10 sentences of the dev data.
rndom = [random.randint(1,len(test_set)) for x in range(100)]
test_run = [test_set[i] for i in rndom]
test_run_base = [tup for sent in test_run for tup in sent]
test_tagged_words = [tup[0] for sent in test_run for tup in sent]

get_tag = Viterbi(test_tagged_words)
  
# accuracy
acc = [i for i, j in zip(get_tag, test_run_base) if i == j] 
accuracy = len(acc)/len(get_tag)
print('Viterbi Accuracy: ',accuracy*100)

Viterbi Accuracy:  87.20882713526767


In [31]:
print(get_tag[:5], test_run_base[:5])

[('It', 'PRP'), ('also', 'RB'), ('tracks', 'VBZ'), ('personnel', 'NNS'), ('and', 'CC')] [('It', 'PRP'), ('also', 'RB'), ('tracks', 'VBZ'), ('personnel', 'NNS'), ('and', 'CC')]


In [28]:
#Testing the dev set
test_tagged_words = [tup for sent in test_set for tup in sent]
test_untagged_words = [tup[0] for sent in test_set for tup in sent]
test_set_base = [tup for sent in test_set for tup in sent]

get_tag = Viterbi(test_untagged_words)
  
# accuracy
acc = [i for i, j in zip(get_tag, test_set_base) if i == j] 
accuracy = len(acc)/len(get_tag)
print('Viterbi Accuracy: ',accuracy*100)

KeyboardInterrupt: 

In [7]:
'''
Second Implementation: Using NLTK's hmm
Takes less time and easier to impement
'''

# Creating HMM object
HmmModel = nltk.HiddenMarkovModelTagger.train(train_set)

In [24]:
# Testing the dev data
true_pos_tags = [tag for sentences in test_set for word, tag in sentences]

predicted_pos_tags=[]
for sentences in test_set:
    predicted_pos_tags += [tag for _, tag in HmmModel.tag([word for word, _ in sentences])]
    
i = 0
for sentence in test_set:
    for word, _ in sentence:
        if predicted_pos_tags[i] == '``':
            predicted_pos_tags[i] = unseen_word_tagger.tag([word])[0][1]
        i += 1
    
#Accuracy
print (classification_report(true_pos_tags, predicted_pos_tags))

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


              precision    recall  f1-score   support

           #       1.00      1.00      1.00         6
           $       0.81      1.00      0.90       311
          ''       0.77      1.00      0.87       297
           (       0.97      0.97      0.97        39
           )       0.64      0.92      0.76        39
           ,       0.98      1.00      0.99      2127
           .       0.95      1.00      0.97      1767
           :       0.99      0.99      0.99       203
          CC       0.97      1.00      0.99      1054
          CD       0.95      0.91      0.93      1599
          DT       0.95      0.99      0.97      3576
          EX       1.00      0.89      0.94        46
          FW       1.00      0.10      0.18        10
          IN       0.97      0.99      0.98      4399
          JJ       0.90      0.86      0.88      2549
         JJR       0.86      0.93      0.89       169
         JJS       0.98      0.87      0.92        75
          MD       0.92    

  _warn_prf(average, modifier, msg_start, len(result))


In [42]:
'''
Now predicting the Test Data
'''
f_test_data = open("test_data.txt", "r")
test_corpus = []

for line in f_test_data:
    line = line.strip()
    if line == "":
        continue
    else:
        test_corpus.append(line)
f_test_data.close()

In [43]:
predicted_pos_tags=[]
for word in test_corpus:
    predicted_pos_tags += [tag for _, tag in HmmModel.tag([word])]

In [44]:
predicted_pos_tags[:10]

['``', 'NNP', 'NNP', 'VBZ', '``', 'NN', 'VBD', 'PRP', 'VBN', 'DT']

In [45]:
k = 0
for word in test_corpus:
    if predicted_pos_tags[k] == '``':
        predicted_pos_tags[k] = unseen_word_tagger.tag([word])[0][1]
    k += 1
    

In [18]:
f_read = open("test_data_tagged.txt", "r")
data = []
for line in f_read:
    line = line.strip()
    data.append(line)
f_read.close()

In [19]:
f_write = open("test_data_tagged.txt", "w")
j = 0
for line in data:
    if line == '':
        f_write.writelines(line + '\n')
        continue
    else:
        f_write.writelines(line + " " + predicted_pos_tags[j] + '\n')
    j += 1
f_write.close()

In [36]:
ff = open("test.txt", "r")
sent_corpus = []
sent = []
true_tags = []

for line in ff:
    line = line.strip()
    if line == "":
        sent_corpus.append(sent)
        sent = []
    else:
        word, pos_tag, _ = line.split(" ")
        #ignoring the chunk tag for this task
        sent.append((word, pos_tag))
        true_tags.append(pos_tag)
ff.close()

# Add the last sentence (if any)
if sent:
    sent_corpus.append(sent)

In [50]:
print (classification_report(true_tags, predicted_pos_tags))

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


              precision    recall  f1-score   support

           #       1.00      1.00      1.00        11
           $       1.00      0.99      1.00       384
          ''       0.91      1.00      0.95       316
           (       1.00      1.00      1.00        77
           )       0.00      0.00      0.00        77
           ,       1.00      1.00      1.00      2390
           .       1.00      0.99      0.99      1975
           :       1.00      1.00      1.00       238
          CC       1.00      1.00      1.00      1214
          CD       1.00      0.77      0.87      1918
          DT       1.00      0.99      0.99      4020
          EX       0.96      1.00      0.98        48
          FW       0.00      0.00      0.00         4
          IN       0.96      1.00      0.98      5071
          JJ       0.95      0.79      0.86      2964
         JJR       0.93      0.55      0.70       202
         JJS       1.00      0.92      0.96        77
          MD       1.00    

  _warn_prf(average, modifier, msg_start, len(result))


In [None]:
'''
Testing Vertebi
'''
get_tag = Viterbi(test_corpus)