# Hidden Markov Model (HMM)

## Notation
* $T$ - length of the observation sequence
* $N$ - number of (hidden) states
* $M$ - number of observables
* $Q = \{q_0, q_1, ..., q_{N-1}\}$ - hidden states ($|Q| = N$).
* $V = \{0, 1, ..., M-1\}$ - set of possible observations ($|V| = M$).
* $A$ - state transition probabilities
* $B$ - observation probability matrix
* $\pi$ - initial state probability distribution
* $\mathcal{O} = (\mathcal{O}_0, \mathcal{O}_1, ..., \mathcal{O}_{T-1})$ - observation sequence
* $X = (x_0, x_1, ..., x_{T-1})$ where $x_t \in Q$ - hidden state sequence

Having that set defined, we can calculate the probability of any state and observation using the matrices:
* $A = \{a_{i,j}\}$ - being an $N \times N$ transition matrix.
* $B = \{b_{k,j}\}$ - being an $M \times N$ 'emission' matrix.

The probabilities associated with transition and observation (emission) are:
* $a_{i,j} = p(q_j^{(t + 1)} | q_i^{(t)})$,
* $b_j(k) = p(\mathcal{O}_k^{(t)} | q_j^{(t)})$.

The model is therefore defined as a collection: $\lambda = (A, B, \pi)$.

In [2]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import pandas as pd
import copy

Here the input would be transition probability and emission probability

In [3]:
class HMM():
    
    def __init__(self, transmission_probability, emission_probability, obs=None):
        
        self.transmission_probability = transmission_probability
        self.emission_probability = emission_probability
        
        self.n = emission_probability.shape[1]
        self.m = emission_probability.shape[0]
        
        self.observations = None
        
        self.forward =[]    #alpha
        self.backward = []  #beta
        
        self.psi = []
        
        self.forward_final = [0, 0]
        self.backward_final = [0, 0]
        
        self.state_probability = []
        
        self.obs = obs
        
        self.emiss_ref = {}
    
    def assume_obs(self):
        
        #If observation labels are not given, will assume that the emission probabilities are in given order order.
    
        obs = list(set(list(self.observations)))
        obs.sort()
        for i in range(len(obs)):
            self.emiss_ref[obs[i]] = i
        return obs
    
    def train(self, observations, iterations = 10):
        
        # T no of observations
        self.observations = observations
        self.obs = self.assume_obs()
        self.psi = [[[0.0] * (len(self.observations)-1) for i in range(self.n)] for i in range(self.n)]
        self.state_initial = [[0.0] * (len(self.observations)) for i in range(self.n)]
        for i in range (iterations):
            old_transmission = self.transmission_probability.copy()
            old_emission = self.emission_probability.copy()
            
            self.expectation()
            self.maximization()
            
    def expectation(self):
               
        self.forward = self.forward_recurse(len(self.observations))
        self.backward = self.backward_recurse(0)
        self.get_state_initial()
        self.get_psi()
    
    def get_psi(self):
        
        for t in range(1, len(self.observations)):
            for j in range(self.n):
                for i in range(self.n):
                    self.psi[i][j][t-1] = self.calculate_psi(t, i, j)

    def calculate_psi(self, t, i, j):
        
        # this function gives updated a_i_j
        # product of (alpha_tminus1_i)(a_i_j)(b_j)(beta_t_j) in numerator 
        # Denomenator consists of product of (alpha)(beta) over all values
        
        alpha_tminus1_i = self.forward[i][t-1]
        a_i_j = self.transmission_probability[j+1][i+1]
        beta_t_j = self.backward[j][t]
        observation = self.observations[t]
        b_j = self.emission_probability[self.emiss_ref[observation]][j]
        denom = float(self.forward[0][i] * self.backward[0][i] + self.forward[1][i] * self.backward[1][i])
        return (alpha_tminus1_i * a_i_j * beta_t_j * b_j) / denom
        
       
    def get_state_initial(self):
        
        # This gives us the update pi_i (initial state probability)
        
        self.state_initial = [[0, 0] for i in range(len(self.observations))]
        for i in range(len(self.observations)):
            self.state_initial[i][0] = (float(self.forward[0][i] * self.backward[0][i]) /
                                float(self.forward[0][i] * self.backward[0][i] +
                                self.forward[1][i] * self.backward[1][i]))
            self.state_initial[i][1] = (float(self.forward[1][i] * self.backward[1][i]) /
                                float(self.forward[0][i] * self.backward[0][i] +
                                self.forward[1][i] * self.backward[1][i]))
    
    def forward_recurse(self, index):
        
        #the below functions calculates (alpha_t) and the product of (a_i_j)(b_j)
        # here they all are multiplied so as to obtain alpha_tplus1
        # the obtained is summed over all values to obtain alpha_tplus1 
        
        # Initialization
        if index == 0:
            forward = [[0.0] * (len(self.observations)) for i in range(self.n)]
            for state in range(self.n):
                forward[state][index] = self.forward_initial(self.observations[index], state)
            return forward
        # Recursion
        else:
            forward = self.forward_recurse(index-1)
            for state in range(self.n):
                if index != len(self.observations):
                    forward[state][index] = self.forward_probability(index, forward, state)
                else:
                    # Termination
                    self.forward_final[state] = self.forward_probability(index, forward, state, final=True)
            return forward
    
    
    def forward_probability(self, index, forward, state, final=False):
        
        # Calculates the value of alpha_t.
    
        p = [0] * self.n
        for prev_state in range(self.n):
            if not final:
                # Recursion
                obs_index = self.emiss_ref[self.observations[index]]
                p[prev_state] = forward[prev_state][index-1] * self.transmission_probability[state + 1][prev_state + 1] * self.emission_probability[obs_index][state]
            else:
                # Termination
                p[prev_state] = forward[prev_state][index-1] * self.transmission_probability[self.n][prev_state + 1]
        return sum(p)
    
    def forward_initial(self, observation, state):
        
        # This computes the product of (a_i_j)(b_j) and returns the value 
        
        self.transmission_probability[state + 1][0]
        self.emission_probability[self.emiss_ref[observation]][state]
        return self.transmission_probability[state + 1][0] * self.emission_probability[self.emiss_ref[observation]][state]

    def backward_recurse(self, index):
        
        # Initialization at T
        if index == (len(self.observations) - 1):
            backward = [[0.0] * (len(self.observations)) for i in range(self.n)]
            for state in range(self.n):
                backward[state][index] = self.backward_initial(state)
            return backward
        # Recursion for T --> 0
        else:
            backward = self.backward_recurse(index+1)
            for state in range(self.n):
                if index >= 0:
                    backward[state][index] = self.backward_probability(index, backward, state)
                if index == 0:
                    self.backward_final[state] = self.backward_probability(index, backward, 0, final=True)
            return backward

    def backward_initial(self, state):
        
        #Initialization of backward probabilities.
        
        return self.transmission_probability[self.n + 1][state + 1]

    def backward_probability(self, index, backward, state, final=False):
        
        #Calculates the backward probability i.e,((a_i_j)(b_j)(Beta))
        
        p = [0] * self.n
        for j in range(self.n):
            observation = self.observations[index + 1]
            if not final:
                a = self.transmission_probability[j + 1][state + 1]
            else:
                a = self.transmission_probability[j + 1][0]
            b = self.emission_probability[self.emiss_ref[observation]][j]
            beta = backward[j][index + 1]
            p[j] = a * b * beta
        return sum(p)
    
    def get_state_probs(self):
        '''
        Calculates total probability of a given state.
        '''
        self.state_probs = [0] * self.n
        for state in range(self.n):
            summ = 0
            for row in self.state_initial:
                summ += row[state]
            self.state_probs[state] = summ
    
    def maximization(self):
        
        # this is used for computation for updated b_j
        
        self.get_state_probs()
        for i in range(self.n):
            self.transmission_probability[i+1][0] = self.state_initial[0][i]
            self.transmission_probability[-1][i+1] = self.state_initial[-1][i] / self.state_probs[i]
            for j in range(self.n):
                self.transmission_probability[j+1][i+1] = self.estimate_transmission(i, j)
            for obs in range(self.m):
                self.emission_probability[obs][i] = self.estimate_emission(i, obs)
    
    
    def estimate_transmission(self, i, j):
        return sum(self.psi[i][j]) / self.state_probs[i]

    
    def estimate_emission(self, j, observation):
       
        observation = self.obs[observation]
        ts = [i for i in range(len(self.observations)) if self.observations[i] == observation]
        for i in range(len(ts)):
            ts[i] = self.state_initial[ts[i]][j]
        return sum(ts) / self.state_probs[j]

    def likelihood(self, new_observations):
    
        #Returns the probability of a observation sequence based on current modelparameters.
        new_hmm = HMM(self.transmission_probability, self.emission_probability)
        new_hmm.observations = new_observations
        new_hmm.obs = new_hmm.assume_obs()
        forward = new_hmm.forward_recurse(len(new_observations))
        return sum(new_hmm.forward_final)
            
        
    

In [6]:

if __name__ == '__main__':

    emission = np.array([[0.7, 0], [0.2, 0.3], [0.1, 0.7]])
    transmission = np.array([ [0, 0, 0, 0], [0.5, 0.8, 0.2, 0], [0.5, 0.1, 0.7, 0], [0, 0.1, 0.1, 0]])
    observations = ['2','3','3','2','3','2','3','2','2','3','1','3','3','1','1',
                    '1','2','1','1','1','3','1','2','1','1','1','2','3','3','2',
                    '3','2','2']
    model = HMM(transmission, emission)
    model.train(observations)
    print("Model transmission probabilities is a N*N matix:\n{}".format(model.transmission_probability))
    print("---------------------------------------------------------------------------------")
    print("Model emission probabilities is a M*N matrix:\n{}".format(model.emission_probability))
    
    print("---------------------------------------------------------------------------------")
    # Probability of a new sequence
    new_seq = ['1', '2', '3']
    print("Finding likelihood for {}".format(new_seq))
    likelihood = model.likelihood(new_seq)
    print("Likelihood: {}".format(likelihood))

Model transmission probabilities is a N*N matix:
[[0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [1.44069481e-13 9.33776929e-01 7.18678407e-02 0.00000000e+00]
 [1.00000000e+00 6.62230707e-02 8.64943107e-01 0.00000000e+00]
 [0.00000000e+00 3.10801009e-14 6.31890522e-02 0.00000000e+00]]
---------------------------------------------------------------------------------
Model emission probabilities is a M*N matrix:
[[0.64048542 0.        ]
 [0.14806851 0.5343899 ]
 [0.21144608 0.4656101 ]]
---------------------------------------------------------------------------------
Finding likelihood for ['1', '2', '3']
Likelihood: 3.2956914388507033e-15
