# **LAB 10 : Hidden Markov Model**

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

Please refer to the following [article](http://www.adeveloperdiary.com/data-science/machine-learning/introduction-to-hidden-markov-model/) to understand Hidden Markov Model

Here we will be dealing with 3 major problems : 
  
  1. Evaluation Problem
  2. Learning Problem
  3. Decoding Problem

1. Evaluation Problem : Implementation of Forward and Backward Algorithm

# HMM

- Mathematically we can say, the probability of the state at time t will only depend on time step t-1

- The probability of one state changing to another state is defined as Transition Probability.

-  When the system is fully observable and autonomous it’s called as Markov Chain. 

## Markov chain

- A set of M states.
- A transition probability matrix A.
- An initial probability distribution π.

## Transitions
<p align=left>
<a>
<img src="imgs/Transitions.png" width=400 height=400>
</a>
</p>


## Emissions
<p align=left>
<a>
<img src="imgs/emission.jpg" width=400 height=400>
</a>
</p>


In [2]:
data = pd.read_csv('data_python.csv') ## Read the data, change the path accordingly

V = data['Visible'].values

# Transition Probabilities
a = np.array(((0.54, 0.46), (0.49, 0.51)))

# Emission Probabilities
b = np.array(((0.16, 0.26, 0.58), (0.25, 0.28, 0.47)))

# Equal Probabilities for the initial distribution
initial_distribution = np.array((0.5, 0.5))


def forward(V, a, b, initial_distribution):
    alpha = np.zeros((V.shape[0], a.shape[0]))
    
    ## Write your code here
    alpha[0, :] = initial_distribution * b[:, V[0]]
 
    for t in range(1, V.shape[0]):
        for j in range(a.shape[0]):
            alpha[t, j] = alpha[t - 1].dot(a[:, j]) * b[j, V[t]]
 
    return alpha


alpha = forward(V, a, b, initial_distribution)


def backward(V, a, b):
    beta = np.zeros((V.shape[0], a.shape[0]))

    ## Write your code here
    beta[V.shape[0] - 1] = np.ones((a.shape[0]))
 
    
    for t in range(V.shape[0] - 2, -1, -1):
        for j in range(a.shape[0]):
            beta[t, j] = (beta[t + 1] * b[:, V[t + 1]]).dot(a[j, :])
 
    return beta


beta = backward(V, a, b)

# Model and Training in HMM

- The model is $\theta \rightarrow s, v, a_{ij},b_{jk}$

- There could be many $\{ \theta_1, \theta_2 … \theta_n \}$ we need to find the best based on $p(\theta | V^T  ) = \frac{p(V^T | \theta) p(\theta)}{p(V^T)}$

## Training

- Once the high-level structure (Number of Hidden & Visible States) of the model is defined, we need to estimate the Transition ($a_{ij}$) & Emission ($b_{jk}$) Probabilities using the training sequences.

- Expectation Maximization (EM) algorithm will be used to estimate the Transition ($a_{ij}$) & Emission ($b_{jk}$) Probabilities

## Forward Alg.

- In Forward Algorithm, we will use the computed probability on current time step to derive the probability of the next time step.

- $\alpha_j(t) = p(v(1)…v(t),s(t)= j)$

## Backward Alg.

- Backward Algorithm is the time-reversed version of the Forward Algorithm. In Backward Algorithm we need to find the probability that the machine will be in hidden state ($s_{i}$)
 at time step t and will generate the remaining part of the sequence of the visible symbol ($V^{T}$).


2. Learning Problem : Implementation of Baum Welch Algorithm

# EM Alg. / baum_welch

- initialize A and B

- iterate until convergence
    - E-Step
        - $\xi_{ij} (t) = \frac{\alpha_i(t) a_{ij} b_{jk \text{ } v(t+1) }\beta_j(t+1)}{\sum_{i=1}^{M} \sum_{j=1}^{M} \alpha_i(t) a_{ij} b_{jk \text{ } v(t+1) }\beta_j(t+1)}$

        - $\gamma_i(t) = \sum_{j=1}^M \xi_{ij}(t)$

    
    - M-step
        - $\hat{a_{ij}} = \frac{\sum_{t=1}^{T-1} \xi_{ij} (t)}{\sum_{t=1}^{T-1} \sum_{j=1}^{M} \xi_{ij} (t)}$

        - $\hat{b_{jk}} = \frac{\sum_{t=1}^T \gamma_j(t) 1(v(t)=k)}{\sum_{t=1}^T \gamma_j(t) }$

- return A, B

Here:

- $\xi$ is expectation of how often each transition is used.

- $\gamma$ is expectation of how often each hidden state is emmited.

- $\xi_{ij} (t)$ = $p(s(t) = i,s(t+1)=j | V^T, \theta )$

- $\gamma_i(t) = \sum_{j=1}^M \xi_{ij}(t)$

In [3]:
def baum_welch(V, a, b, initial_distribution, n_iter=100):
    M = a.shape[0]
    T = len(V)
 
    for n in range(n_iter):
        alpha = forward(V, a, b, initial_distribution)
        beta = backward(V, a, b)
 
        xi = np.zeros((M, M, T - 1))
        for t in range(T - 1):
            denominator = np.dot(np.dot(alpha[t, :].T, a) * b[:, V[t + 1]].T, beta[t + 1, :])
            for i in range(M):
                numerator = alpha[t, i] * a[i, :] * b[:, V[t + 1]].T * beta[t + 1, :].T
                xi[i, :, t] = numerator / denominator
 
        gamma = np.sum(xi, axis=1)
        a = np.sum(xi, 2) / np.sum(gamma, axis=1).reshape((-1, 1))
 
        
        gamma = np.hstack((gamma, np.sum(xi[:, :, T - 2], axis=0).reshape((-1, 1))))
 
        K = b.shape[1]
        denominator = np.sum(gamma, axis=1)
        for l in range(K):
            b[:, l] = np.sum(gamma[:, V == l], axis=1)
 
        b = np.divide(b, denominator.reshape((-1, 1)))

    return (a,b)

data = pd.read_csv('data_python.csv')

V = data['Visible'].values

# Transition Probabilities
a = np.ones((2, 2))
a = a / np.sum(a, axis=1)

# Emission Probabilities
b = np.array(((1, 3, 5), (2, 4, 6)))
b = b / np.sum(b, axis=1).reshape((-1, 1))

# Equal Probabilities for the initial distribution
initial_distribution = np.array((0.5, 0.5))

a,b = baum_welch(V, a, b, initial_distribution, n_iter=100)


# Decoding the problem

Finally, once we have the estimates for Transition ($a_{ij}$) & Emission ($b_{jk}$) Probabilities, we can then use the model ($\theta$) to predict the Hidden States ($W^{T}$) which generated the Visible Sequence ($V^{T}$). The Decoding Problem is also known as Viterbi Algorithm.

3. Decoding Problem : Implementation of Viterbi Algorithm

# Decoding Problem

- Given a sequence of visible symbol $V^{T}$ and model $\theta \rightarrow \{ A, B \}$ find most probable hidden states.

- Brute force $O(N^{T} . T)$

- Viterbi Algorithm 

# Viterbi Algorithm

-  We need to find the most probable hidden state in every iteration of t

- Once we complete the above steps for all the observations, we will first find the last hidden state by maximum likelihood, then using backpointer to backtrack the most likely hidden path.


In [4]:
def viterbi(V, a, b, initial_distribution):
    T = V.shape[0]
    M = a.shape[0]
 
    omega = np.zeros((T, M))
    omega[0, :] = np.log(initial_distribution * b[:, V[0]])
 
    prev = np.zeros((T - 1, M))
 
    for t in range(1, T):
        for j in range(M):
            # Same as Forward Probability
            probability = omega[t - 1] + np.log(a[:, j]) + np.log(b[j, V[t]])
 
            # This is our most probable state given previous state at time t (1)
            prev[t - 1, j] = np.argmax(probability)
 
            # This is the probability of the most probable state (2)
            omega[t, j] = np.max(probability)
 
    # Path Array
    S = np.zeros(T)
 
    # Find the most probable last hidden state
    last_state = np.argmax(omega[T - 1, :])
 
    S[0] = last_state
 
    backtrack_index = 1
    for i in range(T - 2, -1, -1):
        S[backtrack_index] = prev[i, int(last_state)]
        last_state = prev[i, int(last_state)]
        backtrack_index += 1
 
    # Flip the path array since we were backtracking
    S = np.flip(S, axis=0)
 
    # Convert numeric values to actual hidden states
    result = []
    for s in S:
        if s == 0:
            result.append("A")
        else:
            result.append("B")
  ## Write your code here

    return result


data = pd.read_csv('data_python.csv')

V = data['Visible'].values

# Transition Probabilities
a = np.ones((2, 2))
a = a / np.sum(a, axis=1)

# Emission Probabilities
b = np.array(((1, 3, 5), (2, 4, 6)))
b = b / np.sum(b, axis=1).reshape((-1, 1))

# Equal Probabilities for the initial distribution
initial_distribution = np.array((0.5, 0.5))

a, b = baum_welch(V, a, b, initial_distribution, n_iter=100)

result = viterbi(V, a, b, initial_distribution)

print("Result is :")
print(result[:10])


Result is :
['B', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A']


4. Use the built-in **hmmlearn** package to fit the data and generate the result using the decoder

In [5]:
%pip install hmmlearn

Defaulting to user installation because normal site-packages is not writeable
distutils: /home/abhishekj/.local/lib/python3.9/site-packages
sysconfig: /home/abhishekj/.local/lib64/python3.9/site-packages[0m
user = True
home = None
root = None
prefix = None[0m
You should consider upgrading via the '/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [6]:
## Write your code here
from hmmlearn import hmm
import numpy as np
import math

data = pd.read_csv('data_python.csv')

V = data['Visible'].values

In [7]:
print(V.shape)
V_reshaped = np.array(V.reshape(-1,1)).T
print(V_reshaped.shape)

(500,)
(1, 500)


In [12]:
model = hmm.MultinomialHMM(n_components=2)

model.startprob_ = np.array([0.5, 0.5])
model.transmat_ = np.array([[0.5, 0.5],
                            [0.5, 0.5]])
model.emissionprob_ = np.array([[0.11111111, 0.33333333, 0.55555556],
                                [0.16666667, 0.33333333 ,0.5]])

logprob, sequence = model.decode(V_reshaped)
out = []
for i in sequence:
  if i == 1:
    i = "A"
  else:
    i = "B"
  out.append(i)
print(out)

['A', 'A', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'A', 'B', 'B', 'B', 'B', 'A', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'B', 'A', 'B', 'B', 'B', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'A', 'B', 'B', 'B', 'A', 'B', 'A', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'B', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'B', 'B', 'A', 'B', 'B', 'A', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'B', 'B', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'A', 'B', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'B', 'A', 'B', 'B', 'B', 'B', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'A', 'B', 'B', 'B', 'B', 'B', 'A', 'B', 'A', 'B',

In [13]:
Real = data["Hidden"].values

count = 0
for i in range(Real.shape[0]):
    if Real[i] == out[i]:
        count+=1

print(count)

339
