# Part of Speech Tagging

### By: Akshay Punwatkar

## Generating Commponents of Part-of-Speech hidden markov model

In [4]:
# !pip install nltk

In [1]:
import nltk
import numpy as np
from collections import defaultdict 
from pprint import pprint
import tqdm.notebook as tq
# nltk.download('brown')
# nltk.download('universal_tagset')

In [2]:
corpus = nltk.corpus.brown.tagged_sents(tagset='universal')[:10000]

In [170]:
class hidden_markov():
    
    def __init__(self):
        self.observation_matrix = []
        self.transition_matrix = []
        self.init_state = []
        self.observation_dict = defaultdict(lambda : defaultdict(int)) 
        self.transition_dict = defaultdict(lambda : defaultdict(int))
        self.init_state_dict = defaultdict(int)
    
    def normalize(self, dictionary, level=1, smooth_k= 0):
        """
        Function to convert frequency into probabilites with an option
        of Laplace smoothing and log transformation
        Input:
        dictionary : dictionary containing key value mapping, where the values for each 
                     key will be normalised. Can be zero or one level nested dict
        level : 1, level at with the values to be normalize exists in the dictionary;
                Will be "1" for unnested  and "2" for one level nested dict
        smooth_k : 0, laplace smoothing variable        
        
        Output:
        norm_dict : dictionary with normalized values
        """
        
        if level == 1:
            tot = sum(dictionary.values())
            norm_dict = {key:(value+smooth_k)/(tot+value*smooth_k) for key,value in dictionary.items()}
            return norm_dict
        
        elif level == 2:
            norm_dict = defaultdict(lambda : defaultdict(int))
            for first_key,second_key in dictionary.items():
                tot = sum(second_key.values()) 
                norm_dict[first_key] = {key:(value+smooth_k)/(tot+value*smooth_k) for key,value in second_key.items()}
                
            return norm_dict    
                
            
    def generate_matrix(self,x,y,transition=True, show_status=True):
        """
        Function to generate transition and observation matrix for the Hidden Markov Model
        Input: 
        x: No of rows in the matrix
        y: No of columns in the matrix
        transition : True, False if Observation matrix is generated
        show_status : True, Flag to show status of process
        
        Output:
        Generated matrix
        """
        
        local_dict = self.transition_dict if transition else self.observation_dict

        #generating matrix with zero values 
        gen_matrix = np.zeros([x,y]) 
        
        if transition and show_status:
            print("Generating Transition matrix")
        elif show_status:
            print("Generating Observation/Emission matrix")
        
        for first_key, second_key in tq.tqdm(local_dict.items(), disable= not show_status):
            #x-coordinate corresponding first component
            idx_x = self.distinct_pos.index(first_key) if transition else self.distinct_tokens.index(first_key)

            for key, value in second_key.items():
                #y-coordinate corresponding the the following component of the first component
                idx_y = self.distinct_pos.index(key)
                gen_matrix[idx_x][idx_y] = format(value, '.5f')
            
        return gen_matrix    
            
            
    def generate_components(self, corpus,  k=0, show_status=True):
        """
        Function to generate components of Hidden Markow model
        Input:
        corpus : Corpus (list) of sentences containing (word,part-of-speech) mapping
        k : 0, Laplace smoothing coefficient
        show_status : True, Flag to show status of process
        
        Output:
        None
        """
        self.distinct_tokens = set()
        self.distinct_pos = set()
        self.corpus = corpus
        
        for sentence in self.corpus:   
            
            #initial state distribution
            self.init_state_dict[sentence[0][1]] += 1
            
            for i in range(len(sentence)):
                
                curr_word = sentence[i][0].lower()
                curr_pos = sentence[i][1]
                
                #observation dictionary for word : pos : frequency of occurance
                self.observation_dict[curr_word][curr_pos] +=1
                
                if i < len(sentence)-1:
                    next_pos = sentence[i+1][1]
                    #transition dictionary for pos : next-post : frequency of occurance
                    self.transition_dict[curr_pos][next_pos] +=1
                
                self.distinct_tokens.add(curr_word)
        

        self.distinct_pos = sorted(self.transition_dict.keys())
        m = len(self.distinct_pos)
        self.distinct_tokens = sorted(self.distinct_tokens)
        n = len(self.distinct_tokens)
        
        self.transition_dict = self.normalize(self.transition_dict, level=2, smooth_k= k)
        self.transition_matrix = self.generate_matrix(m,m,show_status=show_status)
    
        self.observation_dict = self.normalize(self.observation_dict, level=2, smooth_k = k)
        self.observation_matrix = self.generate_matrix(n,m,transition=False,show_status=show_status)
        
        self.init_state_dict = self.normalize(self.init_state_dict, smooth_k=k)
        self.init_state = [self.init_state_dict[k] for k in self.distinct_pos]
        
        if show_status:
            return "Success !! Components generations completed"
        else:
            return None
        

In [175]:
hmm = hidden_markov()
hmm.generate_components(corpus,k=1,show_status=True)

Generating Transition matrix


HBox(children=(FloatProgress(value=0.0, max=12.0), HTML(value='')))


Generating Observation/Emission matrix


HBox(children=(FloatProgress(value=0.0, max=21248.0), HTML(value='')))




'Success !! Components generations completed'

In [176]:
hmm.init_state

[0.06112676056338028,
 0.04058721934369603,
 0.10778977424823771,
 0.0720185614849188,
 0.05213764337851929,
 0.19382407482060793,
 0.16431257835353114,
 0.017104099085815394,
 0.10194000359259925,
 0.029882604055496264,
 0.039481268011527376,
 0.0005997001499250374]

In [177]:
hmm.transition_matrix

array([[1.2413e-01, 5.0240e-02, 1.0028e-01, 5.9370e-02, 8.5850e-02,
        1.0731e-01, 1.5187e-01, 2.0950e-02, 6.0100e-02, 1.9810e-02,
        1.1570e-01, 2.0100e-03],
       [8.2140e-02, 5.5990e-02, 7.3900e-02, 7.6200e-03, 3.2270e-02,
        6.0600e-03, 4.0297e-01, 1.2700e-02, 2.4300e-03, 1.8250e-02,
        1.5310e-02, 6.7000e-04],
       [1.1290e-02, 7.8020e-02, 1.8610e-02, 1.3480e-02, 1.3200e-03,
        3.0710e-01, 2.2320e-01, 3.7600e-02, 4.7360e-02, 1.2080e-02,
        3.7120e-02, 4.0000e-04],
       [1.2284e-01, 1.3084e-01, 1.2161e-01, 8.7980e-02, 1.3590e-02,
        7.4740e-02, 3.8720e-02, 1.5340e-02, 3.5090e-02, 2.8630e-02,
        2.0591e-01, 3.2000e-04],
       [2.2600e-02, 1.0533e-01, 6.3540e-02, 8.1100e-02, 6.0000e-04,
        1.3594e-01, 2.2481e-01, 1.9270e-02, 5.0100e-02, 2.3180e-02,
        1.3830e-01, 4.5000e-04],
       [1.2770e-02, 2.0027e-01, 8.9600e-03, 1.8550e-02, 5.5000e-04,
        6.4300e-03, 3.8107e-01, 1.3230e-02, 8.3500e-03, 1.6000e-03,
        5.7940e-02,

In [178]:
hmm.observation_matrix

array([[0.50417, 0.     , 0.     , ..., 0.     , 0.     , 0.     ],
       [0.     , 0.     , 0.     , ..., 0.     , 0.     , 0.     ],
       [0.     , 0.     , 0.     , ..., 0.     , 0.     , 0.     ],
       ...,
       [0.     , 0.     , 0.     , ..., 0.     , 0.     , 0.     ],
       [0.     , 0.     , 0.     , ..., 0.     , 0.     , 0.     ],
       [0.     , 0.     , 0.     , ..., 0.     , 0.     , 0.     ]])

In [155]:
hmm.distinct_pos

['.',
 'ADJ',
 'ADP',
 'ADV',
 'CONJ',
 'DET',
 'NOUN',
 'NUM',
 'PRON',
 'PRT',
 'VERB',
 'X']

In [156]:
(hmm.observation_matrix[hmm.distinct_tokens.index('any')])

array([0.     , 0.     , 0.     , 0.01288, 0.     , 0.5    , 0.     ,
       0.     , 0.     , 0.     , 0.     , 0.     ])

In [157]:
hmm.observation_matrix[hmm.distinct_tokens.index('of')]

array([0.     , 0.     , 0.50007, 0.     , 0.     , 0.     , 0.     ,
       0.     , 0.     , 0.     , 0.     , 0.     ])

In [158]:
hmm.observation_matrix[hmm.distinct_tokens.index('0')]

array([0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.])

In [159]:
hmm.observation_matrix[hmm.distinct_tokens.index('one')]

array([0.     , 0.     , 0.     , 0.     , 0.     , 0.     , 0.11076,
       0.46831, 0.     , 0.     , 0.     , 0.     ])

In [160]:
hmm.observation_matrix[hmm.distinct_tokens.index('those')]

array([0.     , 0.     , 0.     , 0.     , 0.     , 0.50286, 0.     ,
       0.     , 0.     , 0.     , 0.     , 0.     ])

In [161]:
hmm.observation_matrix[hmm.distinct_tokens.index('coming')]

array([0.     , 0.     , 0.     , 0.     , 0.     , 0.     , 0.11429,
       0.     , 0.     , 0.     , 0.4918 , 0.     ])

In [179]:
hmm.transition_matrix[hmm.distinct_pos.index('DET')][hmm.distinct_pos.index('VERB')] * \
hmm.observation_matrix[hmm.distinct_tokens.index('coming')][hmm.distinct_pos.index('VERB')]

0.028494892

In [180]:
hmm.transition_matrix[hmm.distinct_pos.index('DET')][hmm.distinct_pos.index('NOUN')] * \
hmm.observation_matrix[hmm.distinct_tokens.index('coming')][hmm.distinct_pos.index('NOUN')]

0.0435524903

****
## Implementation of Viterbi algorithm

#### $\hspace{250px} v_{t}(j)=\max _{i=1}^{N} v_{t-1}(i) a_{i j} b_{j}\left(o_{t}\right)$

In [181]:
class viterbie():
    def __init__(self):
        None
        
    def handle_missing_token(self,token):
        return None
        
    def viterbie(self, obs,pi,A,B,state_list,token_list):
        self.states = []
        """
        Function to implement viterbie algorithm for part-of-speech tagging
        
        obs - the observations [list of ints]
        pi  - the initial state probabilities [list of floats]
        A   - the state transition probability matrix [2D numpy array]
        B   - the observation probability matrix [2D numpy array]
        state_list - ordered list of names of the states
        
        """
        
        n = len(obs)
        num_states = len(pi)
        self.state_prob_matrix = np.zeros([n,num_states])
        self.text = []
        self.real_state = []
        
        #iterating over EACH word in the sentence 
        for i in range(n):
            #for the first word, transition probability will be used as prior
            #else the probability of last word against each state (POS) will be used as prior
            if i == 0:
                prior_prob = pi
            else:
                prior_prob = self.state_prob_matrix[i-1]
                
            token = obs[i][0].lower()    
            self.real_state.append(obs[i][1])
            self.text.append(obs[i][0])
            
            # Observation probability of the given word being in different states (POS). 
            # If the word doesn't exists in the observation matrix,
            # equal probabilites will be assigned against each state for that word.
            obs_probab = B[token_list.index(token)] if token in token_list else np.full(num_states,fill_value=1/num_states) 
            
            #iterating over all possible states (POS) for a word
            for j in range(num_states):
                # for the first word, total probability is the product of the observation probability of the 
                # word coming a particular state (POS) and initial_state probability of that particular state (POS)
                if i == 0:
                    self.state_prob_matrix[i][j] = prior_prob[j]*obs_probab[j]
                else:
                    temp_states = []
                    # interating over all possible previous states for a given state (denoted by j)
                    for k in range(num_states):
                        # for rest of the words, total probability is the product of the observation probability of the 
                        # word coming a particular state (POS), transition state probability of coming a particular previous (k) 
                        # state to the current state (j), and prior state probability of previous word in that particular state (k)
                        # maximum probability of all the states (k's) is assigned as the current state probability for the word
                        state_change_probab = A[k][j]
                        temp_states.append(state_change_probab*obs_probab[j]*prior_prob[k])
                    self.state_prob_matrix[i][j] = max(temp_states)  
                  
        self.state_index = np.argmax(self.state_prob_matrix,axis=1)
        self.states = [state_list[i] for i in self.state_index]
    
        return self.states
    
    def mismatch(self):
        """
        Function to calculate number of mismatch in part-of-speech
        between the actual and predicted using viterbie
        """
        mismatch = 0
        total = len(self.real_state)
        position = []
        for i in range(total):
            if self.real_state[i] != self.states[i]:
                mismatch +=1
                position.append(i)
                
        return "\nTotal mismatch : %s/%s,"%(mismatch,total) + " at positions %s "%position + "Accuracy : %s percent"%round((total-mismatch)*100/total,2)

#### Testing of Corpus data 

In [182]:
gen=vit.viterbie(corpus[0],hmm.init_state, hmm.transition_matrix, hmm.observation_matrix, hmm.distinct_pos, hmm.distinct_tokens)
print("\nText :",end=" ")
print(*vit.text, sep= " ")
print("Generated :", end=" ")
print(*gen, sep=', ')
print("Original  :", end= " ") 
print(*vit.real_state, sep= ", ")
print(vit.mismatch())


Text : The Fulton County Grand Jury said Friday an investigation of Atlanta's recent primary election produced `` no evidence '' that any irregularities took place .
Generated : DET, NOUN, NOUN, ADJ, NOUN, VERB, NOUN, DET, NOUN, ADP, NOUN, ADJ, NOUN, NOUN, VERB, ., DET, NOUN, ., ADP, DET, NOUN, VERB, NOUN, .
Original  : DET, NOUN, NOUN, ADJ, NOUN, VERB, NOUN, DET, NOUN, ADP, NOUN, ADJ, NOUN, NOUN, VERB, ., DET, NOUN, ., ADP, DET, NOUN, VERB, NOUN, .

Total mismatch : 0/25, at positions [] Accuracy : 100.0 percent


***
## Infrence of Sequence of States (Testing on NEW data)

In [165]:
test_data = nltk.corpus.brown.tagged_sents(tagset='universal')[10150:10160]

In [183]:
hmm = hidden_markov()
hmm.generate_components(corpus,k=1, show_status=True)

Generating Transition matrix


HBox(children=(FloatProgress(value=0.0, max=12.0), HTML(value='')))


Generating Observation/Emission matrix


HBox(children=(FloatProgress(value=0.0, max=21248.0), HTML(value='')))




'Success !! Components generations completed'

In [184]:
for i in range(len(test_data)):
    vit = viterbie()
    gen=vit.viterbie(test_data[i],hmm.init_state, hmm.transition_matrix, hmm.observation_matrix, hmm.distinct_pos, hmm.distinct_tokens)
    print("\nText :",end=" ")
    print(*vit.text, sep= " ")
    print("Generated :", end=" ")
    print(*gen, sep=', ')
    print("Original  :", end= " ") 
    print(*vit.real_state, sep= ", ")
    print(vit.mismatch())
    print("-------------------------------------")


Text : Those coming from other denominations will welcome the opportunity to become informed .
Generated : DET, NOUN, ADP, ADJ, NOUN, VERB, VERB, DET, NOUN, ADP, VERB, VERB, .
Original  : DET, VERB, ADP, ADJ, NOUN, VERB, VERB, DET, NOUN, PRT, VERB, VERB, .

Total mismatch : 2/13, at positions [1, 9] Accuracy : 84.62 percent
-------------------------------------

Text : The preparatory class is an introductory face-to-face group in which new members become acquainted with one another .
Generated : DET, NOUN, NOUN, VERB, DET, NOUN, NOUN, NOUN, ADP, DET, ADJ, NOUN, VERB, VERB, ADP, NOUN, DET, .
Original  : DET, ADJ, NOUN, VERB, DET, ADJ, ADJ, NOUN, ADP, DET, ADJ, NOUN, VERB, VERB, ADP, NUM, DET, .

Total mismatch : 4/18, at positions [1, 5, 6, 15] Accuracy : 77.78 percent
-------------------------------------

Text : It provides a natural transition into the life of the local church and its organizations .
Generated : PRON, VERB, DET, ADJ, NOUN, ADP, DET, NOUN, ADP, DET, ADJ, NOUN, CONJ,

****