## Exercise 12.14

In [1]:
import numpy as np

In [7]:
class hmm(object):
    """
    Finite state space hidden markov model
    """
    def __init__(self, A, B, π):
        """
        Initialize model parameters.
        """
        self.A = A
        self.B = B
        self.π = π
        self.n = A.shape[0]
        self.m = B.shape[0]
        
        self.hidden_states = None
        self.observed_states = None
        
        if self.n != self.π.size:
            raise ValueError('Incorrect sized matrix A or distribution π')
        if self.B.shape[1] != self.n:
            raise ValueError('Incorrect sized Matrix B')
    
    def _set_sates(self, hidden_states= None, observed_states = None):
        
        if hidden_states is None:
            self.hidden_states = {i : i for i in range(self.n)}
        else:
            self.hidden_states = { i : state for i, state in enumerate(hidden_states)}
        if observed_states is None:
            self.observed_states = {i : i for i in range(self.m)}
        else:
            self.observed_states = {i : state for i, state in enumerate(observed_states)}
        
            
    def _forward(self, obs):
        """
        Compute the forward probability matrix. 
        
        Parameters
        ----------
        obs : ndarray of shape (T, )
            The observation sequence
                
        Returns 
        --------
        alpha : ndarray of shape (T, n)
            The forward probability matrix (alpha)
        """
        
        # initalize variables
        T = obs.size
        alpha = np.empty((T, self.n))
        
        # calculate for first observation
        alpha[0] = self.π*self.B[obs[0]]
        
        for t in range(1, T):
            # use clever array broadcasting to calculate the rest of the rows
            alpha[t] = np.sum(alpha[t-1]*self.A*self.B[obs[t]].reshape(-1, 1), axis=1)   

        return alpha
        
    def _backward(self, obs, inverted=True):
        """
        Compute the backward probability matrix.
        
        Parameters
        ----------
        obs : ndarray of shape (T,)
            The observation sequence
        
        Returns
        -------
        beta : ndarray of shape (T, n)
            The backward probability matrix
        """
        # initalize variables
        T = obs.size
        beta = np.zeros((T, self.n))
        
        if inverted:
            beta[T-1] = 1
            for t in range(T-2, -1, -1):
                # use clever array broadcasting to calculate the rest of the rows
                beta[t] = np.sum(self.A*beta[t+1].reshape(-1, 1)*self.B[obs[t+1]].reshape(-1, 1), axis=0)
        else:
            beta[0] = 1
        
            for t in range(1, T):
                # use clever array broadcasting to calculate the rest of the rows
                beta[t] = np.sum(self.A*beta[t-1].reshape(-1, 1)*self.B[obs[T-t]].reshape(-1, 1), axis=0)
     
            
        return beta
    
    def _gamma(self, alpha, beta):
        """
        Compute the gamma probabilities
        
        Parameters
        ----------
        alpha : ndarray of shape (T, n)
            The forward probability matrix from the forward pass
        beta : ndarray of shape (T, n)
            The backward probability matrix from the backward pass
            
        Returns
        -------
        gamma : ndarray of shape (T, n)
            The gamma probability array
        """
        
        gamma = alpha*beta /np.sum(alpha[-1])
        
        return gamma
    
    def viterbi(self, obs, c):
        """
        Compute the most likely state sequence
        
        Parameters
        ----------
        obs : ndarray of shape (T, )
            The observation sequence
            
        Returns
        -------
        y* = ndarray of shape (T, ) with entries in 0, 1,..., n-1
            The most likely state sequence
        """

        # intialize variables
        T = obs.size
        eta = np.zeros((T, self.n))
        y_tilde = np.zeros((T-1, self.n)).astype(int)
        
        # calculate eta using array broadcasting
        eta[0] = self.π*self.B[obs[0]]
        y_tilde[0] = np.argmax(eta[0]*self.A*self.B[obs[1]], axis=1)

        # populate eta and y_tilde
        for t in range(1, T):
            eta[t] = np.max(eta[t-1]*self.A*self.B[obs[t]].reshape(-1, 1), axis=1)
            if t < T-1:
                y_tilde[t] = np.argmax(eta[t]*self.A*self.B[obs[t+1]].reshape(-1, 1), axis=1)
                
        # back fill to get optimizers
        y_star = np.zeros(T).astype(int)
        y_star[-1] = np.argmax(eta[-1])
        for t in range(T-2, -1, -1):
            y_star[t] = y_tilde[t, y_star[t+1]]
            
       
        return y_star
                    

Here we write a bunch of test functions where we gets against the examples given in the text book. 
Namely, examples 12.4.6, 12.4.8, and 12.4.9.

In [8]:
def _test_forward(A, B, π, obs):
    
    HMM = hmm(A, B, π)
    calculated_alpha =HMM._forward(obs)
    given_alpha = np.array([[0.06, 0.28], 
                             [0.0616, 0.0372],
                             [0.0058, 0.02856  ],
                             [0.007742, 0.0018876]])
    assert np.allclose(calculated_alpha, given_alpha)

    print('test passed for hmm._forward() function')
    return

def _test_backward(A, B, π, obs):
    
    HMM = hmm(A, B, π)
    calculated_beta = HMM._backward(obs)
    given_beta = np.array([[1, 1], 
                           [0.38, 0.26],
                           [0.0812, 0.1244], 
                           [0.0302, 0.02792]])
    given_beta=given_beta[::-1]

    assert np.allclose(calculated_beta, given_beta)
    print('test passed for hmm._backward() function')
    return
    
def _test_gamma(A, B, π, obs):

    HMM = hmm(A, B, π)
    alpha = HMM._forward(obs)
    beta = HMM._backward(obs)
    calculated_gamma = HMM._gamma(alpha, beta)
    given_gamma = np.array([[0.18816981, 0.81183019],
                            [0.51943175, 0.48056825],
                            [0.22887763, 0.77112237],
                            [0.8039794, 0.1960206]])
    assert np.allclose(given_gamma, calculated_gamma)

    print('test passed for hmm._gamma() function')
    return
    
def _test_viterbi(A, B, π, obs):

    HMM = hmm(A, B, π)
    hidden_states = ['W', 'D']
    observed_states = ['S', 'M', 'L']
    HMM._set_sates(hidden_states=hidden_states, observed_states=observed_states)
    calculated_optimizer = HMM.viterbi(obs, 3)
    given_optimizer = [1, 1, 1, 0]
    
    assert np.allclose(given_optimizer, calculated_optimizer)
    print("test passed for hmm.viterbi() function")
    return
    

## Run tests

In [9]:
A = np.array([[0.7, 0.4], 
                  [0.3, 0.6]])
B = np.array([[0.1, 0.7], 
                  [0.4, 0.2], 
                  [0.5, 0.1]])
π = np.array([0.6, 0.4])
    
obs = np.array([0, 1, 0, 2])

_test_forward(A, B, π, obs)
_test_backward(A, B, π, obs)
_test_gamma(A, B, π, obs)
_test_viterbi(A, B, π, obs)

test passed for hmm._forward() function
test passed for hmm._backward() function
test passed for hmm._gamma() function
test passed for hmm.viterbi() function
