---

layout: post
title: Virtebi Algorithm and Hidden Markov Model - Part 2

---


In [13]:
import pandas as pd
import nltk.corpus as corp
import nltk
import pomegranate as pm

In [2]:
# download the corpus from nltk repository
if not corp.brown:
    nltk.download('brown')

In [3]:
print('Brown corpus has {:,} tagged words / unigram'.format(len(corp.brown.tagged_words())))

Brown corpus has 1,161,192 tagged words / unigram


In [4]:
corp.brown.tagged_words()[:10]

[('The', 'AT'),
 ('Fulton', 'NP-TL'),
 ('County', 'NN-TL'),
 ('Grand', 'JJ-TL'),
 ('Jury', 'NN-TL'),
 ('said', 'VBD'),
 ('Friday', 'NR'),
 ('an', 'AT'),
 ('investigation', 'NN'),
 ('of', 'IN')]

In [5]:
# sampling for the first 5000 tokens just for demo sake
sample = corp.brown.tagged_words()[:5000]
print('We have sample: {} tokens'.format(len(sample)))

We have sample: 5000 tokens


In [7]:
def construct_discrete_distributions_per_words(tagged_tokens):
    tag_probs = dict()
    for token, tag in tagged_tokens:
        if tag not in tag_probs:
            tag_probs[tag] = dict()
            tag_probs[tag]['count_tag'] = dict()
            tag_probs[tag]['occurence'] = 1
        else:
            tag_probs[tag]['occurence'] += 1
            
        if token not in tag_probs[tag]['count_tag']:
            tag_probs[tag]['count_tag'][token] = 1
        else:
            tag_probs[tag]['count_tag'][token] += 1
            
    for tag in tag_probs:
        tag_probs[tag]['probs'] = dict()
        for token in tag_probs[tag]['count_tag']:
            tag_probs[tag]['probs'][token] = float(tag_probs[tag]['count_tag'][token]) / float(tag_probs[tag]['occurence'])
    
    return tag_probs            

In [21]:
def construct_transition_probabilities_per_words(tagged_tokens):
    transition_probs = dict()
    for i in range(len(tagged_tokens)):
        current_tag = tagged_tokens[i][1]
        if current_tag not in transition_probs:
            transition_probs[current_tag] = dict()
            transition_probs[current_tag]['occurence'] = 0
            transition_probs[current_tag]['count_transition'] = dict()
        
        # evaluate previous tag
        if i > 0:
            previous_tag = tagged_tokens[i-1][1]
            pt = tagged_tokens[i-1][0]
            transition_probs[previous_tag]['occurence'] += 1
            
            # special case for <start> tag
            if pt == '.':
                if '<start>' not in transition_probs:
                    transition_probs['<start>'] = dict()
                    transition_probs['<start>']['occurence'] = 0
                    transition_probs['<start>']['count_transition'] = dict()
                if current_tag not in transition_probs['<start>']['count_transition']:
                    transition_probs['<start>']['count_transition'][current_tag] = 0
                    
                transition_probs['<start>']['count_transition'][current_tag] += 1
                transition_probs['<start>']['occurence'] += 1
                    
            
            #init
            if current_tag not in transition_probs[previous_tag]['count_transition']:
                transition_probs[previous_tag]['count_transition'][current_tag] = 0
                
            transition_probs[previous_tag]['count_transition'][current_tag] += 1
    for tag in transition_probs:
        transition_probs[tag]['probs'] = dict()
        for transit_tag in transition_probs[tag]['count_transition']:
            transition_probs[tag]['probs'][transit_tag] = \
                float(transition_probs[tag]['count_transition'][transit_tag]) / float(transition_probs[tag]['occurence'])
    
    return transition_probs

In [61]:
def build_hmm_model(token_dist, transition_dist, model_name='hmm-tagger'):
    state_dict = dict()
    for token in token_dist:
        state_dict[token] = \
            pm.State(
                pm.DiscreteDistribution(
                    token_dist[token]['probs']
                )
                , name=token
            )
            
    model = pm.HiddenMarkovModel(model_name)
    model.add_states(list(state_dict.values()))
    
    # initialization for starting tokens
    for token, prob in transition_dist['.']['probs'].items():
        model.add_transition(state_dict[token], model.end, prob)
        
    for token, prob in transition_dist['<start>']['probs'].items():
        model.add_transition(model.start, state_dict[token], prob)
    
    transition_dist_list = list(transition_dist.items())
    for i in range(1, len(transition_dist_list)):
        ptoken, pmeta = transition_dist_list[i]
        if ptoken != '.' and ptoken != '<start>':
            for ctoken, cprob in pmeta['probs'].items():
                
                model.add_transition(
                    state_dict[ptoken],
                    state_dict[ctoken],
                    cprob
                )

        
    return model, state_dict

In [62]:
from sklearn.base import BaseEstimator, ClassifierMixin 

In [92]:
class HmmTaggerModel(BaseEstimator, ClassifierMixin):
    """
    POS Tagger with Hmm Model
    """
    def __init__(self):
        self._inner_model = None
        self._tag_dist = None
        self._transition_dist = None
        self._state_dict = None
    
    def fit(self, X, y=None):
        """
        expecting X as list of tokens, while y is list of POS tag
        """
        combined = list(zip(X, y))
        self._tag_dist = construct_discrete_distributions_per_words(combined)
        self._transition_dist = construct_transition_probabilities_per_words(combined)
        
        self._inner_model, _ = build_hmm_model(self._tag_dist, self._transition_dist)
        self._inner_model.bake()
        
    
    def predict(self, X, y=None):
        """
        expecting X as list of tokens
        """
        return [state.name for i, state in self._inner_model.viterbi(X)[1]][1:-1]

In [64]:
def accuracy(ypred, ytrue):
    total = len(ytrue)
    correct = 0
    for pred, true in zip(ypred, ytrue):
        if ypred == ytrue:
            correct += 1
    
    return correct / total

In [93]:
model = HmmTaggerModel()
model.fit(map(lambda x: x[0], sample), map(lambda x: x[1], sample))

In [116]:
import numpy as np

In [117]:
st_idx = [0, 68]
end_idx = [10, 72]

mean_acc = np.mean([
    accuracy(
        list(map(lambda x: x[1], sample[st:end])),
        model.predict(
            list(map(lambda x: x[0], sample[st:end]))
        )
    )    
    for st, end in zip(st_idx, end_idx)
])

print('mean accuracy: {}'.format(mean_acc))

mean accuracy: 1.0


In [119]:
larger_sample = corp.brown.tagged_words()[:500000]
model = HmmTaggerModel()
model.fit(
    map(lambda x: x[0], larger_sample), 
    map(lambda x: x[1], larger_sample)
)

In [128]:
st_idx = [0, 68, 103]
end_idx = [10, 72, 118]

mean_acc = np.mean([
    accuracy(
        list(map(lambda x: x[1], sample[st:end])),
        model.predict(
            list(map(lambda x: x[0], sample[st:end]))
        )
    )    
    for st, end in zip(st_idx, end_idx)
])

print('mean accuracy: {}'.format(mean_acc))

mean accuracy: 1.0


# References:
- https://en.wikipedia.org/wiki/Brown_Corpus
- https://pomegranate.readthedocs.io/en/latest/HiddenMarkovModel.html#hiddenmarkovmodel
