# Hidden Markov Model (HMM) Based Part Of Speech Tagging

### Introduction

In this notebook, we will be making a Part of Speech (POS) Tagger using an HMM via Supervised Matrix/Parameter Initialization and Viterbi Decoding.

Hidden Markov Model (HMM) is a statistical model used to describe a system that transitions between a set of hidden states over time, generating observable outcomes.

In the context of Part-of-Speech (POS) tagging, HMMs are used to model the sequence of words in a sentence as a sequence of hidden states representing the POS tags. The observable outcomes are the actual words in the sentence.

## Terminology

__Supervised Matrix/Parameter Initialization__: involves setting the initial values of the parameters in the model, specifically the emission probabilities matrix and the transition probabilities matrix. In the case of supervised learning tasks, where the HMM is trained on annotated data (e.g., for Part-of-Speech tagging), the initialization involves estimating the initial values of these matrices based on the observed training data. This initialization is crucial as it provides the starting point for the model to learn and adjust its parameters during the training process. The matrices are typically initialized using statistical information derived from the frequency of transitions and emissions observed in the training dataset.

__Viterbi Decoding__: is a dynamic programming algorithm used for finding the most likely sequence of hidden states (POS tags) given the observed sequence of words. It efficiently calculates the probability of a sequence of states by considering both emission and transition probabilities. Viterbi decoding helps identify the most probable sequence of POS tags for a given sentence based on the trained HMM parameters.

![The 3 Main Questions of HMMs](hmm_questions.png)

## Resources

For additional details of the working of HMMs, Matrix Initializations and Viterbi Decoding, you can also consult [Chapter 8](https://web.stanford.edu/~jurafsky/slp3/8.pdf) of the SLP3 book as reference.

For a more colorful tutorial, you can also refer to this guide [Hidden Markov Models - An interactive illustration](https://nipunbatra.github.io/hmm/)

Another hands-on approach to Viterbi Decoding, can be found in this medium article [Intro to the Viterbi Algorithm](https://medium.com/mlearning-ai/intro-to-the-viterbi-algorithm-8f41c3f43cf3) and can be supplemented by the following slide-set  
[HMM : Viterbi algorithm -a toy example](https://www.cis.upenn.edu/~cis2620/notes/Example-Viterbi-DNA.pdf) from the UPenn CIS 2620 course.


For this notebook, we will stick to standard libraries i.e. `numpy`, `nltk`, `collections` and `tqdm`.

In [1]:
import nltk
#Download the dataset and tagset from below:
nltk.download('conll2000')
nltk.download('universal_tagset')
from nltk.corpus import conll2000

[nltk_data] Downloading package conll2000 to
[nltk_data]     C:\Users\khuze\AppData\Roaming\nltk_data...
[nltk_data]   Package conll2000 is already up-to-date!
[nltk_data] Downloading package universal_tagset to
[nltk_data]     C:\Users\khuze\AppData\Roaming\nltk_data...
[nltk_data]   Package universal_tagset is already up-to-date!


In [2]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import normalize
from collections import Counter
from tqdm import tqdm

### POS HMM Class
The __POS_HMM__ Class contains the following methods:  

1. `__init__(self, corpus)`: initializes the __POS_HMM__ and prepares it for the parameter initialization phase, contains:
    - a corpus, which is further 85-15 split into training and test sets.
    - a list of all the (word, tag) pairs in the training set (all the sentences are concatenated hence sequence is maintained).
    - a tuple of unique words, tags in the training set
    - a dictionary for mapping words and tags to its unique integer identifier.
    - some additional variables to reduce code redundancy in latter parts such as len()
    - Transition, Emission and Initial State Probability Matrices which are initialized to Zeros.
    - Tag Occurance Probability matrix, which is initialized with tag probabilities i.e. 
        (count of a single tag / count of all the tags) in the training set similar to a unigram. This is used for assigning tags to new or __unknown words__ by randomly sampling from all the tag probabilities.
2. `init_params(self)`: initializes the transition, emission and initial state probability matrices via supervised matrix/parameter learning.  
    - a __method__ `word_given_tag(word, tag)`, which takes as input a word and a tag, and __*returns the count of all instances where the word had the assigned tag as its label / count of the tag*__ (i.e. a probability estimate of its occurence).
    - a __method__ `next_tag_given_prev_tag(tag2, tag1)`which takes as input two tags, and __*returns the count of all instances where the tag 2 and was preceeded by tag 1 / count of the first tag*__ (also a probability estimate).
    - code for populating the initial state probability matrix. This essentially contains the probabilities of all POS tags occuring at the start of the sentence.
    - code to normalize each rows of the transition, emission and initial state probability (only has one row) matrices.
3. `viterbi_decoding(self, sentence)`: returns the mostly likely POS Tags for each word/numeral/punctuation in the sentence.
4. `evaluation(self, debug = False)`: evalutes the performance of the POS Tagger on the test set and returns the testing accuracy (as a %age).

In [3]:
class POS_HMM:
    def __init__(self, corpus): #DO NOT MODIFY THIS FUNCTION

        #-----------------DO NOT MODIFY ANYTHING BELOW THIS LINE-----------------
        self.corpus = corpus
        self.train_set, self.test_set = train_test_split( self.corpus, train_size=0.85,random_state = 777)        
        
        # Extracting Vocabulary and Tags from our training set
        self.all_pairs = [(word, tag) for sentence in self.train_set for word, tag in sentence] #List of tuples (word, POS tag) or a flattened list of tuples by concatenating all sentences
        self.vocab = tuple(set(word for (word, _) in self.all_pairs))

        self.all_tags = [tag for (_, tag) in self.all_pairs] #List of all POS tags in the trainset
        self.tags = tuple(set(self.all_tags)) #List of unique POS tags

        # Mapping vocab and tags to integers for indexing
        self.vocab2index = {word: i for i, word in enumerate(self.vocab)}
        self.tag2index = {tag: i for i, tag in enumerate(self.tags)}

        self.vocab_len = len(self.vocab) #Total Number of Vocab (Unique Words)
        self.all_tag_lengths = len(self.all_tags) #Number of tags in the trainset
        self.tag_len = len(self.tags) #Number of unique tags

        # Initialize transition and emission matrices (Default: Zeros)
        self.transition_mat = np.zeros((self.tag_len, self.tag_len))
        self.emission_mat = np.zeros((self.tag_len, self.vocab_len))
        self.initial_state_prob = np.zeros(self.tag_len)

        # Initialize POS Tag occurance probabilities for getting most likely POS Tags for unknown words
        self.tag_occur_prob= {} #Dictionary of POS Tag occurance probabilities
        all_tag_counts = Counter(self.all_tags)
        for tag in self.tags:
            self.tag_occur_prob[tag] = all_tag_counts[tag]/self.all_tag_lengths            
        

    def init_params(self): #Initialize transition and emission matrices via Supervised Learning (Counting Occurences of emissions and transitions observed in the data).
        all_pairs_array = np.array(self.all_pairs)
        tags_array = np.array(self.all_tags)

        # make data structures for going through tag once and getting all probabilities of words the tag occurs with
        # memoization of all words seen for tag

        tag_dict = dict()
        tag_counts = dict()
        for tag in self.tags:
            word_dict = dict()

            for word in self.vocab:
                word_dict[word] = 0 # 1 if smoothed

            count_tag = 0 # self.vocab_len if smoothed?
            for pair in all_pairs_array:
                if pair[1] == tag:
                    count_tag += 1
                    word_dict[pair[0]] += 1
            
            tag_dict[tag] = word_dict
            tag_counts[tag] = count_tag

        def word_given_tag(word, tag): #Complete this function, skeleton and dummy variables have been provided
            count_tag = tag_counts[tag]
            count_w_given_tag = (tag_dict[tag])[word]

            # print("word,count_w_given_tag,count_tag = ", word,count_w_given_tag, count_tag )

            return count_w_given_tag/count_tag
        
        def next_tag_given_prev_tag(tag2, tag1): #Complete this function, skeleton and dummy variables have been provided
            count_t1 = 0 # self.tag_len
            count_t2_t1 = 0 # 1

            prev = None
            for cur_tag in tags_array:
                if prev == tag1:
                    count_t1 += 1
                    if cur_tag == tag2:
                        count_t2_t1 += 1
                prev = cur_tag
            
            # Since the last word is not being checked for being tag1 in the above loop
            if self.all_tags[-1] == tag1:
                count_t1 += 1

            return count_t2_t1/count_t1 #Number of Occurences of a tag (2nd tag) given prev tag (1st tag) / Number of Occurences of tag 1
        
        # Compute Transition Matrix
        for i, t1 in enumerate(tqdm(list(self.tag2index.keys()), desc="Populating Transition Matrix", mininterval = 10)):
            for j, t2 in enumerate(list(self.tag2index.keys())):
                self.transition_mat[i, j] = next_tag_given_prev_tag(t2, t1)

        # Compute Emission Matrix
        for i, tag in enumerate(tqdm(list(self.tag2index.keys()), desc="Populating Emission Matrix", mininterval = 10)):
            for j, word in enumerate(list(self.vocab2index.keys())):
                self.emission_mat[i, j] = word_given_tag(word, tag)

        # Compute Initial State Probability
                
        for i, tag in enumerate(tqdm(list(self.tag2index.keys()), desc="Populating Initial Probability Matrix", mininterval = 10)):
           self.initial_state_prob[i] = self.tag_occur_prob[tag]
        
        # Normalize matrices i.e. each row sums to 1
        
        self.transition_mat = normalize(self.transition_mat, axis=1, norm='l1')
        self.emission_mat = normalize(self.emission_mat, axis=1, norm='l1')
        



    def viterbi_decoding(self, sentence): # Sentence is a list words i.e. ["Moon", "Landing", "was", "Faked"]
        
        pred_pos_sequence = []  # Implement the Viterbi Algorithm to predict the POS tags of the given sentence

        num_states = self.tag_len
        sent_len = len(sentence)

        # Creating matrices to store memoized values
        tag_pointers = np.zeros([sent_len, self.tag_len])
        probabilities = np.zeros([sent_len, self.tag_len])
        probabilities[0, :] = self.initial_state_prob
        try:
            current_word_ind = self.vocab2index[sentence[0]]  
            probabilities[0, :] =  self.emission_mat[:, current_word_ind] #  * self.initial_state_prob 
        except: 
            pass       

        # Bottom up Approach as it is a Dynamic Algorithm
        for i in range(1,sent_len):
            # Looking for word in known vocabulary and assigning an index
            try:
                current_word_ind = self.vocab2index[sentence[i]]
                for s in range(num_states):
                    max_prob = 0.0
                    pointer = 0
                    for s_prev in range(num_states):
                        prob = probabilities[i-1, s_prev] * self.transition_mat[s_prev, s] * self.emission_mat[s, current_word_ind]
                        if prob > max_prob:
                            max_prob = prob
                            pointer = s_prev
                    probabilities[i, s] = max_prob
                    tag_pointers[i, s] = pointer

            # using the tag probabilities if it is a new word
            except:
                for s in range(num_states):
                    max_prob = 0.0
                    pointer = 0
                    tag = self.tags[s]
                    for s_prev in range(num_states):
                        prob = probabilities[i-1, s_prev] * self.transition_mat[s_prev, s] * self.tag_occur_prob[tag]
                        if prob > max_prob:
                            max_prob = prob
                            pointer = s_prev
                    probabilities[i, s] = max_prob
                    tag_pointers[i, s] = pointer


        best_path = np.zeros(sent_len, dtype=int)
        # The best path ends at the index of the last word's most probable tag
        best_path[sent_len-1] = np.argmax(probabilities[sent_len-1, :])
        # Going backwards trhough the stored matrices by following the pointers
        for i in range(sent_len-2, -1, -1):
            best_path[i] = tag_pointers[i+1, best_path[i+1]]
        
        # Converting indices to tags
        for pos in best_path:
            pred_pos_sequence.append(self.tags[pos])
                    
        return pred_pos_sequence

    def evaluation(self, debug=False): #DO NOT MODIFY THIS FUNCTION
        # Evaluate the model on the test set
        correct, total = 0, 0
        pred_pos_sequences = []

        for test_sentence in self.test_set:
            test_sentence_words, test_sentence_tags = zip(*test_sentence)
            pred_pos_tags = self.viterbi_decoding(test_sentence_words)
            pred_pos_sequences.extend(pred_pos_tags)

            correct += sum(1 for true_tag, pred_tag in zip(test_sentence_tags, pred_pos_tags) if true_tag == pred_tag)
            total += len(test_sentence_words)
        
        accuracy = correct / total if total > 0 else 1

        if debug:
            test_words, test_tags = zip(*[(word, tag) for test_sentence in self.test_set for word, tag in test_sentence])
            print(f"Sentence (first 20 words): {test_words[:20]}")
            print(f"True POS Tags (first 20 words): {test_tags[:20]}")
            print(f"Predicted POS Tags (first 20 words): {pred_pos_sequences[:20]}")

        print(f"Test Accuracy: {accuracy * 100:.4f}%")

### Model Evaluation

In [4]:
pos_hmm = POS_HMM(corpus = conll2000.tagged_sents(tagset='universal'))
pos_hmm.init_params()
pos_hmm.evaluation(debug = False)


Populating Transition Matrix: 100%|██████████| 12/12 [00:15<00:00,  1.31s/it]
Populating Emission Matrix: 100%|██████████| 12/12 [00:00<00:00, 57.45it/s]
Populating Initial Probability Matrix: 100%|██████████| 12/12 [00:00<?, ?it/s]


Test Accuracy: 96.0369%
