# POS Tagging using HMM and Viterbi
A hidden Markov model (HMM) allows us to talk about both observed events (like words that we see in the input) and hidden events (like part-of-speech tags) in our probabilistic model. An HMM is speciﬁed by the following components:

$Q = q_1 q_2 ...q_N$

a set of N states

$A = a_{11} ...a_{ij} ...a_{NN}$

a transition probability matrix A, each a ij representing the probability of moving from state i to state j, s.t. $\sum^{N}_{j=1}a_{ij} = 1 ∀i$

$O = o_1 o_2 ...o_T$

a sequence of T observations, each one drawn from a vocabulary $V = v_1 ,v_2 ,...,v_V$

$B = b_i (o_t )$

a sequence of observation likelihoods, also called emission probabilities, each expressing the probability of an observation $o_t$ being generated from a state $q_i$ 

$π = π_1 ,π_2 ,...,π_N$

an initial probability distribution over states. $π_i$ is the probability that the Markov chain will start in state i. Some states j may have $π_j$ = 0, meaning that they cannot be initial states. Also, $\sum_{i=1}^n π_i = 1$

In [1]:
pos_file = './data/wsj_sec0_pos'

In [3]:
import sys
import math
from collections import Counter

def process_line(s):
    words = ['bos']
    tags = ['BOS']
    for item in s.split(' '):
        w, t = '/'.join(item.split('/')[:-1]), item.split('/')[-1]
        words.append(w)
        tags.append(t)
    return words+['eos'], tags+['EOS']

data = []

with open(pos_file, 'r') as f:
    for line in f.readlines():
        data.append(process_line(line.strip()))

In [5]:
print(data[0])

(['bos', 'Pierre', 'Vinken', ',', '61', 'years', 'old', ',', 'will', 'join', 'the', 'board', 'as', 'a', 'nonexecutive', 'director', 'Nov.', '29', '.', 'eos'], ['BOS', 'NNP', 'NNP', ',', 'CD', 'NNS', 'JJ', ',', 'MD', 'VB', 'DT', 'NN', 'IN', 'DT', 'JJ', 'NN', 'NNP', 'CD', '.', 'EOS'])


## HMM

In [87]:
from collections import defaultdict

transition_prob = defaultdict(dict)
emission_prob = defaultdict(dict)
state_set = set()
transition_cnt = Counter()
tag_cnt = Counter()
emission_cnt = Counter()

for s in data:
    words, tags = s
    tag_cnt.update(tags)
    for bi_t in zip(*[tags[i:] for i in range(2)]):
        transition_cnt.update((bi_t,))
    for t_w in zip(tags, words):
        emission_cnt.update((t_w,))

for item in sorted(transition_cnt.items()):
    tran, cnt = item
    prob = cnt / tag_cnt[tran[0]]
    transition_prob[tran[0]][tran[1]] = prob
    state_set.update(list(tran))

for item in sorted(emission_cnt.items()):
    emi, cnt = item
    prob = cnt / tag_cnt[emi[0]]
    emission_prob[emi[0]][emi[1]] = prob

In [47]:
import random
random.choice(list(transition_prob.items()))

('WDT',
 {'CD': 0.005025125628140704,
  'DT': 0.03015075376884422,
  'EX': 0.005025125628140704,
  'IN': 0.005025125628140704,
  'JJS': 0.005025125628140704,
  'MD': 0.11055276381909548,
  'NNP': 0.010050251256281407,
  'NNS': 0.010050251256281407,
  'PRP': 0.04020100502512563,
  'RB': 0.06532663316582915,
  'RBR': 0.005025125628140704,
  'TO': 0.005025125628140704,
  'VBD': 0.18090452261306533,
  'VBN': 0.005025125628140704,
  'VBP': 0.20603015075376885,
  'VBZ': 0.3065326633165829,
  '``': 0.005025125628140704})

In [50]:
random.choice(list(emission_prob.items()))

('PRP',
 {'He': 0.03982777179763186,
  'I': 0.059203444564047365,
  'It': 0.059203444564047365,
  'She': 0.0193756727664155,
  'They': 0.03552206673842842,
  'We': 0.03336921420882669,
  'You': 0.008611410118406888,
  'he': 0.12701829924650163,
  'her': 0.017222820236813777,
  'herself': 0.001076426264800861,
  'him': 0.00968783638320775,
  'himself': 0.0032292787944025836,
  'it': 0.26480086114101187,
  'itself': 0.0032292787944025836,
  'me': 0.004305705059203444,
  'one': 0.002152852529601722,
  'she': 0.07104413347685684,
  'them': 0.038751345532831,
  'themselves': 0.008611410118406888,
  'they': 0.12594187298170076,
  'us': 0.012917115177610334,
  'we': 0.02583423035522067,
  'you': 0.02906350914962325})

## Viterbi Decoding
Viterbi is an instance of dynamic programming. Given that we had already computed the probabilbeing in every state at time $t−1$, we compute the Viterbi probability by taking the most probable of the extensions of the paths that lead to the current cell.

$v_t(j) = max^N_{i=1} v_{t-1}(i) a_{ij} b_j (o_t )$

In [111]:
import numpy as np

def viterbi(state, observation, hmm):
    """
    state: list of all states
    observation: list of observation
    hmm: HMM model created
    """
    N, T = len(state), len(observation) # N: length of state graph, T: length of observation
    viterbi_matrix = np.zeros([N, T]) # create matrix for path probability
    backpointer = np.zeros([N, T])
    # initialization
    for s in range(N):
        try:
            a = hmm['transition']['BOS'][state[s]]
            viterbi_matrix[s][0] = hmm['emission'][state[s]][observation[0]] * a
        except:
            pass
        backpointer[s][0] = 0
    # recursion
    for t in range(1, T):
        for s in range(N):
            a = np.array([hmm['transition'][s_].get(state[s], 0) for s_ in state])
            b = hmm['emission'][state[s]].get(observation[t], 0)
            viterbi_matrix[s][t] = np.max(viterbi_matrix[:,t-1] * a * b)
            backpointer[s][t] = np.argmax(viterbi_matrix[:,t-1] * a * b)
    # find the best path
    best_path_prob = np.max(viterbi_matrix[:, -1])
    best_path_pointer = int(np.argmax(viterbi_matrix[:, -1]))
    best_path = [best_path_pointer]
    for t in range(1, T)[::-1]:
        best = int(backpointer[best_path_pointer][t])
        best_path = [best] + best_path
        best_path_pointer = best
    best_path = [state[i] for i in best_path]
    return best_path, best_path_prob

def get_pos(s, state, hmm):
    """
    s: input sentence (observation)
    state: list of all states
    hmm: HMM model created
    """
    s_list = s.strip().split()
    symbol_set = set([x for y in [list(item.keys()) for item in hmm['emission'].values()] for x in y])
    s_list = [c if c in symbol_set else '<unk>' for c in s_list]
    best_path, best_path_prob = viterbi(state, s_list, hmm)
    return best_path, best_path_prob

In [112]:
hmm = dict()
hmm['transition'] = transition_prob
hmm['emission'] = emission_prob

In [113]:
test_text = "Mr. Vinken is chairman of Elsevier N.V. , the Dutch publishing group ."
get_pos(test_text, list(state_set), hmm)

(['NNP',
  'NNP',
  'VBZ',
  'NN',
  'IN',
  'NNP',
  'NNP',
  ',',
  'DT',
  'NNP',
  'NN',
  'NN',
  '.'],
 2.3389568232423102e-37)