  A Hidden Markov Model (HMM) consists of a secret markov process that transitions from state to state. In each state, it outputs an observation, which is all we can see. The classical example is the weather. Suppose the weather is a Markov process with states Rainy and Sunny. However, instead of directly observing the weather, we observe the type of exercize someone does (say either Running, Jogging, or Swimming).<br/>
 Formally, a HMM consists of $\pi$, the initial state probabilities, A, the transition matrix for the hidden Markov process, and B, a matrix which describes the probability of observing outputs given the hidden state.<br/>
 In particular:<br/>
 $A_{ij}$ is the probability of transitioning from state j to i.<br/>
 $B_{ki}$ is the probability of observing symbol k given we are in state i.<br/>
 $\pi_i$ is the probability of starting in state i.<br/>
 Given an observation sequence, there are three fundamental problems.<br/>
 (i) Given an HMM, determine the probability of a particular observation sequence.<br/>
 (ii) Determine the most likely state sequence given a model and observation sequence.<br/>
 (iii) Given an observation sequence, recover A, B, and $\pi$.<br/>
 
 Below is a Python class designed to solve these three problems. It will provide the basis for the rest of our examples.
 


In [2]:
import numpy as np
from math import log
from itertools import product
from numpy.random import normal,random
import numpy.linalg as la
import sklearn.utils.linear_assignment_ as las


def perturbed_uniform_init(shape, sum_axis=0, var = 3*1e-1):
 mat = np.ones(shape) + var * (random(shape) -.5)
 #force the matrix to be column or row stochastic depending on sum_axis
 # sum_axis = 0 corresponds to column stochastic matrices, 1 to row
 mat /= np.sum(mat, axis = sum_axis)
 return mat 

class HMM():
 def __init__(self,hidden_alphabet, observed_alphabet, A=None,B=None, p=None):
  self.A = A
  self.B = B
  self.p = p
  self.hidden = hidden_alphabet
  self.observed = observed_alphabet
  self.nstates = len(hidden_alphabet)
  self.noutputs = len(observed_alphabet)
  self.state_list = [None] * self.nstates
  for state in self.hidden:
    self.state_list[self.hidden[state]] = state

 def is_not_init(self):
  return self.A is None or self.B is None or self.p is None

 def check_init(self):
  if self.is_not_init():
   raise ValueError("Model parameters have not been initialized.")

 def naive_prob(self,observed_sequence):
  self.check_init()
  prob = 0
  l = len(self.hidden)
  num_observations = len(observed_sequence)
  obs_inds = self.observation_inds(observed_sequence)
  for state_sequence in product(range(l), repeat = num_observations):
    increment = self.p[state_sequence[0]] * self.B[obs_inds[0], state_sequence[0]]
    for i in xrange(1,num_observations):
     last_state, curr_state = state_sequence[i-1], state_sequence[i]
     increment *= self.A[curr_state, last_state]
     increment *= self.B[obs_inds[i], curr_state]
    prob += increment 
  return prob

 def observation_inds(self, observations):
  if type(observations[0]) == int or type(observations[0]) == np.int64:
   return observations
  return [self.observed[o] for o in observations]
   
 def smart_prob(self, observations):
   alpha = self.alpha_pass(observations)
   return np.sum(alpha[-1,:])


 def convert_to_states(self, state_inds):
   return [self.state_list[i] for i in state_inds]

 def viturbi_seq(self, observations):
   """
   Returns the best state sequence in the DP sense
   """
   num_observations = len(observations)
   obs_inds = self.observation_inds(observations)
   d = np.log(self.p * self.B[0,:])
   paths = np.zeros((num_observations, self.nstates), dtype=int)
   state_seq = np.array(num_observations)
   for t in xrange(1, num_observations):
     d_new = np.zeros(self.nstates)
     for i in xrange(self.nstates):
       temp = np.log(self.A[i,:])  +  d + log(self.B[obs_inds[t], i])
       best_previous_state = np.argmax(temp)
       d_new[i] = temp[best_previous_state]
       paths[t-1, i] = best_previous_state
     d = d_new
   
   best_final_state = np.argmax(d)  
   res = np.zeros(num_observations, dtype=int)
   res[-1] = best_final_state
   last_best = best_final_state
   for i in xrange(num_observations - 2, -1, -1):
     last_best = paths[i, last_best]
     res[i] = last_best
   return self.convert_to_states(res) 

 def alpha_pass(self, observations, normalize = False):
  obs_inds = self.observation_inds(observations)
  nobs = len(obs_inds)
  alpha = np.zeros((nobs, self.nstates))
  alpha[0,:] = self.B[obs_inds[0],:] * self.p
  if normalize:
    c_arr = np.zeros(nobs)
    c_arr[0] = np.sum(alpha[0,:])
    self.logprob = log(c_arr[0])
    alpha[0,:] /= c_arr[0]
  A,B = self.A, self.B
  for i in xrange(1, nobs):
    alpha[i,:] = B[obs_inds[i],:] * np.dot(A, alpha[i-1,:])
    if normalize:
      c_arr[i] = np.sum(alpha[i,:])
      self.logprob += log(c_arr[i])
      alpha[i,:] /= c_arr[i]
  if normalize:
    return alpha, c_arr
  else:
    return alpha

 def beta_pass(self, obs_inds, normalize = False, c_arr = None):
  nobs = len(obs_inds)
  beta = np.ones((nobs, self.nstates))
  if normalize:
    beta[-1,:] /= c_arr[-1]
#    c = np.sum(beta[-1,:])
 #   self.logprob += log(c)
  #  beta[-1,:] /= c

  for i in xrange(1, nobs):
    beta[nobs-i-1,:] = self.B[obs_inds[nobs-i],:] * np.dot(self.A.T, beta[nobs-i,:])
    if normalize:
      beta[nobs-i-1,:] /= c_arr[nobs-i-1]
#       c = np.sum(beta[nobs-i-1,:])
 #      self.logprob += log(c)
  #     beta[nobs-i-1,:] /= c

  return beta
   
 def best_seq(self, observations, normalize = False):
   """
   Return the state sequence that maximizes the number of correct states.
   """
   #make sure is numeric
   obs = self.observation_inds(observations)
   if normalize:
     alpha, c_arr = self.alpha_pass(obs, normalize=True)
     beta = self.beta_pass(obs, c_arr=c_arr, normalize=True)
   else:
     alpha, beta = self.alpha_pass(obs), self.beta_pass(obs)
   return self.convert_to_states(np.argmax(alpha*beta, axis = 1))
 
 def init_random(self):
   if self.A is None:
     self.A = perturbed_uniform_init((self.nstates, self.nstates))
   if self.B is None:
     self.B = perturbed_uniform_init((self.noutputs, self.nstates))
   if self.p is None:
     self.p = perturbed_uniform_init(self.nstates, sum_axis = 0)

  
 def train(self, observations, iters = 50, update_A = True, update_B = True, update_p = True):
   O = self.observation_inds(observations)
   T = len(O)
   oldprob = -np.inf
   if self.is_not_init():
    self.init_random()
 #  print self.A
   for it in xrange(iters):
     alpha, c_arr = self.alpha_pass(O,normalize=True)
     beta = self.beta_pass(O, normalize = True, c_arr=c_arr)
     digamma = np.zeros((self.nstates, self.nstates, T-1))
     gamma = np.zeros((T, self.nstates))
     BB = np.zeros((T-1, self.nstates))
     for i in xrange(0,T-1):
       BB[i,:] = self.B[O[i+1], :] * beta[i+1, :]
       digamma[:,:,i] = self.A.T * np.outer(alpha[i,:], BB[i,:])
       digamma[:,:,i] /= np.sum(digamma[:,:,i])
       gamma[i,:] = np.sum(digamma[:,:,i], axis = 1)   
     
     gamma[-1, :] = alpha[-1, :]
     if update_p:  self.p = gamma[0,:]
     if update_A:
       for i in xrange(self.nstates):
         for j in xrange(self.nstates):
           numer = np.sum(digamma[j,i,:T-1])
           denom = np.sum(gamma[:T-1,j])          
           self.A[i,j] = numer / denom

     if update_B:
       self.B = np.zeros(self.B.shape)
       for i in xrange(T):
         self.B[O[i], :] += gamma[i,:]
     
       for j in xrange(self.nstates):
         self.B[:,j] = self.B[:,j] / np.sum(gamma[:,j])
     print "Log Probability: ",self.logprob
     if self.logprob < oldprob:
       break
     else:
       oldprob = self.logprob
     
   print "Final Probability: ", self.logprob  


 Consider the problem of determining the temperature of a year by observing the diameter of tree trunks. We model the temperatature as a two-state process with states Hot and Cold. Possible observations are Small, Medium, and Large (for the tree trunk size). Let A, B, and $\pi$ be as in Problem 1. We seek to compute the probability of observing the sequence M, S, L. <br/>
 One way is to simply sum over all possible state sequences. This naive approach is accomplished by the following code.


In [3]:
model = HMM({"H":0, "C":1}, {"S":0, "M":1, "L": 2})
model.A = np.array([[.7,.4],[.3,.6]])
model.B = np.array([[.1,.7],[.4,.2],[.5, .1]])
model.p = np.array([0., 1.])
oseq = ["M", "S", "L"]
print model.naive_prob(oseq)

0.02488


This method is exponential in the number of observations, unfortunately. A better way is to use an algorithm known as the $\alpha$-pass, or the Forward algorithm. This algorithm is only linear in the number of observations, a tremendous improvement!<br/>
The algorithm works by computing $\alpha_t(i)$, the probability of being in state i at time t, and of seeing the given observations $O_1, ..., O_t$ up to time t. It turns out the probability of the entire observation sequence is given by $$P(O) = \sum_{i=1}^{i=N}\alpha_{T-1}(i)$$<br/> which follows by marginalizing over the state at time T-1. 

In [4]:
print model.smart_prob(oseq)

0.02488


We turn our attention to the next problem: Computing the best state sequence given observations. It turns out that there are two types of 'best' in this case. The first type of sequence is the sequence that is overall most likely, or the hidden state sequence for which P(O) is maximized. <br/> Of course, we could compute this by simply computing P(O) for every possible state sequence. However, there  is a faster dynamic programming algorithm known as Viturbi's algorithm. This algorithm is implemented in our HMM class and is called below.
 <br/>
 The second type of 'best' is the sequence that maximizes the expected number of correct states. At each time step, we choose the hidden state that is most likely for that time step. To compute the probability of being in state i at time t, we need another definition.<br/> Let $$\beta_t(i)$$ denote the probability of being in state i given the observations after time t, $O_{t+1}, O_{t+2}, ..., O_{T-1}$
 <br/>This can result in a different answer than viturbi, and is typically more useful in applications.
 Below is code that solves both problems. 

In [5]:
print model.viturbi_seq(oseq) #2(a) Extra Credit! I implemented the viturbi algorithm! 
print model.best_seq(oseq) #2(b)


['C', 'C', 'H']
['C', 'C', 'H']




In this example, it turns out that these sequences are the same, but in general they are different problems. Also, don't worry about the runtime error in the above code. It couldn't be helped, and Python was smart enough to deal with it. .<br/>
We noe verify that the naive and smart probability computations actually are the same in all cases. We do this by summing the probability of observing a given sequence over all possible sequences, and note that both sums (for the naive and smart methods) are indeed 1.


In [6]:
model.p = np.array([.6,.4])
#Problem 3
naive_sum = 0
smart_sum = 0
for o in product(range(3), repeat=4):
  naive_sum += model.naive_prob(o)
  smart_sum += model.smart_prob(o)
print "Sum using naive method: ",naive_sum
print "Sum using smart method: ",smart_sum


Sum using naive method:  1.0
Sum using smart method:  1.0


Now for the difficult part: training a model given observations. This is accomplished via a form of expectation maximization. In this case, the update rules are:
$$\gamma_t(i,j) = \alpha_t(i)a_{ji}b_j(O_{t+1})\beta_{t+1}(j)$$
$$\gamma_t(i) = \sum_{j=0}^{j=N-1}\gamma_t(i,j)$$
$$\pi_i = \gamma_0(i)$$
$$a_{ji} = \frac{\sum_{t=0}^{T-2}\gamma_t(i,j)}{\sum_{t=0}^{T-2}\gamma_t(i)}$$
$$b_j(k) = \frac{\sum_{t | O_t = k}\gamma_t(j)}{\sum_{t=0}^{T-2}\gamma_t(j)}$$



These can be written in terms of $\alpha$ and $\beta$ as follows:
        $$\gamma_t(i) = \sum_{j=0}^{j=N-1}\gamma_t(i,j)$$
        $$\pi_i = \sum_{j=0}^{j=N-1}\alpha_0(i)a_{ji}b_j(O_{1})\beta_{1}(j)$$
        $$a_{ji} = \frac{\sum_{t=0}^{T-2}\alpha_t(i)a_{ji}b_j(O_{t+1})\beta_{t+1}(j)}{\sum_{t=0}^{T-2}\sum_{j=0}^{j=N-1}\alpha_t(i)a_{ji}b_j(O_{t+1})\beta_{t+1}(j)}$$
        $$b_j(k) = \frac{\sum_{t | O_t = k}\sum_{j=0}^{j=N-1}\sum_{i=0}^{i=N-1}\alpha_t(j)a_{ij}b_i(O_{t+1})\beta_{t+1}(i)}{\sum_{t=0}^{T-2}\sum_{i=0}^{i=N-1}\alpha_t(j)a_{ij}b_i(O_{t+1})\beta_{t+1}(i)}$$
       
These update rules are implemented in the train method of my HMM class. Consider the problem of training an HMM on a corpus of English words, where the observations are the letters. What patterns will it find? When running my HMM on the Brown Corpus with 2, 3, and 4 hidden states, I get the following results.

In each case, the observation letters correspond to one of the hidden states.
2 states:<br/>
['a', 'e', 'h', 'i', 'o', 'u', 'y', ' ']<br/>
['b', 'c', 'd', 'f', 'g', 'j', 'k', 'l', 'm', 'n', 'p', 'q', 'r', 's', 't', 'v',
 'w', 'x', 'z']<br/>

Note that the vowels correspond to one state, and the consonants to another. The one mistake is that 'h' is classified as a vowel, which isn't that bad.<br/>

3 states:<br/>
['a', 'i', 'u', 'x', ' ']<br/>
['e', 'g', 'h', 'k', 'o', 'q', 'y']<br/>
['b', 'c', 'd', 'f', 'j', 'l', 'm', 'n', 'p', 'r', 's', 't', 'v', 'w', 'z']<br/>

The reason 'x' snuck in as a vowel is probably because of the very low frequency of the letter. <br/>

4 states:<br/>
['d', 'e', 'h', 'n', 's', 'y']<br/>
['u', 'x', ' ']<br/>
['a', 'i', 'k', 'l', 'o', 'r', 'v']<br/>
['b', 'c', 'f', 'g', 'j', 'm', 'p', 'q', 't', 'w', 'z']<br/>

Note that ' ' is pretty much in its own class, while most consonants are in their own class.

When running the HMM on the cipher-text of problem 10, I get:
['a', 'b', 'd', 'e', 'f', 'g', 'i', 'j', 'l', 'm', 'n', 'p', 'q', 'r', 't', 'x', 'y', 'z', ' ']<br/>
['c', 'h', 'k', 'o', 's', 'u', 'v', 'w']<br/>

The second list of characters presumably corresponds to vowels and the ' ' character in the ciphertext. Indeed, there are 8 characters in the second state, and there are a total of 8 vowels, counting the space character.<br/>

Below is a list of helper functions used to generate results for problems 9 and 10.


In [7]:
def make_list(alphabet):
 nsymbols = len(alphabet)
 alist = [None] * nsymbols
 for k in alphabet:
   alist[alphabet[k]] = k
 return alist

def print_model_states(model, observation_alphabet):
 states = [[] for i in xrange(model.nstates)]
 olist = make_list(observation_alphabet)
 for i in xrange(len(observation_alphabet)):
   states[np.argmax(model.B[i,:])].append(olist[i])
 for state in states:
  print state

def alphabet_helper(observations, observation_alphabet, nstates = 2, nobs = 50000, iters = 100, A=None, p =None):
 hidden_alphabet = {i:i for i in xrange(nstates)}
 model = HMM(hidden_alphabet, observation_alphabet)
 model.A = A
 model.p = p
 model.train(observations[:nobs], iters = iters)
 print model.p
 print model.A
 print model.B
 print_model_states(model, observation_alphabet)

def parse_char_file(filename):
 f = open(filename, "r")
 l = f.readline().strip()
 nobs = len(l)
 observations = np.zeros(nobs, dtype = int)
 observation_alphabet = {' ' : 26}
 for off in xrange(26):
   observation_alphabet[chr(ord('a')+off)] = off
 for i in xrange(nobs):
   observations[i] = observation_alphabet[l[i]]
 return observations, observation_alphabet

def alphabet_probs():
 observations, observation_alphabet = parse_char_file("brown.txt")
 alphabet_helper(observations, observation_alphabet, 2,50000, iters=100,A = np.array([[.47468,.51656], [.52532,.48344]]), p = np.array([.51316,.48684]))
 alphabet_helper(observations, observation_alphabet, 3,50000, iters=200)
 alphabet_helper(observations, observation_alphabet, 4,50000, iters=200)
 observations, observation_alphabet = parse_char_file("ciphertext.txt")
 alphabet_helper(observations, observation_alphabet, 2,50000, iters=100,A = np.array([[.47468,.51656], [.52532,.48344]]), p = np.array([.51316,.48684]))
