Submitted by Tarang Ranpara (202011057)

In [13]:
import nltk
from nltk.corpus import brown
from sklearn.model_selection import train_test_split
nltk.download('brown')
nltk.download('averaged_perceptron_tagger')
nltk.download('universal_tagset')

[nltk_data] Downloading package brown to /root/nltk_data...
[nltk_data]   Package brown is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package universal_tagset to /root/nltk_data...
[nltk_data]   Package universal_tagset is already up-to-date!


True

In [11]:
def gen_dataset(data):
    train_data, test_data =  train_test_split(data, test_size = 0.2, random_state = 50)

    x_train = [[word.lower() for word,_ in sentence] for sentence in train_data]
    x_test = [[word.lower() for word,_ in sentence] for sentence in test_data]

    y_train = [[tag for _, tag in sentence] for sentence in train_data]
    y_test = [[tag for _ , tag in sentence] for sentence in test_data]

    return x_train, y_train, x_test, y_test

In [15]:
class NLTK_POS_TAGGER:
    def __init__(self, x_test, y_test):
        self.x_test = x_test
        self.y_test = y_test
        self.y_pred = []

    def predict(self):
        for sentence in self.x_test:
            pred = nltk.pos_tag(sentence, tagset='universal')
            pred = [tag for _,tag in pred]
            self.y_pred.append(pred)

    def evaluate(self):
        if self.y_pred == []:
            self.predict()

        total, match = 0, 0
        for i in range(len(self.y_test)):
            for pred, ground_truth in zip(self.y_pred[i], self.y_test[i]):
                if pred == ground_truth:
                    match += 1

                total += 1

        return match/total * 100

In [33]:
class HiddenMarkovModel:
    def __init__(self, x_train, y_train, x_test, y_test):
        self.transition_matrix = dict()

        self.x_train = x_train
        self.y_train = y_train
        self.x_test = x_test
        self.y_test = y_test

        self.tags, self.words = set(), set() 
        self.tags, self.words = self.list_unique(self.y_train), self.list_unique(self.x_train)
        self.y_pred = []

    def fit(self):
        print('Fitting the HMM model.')
        self.start_probability = self.compute_start_probability()
        self.transition_matrix = self.compute_transition_matrix()
        self.emission_matrix = self.compute_emission_matrix()

    def predict(self):
        self.fit()
        print('generating predictions.')
        for sentence in self.x_test:
            pred = self.viterbi_algo(sentence, self.tags, self.start_probability, self.transition_matrix, self.emission_matrix, self.words)
            self.y_pred.append(pred)

    def evaluate(self):
        self.predict()
        print('evaulating the HMM model.')
        total_words = 0
        true_prediction = 0
        total_sent = len(self.y_test)
        for i in range(total_sent):
            no_of_tokens = len(self.y_test[i])
            for j in range(no_of_tokens):
                if self.y_pred[i][j] == self.y_test[i][j]:
                    true_prediction += 1
                total_words += 1
        acc = (true_prediction/total_words)*100
        return acc

    def list_unique(self, data):
        unique_elements = set()
        for sentence in data:
            for token in sentence:
                unique_elements.add(token)
        return unique_elements

    def compute_start_probability(self):
        start_probability = dict()
        
        for tag in self.tags:
            start_probability[tag] = 0
        
        for sentence in self.y_train:
            start_probability[sentence[0]] += 1

        sum_ = sum(start_probability.values())

        for tag in self.tags:
            start_probability[tag] /= sum_
    
        return start_probability

    def compute_transition_matrix(self):
        transition_matrix = dict()

        for tag1 in self.tags:
            transition_matrix[tag1] = dict() 
            for tag2 in self.tags:
                transition_matrix[tag1][tag2] = 0
        
        for sentence in self.y_train:
            m = len(sentence)
            for i in range(1,m):
                transition_matrix[sentence[i-1]][sentence[i]] += 1

        for tag1 in self.tags:
            sum_ = sum(transition_matrix[tag1].values())
            for tag2 in self.tags:
                transition_matrix[tag1][tag2] /= sum_
        return transition_matrix

    def compute_emission_matrix(self):
        emission_matrix = dict()

        for tag in self.tags:
            emission_matrix[tag] = dict()
            for word in self.words:
                emission_matrix[tag][word] = 0

        for sentence, tags in zip(self.x_train, self.y_train):
            for tag, word in zip(tags, sentence):
                emission_matrix[tag][word] +=  1 
        
        for tag in self.tags:
            sum_ = sum(emission_matrix[tag].values())
            for word in self.words:
                emission_matrix[tag][word] /= sum_

        return emission_matrix

    def viterbi_algo(self, obs, states, s_pro, t_pro, e_pro, words):
        path = { s:[s] for s in states}
        curr_pro = {}
        for s in states:
            word_pro = 1 if obs[0] not in words else e_pro[s][obs[0]]
            curr_pro[s] = s_pro[s]*word_pro

        for i in range(1, len(obs)):
            new_path = { s:[] for s in states}
            last_pro = curr_pro.copy()
            curr_pro = {}
            for curr_state in states:
                word_pro = 1 if obs[i] not in words else e_pro[curr_state][obs[i]]
                max_pro, last_sta = max(((last_pro[last_state]*t_pro[last_state][curr_state]*word_pro, last_state) for last_state in states))
                curr_pro[curr_state] = max_pro
                new_path[curr_state] =path[last_sta] + [curr_state]
            path = new_path
        max_pro = -1
        max_path = None
        for s in states:
            if curr_pro[s] > max_pro:
                max_path = path[s]
                max_pro = curr_pro[s]
        return max_path

In [10]:
tagged_corpus = nltk.corpus.brown.tagged_sents(tagset='universal')

In [14]:
x_train, y_train, x_test, y_test = gen_dataset(tagged_corpus)

In [16]:
nltk_pos_tagger = NLTK_POS_TAGGER(x_test, y_test)

In [29]:
print(f'NLTK pos tagger accuracy: {nltk_pos_tagger.evaluate()}%')

NLTK pos tagger accuracy: 90.71294639785317%


In [34]:
hmm = HiddenMarkovModel(x_train, y_train, x_test, y_test)

In [35]:
print(f'HMM model accuracy: {hmm.evaluate()}%')

Fitting the HMM model.
generating predictions.
evaulating the HMM model.
HMM model accuracy: 95.89468795155852%
