##### Assignment 5- HMMs
In this assignment, you will be implementing the dynamic programs for forward and backward algorithms. Feel free to modify the skeleton code as you deem necessary.

In [2]:
import numpy as np
import random

In [15]:
class HMM:
    def __init__(self, states, symbols, trans_probs, emission_probs, start_distribution = None):
        self.states = states 
        self.n_states = len(self.states) 
        self.transition_matrix = trans_probs
        self.emission_matrix = emission_probs
        self.symbols = symbols
        self.n_symbols = len(self.symbols)
        if start_distribution is None:
            self.start_distribution = np.ones(self.n_states)/self.n_states


class log_probs_2D:
    """
    This class allows us to compute the probability of each observation under each state.
    """
    def __init__(self, n_states, n_times):
        # n_times is the length of the sequence you have.
        self.logprobs = np.zeros(shape=[n_times + 1, n_states])
    
    def get_vec(self, n):
        # Retrieves a row of probabilities from logprobs
        assert n <= np.shape(self.logprobs)[0]
        return self.logprobs[n,:]

    def set_vec(self, n, new_log_probs):
        # Sets a row of probabilities in logprobs
        assert n <= np.shape(self.logprobs)[0]
        self.logprobs[n,:] = new_log_probs
        return self.logprobs[n,:]


class iter_algorithm:
    """
    The parent class which defines general approach for running forward backward
    Later, we extend this class by defining methods init_DP and iter_DP specific to forward/backward methods
    and uses your implementation of init_DP and iter_DP.
    """
    def __init__(self, hmm, observations, init_index, iteration_range, is_forward):
        self.lprobs = log_probs_2D(n_states = hmm.n_states, n_times = len(observations))
        self.lprobs.set_vec(init_index, self.init_DP(hmm))
        for n in iteration_range:
            if (is_forward):
                current = n+1
                prev = n
            else:
                current = n
                prev = n+1
            self.lprobs.set_vec(current, self.iter_DP(self.lprobs.get_vec(prev), hmm, observations[n]))

    
class forward(iter_algorithm):
    """
    forward inherits from iter_algorithm. 
    """
    def __init__(self, hmm, observations):
        iter_algorithm.__init__(self, hmm, observations, 0, range(len(observations)), True)

    def init_DP(self, hmm):
        return np.full((hmm.n_states), -np.log(hmm.n_states))
    
    def iter_DP(self, prev_log_probs, hmm, observation):
        ob_idx = hmm.symbols.index(observation)
        log_probs = np.full((hmm.n_states), np.NINF)
        for s in range(len(log_probs)):
            for p in range(len(prev_log_probs)):
                state_prob = prev_log_probs[p] + np.log(hmm.transition_matrix[p, s]) + np.log(hmm.emission_matrix[s, ob_idx])
                log_probs[s] = np.logaddexp(log_probs[s], state_prob)
        return log_probs

    def overall_loglikelihood(self):
        last_probs = self.lprobs.logprobs[-1,:]
        overall_prob = np.NINF
        for p in last_probs:
            overall_prob = np.logaddexp(overall_prob, p)
        return overall_prob

class backward(iter_algorithm):
    def __init__(self, hmm, observations):
        iter_algorithm.__init__(self, hmm, observations, len(observations), reversed(range(len(observations))), False)                     

    def init_DP(self, hmm):
        return np.zeros((hmm.n_states))
                               
    def iter_DP(self, prev_log_probs, hmm, observation):
        ob_idx = hmm.symbols.index(observation)
        log_probs = np.full((hmm.n_states), np.NINF)
        for s in range(len(log_probs)):
            for p in range(len(prev_log_probs)):
                state_prob = prev_log_probs[p] + np.log(hmm.transition_matrix[s, p]) + np.log(hmm.emission_matrix[p, ob_idx])
                log_probs[s] = np.logaddexp(log_probs[s], state_prob)
        return log_probs
    
    def overall_loglikelihood(self):
        last_probs = self.lprobs.logprobs[0,:]
        overall_prob = np.NINF
        for p in last_probs:
            overall_prob = np.logaddexp(overall_prob, p)
        return overall_prob
        
class HMM_plus_data:
    def __init__(self, hmm, observations):
        self.observations = observations
        self.HMM = hmm
        self.fwd = forward(hmm, observations)
        self.bwd = backward(hmm, observations)
                                     
    def compute_logprob_joint(self, n):
        ## returns a matrix (np.ndarray) of log probalities whose k, kprime entry lists the log probability
        ## of the HMM being at a state k at time n,and at state kprime at time n+1 
        
        '''
        P(p(n) = k, p(n + 1) = k', Y) = P(f_k(n) * T_{k k'} * E_{k' y(n+1)} * b_k'(n+1))
        '''
        logprob_joint = np.zeros((self.HMM.n_states, self.HMM.n_states))
        ob_idx = hmm.symbols.index(self.observations[n])
        for k in range(self.HMM.n_states):
            for kprime in range(self.HMM.n_states):
                logprob_joint[k, kprime] = \
                    self.fwd.lprobs.get_vec(n)[k] + \
                    np.log(self.HMM.transition_matrix[k, kprime]) + \
                    np.log(self.HMM.emission_matrix[kprime, ob_idx]) + \
                    self.bwd.lprobs.get_vec(n + 1)[kprime]
                    
        return logprob_joint
                      
    def compute_prob_conditional(self, n):
        ## returns a matrix (np.ndarray) of log probabilities whose k, kprime entry lists the log probability 
        ## of the HMM being at state kprime at time n+1 given at a state k at time n
        '''
        P(p(n+1) = k' | p(n) = k, Y) = P(p(n+1) = k', p(n) = k, Y) / P(p(n) = k, Y)
        P(p(n) = k, Y) = f_k(n) * b_k(n)
        '''
        logprob_cond = self.compute_logprob_joint(n)
        for k in range(self.HMM.n_states):
            for kprime in range(self.HMM.n_states):
                logprob_cond[k, kprime] = \
                    logprob_cond[k, kprime] - \
                    self.fwd.lprobs.get_vec(n)[k] - \
                    self.bwd.lprobs.get_vec(n)[k]
        return logprob_cond

In [5]:
# Based on https://en.wikipedia.org/wiki/Forward%E2%80%93backward_algorithm#Example

#State 0 is rain
#State 1 is no rain 
states = [0,1]

# 3 is umbrella, 4 is no umbrella
symbols = ['3','4']
trans_probs = np.array([[0.7, 0.3],[0.3, 0.7]])
emission_probs = np.array([[0.9, 0.1],[0.2,0.8]])
hmm = HMM(states, symbols, trans_probs, emission_probs)

In [6]:
observations = '33433'
fwd = forward(hmm, observations)
lp = fwd.lprobs.logprobs
print(lp)
probs = np.exp(lp)

# Normalize the probabilities in case anyone implemented their code with scaling
observed = probs/probs.sum(axis=1, keepdims=True)
expected = np.array([[ 0.5       ,  0.5       ],
                     [ 0.81818182,  0.18181818],
                     [ 0.88335704,  0.11664296],
                     [ 0.19066794,  0.80933206],
                     [ 0.730794  ,  0.269206  ],
                     [ 0.86733889,  0.13266111]])

assert (np.linalg.norm(observed - expected, 2) < 1e-8)               


[[-0.69314718 -0.69314718]
 [-0.7985077  -2.30258509]
 [-1.16957138 -3.19418321]
 [-3.77378396 -2.32810805]
 [-3.19937839 -4.19803314]
 [-3.51482755 -5.39245949]]


In [7]:
bwd = backward(hmm, observations)
lp =  bwd.lprobs.logprobs
probs = np.exp(lp)
print(lp)

# Normalize the probabilities in case anyone implemented their code with scaling
observed = np.round(np.flip(probs/probs.sum(axis=1, keepdims=True), 0), 4)
expected = np.array([[ 0.5   ,  0.5   ],
                     [ 0.6273,  0.3727],
                     [ 0.6533,  0.3467],
                     [ 0.3763,  0.6237],
                     [ 0.5923,  0.4077],
                     [ 0.6469,  0.3531]])

assert (np.linalg.norm(observed - expected, 2) < 1e-8)       


[[-3.11486346 -3.72045954]
 [-2.71631985 -3.0898744 ]
 [-2.40087069 -1.89544805]
 [-0.77805169 -1.41181732]
 [-0.37106368 -0.89159812]
 [ 0.          0.        ]]


In [18]:
hmm_plus_data = HMM_plus_data(hmm, observations)
print(np.exp(hmm_plus_data.compute_logprob_joint(1)))
print(np.sum(np.exp(hmm_plus_data.compute_prob_conditional(1)), axis=1))

[[0.02569616 0.00405678]
 [0.00244725 0.00210351]]
[1. 1.]
