## IMPORTS

In [118]:
import numpy as np

## DATASET

In [4]:
"""
Download POS dataset from NLTK library
"""
import nltk
nltk.download('conll2000')

"""
Load the dataset
"""
from nltk.corpus import conll2000
tagged_sentences = conll2000.tagged_sents()

[nltk_data] Downloading package conll2000 to /Users/vinay/nltk_data...
[nltk_data]   Package conll2000 is already up-to-date!


In [199]:
"""
View the data
"""
print(tagged_sentences[7])
print(len(tagged_sentences))

[('The', 'DT'), ('August', 'NNP'), ('deficit', 'NN'), ('and', 'CC'), ('the', 'DT'), ('#', '#'), ('2.2', 'CD'), ('billion', 'CD'), ('gap', 'NN'), ('registered', 'VBN'), ('in', 'IN'), ('July', 'NNP'), ('are', 'VBP'), ('topped', 'VBN'), ('only', 'RB'), ('by', 'IN'), ('the', 'DT'), ('#', '#'), ('2.3', 'CD'), ('billion', 'CD'), ('deficit', 'NN'), ('of', 'IN'), ('October', 'NNP'), ('1988', 'CD'), ('.', '.')]
10948


In [200]:
"""
Convert the data into a suitable training format
"""
x_train = []
y_train = []
for sent in tagged_sentences:
    x,y = zip(*sent)
    x_train.append(list(x))
    y_train.append(list(y))

In [248]:
"""
Utility function for creating a map from a set
"""
def convert_set_to_map(s):
    m = {}
    for item in s:
        m[item] = len(m)
    return m

## MODEL

In [353]:
"""
Hidden Markov Model Sequence Tagger
Training - uses Supervised training
Estimation - Uses forward algorithm to compute likelihood of a sequence
Inference - Viterbi Decoding
Applications - Sequence Tagging problems such as POS Tagger, NER
"""
class HMMTagger():
    def __init__(self):
        pass
    
    def train(self, x, y):
        # build word and tag index maps
        vocab = set(word for sent in x for word in sent)
        self.word_to_idx = convert_set_to_map(vocab)

        pos_tags = set(pos_tag for sent in y for pos_tag in sent)
        self.tags_to_idx = convert_set_to_map(pos_tags)
        
        self.vocab_length = len(vocab)
        self.tag_length = len(pos_tags)
        
        # initialize matrices
        self.start_prob = np.zeros(self.tag_length)
        self.transition_prob = np.zeros((self.tag_length, self.tag_length))
        self.emission_prob = np.zeros((self.vocab_length, self.tag_length))
        
        # train transition probabilities
        for label_seq in y:
            # add count for starting tag
            first_tag_id = tags_to_idx[label_seq[0]]
            self.start_prob[first_tag_id] += 1
            
            # compute for consecutive pairs
            for i in range(len(label_seq)-1):
                first_tag_id = tags_to_idx[label_seq[i]]
                second_tag_id = tags_to_idx[label_seq[i+1]]
                self.transition_prob[first_tag_id][second_tag_id] += 1
            
        # train emission probabilities
        for i, sent in enumerate(x):
            for j, word in enumerate(sent):
                word_id = word_to_idx[word]
                tag_id = tag_to_idx[y[i][j]]
                self.emission_prob[word_id][tag_id] += 1
                
        # normalize the probabilites
        self.transition_prob = self.transition_prob/np.sum(self.transition_prob, axis=1, keepdims=True)
        self.emission_prob = self.emission_prob/np.sum(self.emission_prob, axis=1, keepdims=True)
        self.start_prob = self.start_prob/self.start_prob.sum()
     
    
    def train_unsupervised(self, x, pos_tags):
        # build word map
        vocab = set(word for sent in x for word in sent)
        self.word_to_idx = convert_set_to_map(vocab)
        
        self.tags_to_idx = convert_set_to_map(pos_tags)
        
        self.vocab_length = len(vocab)
        self.tag_length = len(pos_tags)
        
        # randomly initialize transition and emission matrices
        self.start_prob = np.random.uniform(size=self.tag_length)
        self.transition_prob = np.random.uniform(size=(self.tag_length, self.tag_length))
        self.emission_prob = np.random.uniform(size=(self.vocab_length, self.tag_length))
        
        # normalize the probabilites
        self.transition_prob = self.transition_prob/np.sum(self.transition_prob, axis=1, keepdims=True)
        self.emission_prob = self.emission_prob/np.sum(self.emission_prob, axis=1, keepdims=True)
        self.start_prob = self.start_prob/self.start_prob.sum()
        
        # Baum-Welch algorithm (a special case of Expectation Maximization)
        # epsilon(i,j,t) -> expected number of transitions from state i->j at time t
        # gamma(j,t) -> expected number of occurrences of token t when at state j
        num_iter = 10
        while num_iter > 0:
            
            # E-step
            
            
            # M-step
            
            num_iter--
        
        
    # MLE Estimate of a sequence
    def forward(self, x):
        path = np.zeros((self.tag_length, len(x)))
        
        # initialization
        for i in range(self.tag_length):
            path[i][0] = self.start_prob[i]*self.emission_prob[x[0]][i]
        
        # recursion
        for t in range(1, len(x)):
            for s in range(self.tag_length):
                curr_sum = 0.0
                for prev_state in range(self.tag_length):
                    curr_sum += self.transition_prob[prev_state][s]*path[prev_state][t-1]*self.emission_prob[x[t]][s]
                path[s][t] = curr_sum
                
        # termination: compute likelihood
        fwd_prob = np.sum(path[:, -1])
        return fwd_prob, path
    
    
    def backward(self, x):
        path = np.zeros((self.tag_length, len(x)))
        
        # initialization
        for i in range(self.tag_length):
            path[i][len(x)-1] = 1
        
        # recursion
        for t in range(len(x)-2, -1, -1):
            for s in range(self.tag_length):
                curr_sum = 0.0
                for next_state in range(self.tag_length):
                    curr_sum += self.transition_prob[s][next_state]*path[next_state][t+1]*self.emission_prob[x[t+1]][next_state]
                path[s][t] = curr_sum
                
        # termination: compute backward likelihood
        back_prob = 0.0
        for i in range(self.tag_length):
            back_prob += self.start_prob[i]*self.emission_prob[x[0]][i]*path[i][0]
        return back_prob, path

    
    def get_seq_prob(self, x):
        x = self._vectorize(x)
        fwd_prob, fwd_path = self.forward(x)
        back_prob, back_path = self.backward(x)
        
        print(fwd_path.shape)
        
        print(fwd_prob, " : ", back_prob)
        
        for t in range(len(x)):
            curr_sum = 0.0
            for s in range(self.tag_length):
                curr_sum += fwd_path[s][t]*back_path[s][t]
            print("Time : ", t, " and prob : ", curr_sum)
            
    # Viterbi decoding
    def predict(self, x):
        x = self._vectorize(x)
        path = np.zeros((self.tag_length, len(x)))
        back_pointers = np.full((self.tag_length, len(x)), -1)
        
        for i in range(self.tag_length):
            path[i][0] = self.start_prob[i]*self.emission_prob[x[0]][i]
        
        for t in range(1, len(x)):
            for s in range(self.tag_length):
                curr_max = 0.0
                best_prev_state = 0
                for prev_s in range(self.tag_length):
                    if self.transition_prob[prev_s][s]*path[prev_s][t-1] > curr_max:
                        curr_max = self.transition_prob[prev_s][s]*path[prev_s][t-1]
                        best_prev_state = prev_s
                path[s][t] = curr_max*self.emission_prob[x[t]][s]
                back_pointers[s][t] = best_prev_state
        
        # compute the backward trajectory
        max_prob_score = np.max(path[:, -1])
        argmax_index = np.argmax(path[:, -1])
                
        best_path = [argmax_index]
        t = len(x)-1
        while True:
            state = back_pointers[argmax_index][t]
            if state == -1:
                break
            best_path.append(state)
            t = t-1
        
        return self._decode_tags(reversed(best_path))
    
    def _vectorize(self, sent):
        vector = []
        for word in sent.split():
            vector.append(self.word_to_idx[word])
        return vector

    def _decode_tags(self, tag_ids):
        tags = []
        for tag_id in tag_ids:
            for tag, idx in self.tags_to_idx.items():
                if tag_id == idx:
                    tags.append(tag)
        return tags

In [354]:
"""
Create the model and train it
"""
hmm_tagger = HMMTagger()
hmm_tagger.train(x_train, y_train)

In [364]:
"""
Test the model on test data
"""
test_sentence = "It is pouring heavily in here"
tags = hmm_tagger.predict(test_sentence)
print(list(zip(test_sentence.split(), tags)))

print(hmm_tagger.get_seq_prob(test_sentence))

[('It', 'PRP'), ('is', 'VBZ'), ('pouring', 'VBG'), ('heavily', 'RB'), ('in', 'IN'), ('here', 'RB')]
(44, 6)
4.337771033860644e-08  :  4.337771033860644e-08
Time :  0  and prob :  4.337771033860644e-08
Time :  1  and prob :  4.337771033860645e-08
Time :  2  and prob :  4.337771033860645e-08
Time :  3  and prob :  4.337771033860644e-08
Time :  4  and prob :  4.337771033860644e-08
Time :  5  and prob :  4.337771033860644e-08
None


In [332]:
"""
Test the results using NLTK library
"""
nltk.download('brown')
# Initialize and tag with HiddenMarkovModelTagger
tagger = nltk.tag.HiddenMarkovModelTagger.train(nltk.corpus.brown.tagged_sents(categories='news'))

[nltk_data] Downloading package brown to /Users/vinay/nltk_data...
[nltk_data]   Package brown is already up-to-date!


In [365]:
# Tokenize the sentence into words
tokens = nltk.word_tokenize(test_sentence)

# Perform POS tagging
pos_tags = tagger.tag(tokens)

# Perform POS tagging
pos_tags = nltk.pos_tag(tokens)

print(pos_tags)

[('It', 'PRP'), ('is', 'VBZ'), ('pouring', 'VBG'), ('heavily', 'RB'), ('in', 'IN'), ('here', 'RB')]
