In [5]:
import nltk
#Download the dataset and tagset from below:
nltk.download('conll2000')
nltk.download('universal_tagset')
from nltk.corpus import conll2000

[nltk_data] Downloading package conll2000 to
[nltk_data]     /Users/hassanadnan/nltk_data...
[nltk_data]   Unzipping corpora/conll2000.zip.
[nltk_data] Downloading package universal_tagset to
[nltk_data]     /Users/hassanadnan/nltk_data...
[nltk_data]   Unzipping taggers/universal_tagset.zip.


In [6]:
import numpy as np
from sklearn.model_selection import train_test_split
from collections import Counter
from tqdm import tqdm

In [15]:
class POS_HMM:
    def __init__(self, corpus): #DO NOT MODIFY THIS FUNCTION

        #-----------------DO NOT MODIFY ANYTHING BELOW THIS LINE-----------------
        self.corpus = corpus
        self.train_set, self.test_set = train_test_split( self.corpus, train_size=0.85,random_state = 777)        
        
        # Extracting Vocabulary and Tags from our training set
        self.all_pairs = [(word, tag) for sentence in self.train_set for word, tag in sentence] #List of tuples (word, POS tag) or a flattened list of tuples by concatenating all sentences
        self.vocab = tuple(set(word for (word, _) in self.all_pairs))

        self.all_tags = [tag for (_, tag) in self.all_pairs] #List of all POS tags in the trainset
        self.tags = tuple(set(self.all_tags)) #List of unique POS tags
        self.word_tag_counts = Counter(self.all_pairs)

        # Mapping vocab and tags to integers for indexing
        self.vocab2index = {word: i for i, word in enumerate(self.vocab)}
        self.tag2index = {tag: i for i, tag in enumerate(self.tags)}

        self.vocab_len = len(self.vocab) #Total Number of Vocab (Unique Words)
        self.all_tag_lengths = len(self.all_tags) #Number of tags in the trainset
        self.tag_len = len(self.tags) #Number of unique tags

        # Initialize transition and emission matrices (Default: Zeros)
        self.transition_mat = np.zeros((self.tag_len, self.tag_len))
        self.emission_mat = np.zeros((self.tag_len, self.vocab_len))
        self.initial_state_prob = np.zeros(self.tag_len)

        # Initialize POS Tag occurance probabilities for getting most likely POS Tags for unknown words
        self.tag_occur_prob= {} #Dictionary of POS Tag occurance probabilities
        all_tag_counts = Counter(self.all_tags)
        for tag in self.tags:
            self.tag_occur_prob[tag] = all_tag_counts[tag]/self.all_tag_lengths
        self.tag_counts = Counter(tag for _, tag in self.all_pairs)
        #-----------------Add additional variables here-----------------
        def word_given_tag(word, tag):
            count_tag = self.all_tags.count(tag)
            count_w_given_tag = self.all_pairs.count((word, tag))
            return count_w_given_tag / count_tag if count_tag != 0 else 0

        def next_tag_given_prev_tag(tag2, tag1):
            count_t1 = self.all_tags.count(tag1)
            count_t2_t1 = sum(1 for i in range(len(self.all_tags) - 1) 
                              if self.all_tags[i] == tag1 and self.all_tags[i + 1] == tag2)
            return count_t2_t1 / count_t1 if count_t1 != 0 else 0


    def init_params(self): #Initialize transition and emission matrices via Supervised Learning (Counting Occurences of emissions and transitions observed in the data).
        all_pairs_array = np.array(self.all_pairs)
        tags_array = np.array(self.all_tags)

        #------------------- Space provided for any additional data structures that you may need or any process that you may need to perform-------------------
        for tag in self.tags:
            tag_index = self.tag2index[tag]
            self.initial_state_prob[tag_index] = self.tag_occur_prob[tag]

            for next_tag in self.tags:
                next_tag_index = self.tag2index[next_tag]
                count_t1_t2 = sum(1 for i in range(len(self.all_pairs) - 1) 
                                  if self.all_pairs[i][1] == tag and self.all_pairs[i + 1][1] == next_tag)
                self.transition_mat[tag_index, next_tag_index] = count_t1_t2 / self.tag_counts[tag]

            for word in self.vocab:
                word_index = self.vocab2index[word]
                self.emission_mat[tag_index, word_index] = self.word_tag_counts[(word, tag)] / self.tag_counts[tag]

        # Normalize matrices
        self.normalize_matrices()

    def normalize_matrices(self):
        self.transition_mat /= self.transition_mat.sum(axis=1, keepdims=True)
        self.emission_mat /= self.emission_mat.sum(axis=1, keepdims=True)
        self.initial_state_prob /= self.initial_state_prob.sum()



        def word_given_tag(word, tag):
            count_tag = self.all_tags.count(tag)
            count_w_given_tag = self.all_pairs.count((word, tag))
            return count_w_given_tag / count_tag if count_tag != 0 else 0
    
        def next_tag_given_prev_tag(tag2, tag1):
            count_t1 = self.all_tags.count(tag1)
            count_t2_t1 = sum(1 for i in range(len(self.all_tags) - 1) 
                            if self.all_tags[i] == tag1 and self.all_tags[i + 1] == tag2)
            return count_t2_t1 / count_t1 if count_t1 != 0 else 0
        
        
        # Compute Transition Matrix
        for i, t1 in enumerate(tqdm(list(self.tag2index.keys()), desc="Populating Transition Matrix", mininterval=10)):
            for j, t2 in enumerate(list(self.tag2index.keys())):
                self.transition_mat[i, j] = next_tag_given_prev_tag(t2, t1)

    # Compute Emission Matrix
        for i, tag in enumerate(tqdm(list(self.tag2index.keys()), desc="Populating Emission Matrix", mininterval=10)):
            for j, word in enumerate(list(self.vocab2index.keys())):
                self.emission_mat[i, j] = word_given_tag(word, tag)
        
        for i, tag in enumerate(tqdm(list(self.tag2index.keys()), desc="Populating Initial Probability Matrix", mininterval=10)):
            self.initial_state_prob[i] = self.all_tags.count(tag) / len(self.all_tags) if len(self.all_tags) != 0 else 0

        # Normalize matrices
        self.transition_mat = self.transition_mat / self.transition_mat.sum(axis=1, keepdims=True)
        self.emission_mat = self.emission_mat / self.emission_mat.sum(axis=1, keepdims=True)
        self.initial_state_prob = self.initial_state_prob / self.initial_state_prob.sum() if self.initial_state_prob.sum() != 0 else self.initial_state_prob
        # Compute Initial State Probability
                
        #The below code may help. You can modify it as per your requirement.
        #for i, tag in enumerate(tqdm(list(self.tag2index.keys()), desc="Populating Initial Probability Matrix", mininterval = 10)):
        #    self.initial_state_prob[i] = None

        
        # Normalize matrices i.e. each row sums to 1
                

        



    def viterbi_decoding(self, sentence): #Sentence is a list words i.e. ["Moon", "Landing", "was", "Faked"]
        
        pred_pos_sequence = []  # Implement the Viterbi Algorithm to predict the POS tags of the given sentence

        # Number of words in the sentence and number of tags
        n_words = len(sentence)
        n_tags = len(self.tags)

        # Probability matrix, initialized with zeros
        # Each cell prob_matrix[t][i] holds the probability of the most probable tag sequence ending in tag t for the first i words in the sentence.
        prob_matrix = np.zeros((n_tags, n_words))

        # Back pointers matrix to reconstruct the path of tags
        back_pointers = np.zeros((n_tags, n_words), dtype=int)

        # Initialization step (i=0 for the first word in the sentence)
        for t in range(n_tags):
            # Probability of tag 't' being the first tag and the first word being emitted by tag 't'
            prob_matrix[t][0] = self.initial_state_prob[t] * self.emission_mat[t][self.vocab2index.get(sentence[0], -1)]
            back_pointers[t][0] = 0

        # Dynamic programming forward pass
        for i in range(1, n_words):
            for t in range(n_tags):
                # Calculate the maximum probability for each tag 't' at position 'i' in the sentence
                # And also find the tag from the previous position that contributes to this maximum probability
                max_prob, max_state = max((prob_matrix[t_prev][i - 1] * self.transition_mat[t_prev][t] * self.emission_mat[t][self.vocab2index.get(sentence[i], -1)], t_prev) for t_prev in range(n_tags))
                prob_matrix[t][i] = max_prob
                back_pointers[t][i] = max_state

        # Find the final best path through backtracking
        best_path = []
        # Start with the most probable last tag
        max_prob_last_tag = np.argmax(prob_matrix[:, n_words - 1])
        best_path.append(max_prob_last_tag)

        # Follow the back pointers to retrieve the best path
        for i in range(n_words - 1, 0, -1):
            best_tag_prev = back_pointers[best_path[-1]][i]
            best_path.append(best_tag_prev)

        # Reverse the path to get it in the correct order
        best_path.reverse()

        # Convert tag indices back to tag names
        pred_pos_sequence = [list(self.tag2index.keys())[tag_index] for tag_index in best_path]

        return pred_pos_sequence






        return pred_pos_sequence
    
   
    def evaluation(self, debug=False): #DO NOT MODIFY THIS FUNCTION
        # Evaluate the model on the test set
        correct, total = 0, 0
        pred_pos_sequences = []

        for test_sentence in self.test_set:
            test_sentence_words, test_sentence_tags = zip(*test_sentence)
            pred_pos_tags = self.viterbi_decoding(test_sentence_words)
            pred_pos_sequences.extend(pred_pos_tags)

            correct += sum(1 for true_tag, pred_tag in zip(test_sentence_tags, pred_pos_tags) if true_tag == pred_tag)
            total += len(test_sentence_words)
        
        accuracy = correct / total if total > 0 else 1

        if debug:
            test_words, test_tags = zip(*[(word, tag) for test_sentence in self.test_set for word, tag in test_sentence])
            print(f"Sentence (first 20 words): {test_words[:20]}")
            print(f"True POS Tags (first 20 words): {test_tags[:20]}")
            print(f"Predicted POS Tags (first 20 words): {pred_pos_sequences[:20]}")

        print(f"Test Accuracy: {accuracy * 100:.4f}%")

### Model Evaluation

In [16]:
pos_hmm = POS_HMM(corpus = conll2000.tagged_sents(tagset='universal'))
pos_hmm.init_params()
pos_hmm.evaluation(debug = False)

Populating Transition Matrix: 100%|██████████| 12/12 [00:01<00:00, 10.32it/s]
Populating Emission Matrix: 100%|██████████| 12/12 [18:28<00:00, 92.39s/it]
Populating Initial Probability Matrix: 100%|██████████| 12/12 [00:00<00:00, 493.70it/s]


Test Accuracy: 94.1660%
