# Hidden Markov Models in python

*Notebook for COMP90042, Web search and Text Analysis*

*Copyright The University of Melbourne, 2017*

Here we'll show how the Viterbi algorithm works for HMMs, assuming we have a trained model to start with. Further down we look at the forward and backward algorithms and Baum-Welch.

In [1]:
import numpy as np

Initialise the model parameters based on the example from the lecture slides (values taken from figure). We have three observation types (up, down, unchanged) and also three hidden states. 

In [2]:
A = np.array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]])
pi = np.array([0.5, 0.2, 0.3])
O = np.array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]])

Now let's consider the example observation sequence UP, UP, DOWN, for which we'll try to discover the hidden state sequence.

In [13]:
observations

[0, 0, 1]

In [6]:
states = UP, DOWN, UNCHANGED = 0, 1, 2
observations = [UP, UP, DOWN]

Now we'll code the Viterbi algorithm. It keeps a store of two components, the best scores to reach a state at a give time, and the last step of the path to get there. Scores alpha are initialised to -inf to denote that we haven't set them yet. 

In [7]:
alpha = np.zeros((len(observations), len(states))) # time steps x states
alpha[:,:] = float('-inf')
backpointers = np.zeros((len(observations), len(states)), 'int')

The base case for the recursion sets the starting state probs based on pi and generating the observation.

In [9]:
backpointers

array([[0, 0, 0],
       [0, 0, 0],
       [0, 0, 0]])

In [10]:
# base case, time step 0
alpha[0, :] = pi * O[:,UP]
alpha

array([[ 0.35,  0.02,  0.09],
       [ -inf,  -inf,  -inf],
       [ -inf,  -inf,  -inf]])

Now for the recursive step, where we maximise over incoming transitions reusing the best incoming score, computed above.

In [11]:
# time step 1
for t1 in states:
    for t0 in states:
        score = alpha[0, t0] * A[t0, t1] * O[t1,UP]
        if score > alpha[1, t1]:
            alpha[1, t1] = score
            backpointers[1, t1] = t0
alpha

array([[ 0.35 ,  0.02 ,  0.09 ],
       [ 0.147,  0.007,  0.021],
       [  -inf,   -inf,   -inf]])

Note that the running maximum for any incoming state (t0) is maintained in alpha[1,t1], and the winning state is stored in addition, as a backpointer. 

Repeat with the next observation. (We'd do this as a loop over positions in practice.)

In [None]:
# time step 2
for t2 in states:
    for t1 in states:
        score = alpha[1, t1] * A[t1, t2] * O[t2,DOWN]
        if score > alpha[2, t2]:
            alpha[2, t2] = score
            backpointers[2, t2] = t1
alpha

Now read of the best final state:

In [None]:
t2 = np.argmax(alpha[2,:])
t2

We need to work out the rest of the path which is the best way to reach the final state, t2. We can work this out by taking a step backwards looking at the best incoming edge, i.e., as stored in the backpointers.

In [None]:
t1 = backpointers[2,t2]
t1

Repeat this until we reach the start of the sequence.

In [None]:
t0 = backpointers[1,t1]
t0

Phew. The best state sequence is t=[0, 0, 1].

## Formalising things

Now we can put this all into a function to handle arbitrary length inputs 

In [None]:
def viterbi(params, observations):
    pi, A, O = params
    M = len(observations)
    S = pi.shape[0]
    
    alpha = np.zeros((M, S))
    alpha[:,:] = float('-inf')
    backpointers = np.zeros((M, S), 'int')
    
    # base case
    alpha[0, :] = pi * O[:,observations[0]]
    
    # recursive case
    for t in range(1, M):
        for s2 in range(S):
            for s1 in range(S):
                score = alpha[t-1, s1] * A[s1, s2] * O[s2, observations[t]]
                if score > alpha[t, s2]:
                    alpha[t, s2] = score
                    backpointers[t, s2] = s1
    
    # now follow backpointers to resolve the state sequence
    ss = []
    ss.append(np.argmax(alpha[M-1,:]))
    for i in range(M-1, 0, -1):
        ss.append(backpointers[i, ss[-1]])
        
    return list(reversed(ss)), np.max(alpha[M-1,:])

Let's test the method on the same input, and a longer input observation sequence.

In [None]:
viterbi((pi, A, O), [UP, UP, DOWN])

In [None]:
viterbi((pi, A, O), [UP, UP, DOWN, UNCHANGED, UNCHANGED, DOWN, UP, UP])

## Exhaustive method

Let's verify that we've done the above algorithm correctly by implementing exhaustive search, which forms the cross-product of states^M.

In [None]:
from itertools import product

def exhaustive(params, observations):
    pi, A, O = params
    M = len(observations)
    S = pi.shape[0]
    
    # track the running best sequence and its score
    best = (None, float('-inf'))
    # loop over the cartesian product of |states|^M
    for ss in product(range(S), repeat=M):
        # score the state sequence
        score = pi[ss[0]] * O[ss[0],observations[0]]
        for i in range(1, M):
            score *= A[ss[i-1], ss[i]] * O[ss[i], observations[i]]
        # update the running best
        if score > best[1]:
            best = (ss, score)
            
    return best

In [None]:
exhaustive((pi, A, O), [UP, UP, DOWN])

In [None]:
exhaustive((pi, A, O), [UP, UP, DOWN, UNCHANGED, UNCHANGED, DOWN, UP, UP])

Yay, it got the same results as before. Note that the exhaustive method is practical on anything beyond toy data due to the nasty cartesian product. But it is worth doing to verify the Viterbi code above is getting the right results. 

## Supervised training, aka "visible" Markov model

Let's train the HMM parameters on the Penn Treebank, using the sample from NLTK. Note that this is a small fraction of the treebank, so we shouldn't expect great performance of our method trained only on this data.

In [None]:
from nltk.corpus import treebank

In [None]:
corpus = treebank.tagged_sents()
print(corpus)

We have to first map words and tags to numbers for compatibility with the above methods.

In [None]:
word_numbers = {}
tag_numbers = {}

num_corpus = []
for sent in corpus:
    num_sent = []
    for word, tag in sent:
        wi = word_numbers.setdefault(word.lower(), len(word_numbers))
        ti = tag_numbers.setdefault(tag, len(tag_numbers))
        num_sent.append((wi, ti))
    num_corpus.append(num_sent)
    
word_names = [None] * len(word_numbers)
for word, index in word_numbers.items():
    word_names[index] = word
tag_names = [None] * len(tag_numbers)
for tag, index in tag_numbers.items():
    tag_names[index] = tag

Now let's hold out the last few sentences for testing, so that they are unseen during training and give a more reasonable estimate of accuracy on fresh text.

In [None]:
training = num_corpus[:-10] # reserve the last 10 sentences for testing
testing = num_corpus[-10:]

Next we compute relative frequency estimates based on the observed tag and word counts in the training set. Note that smoothing is important, here we add a small constant to all counts. 

In [None]:
S = len(tag_numbers)
V = len(word_numbers)

# initalise
eps = 0.1
pi = eps * np.ones(S)
A = eps * np.ones((S, S))
O = eps * np.ones((S, V))

# count
for sent in training:
    last_tag = None
    for word, tag in sent:
        O[tag, word] += 1
        # bug fixed here 27/3/17; test was incorrect 
        if last_tag == None:
            pi[tag] += 1
        else:
            A[last_tag, tag] += 1
        last_tag = tag
        
# normalise
pi /= np.sum(pi)
for s in range(S):
    O[s,:] /= np.sum(O[s,:])
    A[s,:] /= np.sum(A[s,:])

Now we're ready to use our Viterbi method defined above

In [None]:
predicted, score = viterbi((pi, A, O), list(map(lambda w_t: w_t[0], testing[0])))

In [None]:
print('%20s\t%5s\t%5s' % ('TOKEN', 'TRUE', 'PRED'))
for (wi, ti), pi in zip(testing[0], predicted):
    print('%20s\t%5s\t%5s' % (word_names[wi], tag_names[ti], tag_names[pi]))

Hey, not bad, only three errors. Can you explain why these might have occurred?

# Advanced material follows

The remainder of the notebook goes well beyond the material taught in the WSTA subject, but may interest the intrepid.

## Marginalisation in Hidden Markov Models

A related problem is marginalisation, when we wish to find the probability of an observation sequence *under any hidden state sequence*. This allows hidden Markov models to function as language models, but also is key to unsupervised training and the central algorithm for training.

As with the Viterbi algorithm, we'll need to start with the mathematical definition and attempt to factorise it (to follow a recursion, thus allowing for dynamic programming). The quantity we wish to compute is $$p(\vec{w}) = \sum_{\vec{t}} p(\vec{t}, \vec{w})$$
where $w$ are the observations (words) and $t$ are the states (tags).

Let's start by expanding the summation 
$$
p(\vec{w})  = \sum_{t_1} \sum_{t_2} \cdots \sum_{t_{N-1}} \sum_{t_N} p(\vec{t}, \vec{w})
$$
and expand the HMM probability 
$$
p(\vec{w})  = \sum_{t_1} \sum_{t_2} \cdots \sum_{t_{N-1}} \sum_{t_N} p(t_1) p(w_1 | t_1) p(t_2 | t_1) p(w_2| t_2) \cdots p(t_{N-1} | t_{N-2}) p(w_{N-1}| t_{N-1}) p(t_{N} | t_{N-1}) p(w_{N}| t_{N})
$$

Let's compare the full marginal probability $p(\vec{w})$ and the probability up to position $N-1$, finishing with tag $t_{N-1}$
$$p(w_1, w_2, \ldots, w_{N-1}, t_{N-1}) = \sum_{t_1} \sum_{t_2} \cdots \sum_{t_{N-1}} p(t_1) p(w_1 | t_1) p(t_2 | t_1) p(w_2| t_2) \cdots p(t_{N-1} | t_{N-2}) p(w_{N-1}| t_{N-1})
$$

They look rather similar, and in fact we can express $p(\vec{w})$ more simply as
$$
p(\vec{w})  = \sum_{t_N} p(w_1, w_2, \ldots, w_{N-1}, t_{N-1}) p(t_{N} | t_{N-1}) p(w_{N}| t_{N})
$$

We can continue further by defining $p(w_1, w_2, \ldots, w_{N-1}, t_{N-1})$ in terms of $p(w_1, w_2, \ldots, w_{N-2}, t_{N-2})$ and so forth. (This is the same process used in the Viterbi algorithm, albeit swapping a max for a sum.)

Formally we store a matrix of partial marginals, $\alpha$ defined as follows
$$\alpha[i, t_i] = p(w_1, w_2, \ldots, w_i, t_i)$$
computed using the recursion
$$  
\alpha[i, t_i] = \sum_{t_{i-1}} \alpha[i-1, t_i] p(t_i | t_{i-1}) p(w_i| t_i) 
$$
and the base case for $i=1$,
$$
\alpha[1, t_1] = p(t_1) p(w_1 | t_1)
$$

Now we have computed the formulation, we can put this into an iterative algorithm: compute the vector of alpha[1] values, then alpha[2] etc until we reach the end of our input

In [None]:
def forward(params, observations):
    pi, A, O = params
    N = len(observations)
    S = pi.shape[0]
    
    alpha = np.zeros((N, S))
    
    # base case
    alpha[0, :] = pi * O[:,observations[0]]
    
    # recursive case
    for i in range(1, N):
        for s2 in range(S):
            for s1 in range(S):
                alpha[i, s2] += alpha[i-1, s1] * A[s1, s2] * O[s2, observations[i]]
    
    return (alpha, np.sum(alpha[N-1,:]))

In [None]:
A = np.array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]])
pi = np.array([0.5, 0.2, 0.3])
O = np.array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]])

forward((pi, A, O), [UP, UP, DOWN])

In [None]:
forward((pi, A, O), [UP, UP, DOWN, UNCHANGED, UNCHANGED, DOWN, UP, UP])

Let's confirm we did this correctly by implementing an exhaustive equivalent

In [None]:
def exhaustive_forward(params, observations):
    pi, A, O = params
    N = len(observations)
    S = pi.shape[0]

    total = 0.0
    # loop over the cartesian product of |states|^N
    for ss in product(range(S), repeat=N):
        # score the state sequence
        score = pi[ss[0]] * O[ss[0],observations[0]]
        for i in range(1, N):
            score *= A[ss[i-1], ss[i]] * O[ss[i], observations[i]]
        total += score
            
    return total

In [None]:
exhaustive_forward((pi, A, O), [UP, UP, DOWN])

In [None]:
exhaustive_forward((pi, A, O), [UP, UP, DOWN, UNCHANGED, UNCHANGED, DOWN, UP, UP])

## Backward algorithm

The same process but working from left to right rather than right to left give us the backward algorithm.

In [None]:
def backward(params, observations):
    pi, A, O = params
    N = len(observations)
    S = pi.shape[0]
    
    beta = np.zeros((N, S))
    
    # base case
    beta[N-1, :] = 1
    
    # recursive case
    for i in range(N-2, -1, -1):
        for s1 in range(S):
            for s2 in range(S):
                beta[i, s1] += beta[i+1, s2] * A[s1, s2] * O[s2, observations[i+1]]
    
    return (beta, np.sum(pi * O[:, observations[0]] * beta[0,:]))

Let's confirm the it gets the same marginal probability as the forward algorithm 

In [None]:
backward((pi, A, O), [UP, UP, DOWN])

## Unsupervised training

Unsupervised training of a HMM involves running forward and backward to estimate the *expected* probability of taking various state sequences, then updates the model to match these *expectations*. This repeats many times until things stabilise (covergence). Note that it's non-convex, so the starting point often affects the converged solution.

In [None]:
def baum_welch(training, pi, A, O, iterations):
    pi, A, O = np.copy(pi), np.copy(A), np.copy(O) # take copies, as we modify them
    S = pi.shape[0]
    
    # do several steps of EM hill climbing
    for it in range(iterations):
        pi1 = np.zeros_like(pi)
        A1 = np.zeros_like(A)
        O1 = np.zeros_like(O)
        
        for observations in training:
            # compute forward-backward matrices
            alpha, za = forward((pi, A, O), observations)
            beta, zb = backward((pi, A, O), observations)
            assert abs(za - zb) < 1e-6, "it's badness 10000 if the marginals don't agree"
            
            # M-step here, calculating the frequency of starting state, transitions and (state, obs) pairs
            pi1 += alpha[0,:] * beta[0,:] / za
            for i in range(0, len(observations)):
                O1[:, observations[i]] += alpha[i,:] * beta[i,:] / za
            for i in range(1, len(observations)):
                for s1 in range(S):
                    for s2 in range(S):
                        A1[s1, s2] += alpha[i-1,s1] * A[s1, s2] * O[s2, observations[i]] * beta[i,s2] / za
                                                                    
        # normalise pi1, A1, O1
        pi = pi1 / np.sum(pi1)
        for s in range(S):
            A[s, :] = A1[s, :] / np.sum(A1[s, :])
            O[s, :] = O1[s, :] / np.sum(O1[s, :])
    
    return pi, A, O

Let's test it out by training on our example from above

In [None]:
pi2, A2, O2 = baum_welch([[UP, UP, DOWN]], pi, A, O, 10)

In [None]:
forward((pi2, A2, O2), [UP, UP, DOWN])

Looks like it memorised the sequence, and has assigned it a very high probability. The downside is that it won't be very accepting of other sequences

In [None]:
forward((pi2, A2, O2), [UP, UP, DOWN, UNCHANGED, UNCHANGED, DOWN, UP, UP])

This looks strangely reminiscent of many other learning problems.... Can you think how we might deal with this?

Incidentally training on both sequences leads to perhaps a better model

In [None]:
pi3, A3, O3 = baum_welch([[UP, UP, DOWN], [UP, UP, DOWN, UNCHANGED, UNCHANGED, DOWN, UP, UP]], pi, A, O, 10)

In [None]:
forward((pi3, A3, O3), [UP, UP, DOWN])

In [None]:
forward((pi3, A3, O3), [UP, UP, DOWN, UNCHANGED, UNCHANGED, DOWN, UP, UP])