# PA2.2 Part A - Hidden Markov Model (HMM) Based Part Of Speech Tagging

### Introduction

In this notebook, you will be making a Part of Speech (POS) Tagger using an HMM via Supervised Matrix/Parameter Initialization and Viterbi Decoding.

Hidden Markov Model (HMM) is a statistical model used to describe a system that transitions between a set of hidden states over time, generating observable outcomes.

In the context of Part-of-Speech (POS) tagging, HMMs are used to model the sequence of words in a sentence as a sequence of hidden states representing the POS tags. The observable outcomes are the actual words in the sentence.

## Terminology

__Supervised Matrix/Parameter Initialization__: involves setting the initial values of the parameters in the model, specifically the emission probabilities matrix and the transition probabilities matrix. In the case of supervised learning tasks, where the HMM is trained on annotated data (e.g., for Part-of-Speech tagging), the initialization involves estimating the initial values of these matrices based on the observed training data. This initialization is crucial as it provides the starting point for the model to learn and adjust its parameters during the training process. The matrices are typically initialized using statistical information derived from the frequency of transitions and emissions observed in the training dataset.

__Viterbi Decoding__: is a dynamic programming algorithm used for finding the most likely sequence of hidden states (POS tags) given the observed sequence of words. It efficiently calculates the probability of a sequence of states by considering both emission and transition probabilities. Viterbi decoding helps identify the most probable sequence of POS tags for a given sentence based on the trained HMM parameters.

![The 3 Main Questions of HMMs](hmm_questions.png)

## Resources

For additional details of the working of HMMs, Matrix Initializations and Viterbi Decoding, you can also consult [Chapter 8](https://web.stanford.edu/~jurafsky/slp3/8.pdf) of the SLP3 book as reference.

For a more colorful tutorial, you can also refer to this guide [Hidden Markov Models - An interactive illustration](https://nipunbatra.github.io/hmm/)

Another hands-on approach to Viterbi Decoding, can be found in this medium article [Intro to the Viterbi Algorithm](https://medium.com/mlearning-ai/intro-to-the-viterbi-algorithm-8f41c3f43cf3) and can be supplemented by the following slide-set  
[HMM : Viterbi algorithm -a toy example](https://www.cis.upenn.edu/~cis2620/notes/Example-Viterbi-DNA.pdf) from the UPenn CIS 2620 course.

### Instructions

- Follow along with the notebook, filling out the necessary code where instructed.

- <span style="color: red;">Read the Submission Instructions, Plagiarism Policy, and Late Days Policy in the attached PDF.</span>

- <span style="color: red;">Make sure to run all cells for credit.</span>

- <span style="color: red;">Do not remove any pre-written code.</span>

- <span style="color: red;">You must attempt all parts.</span>

For this notebook, in addition to standard libraries i.e. `numpy`, `nltk`, `collections` and `tqdm`, you are permitted to incorporate supplementary libraries, but it is strongly advised to restrict their inclusion to a minimum. However, other HMM toolkits or libraries are strictly prohibited.

In [9]:
import nltk
#Download the dataset and tagset from below:
# nltk.download('conll2000')
# nltk.download('universal_tagset')
from nltk.corpus import conll2000

In [10]:
import numpy as np
from sklearn.model_selection import train_test_split
from collections import Counter
from tqdm import tqdm

### POS HMM Class
The __POS_HMM__ Class contains the following methods:  

1. `__init__(self, corpus)`: initializes the __POS_HMM__ and prepares it for the parameter initialization phase, contains:
    - a corpus, which is further 85-15 split into training and test sets.
    - a list of all the (word, tag) pairs in the training set (all the sentences are concatenated hence sequence is maintained).
    - a tuple of unique words, tags in the training set
    - a dictionary for mapping words and tags to its unique integer identifier.
    - some additional variables to reduce code redundancy in latter parts such as len()
    - Transition, Emission and Initial State Probability Matrices which are initialized to Zeros.
    - Tag Occurance Probability matrix, which is initialized with tag probabilities i.e. 
        (count of a single tag / count of all the tags) in the training set similar to a unigram. This is used for assigning tags to new or __unknown words__ by randomly sampling from all the tag probabilities.
2. `init_params(self)`: (__To Be Implemented__) initializes the transition, emission and initial state probability matrices via supervised matrix/parameter learning.  \
    __Parts to Complete__:
    - a __method__ `word_given_tag(word, tag)`, which takes as input a word and a tag, and __*returns the count of all instances where the word had the assigned tag as its label / count of the tag*__ (i.e. a probability estimate of its occurence).
    - a __method__ `next_tag_given_prev_tag(tag2, tag1)`which takes as input two tags, and __*returns the count of all instances where the tag 2 and was preceeded by tag 1 / count of the first tag*__ (also a probability estimate).
    - __add code__ for populating the initial state probability matrix. This essentially contains the probabilities of all POS tags occuring at the start of the sentence.
    - __add code__ to normalize each rows of the transition, emission and initial state probability (only has one row) matrices.
3. `viterbi_decoding(self, sentence)`: (__To Be Implemented__) returns the mostly likely POS Tags for each word/numeral/punctuation in the sentence.
4. `evaluation(self, debug = False)`: evalutes the performance of the POS Tagger on the test set and returns the testing accuracy (as a %age).

In [11]:
class POS_HMM:
    def __init__(self, corpus): #DO NOT MODIFY THIS FUNCTION

        #-----------------DO NOT MODIFY ANYTHING BELOW THIS LINE-----------------
        self.corpus = corpus
        self.train_set, self.test_set = train_test_split( self.corpus, train_size=0.85,random_state = 777)        
        
        # Extracting Vocabulary and Tags from our training set
        self.all_pairs = [(word, tag) for sentence in self.train_set for word, tag in sentence] #List of tuples (word, POS tag) or a flattened list of tuples by concatenating all sentences
        self.vocab = tuple(set(word for (word, _) in self.all_pairs))

        self.all_tags = [tag for (_, tag) in self.all_pairs] #List of all POS tags in the trainset
        self.tags = tuple(set(self.all_tags)) #List of unique POS tags
        
        # Mapping vocab and tags to integers for indexing
        self.vocab2index = {word: i for i, word in enumerate(self.vocab)}
        self.tag2index = {tag: i for i, tag in enumerate(self.tags)}

        self.vocab_len = len(self.vocab) #Total Number of Vocab (Unique Words)
        self.all_tag_lengths = len(self.all_tags) #Number of tags in the trainset
        self.tag_len = len(self.tags) #Number of unique tags

        # Initialize transition and emission matrices (Default: Zeros)
        self.transition_mat = np.zeros((self.tag_len, self.tag_len))
        self.emission_mat = np.zeros((self.tag_len, self.vocab_len))
        self.initial_state_prob = np.zeros(self.tag_len)

        # Initialize POS Tag occurance probabilities for getting most likely POS Tags for unknown words
        self.tag_occur_prob= {} #Dictionary of POS Tag occurance probabilities
        all_tag_counts = Counter(self.all_tags)
        for tag in self.tags:
            self.tag_occur_prob[tag] = all_tag_counts[tag]/self.all_tag_lengths

        #-----------------Add additional variables here-----------------

    def init_params(self): #Initialize transition and emission matrices via Supervised Learning (Counting Occurences of emissions and transitions observed in the data).
        all_pairs_array = np.array(self.all_pairs)
        tags_array = np.array(self.all_tags)
        tag_counts = Counter(tags_array)
        word_tag_counter = Counter(self.all_pairs)
        #------------------- Space provided for any additional data structures that you may need or any process that you may need to perform-------------------

        

        #-----------------DO NOT ADD ANYTHING BELOW THIS LINE-----------------

        def word_given_tag(word, tag): #Complete this function, skeleton and dummy variables have been provided
            count_tag = tag_counts[tag]
            count_w_given_tag = word_tag_counter[(word, tag)]
            return count_w_given_tag/count_tag
        
        def next_tag_given_prev_tag(tag2, tag1): #Complete this function, skeleton and dummy variables have been provided
            all_tag_counts = Counter(self.all_tags)
            count_t1 = all_tag_counts[tag1]
            count_t2_t1 = sum(1 for i in range(len(self.all_tags) - 1) if self.all_tags[i] == tag1 and self.all_tags[i+1] == tag2)
            return count_t2_t1/count_t1 #Number of Occurences of a tag (2nd tag) given prev tag (1st tag) / Number of Occurences of tag 1
        
        # Compute Transition Matrix
        for i, t1 in enumerate(tqdm(list(self.tag2index.keys()), desc="Populating Transition Matrix", mininterval = 10)):
            for j, t2 in enumerate(list(self.tag2index.keys())):
                self.transition_mat[i, j] = next_tag_given_prev_tag(t2, t1)

        # Compute Emission Matrix
        for i, tag in enumerate(tqdm(list(self.tag2index.keys()), desc="Populating Emission Matrix", mininterval = 10)):
            for j, word in enumerate(list(self.vocab2index.keys())):
                self.emission_mat[i, j] = word_given_tag(word, tag)
        
        #-------------------Add your code here-------------------
        
        # Compute Initial State Probability
                
        #The below code may help. You can modify it as per your requirement.
        for i, tag in enumerate(tqdm(list(self.tag2index.keys()), desc="Populating Initial Probability Matrix", mininterval = 10)):
           count = 0
           for sentence in self.train_set:
               if (sentence[0][1] == tag):
                   count += 1
            
           if (count == 0):
               self.initial_state_prob[i] = self.tag_occur_prob[tag]
           else:
               self.initial_state_prob[i] = count

        
        # Normalize matrices i.e. each row sums to 1
        def check_row_sum_to_one(arr):
            row_sums = np.sum(arr, axis=1)  # Calculate the sum of each row along axis=1
            all_row_sums_one = np.allclose(row_sums, 1.0)  # Check if all row sums are close to 1 (within a small tolerance)
            return all_row_sums_one
        
        self.initial_state_prob /= np.sum(self.initial_state_prob)
        print("sum of initial state prob", sum(self.initial_state_prob))

        self.transition_mat /= np.sum(self.transition_mat, axis=1, keepdims=True)
        all_good = check_row_sum_to_one(self.transition_mat)
        if (all_good):
            print("transition matrix ok")

        self.emission_mat /= np.sum(self.emission_mat, axis=1, keepdims=True)
        all_good = check_row_sum_to_one(self.emission_mat)
        if (all_good):
            print("emission mat ok")


        
        #-----------------DO NOT MODIFY ANYTHING BELOW THIS LINE-----------------



    def viterbi_decoding(self, sentence): #Sentence is a list words i.e. ["Moon", "Landing", "was", "Faked"]
        
        pred_pos_sequence = []  # Implement the Viterbi Algorithm to predict the POS tags of the given sentence

        #-------------------Add your code here-------------------
        # Initializations
        T = len(sentence)  # Number of words in the sentence
        N = len(self.tags) # Number of POS tags
        viterbi = np.zeros((N, T)) # Viterbi matrix
        backpointer = np.zeros((N, T), dtype=int) # Backpointer matrix
        
        # Initialization step
        for s in range(N):
            viterbi[s, 0] = self.initial_state_prob[s] * self.emission_mat[s, self.vocab2index.get(sentence[0], 0)]
            backpointer[s, 0] = 0
        
        # Recursion step
        for t in range(1, T):
            for s in range(N):
                max_val = -np.inf
                max_state = None
                for s_prev in range(N):
                    val = viterbi[s_prev, t-1] * self.transition_mat[s, s_prev] * self.emission_mat[s, self.vocab2index.get(sentence[t], 0)]
                    if val > max_val:
                        max_val = val
                        max_state = s_prev
                viterbi[s, t] = max_val
                backpointer[s, t] = max_state

        
        # Termination step
        max_val = -np.inf
        max_state = None
        for s in range(N):
            if viterbi[s, T-1] > max_val:
                max_val = viterbi[s, T-1]
                max_state = s
        
        # Backtracking
        pred_pos_sequence.append(self.tags[max_state])
        for t in range(T-1, 0, -1):
            max_state = backpointer[max_state, t]
            pred_pos_sequence.insert(0, self.tags[max_state])




    
        
        #-----------------DO NOT MODIFY ANYTHING BELOW THIS LINE-----------------

        return pred_pos_sequence
    
   
    def evaluation(self, debug=False): #DO NOT MODIFY THIS FUNCTION
        # Evaluate the model on the test set
        correct, total = 0, 0
        pred_pos_sequences = []

        for test_sentence in self.test_set:
            test_sentence_words, test_sentence_tags = zip(*test_sentence)
            pred_pos_tags = self.viterbi_decoding(test_sentence_words)
            pred_pos_sequences.extend(pred_pos_tags)

            correct += sum(1 for true_tag, pred_tag in zip(test_sentence_tags, pred_pos_tags) if true_tag == pred_tag)
            total += len(test_sentence_words)
        
        accuracy = correct / total if total > 0 else 1

        if debug:
            test_words, test_tags = zip(*[(word, tag) for test_sentence in self.test_set for word, tag in test_sentence])
            print(f"Sentence (first 20 words): {test_words[:20]}")
            print(f"True POS Tags (first 20 words): {test_tags[:20]}")
            print(f"Predicted POS Tags (first 20 words): {pred_pos_sequences[:20]}")

        print(f"Test Accuracy: {accuracy * 100:.4f}%")

### Model Evaluation

In [12]:
pos_hmm = POS_HMM(corpus = conll2000.tagged_sents(tagset='universal'))
pos_hmm.init_params()
pos_hmm.evaluation(debug = False)

Populating Transition Matrix: 100%|██████████| 12/12 [00:04<00:00,  2.48it/s]
Populating Emission Matrix: 100%|██████████| 12/12 [00:00<00:00, 50.40it/s]
Populating Initial Probability Matrix: 100%|██████████| 12/12 [00:00<00:00, 362.73it/s]


sum of initial state prob 1.0
transition matrix ok
emission mat ok
Test Accuracy: 93.6569%
