### ANKUR GURIA : 17039 : DSE 308

In [1]:
# Importing libraries and Packages
import nltk
import numpy as np
import pandas as pd
import random
from sklearn.model_selection import train_test_split
import pprint, time

In [2]:
# Downloading the treebank corpus from NLTK
nltk.download('treebank')
 
# Downloading the universal tagset from NLTK
nltk.download('universal_tagset')
 
# Reading the Treebank tagged sentences
nltk_tagged_sentences = list(nltk.corpus.treebank.tagged_sents(tagset = 'universal'))

# Printing the first two sentences along with tags
print(nltk_tagged_sentences[:2])

[nltk_data] Downloading package treebank to
[nltk_data]     C:\Users\ANKUR\AppData\Roaming\nltk_data...
[nltk_data]   Package treebank is already up-to-date!
[nltk_data] Downloading package universal_tagset to
[nltk_data]     C:\Users\ANKUR\AppData\Roaming\nltk_data...
[nltk_data]   Package universal_tagset is already up-to-date!
[[('Pierre', 'NOUN'), ('Vinken', 'NOUN'), (',', '.'), ('61', 'NUM'), ('years', 'NOUN'), ('old', 'ADJ'), (',', '.'), ('will', 'VERB'), ('join', 'VERB'), ('the', 'DET'), ('board', 'NOUN'), ('as', 'ADP'), ('a', 'DET'), ('nonexecutive', 'ADJ'), ('director', 'NOUN'), ('Nov.', 'NOUN'), ('29', 'NUM'), ('.', '.')], [('Mr.', 'NOUN'), ('Vinken', 'NOUN'), ('is', 'VERB'), ('chairman', 'NOUN'), ('of', 'ADP'), ('Elsevier', 'NOUN'), ('N.V.', 'NOUN'), (',', '.'), ('the', 'DET'), ('Dutch', 'NOUN'), ('publishing', 'VERB'), ('group', 'NOUN'), ('.', '.')]]


In [3]:
# Printing each word with its respective tag for the first sentence
for sentence in nltk_tagged_sentences[:1]: 
    for tuple in sentence: print(tuple)

('Pierre', 'NOUN')
('Vinken', 'NOUN')
(',', '.')
('61', 'NUM')
('years', 'NOUN')
('old', 'ADJ')
(',', '.')
('will', 'VERB')
('join', 'VERB')
('the', 'DET')
('board', 'NOUN')
('as', 'ADP')
('a', 'DET')
('nonexecutive', 'ADJ')
('director', 'NOUN')
('Nov.', 'NOUN')
('29', 'NUM')
('.', '.')


In [4]:
# Spliting data into training and validation set in the ratio 85:15
train_data, test_data = train_test_split(nltk_tagged_sentences, train_size = 0.85, test_size = 0.15, random_state = 101)

In [5]:
# List of train and test tagged words
train_words, test_words = [], []

for sentence in train_data:
    for tuple in sentence:
        train_words.append(tuple)        
for sentence in test_data:
    for tuple in sentence:
        test_words.append(tuple)

In [6]:
# Some of the tagged words
train_words[:5]

[('Next', 'ADJ'),
 ('week', 'NOUN'),
 (',', '.'),
 ('the', 'DET'),
 ('Philippine', 'NOUN')]

In [7]:
# Unique tags in training data
all_tags = []
for word, tag in train_words: all_tags.append(tag)
unique_tags = set(all_tags)
print(len(unique_tags))
print(unique_tags)
 
# Unique words in vocabulary of training data
all_words = []
for word, tag in train_words: all_words.append(word)
unique_words = set(all_words)
print(len(unique_words))

12
{'.', 'VERB', 'CONJ', 'ADJ', 'PRON', 'DET', 'ADP', 'ADV', 'NUM', 'PRT', 'NOUN', 'X'}
11394


##### EMISSION PROBABILITY

In [8]:
def emission_prob(word, tag, data = train_words):
    list_tag = [tuple for tuple in data if tuple[1] == tag]
    tag_count = len(list_tag) # count of the number of times the current tag occurred in the data
    word_given_list_tag = [tuple[0] for tuple in list_tag if tuple[0] == word]
    count_word_given_tag = len(word_given_list_tag) # count of the number of times the current word occurred as the passed tag.
 
    return count_word_given_tag / tag_count

##### TRANSITION PROBABILITY

In [9]:
def transition_prob(tag2, tag1, data = train_words):
    tags = [tuple[1] for tuple in data]
    count_tag1 = len([t for t in tags if t == tag1])
    count_tag2_tag1 = 0
    for ind in range(len(tags)-1):
        if tags[ind] == tag1 and tags[ind + 1] == tag2: count_tag2_tag1 += 1
    return count_tag2_tag1 / count_tag1

In [10]:
# Constructing a t x t transition matrix of tags wher t is equal to the nunmber of unique tags
# Tag_Matrix(i, j) represents Probability of the jth index tag occurring after the ith index tag
 
tag_matrix = np.zeros((len(unique_tags), len(unique_tags)), dtype = 'float32')
for i, tag1 in enumerate(list(unique_tags)):
    for j, tag2 in enumerate(list(unique_tags)): tag_matrix[i, j] = transition_prob(tag2, tag1)
 
print(tag_matrix)

[[9.08354446e-02 9.04303789e-02 5.91392405e-02 4.52658236e-02
  6.72405064e-02 1.72658235e-01 9.21518952e-02 5.18481024e-02
  7.89873451e-02 2.63291132e-03 2.21974686e-01 2.67341770e-02]
 [3.44767682e-02 1.68562740e-01 5.47112478e-03 6.61745518e-02
  3.55188884e-02 1.33738607e-01 9.24880579e-02 8.34563598e-02
  2.31871475e-02 3.02214511e-02 1.09943554e-01 2.16760740e-01]
 [3.52514274e-02 1.50336966e-01 5.18403307e-04 1.15085535e-01
  5.75427674e-02 1.22861587e-01 5.65059632e-02 5.75427674e-02
  3.83618437e-02 4.66562994e-03 3.51995856e-01 9.33125988e-03]
 [6.56008795e-02 1.15766264e-02 1.67217944e-02 6.48658574e-02
  1.83755968e-04 5.14516700e-03 7.99338445e-02 4.96141147e-03
  2.16832049e-02 1.12091145e-02 6.97353899e-01 2.07644254e-02]
 [4.20095287e-02 4.85058457e-01 5.19705517e-03 7.23256841e-02
  6.49631862e-03 9.09484643e-03 2.25205719e-02 3.55132110e-02
  7.36249471e-03 1.34257255e-02 2.14378521e-01 8.66175815e-02]
 [1.74962711e-02 3.96039598e-02 5.42520022e-04 2.05615088e-01
  3

In [11]:
# Tag_Matrix as a Dataframe
tag_df = pd.DataFrame(tag_matrix, columns = list(unique_tags), index = list(unique_tags))
display(tag_df)

Unnamed: 0,.,VERB,CONJ,ADJ,PRON,DET,ADP,ADV,NUM,PRT,NOUN,X
.,0.090835,0.09043,0.059139,0.045266,0.067241,0.172658,0.092152,0.051848,0.078987,0.002633,0.221975,0.026734
VERB,0.034477,0.168563,0.005471,0.066175,0.035519,0.133739,0.092488,0.083456,0.023187,0.030221,0.109944,0.216761
CONJ,0.035251,0.150337,0.000518,0.115086,0.057543,0.122862,0.056506,0.057543,0.038362,0.004666,0.351996,0.009331
ADJ,0.065601,0.011577,0.016722,0.064866,0.000184,0.005145,0.079934,0.004961,0.021683,0.011209,0.697354,0.020764
PRON,0.04201,0.485058,0.005197,0.072326,0.006496,0.009095,0.022521,0.035513,0.007362,0.013426,0.214379,0.086618
DET,0.017496,0.039604,0.000543,0.205615,0.003255,0.005696,0.00963,0.0118,0.022786,0.000271,0.638004,0.0453
ADP,0.038466,0.008482,0.000956,0.107275,0.069287,0.322184,0.016724,0.014216,0.063553,0.001195,0.323139,0.034524
ADV,0.13963,0.34,0.007037,0.13037,0.012963,0.071111,0.118889,0.08037,0.030741,0.014815,0.031111,0.022963
NUM,0.118712,0.020121,0.013749,0.035211,0.001677,0.003689,0.036217,0.003353,0.181757,0.026157,0.355131,0.204225
PRT,0.044248,0.405236,0.002212,0.08149,0.01733,0.099558,0.019912,0.009956,0.057153,0.001106,0.249263,0.012537


##### VITERBI ALGORITHM

In [12]:
def Viterbi(words, data = train_words):
    hidden_state = []
    T = list(set([tuple[1] for tuple in data]))
     
    for key, word in enumerate(words):
        p = [] #initialising list of probability column for a given observation
        for tag in T:
            if key == 0: transition_probability = tag_df.loc['.', tag]
            else: transition_probability = tag_df.loc[hidden_state[-1], tag]
                 
            # computing emission and hidden_state probabilities
            emission_probability = emission_prob(words[key], tag)
            hidden_state_probability = emission_probability * transition_probability    
            p.append(hidden_state_probability)
             
        prob_max = max(p)
        
        hidden_state_max = T[p.index(prob_max)] # getting hidden_state for which probability is maximum
        hidden_state.append(hidden_state_max)
        
    return list(zip(words, hidden_state))

In [13]:
# Testing Viterbi algorithm on few sentences of test dataset
random.seed(1234)      #define a random seed to get same sentences when run multiple times
 
rndom = [random.randint(1, len(test_data)) for x in range(10)] # choose random 10 numbers
test_run = [test_data[i] for i in rndom] # list of 10 sents on which we test the model

test_run_base = [tup for sent in test_run for tup in sent] # list of tagged words
test_words = [tup[0] for sent in test_run for tup in sent] # list of untagged words

##### VITERBI ALGORIHTM ACCURACY

In [14]:
# We are testing only on 10 sentences to check the accuracy as testing on the whole dataset will take a lot of time
tagged_sequences = Viterbi(test_words)

# accuracy
counter = [res1 for res1, res2 in zip(tagged_sequences, test_run_base) if res1 == res2] 
 
model_accuracy = len(counter) / len(tagged_sequences)
print('Viterbi Algorithm Accuracy: ', model_accuracy * 100)

Viterbi Algorithm Accuracy:  91.32420091324201


##### Code to test accuracy on all the test sentences
##### CAUTION: Running the below cell takes a lot of time (You may want to avoid it)

In [None]:
# tagging the test sentences()
test_words = [tup for sent in test_data for tup in sent]
test_untagged_words = [tup[0] for sent in test_data for tup in sent]

tagged_seq = Viterbi(test_untagged_words)

# accuracy
check = [i for i, j in zip(test_words, test_untagged_words) if i == j] 
 
accuracy = len(check)/len(tagged_seq)
print('Viterbi Algorithm Accuracy: ',accuracy*100)

In [15]:
# We try to optimize the performance by specifying a rule base tagger for unknown words 
# Specifying rules for tagging unknown words
rules = [
    (r'.*s$', 'NOUN'),                # plural nouns
    (r'\*T?\*?-[0-9]+$', 'X'),        # X
    (r'.*ed$', 'VERB'),               # past tense 
    (r'.*es$', 'VERB'),               # verb    
    (r'.*ing$', 'VERB'),              # gerund
    (r'.*\'s$', 'NOUN'),              # possessive nouns
    (r'^-?[0-9]+(.[0-9]+)?$', 'NUM'), # cardinal numbers
    (r'.*', 'NOUN')                   # nouns
]
 
# Building rule based tagger
rule_tagger = nltk.RegexpTagger(rules)

In [16]:
# Optimized Viterbi to include rule based tagger
def viterbi_rule_tagger(words, data = train_words):
    hidden_state = []
    T = list(set([tuple[1] for tuple in data]))
     
    for key, word in enumerate(words):
        p = [] #initialising list of probability column for a given observation
        for tag in T:
            if key == 0: transition_probability = tag_df.loc['.', tag]
            else: transition_probability = tag_df.loc[hidden_state[-1], tag]
                 
            # computing emission and hidden_state probabilities
            emission_probability = emission_prob(words[key], tag)
            hidden_state_probability = emission_probability * transition_probability   
            p.append(hidden_state_probability)
             
        prob_max = max(p)
        hidden_state_max = rule_tagger.tag([word])[0][1]       
        
        if(prob_max == 0): hidden_state_max = rule_tagger.tag([word])[0][1] # assign based on rule based tagger
        else:
            if hidden_state_max != 'X':hidden_state_max = T[p.index(prob_max)] # getting hidden_state for which probability is maximum
             
        hidden_state.append(hidden_state_max)
        
    return list(zip(words, hidden_state))

##### VITERBI ALGORIHTM WITH RULE BASED TAGGING ACCURACY

In [17]:
#test accuracy on subset of test data 
tagged_sequences = viterbi_rule_tagger(test_words)
 
# accuracy
counter = [res1 for res1, res2 in zip(tagged_sequences, test_run_base) if res1 == res2] 
 
model_accuracy = len(counter) / len(tagged_sequences)
print('Viterbi Algorithm with rule based tagger Accuracy: ', model_accuracy * 100)

Viterbi Algorithm with rule based tagger Accuracy:  96.80365296803653


##### COMPARING THE 2 POST TAGGERS

In [18]:
test_sentence = "Will can see Marry"
predicted_tag_rule = viterbi_rule_tagger(test_sentence.split())
predicted_tag_no_rule = Viterbi(test_sentence.split())
print(predicted_tag_rule)
print(predicted_tag_no_rule)
#Will and Marry are tagged as DET as they are unknown words for Viterbi Algorithm 

[('Will', 'NOUN'), ('can', 'VERB'), ('see', 'VERB'), ('Marry', 'NOUN')]
[('Will', '.'), ('can', 'VERB'), ('see', 'VERB'), ('Marry', '.')]
