In [2]:
import nltk
nltk.download('brown')
nltk.download('universal_tagset')
nltk.download('tagsets')

[nltk_data] Downloading package brown to /home/ankita/nltk_data...
[nltk_data]   Package brown is already up-to-date!
[nltk_data] Downloading package universal_tagset to
[nltk_data]     /home/ankita/nltk_data...
[nltk_data]   Package universal_tagset is already up-to-date!
[nltk_data] Downloading package tagsets to /home/ankita/nltk_data...
[nltk_data]   Package tagsets is already up-to-date!


True

In [3]:
import pandas as pd
import numpy as np
from nltk.corpus import brown #Brown Corpus 
from collections import defaultdict 
from nltk.metrics import ConfusionMatrix
from sklearn.model_selection import KFold
from itertools import chain 

In [4]:
'''
Constant declarations
'''

SMOOTHENING_FACTOR = 0.00001
UNIVERSAL_TAGSET =['NOUN', 'DET', 'ADJ', 'ADP', '.', 'VERB', 'CONJ', 'NUM', 'ADV', 'PRT', 'PRON', 'X']
TOTAL_TAGGED_WORDS = len(brown.words())
FOLDS = 5


In [5]:
'''
Calculate count of each tag for informational purposes
'''
br = brown.tagged_words(tagset='universal')
count_tag = defaultdict(int)
for i in range(len(br)):
    count_tag[br[i][1]] +=1

In [6]:
class HMM_POS():
    
    def __init__(self, train_data, test_data, token_tags_map):
        '''
        Initialising train and test data obtained after 5 fold cross validation
        The train/test data contains tagged sentences picked up from brown corpus.
        The token_tags_map contains words and their corresponding tag mappings which are possible
        The test_sent contains only the sentences of the test data.
        The test_actual_tags contains actual tags for the above setences.
        The test_predicted_tags contains predicted tags for the test data.
        The tag_metrics contains accuracy scores for all the tag values
        The counter_dict contains values for TN,TP,FN,FP tag wise
        '''
        self.train = train_data
        self.test  = test_data
        self.emission_matrix = {}
        self.transition_matrix = {}
        self.token_tags_map = token_tags_map
        self.test_sent = []
        self.test_actual_tags = []
        self.test_predicted_tags = []
        self.tag_metrics = defaultdict(lambda: defaultdict(lambda:0))
        self.counter_dict = defaultdict(lambda: defaultdict(lambda:0))
        
    def calc_emmision_matrix(self):
        '''
        Creating emmision matrix from train data
        '''
        counter_dict =defaultdict(lambda: defaultdict(lambda:0)) # Storing total count of words used with the tag for calculating probabitities
        for data in self.train:
            for word,tag in data:
                counter_dict[word][tag] +=1
        
        for key in counter_dict.keys():
            self.emission_matrix[key] ={}
            for k in counter_dict[key].keys():
                self.emission_matrix[key][k] = counter_dict[key][k]/sum(counter_dict[key].values())
                
    def calc_transition_matrix(self):
        '''
        Creating transition matrix from train data using bigram probabilities
        '''
        counter_dict =defaultdict(lambda: defaultdict(lambda:0))
        for data in self.train:
            bigrams=list(nltk.bigrams(data)) # Generating bigrams from data (sentences)
            for a,b in bigrams:
                counter_dict[(a[1])][(b[1])] +=1
        
        for key in counter_dict.keys():
            self.transition_matrix[key] ={}
            for k in counter_dict[key].keys():
                self.transition_matrix[key][k] = counter_dict[key][k]/sum(counter_dict[key].values())
                
                    
    def viterbi_one_pass(self,sentence_split):
        '''
        Viterbi Algorithm applied to one sentence and corresponding states
        '''
        prob_state ={}
        initial_prob_added =  False
        # Storing initial probabilities for start of sentences first
        
        possible_tags = self.token_tags_map[sentence_split[1]] 
        prob_state[1] = {}
        for tag in possible_tags:
            #exists = tag in transition_matrix['^'] # Checking if tag exists in matrix
            try:
                prob_state[1][tag] = ['^',self.transition_matrix['^'][tag]*self.emission_matrix[tag][sentence_split[1]]]
            except KeyError: # If tag does not exist 
                #print(tag)
                prob_state[1][tag] = ['^',SMOOTHENING_FACTOR] # Assign default value
        
        # Iterating through the rest of the sentence
        
        for k in range(2,len(sentence_split)):
            prob_state[k] = {}
            prev_state  = list(prob_state[k-1].keys()) 
            curr_state  = self.token_tags_map[sentence_split[k]]     
            for tag in curr_state:                             
                all_states = []
                for pt in prev_state: 
                    try:
                        all_states.append(prob_state[k-1][pt][1]*self.transition_matrix[pt][tag]*self.emission_matrix[tag][sentence_split[k]])
                    except KeyError:
                        #print(tag,sentence_split[k])
                        all_states.append(prob_state[k-1][pt][1]*SMOOTHENING_FACTOR)
                max_in_current_level = all_states.index(max(all_states)) 
                prob_state[k][tag]=[prev_state[max_in_current_level],max(all_states)] #Stores the index as well as the tag seq with max probability
        
        # Back tracing to get the predicted tag sequence
        
        tags = ['<EOS>']
        for itr in reversed(range(len(sentence_split))):
            if(itr>0):
                tags.append(prob_state[itr][tags[len(tags)-1]][0])
        
        return list(reversed(tags))
                
    def viterbi(self):
        '''
        Viterbi Algorithm applied on test data
        '''
        tag_seq = []
        for data in self.test_sent:
            tag_seq = self.viterbi_one_pass(data)
            self.test_predicted_tags.append(tag_seq)
        
    def split_test(self):
        '''
        Split test data into sentences and corresponding tag lists.
        This tag list is the actual_tags list that we need to compare with predicted tags
        '''
        sents = []
        tags  = []
        for data in self.test :
            for word,tag in data:
                sents.append(word)
                tags.append(tag)
            self.test_sent.append(sents)
            self.test_actual_tags.append(tags)
            sents = []
            tags  = []
    
    def calculate(self):
        '''
        Perform all the computations required to get predictions
        '''
        self.calc_emmision_matrix()
        self.calc_transition_matrix()
        self.split_test()
        self.viterbi()
        self.calc_tag_metrics()
    
    def calc_tag_metrics(self):
        '''
        Calculate the per-POS accuracy for all the tags in the tag-set
        '''
        counter_dict = defaultdict(lambda: defaultdict(lambda:0))
        
        for i in range(len(self.test_actual_tags)):
            for j in range(len(self.test_actual_tags[i])):
                if(self.test_actual_tags[i][j] == self.test_predicted_tags[i][j]):
                    counter_dict[self.test_actual_tags[i][j]]['TP'] += 1
                else:
                    counter_dict[self.test_actual_tags[i][j]]['FN']    += 1
                    counter_dict[self.test_predicted_tags[i][j]]['FP'] += 1
        
        for tag in counter_dict.keys():
            counter_dict[tag]['TN'] = TOTAL_TAGGED_WORDS - counter_dict[tag]['TP']- counter_dict[tag]['FN'] - counter_dict[tag]['FP']
        
        for tag in counter_dict.keys():
            try:                
                self.tag_metrics[tag]['Precision'] = counter_dict[tag]['TP']/(counter_dict[tag]['TP']+counter_dict[tag]['FP'])
                self.tag_metrics[tag]['Recall'] = counter_dict[tag]['TP']/(counter_dict[tag]['TP']+counter_dict[tag]['FN'])
                self.tag_metrics[tag]['F1_score'] = 2*(self.tag_metrics[tag]['Precision']*self.tag_metrics[tag]['Recall'])/(self.tag_metrics[tag]['Precision']+self.tag_metrics[tag]['Recall'])
                self.tag_metrics[tag]['Accuracy'] = (counter_dict[tag]['TP']+ counter_dict[tag]['TN']) / TOTAL_TAGGED_WORDS
            
            except ZeroDivisionError:
                continue
                
        self.counter_dict =  counter_dict
        
    def generate_confusion_matrix(self):
        '''
        Generate confusion matrix for the particular fold
        '''
        CM = ConfusionMatrix(list(chain.from_iterable(self.test_actual_tags)) ,list(chain.from_iterable(self.test_predicted_tags)))
        print(CM)
        
    def accuracy(self):
        '''
        Calculate average accuracy score
        '''
        TP =0
        FP =0
        for tag in self.counter_dict.keys():
            TP += self.counter_dict[tag]['TP']
            FP += self.counter_dict[tag]['FP']
        
        return TP/(TP+FP)
        
        
    def print_sample(self):
        '''
        Prints a sample of n = 5 actual and predicted tagged sentences for reference
        '''
        for i in range(5):
            print("Actual :",self.test_actual_tags[i])
            print("Predicted :",self.test_predicted_tags[i])
        
    def get_tag_metrics(self):
        '''
        Prints the per POS precision,recall and F1 score of predicted tags
        '''
        
        print ("{:<10} {:<10} {:<10} {:<10} {:<10}".format('TAG', 'PRECISION', 'RECALL','F1_SCORE','ACCURACY'))
        
        for key in self.tag_metrics.keys():
            precision = str(round(self.tag_metrics[key]['Precision'], 2))
            recall    = str(round(self.tag_metrics[key]['Recall'], 2))
            F1_score  = str(round(self.tag_metrics[key]['F1_score'], 2))
            accuracy  = str(round(self.tag_metrics[key]['Accuracy'], 2))
            print ("{:<10} {:<10} {:<10} {:<10} {:<10}".format(key, precision,recall,F1_score,accuracy)) 

In [7]:
'''
Pre-processing 
'''
def add_begin_end_tags(sentences):
    
    app_sent=[]
    for sentence in sentences:
      sentence.insert(0,('^','^')) # To find initial probabilities 
      sentence.append(('<EOS>','<EOS>')) # To make sure every sentence ends with a '.' to ease processing
      app_sent.append(sentence)
    return app_sent

In [8]:
'''
Convert only non-noun words from the corpus to lower case
'''
def convert_to_lower(sentences):
    app_sent=[]
    for i in range(len(sentences)) :
        for j in range(len(sentences[i])):
            if(sentences[i][j][1] != 'NOUN'):
                sentences[i][j] = (sentences[i][j][0].lower(),sentences[i][j][1])
        app_sent.append(sentences[i])
    return app_sent

In [9]:
'''
Get all possible word tags for Viterbi Tagging
'''
def get_word_tag_mapping(sentences):
    
    word_tag_map ={}
    for sentence in sentences:
        for word,tag in sentence:
            if word not in word_tag_map:
                word_tag_map[word] = [tag]
            if tag not in word_tag_map[word]:
                word_tag_map[word].append(tag)
    return word_tag_map

In [10]:
'''
Calculate average of scores obtained using 5-fold cross validation
'''

def avg(dict_list,folds):
    avg_dict = defaultdict(lambda: defaultdict(lambda:0))
    for d in dict_list:
        for tag in d.keys():
            avg_dict[tag]['Precision'] += d[tag]['Precision']/folds
            avg_dict[tag]['Recall'] += d[tag]['Recall']/folds
            avg_dict[tag]['F1_score'] += d[tag]['F1_score']/folds
            avg_dict[tag]['Accuracy'] += d[tag]['Accuracy']/folds
            
    print ("{:<10} {:<10} {:<10} {:<10} ".format('TAG', 'PRECISION', 'RECALL','F1_SCORE'))
        
    for key in avg_dict.keys():
        precision = str(round(avg_dict[key]['Precision'], 2))
        recall    = str(round(avg_dict[key]['Recall'], 2))
        F1_score  = str(round(avg_dict[key]['F1_score'], 2))
        accuracy  = str(round(avg_dict[key]['Accuracy'], 2))
        print ("{:<10} {:<10} {:<10} {:<10} ".format(key, precision,recall,F1_score)) 
    

In [11]:
'''
Taking sentence/tag pairs from brown corpus and doing cross validation
''' 
sentences = brown.tagged_sents(tagset='universal')
processed_sentences = add_begin_end_tags(sentences)
processed_sentences = convert_to_lower(processed_sentences)
tag_map = get_word_tag_mapping(processed_sentences)

In [12]:
'''
Finding accuracies via 5 fold cross validation
'''
sent = np.array(processed_sentences,dtype=object)

kfold = KFold(n_splits=FOLDS,shuffle=True)
tag_metric_list = []
predicted_tags_list = []
actual_tags_list = []
overall_accuracy = 0

for train, test in kfold.split(sent):
    train_data = list(sent[train])
    test_data = list(sent[test])                 
    hmm = HMM_POS(train_data,test_data,token_tags_map=tag_map)
    hmm.calculate()
    tag_metric_list.append(hmm.tag_metrics)
    predicted_tags_list.extend(list(chain.from_iterable(hmm.test_predicted_tags)))
    actual_tags_list.extend(list(chain.from_iterable(hmm.test_actual_tags)))
    overall_accuracy += hmm.accuracy()/FOLDS
    
# Reporting precision,recall,F1 score  

avg(tag_metric_list,FOLDS)

# Printing final confusion matrix
'''
NOTE : This is the confusion matrix plotted over total of all predictions obtained from the FOLDS=5 folds
Divide by FOLDS in case average is required
'''

print(ConfusionMatrix(predicted_tags_list,actual_tags_list))

# Printing overall accuracy 

print("OVERALL ACCURACY :",round(overall_accuracy,4)*100,"%")

TAG        PRECISION  RECALL     F1_SCORE   
^          1.0        1.0        1.0        
ADV        0.9        0.82       0.86       
.          1.0        1.0        1.0        
PRON       1.0        0.93       0.97       
VERB       0.94       0.95       0.94       
ADP        0.93       0.9        0.91       
DET        0.99       0.98       0.99       
NOUN       0.95       0.95       0.95       
ADJ        0.92       0.91       0.91       
<EOS>      1.0        1.0        1.0        
CONJ       0.99       0.99       0.99       
NUM        0.97       0.99       0.98       
PRT        0.67       0.93       0.78       
X          0.44       0.83       0.57       
      |             <                                                                                     |
      |             E                           C             N             P             V               |
      |             O      A      A      A      O      D      O      N      R      P      E               |
 