# Parts of speech Tagging - Hidden Markov Models

Done by : Bharathi A

## 1. Importing libraries

In [1]:
import math
import string 
import numpy as np
import pandas as pd
from collections import defaultdict

## 2. Importing data

The data I have used is from Wall Street Journal. 

The training data is "WSJ_train.pos" and each line of the file contains a particular word and its tag seperated by a tab.

The test data is "WSJ_test.pos" and it is formatted in the same way as the train data

In [2]:
f1_train = open('WSJ_train.pos','r')

f2_test  = open('WSJ_test.pos','r')

Let us see how the data looks like 

In [3]:
lines_train = f1_train.readlines()
'''
print("Sample Training data : ")
print(lines_train[0:10])
'''

lines_test = f2_test.readlines()
'''
print("Sample Test data : ")
print(lines_test[0:10])
'''

Sample Training data : 
['In\tIN\n', 'an\tDT\n', 'Oct.\tNNP\n', '19\tCD\n', 'review\tNN\n', 'of\tIN\n', '``\t``\n', 'The\tDT\n', 'Misanthrope\tNN\n', "''\t''\n"]
Sample Test data : 
['The\tDT\n', 'economy\tNN\n', "'s\tPOS\n", 'temperature\tNN\n', 'will\tMD\n', 'be\tVB\n', 'taken\tVBN\n', 'from\tIN\n', 'several\tJJ\n', 'vantage\tNN\n']


The above format of data doesn't look easily readable . But , we can understand that each word is separated from its corresponding part of speech tag by a tabspace

In [4]:
#A single entry in the dataset

#print(lines_train[3])

19	CD



## 3. Defining Utility Functions

In [5]:
punct = set(string.punctuation)

#Rules for tagging unknown terms
noun_suffix = ["action", "age", "ance", "cy", "dom", "ee", "ence", "er", "hood", "ion", "ism", "ist", "ity", "ling", "ment", "ness", "or", "ry", "scape", "ship", "ty"]
verb_suffix = ["ate", "ify", "ise", "ize"]
adj_suffix = ["able", "ese", "ful", "i", "ian", "ible", "ic", "ish", "ive", "less", "ly", "ous"]
adv_suffix = ["ward", "wards", "wise"]

The following cells define some utility functions that will be used throughout this project

The function 'assign_unk' assigns a 'unk' tag to unknown words and with some prior knowledge , it also assigns some possible additional information about the word such as the word being a verb , noun etc

In [6]:
def assign_unk(word):
    if any(char.isdigit() for char in word):
        return '--unk_digit--'
    elif any(char in punct for char in word):
        return '--unk_punct--'
    elif any(word.endswith(suffix) for suffix in noun_suffix):
        return '--unk_noun--'
    elif any(word.endswith(suffix) for suffix in verb_suffix):
        return '--unk_verb--'
    elif any(word.endswith(suffix) for suffix in adj_suffix):
        return '--unk_adj--'
    elif any(word.endswith(suffix) for suffix in adv_suffix):
        return '--unk_adv--'
    
    return '--unk--'
    

The function 'get_word_tag_tuple' parses a given line in the training corpus and returns a tuple of (word,tag). If the line was a new line , the word would get a '--n--' tag . 

If it was an unknown word , it would tag the word using the 'assign_unk' function .

In [7]:
def get_word_tag_tuple(line,vocab):
    if not line.split():
        #It is a new line
        word = '--n--'
        tag  = '--s--'
        return word,tag
    else:
        word,tag = line.split()
        if word not in vocab:
            #Tag the word
            word = assign_unk(word)
        return word,tag
    return None

The next function 'preprocess' is to preprocess the test set vocabulary(without tags) to mark unknown words(not found in train vocab) and mark end of sentence tokens . 

This function returns preprocessed_test that contains preprocessed words which will be fed to the testing function later

In [8]:
def preprocess(file,vocab):
    
    original_test = []
    preprocessed_test = []
    
    fp = open(file,'r')
    
    for i , word in enumerate(fp):
        if not word.strip():
            #Assign new line
            original_test.append(word.split())
            word = '--n--'
            preprocessed_test.append(word)
            continue
        elif word.strip() not in vocab:
            original_test.append(word.strip())
            #Tag the unknown word
            word = assign_unk(word)
            preprocessed_test.append(word)
            continue
        else:
            original_test.append(word.strip())
            preprocessed_test.append(word.strip())
    fp.close()
            
    assert(len(original_test) == len(open(file, "r").readlines()))
    assert(len(preprocessed_test) == len(open(file, "r").readlines()))
            
    return original_test , preprocessed_test

## 4. Build Training vocabulary from the data

In [9]:
#Extract only the word from each line that contains (word,tag)
words = [line.split('\t')[0] for line in lines_train]

#print(words[0:20])

['In', 'an', 'Oct.', '19', 'review', 'of', '``', 'The', 'Misanthrope', "''", 'at', 'Chicago', "'s", 'Goodman', 'Theatre', '(', '``', 'Revitalized', 'Classics', 'Take']


#### Store in the vocabulary only words that occur twice or more than twice

In [10]:
#Build a dict to store the freq of words

word_freq = defaultdict(int)

for word in words:
    word_freq[word]+=1
    

Store the list of words that occur more than once 

In [11]:
voc = [key for key,value in word_freq.items() if (key!='\n' and value > 1)]

#Sort the words
voc = sorted(voc)

#print(voc[10:30])


["'80s", "'86", "'90s", "'N", "'S", "'d", "'em", "'ll", "'m", "'n'", "'re", "'s", "'til", "'ve", '(', ')', ',', '-', '--', '.']


Before writing these words into the file , we also need to append to the vocabulary certain unknown word tags : 

Eg : --unk_verb-- , --unk_upper etc that will be used to denote Unknown words in the test set / practical scenario

In [12]:
unknown_tokens = ['--n--','--unk--', '--unk_adj--', '--unk_adv--', '--unk_digit--', '--unk_noun--', '--unk_punct--', '--unk_upper--', '--unk_verb--']

for token in unknown_tokens : 
    voc.append(token)
    

In [13]:
#print(voc[-10:])

['}', '--n--', '--unk--', '--unk_adj--', '--unk_adv--', '--unk_digit--', '--unk_noun--', '--unk_punct--', '--unk_upper--', '--unk_verb--']


### Store the list of words to a file(They form the vocabulary)

In [14]:
file_vocab = open('vocab.txt','w')

#Store each word in a newline of the file

file_vocab.writelines("%s\n" % item for item in voc)

file_vocab.close()
    

In [15]:
assert(len(voc) == len(open('vocab.txt','r').readlines()))

23776
23776


In [16]:
#Read words from vocab.txt

with open('vocab.txt','r') as f:
    voc_list = f.read().split('\n')
    
#print("Vocabulary : ")
#print(voc_list[0:30])

Vocabulary : 
['!', '#', '$', '%', '&', "'", "''", "'40s", "'60s", "'70s", "'80s", "'86", "'90s", "'N", "'S", "'d", "'em", "'ll", "'m", "'n'", "'re", "'s", "'til", "'ve", '(', ')', ',', '-', '--', '.']


Now that our Training set vocabulary is created , we have to create the vocabulary for Test set

In [17]:
test_words = [line.split('\t')[0] for line in lines_test]


#print(test_words[0:20])
    

['The', 'economy', "'s", 'temperature', 'will', 'be', 'taken', 'from', 'several', 'vantage', 'points', 'this', 'week', ',', 'with', 'readings', 'on', 'trade', ',', 'output']


#### Store Test vocabulary in a file

It is appropriate to store the test set vocabulary, without the tags ,  in a file so that it is easily accessible for further processing and testing

In [18]:
#test_words = sorted(test_words)

test_file = open('test_vocab.txt','w')

#print(len(test_words))


test_file.writelines("%s\n" % item.strip() for item in test_words)
test_file.close()


34199


## 5. Preprocessing Train and Test data for Modeling

### Build a Dictionary from Training Vocabulary

In [19]:
#Build a dictionary vocab that stores the index of each word ( This will be necessary in the Viterbi algorithm)
vocab = {}

for i , word in enumerate(sorted(voc_list)):
    vocab[word] = i
    
#for word in voc_list[0:15]:
    #print(word,"   :  ",vocab[word])


!    :   1
#    :   2
$    :   3
%    :   4
&    :   5
'    :   6
''    :   7
'40s    :   8
'60s    :   9
'70s    :   10
'80s    :   11
'86    :   12
'90s    :   13
'N    :   14
'S    :   15


### Build Test data 'y' and preprocess it 

In [20]:
# 'y' is simply each line from lines_test that contains (word,tag) tuple

y = lines_test

'''
print("Test data sample ")
print(y[10:20])
'''

Test data sample 
['points\tNNS\n', 'this\tDT\n', 'week\tNN\n', ',\t,\n', 'with\tIN\n', 'readings\tNNS\n', 'on\tIN\n', 'trade\tNN\n', ',\t,\n', 'output\tNN\n']


The preprocess function is called which returns the preprocessed words for test data

In [21]:
_,preprocessed_test = preprocess('test_vocab.txt',vocab)

'''
print("Sample of Preprocessed Test words : ")
print(preprocessed_test[0:20])
'''

Sample of Preprocessed Test words : 
['The', 'economy', "'s", 'temperature', 'will', 'be', 'taken', 'from', 'several', '--unk--', 'points', 'this', 'week', ',', 'with', 'readings', 'on', 'trade', ',', 'output']


## 6. Training

To train the Part of speech tagger using Hidden Markov Models , We need three dictionaries :

    1. Transition counts - computes the number of times each tag happened next to another tag
    2. Emission counts   - computes the probability of a word given its tag.
    3. Tag counts        - Computes the number of times each tag appeared in the training set
    
    These dictionaries will be helpful in computing matrices that are used in actual computation in the Hidden Markov Model

In [23]:
def create_dictionaries(training_data,vocab):
    '''
    Computes and returns the Transition , Emission and Tag counts dictionaries from the raw training data and the vocab dict
    
    '''
    
    #Initialize empty dictionaries 
    transition_counts = defaultdict(int)
    emission_counts   = defaultdict(int)
    tag_counts        = defaultdict(int)
    
    prev_tag = '--s--'
    
    for line in training_data:
        
        word,tag = get_word_tag_tuple(line,vocab)
        
        transition_counts[(prev_tag,tag)] += 1
        
        emission_counts[(tag,word)] += 1
        
        tag_counts[tag] += 1
        
        prev_tag = tag
    
    return transition_counts , emission_counts , tag_counts

In [24]:
transition_counts , emission_counts , tag_counts = create_dictionaries(lines_train,vocab)

In [25]:
#Check the values 

'''
print("Sample transition counts :")

for item in list(transition_counts.items())[0:4]:
    print(item)
    
print("\n Sample Emission counts :")

for item in list(emission_counts.items())[0:4]:
    print(item)
    
print("\n Sample Tag counts :")

for item in list(tag_counts.items())[0:4]:
    print(item)
'''

Sample transition counts :
(('--s--', 'IN'), 5050)
(('IN', 'DT'), 32364)
(('DT', 'NNP'), 9044)
(('NNP', 'CD'), 1752)

 Sample Emission counts :
(('IN', 'In'), 1735)
(('DT', 'an'), 3142)
(('NNP', 'Oct.'), 317)
(('CD', '19'), 100)

 Sample Tag counts :
('IN', 98554)
('DT', 81842)
('NNP', 91466)
('CD', 36568)


### Generate Probabilities Matrices for Hidden Markov Model computation

Transition Probability Matrix : Each cell returns the probability to go from one part of speech to another.Here , the POSs act as states of a Markov model

In [26]:
def create_transition_prob_matrix(transition_counts,tag_counts,smoothing_factor):
    
    tags = sorted(tag_counts.keys())
    
    n_tags = len(tags)
    
    #Initialize the matrix TP
    
    TP = np.zeros((n_tags,n_tags))
    
    transition_states = set(transition_counts.keys())
    
    for i in range(n_tags):
        
        for j in range(n_tags):
            
            count = 0
            
            key = (tags[i],tags[j])
            
            if key in transition_states:
                
                count = transition_counts[key]
            
            count_tag_prev = tag_counts[tags[i]]
            
            TP[i,j] = (count + smoothing_factor)/(count_tag_prev + n_tags*smoothing_factor)
    
    return TP
        
    

In [27]:
TP = create_transition_prob_matrix(transition_counts,tag_counts,0.001)

Emission Probabilities Matrix : The emission probabilities matrix of dimension (n_tags, len(vocab)) computes the emission probability of a (tag,word) in each cell 

In [28]:
def create_emission_prob_matrix(emission_counts,tag_counts,vocab,smoothing_factor):
    
    tags = sorted(tag_counts.keys())
    
    n_tags = len(tag_counts)
    
    n_words = len(vocab)
    
    #Initialize empty matrix
    E = np.zeros((n_tags,n_words))
    
    emission_states = set(emission_counts.keys())
    
    for i in range(n_tags):
        
        for j in range(n_words):
            
            count = 0 
            
            key = (tags[i],vocab[j])
            
            if key in emission_states:
                
                count = emission_counts[key]
                
            count_tag = tag_counts[tags[i]]
        
            E[i,j] = (count + smoothing_factor)/(count_tag + n_words*smoothing_factor)
    
    return E
    
    

In [32]:
E = create_emission_prob_matrix(emission_counts,tag_counts,list(vocab),0.001)

In [33]:
states = sorted(tag_counts.keys())

#print(states)

['#', '$', "''", '(', ')', ',', '--s--', '.', ':', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'JJR', 'JJS', 'LS', 'MD', 'NN', 'NNP', 'NNPS', 'NNS', 'PDT', 'POS', 'PRP', 'PRP$', 'RB', 'RBR', 'RBS', 'RP', 'SYM', 'TO', 'UH', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ', 'WDT', 'WP', 'WP$', 'WRB', '``']


Store a mapping from Tag abbreviations to their meaningful full forms

In [62]:
states_mapping ={'CC' : 'conjunction, coordinating',
'CD':'cardinal number',
'DT' : 'determiner',
'EX' : 'existential there',
'FW' : 'foreign word',
'IN': 'conjunction, subordinating or preposition',
'JJ':'adjective',
'JJR':'adjective, comparative',
'JJS':'adjective, superlative',
'LS':'list item marker' ,
'MD':'verb, modal auxillary',
'NN':'noun, singular or mass',
'NNS':'noun, plural',
'NNP':'noun, proper singular',
'NNPS':'noun, proper plural',
'PDT':'predeterminer',
'POS':'possessive ending',
'PRP':'pronoun, personal',
'PRP$':'pronoun, possessive',
'RB':'adverb',
'RBR':'adverb, comparative',
'RBS':'adverb, superlative',
'RP':'adverb, particle',
'SYM':'symbol',
'TO':'infinitival to',
'UH':'interjection',
'VB':'verb, base form',
'VBZ':'verb, 3rd person singular present',
'VBP':'verb, non-3rd person singular present',
'VBD':'verb, past tense',
'VBN':'verb, past participle',
'VBG':'verb, gerund or present participle',
'WDT':'wh-determiner',
'WP':'wh-pronoun, personal',
'WP$':'wh-pronoun, possessive',
'WRB':'wh-adverb',
'.':'punctuation mark, sentence closer',
',':'punctuation mark, comma',
':':'punctuation mark, colon',
'(':'contextual separator, left paren',
')':'contextual separator, right paren',
'#' : 'unknown word/sentence',
'$' : 'unknown word/sentence',
"''" : "unknown word/sentence",
'--s--':'start of sentence ',
'``' : 'unknown word/sentence'}

## 7. Viterbi Algorithm for Testing 

The Viterbi algorithm makes use of Dynamic Programming to find the best sequence of POS tags for a given sequence of words/sentence . It is primarily makes use of two matrices : 

      1.'best_probs' : Stores the best probabiliy of going from a tag to a word (Hidden state to observable transition)
  
      2.'best_paths' : Stores the best previous states to reach a particular tag
  
The Algorithm consists of three main steps :
 1. Initialization : For a given corpus/sequence of words , this step initializes the first column of 'best_probs' matrix and initializes the 'best_paths' matrix with zeros . 
 2. Forward propagation : Populates the two matrices by computing the best probabilities and paths 
 3. Backward propagation : Traverses backwards through the 'best_paths' matrix to find the best sequence of tags for a given 
    sentence

In [34]:
def initialize(states,tag_counts,TP,E,test_corpus,vocab):
    
    n_tags = len(tag_counts)
    
    #Initialize the two matrices
    best_probs = np.zeros((n_tags,len(test_corpus)))
    
    best_paths = np.zeros((n_tags,len(test_corpus)))
    
    #Index of a tag denoting the start of the sentence (Before the first word)
    start_index = states.index('--s--')
    
    #Populate first column of 'best_probs' matrix . Go through each POS tag
    for i in range(n_tags):
        
        if TP[start_index,i] == 0:
            best_probs[i,0] = float('-inf')
            
        else:
            best_probs[i,0] = math.log(TP[start_index,i]) + math.log(E[i,vocab[test_corpus[0]]])
            
    return best_probs,best_paths
        
    

In [35]:
#best_probs , best_paths = initialize(states,tag_counts,TP,E,preprocessed_test,vocab)

The next step is to populate the remaining cells in the two matrices by forward propagation

In [36]:
def forward(TP,E,test_corpus,best_probs,best_paths,vocab):
    
    n_tags = best_probs.shape[0]
    
    #Loop through each word in the test sentence starting from word 1 , since word 0 was already initialized
    
    for i in range(1,len(test_corpus)):
        
        for j in range(n_tags):
            
            temp_best_probs = float('-inf')
            
            temp_best_paths = None
            
            for k in range(n_tags):
                
    
                temp_prob = best_probs[k,i-1] + math.log(TP[k,j]) + math.log(E[j,vocab[test_corpus[i]]])
                
                if temp_prob > temp_best_probs:
                    
                    temp_best_probs = temp_prob
                    
                    temp_best_paths = k 
            
            best_probs[j,i] = temp_best_probs
            best_paths[j,i] = temp_best_paths
    
    return best_probs,best_paths
    

In [37]:
#best_probs,best_paths = forward(TP,E,preprocessed_test,best_probs,best_paths,vocab)


The last function 'backward' retrieves the best sequence of tags through back propagating the 'best_paths' matrix.

In [38]:
def backward(best_probs,best_paths,test_corpus,states):
    
    #Store the number of words in the test sentence
    n_words = best_probs.shape[1]
    
    n_tags = best_probs.shape[0]
    
    #Array to store the indices of predicted states
    pred_states_index = [None] * n_words
    
    #Array to store the actual predicted states in string
    predictions = [None] * n_words
    
    best_prob_last_word = float('-inf')
    
    #Retreive the best probability for the last word and the index of tag for this word 
    for i in range(best_probs.shape[0]):
        
        if(best_probs[i,n_words-1]) > best_prob_last_word:
            
            best_prob_last_word = best_probs[i,n_words-1]
            
            pred_states_index[n_words - 1] = i
            
    predictions[n_words-1] = states[pred_states_index[n_words-1]]
    
    
    #Loop backwards through the best_paths matrix from the last word
    
    for i in range(n_words-1 , -1, -1):
        
        
        tag_word_i = pred_states_index[i]
        
        pred_states_index[i-1] = best_paths[int(tag_word_i),i]
        
        predictions[i-1] = states[int(pred_states_index[i-1])]
        

    
    return predictions 

In [39]:
#predictions = backward(best_probs,best_paths,preprocessed_test,states)

## 8. Compute Accuracy of the Tagger

In [40]:
def get_accuracy(predictions,y):
    
    correct = 0
    total = 0
    
    for prediction , y in zip(predictions,y):
        
        word_tag = y.split()
        
        if len(word_tag)!=2:
            continue
        
        word , tag = word_tag
        
        if tag == prediction :
            correct+=1
        
        total+=1
    
    return correct/total

In [41]:
#print(f"Accuracy of the POS Tagger using Viterbi algorithm is : {get_accuracy(predictions,y)} ")

Accuracy of the POS Tagger using Viterbi algorithm is : 0.9508720664779472 


The Viteri Algorithm acheived about 95% accuracy in the test set  for Parts of Speech Tagging . This can now be applied to test our own example . 

In order to deal with a real-world example , we need to define another function that will preprocess those sentences . The function 'preprocess' defined above only dealt with the particularities of this specific test set 

In [49]:
def preprocess_test_sample(words,vocab):
    
    preprocessed_words = []
    
    for i , word in enumerate(words):
        
        if not word.strip():
            word = '--n--'
            preprocessed_words.append(word)
            continue
        elif word.strip() not in vocab:
            word = assign_unk(word)
            preprocessed_words.append(word)
            continue
        else:
            preprocessed_words.append(word)
    
    assert(len(words)==len(preprocessed_words))
    
    return preprocessed_words

In [63]:
'''
sentence = 'sffdhew.'

words = sentence.split(' ')

preprocessed_words = preprocess_test_sample(words,vocab)

print(preprocessed_words)
'''

['--unk_punct--']


In [64]:
#test_best_probs , test_best_paths = initialize(states,tag_counts,TP,E,preprocessed_words,vocab)

In [65]:
#test_best_probs , test_best_paths = forward(TP,E,preprocessed_words,test_best_probs,test_best_paths,vocab)

In [66]:
#test_predictions = backward(test_best_probs,test_best_paths,preprocessed_words,states)

In [67]:
'''
for i , pred in enumerate(test_predictions):
    print(preprocessed_words[i],'      ',states_mapping[pred])'''

--unk_punct--        unknown word/sentence


## 9. Pickle the Vocabulary , Probability and Count matrices

The last part that remains is to pickle important data structures used in this notebook so that they can be referred to in the app 

In [56]:
'''
import pickle 

with open('Transition_probabilities.pickle','wb') as tfp:
    pickle.dump(TP,tfp)
    
with open('Emission probabilities.pickle','wb') as efp:
    pickle.dump(E,efp)
    
with open('states.pickle','wb') as sfp:
    pickle.dump(states,sfp)

with open('tag_counts.pickle','wb') as tagfp:
    pickle.dump(tag_counts,tagfp)
    
with open('vocab.pickle','wb') as vfp:
    pickle.dump(vocab,vfp)
    
with open('states_mapping.pickle','wb') as smfp:
    pickle.dump(states_mapping,smfp)
'''