# **LAB 10 : Hidden Markov Model**

## Terms
- <b>State Transition Probabilities</b> determine the probabilities of each future state, given the current state.
- <b>Observation Symbol (Emission) Probabilities</b> determine the probabilities of observations emitted by the current state of the system.


In [2]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

Please refer to the following [article](http://www.adeveloperdiary.com/data-science/machine-learning/introduction-to-hidden-markov-model/) to understand Hidden Markov Model

Here we will be dealing with 3 major problems : 
  
  1. Evaluation Problem
  2. Learning Problem
  3. Decoding Problem

### 1. Evaluation Problem : Implementation of Forward and Backward Algorithm

Goal: 
    $$P(V^{T}|\lambda)$$

The forward variable $\alpha_{t}(i)$ is defined as the probability of the partial observation sequence $v_{1}$,$v_{2}$,...,$v_{t}$, when it terminates at the state i. Mathematically, 
\begin{align*}
\alpha_{t}(i) & = p(v_{1},v_{2},...,v_{t}, q_{t} = i | \lambda) \\ 
\alpha_{t+1}(i) & = b_{j}(v_{t+1}) \sum_{i=1}^{N} \alpha_{t}(i)a_{ij} ~~~~~~~~~~~   ,1\leq j \leq N, 1 \leq t \leq T-1 \\
\alpha_{1}(j) & = \pi_{j}b_{j}(v_{1})   ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ,1\leq j \leq N
\end{align*}

In [3]:
data = pd.read_csv('data_python.csv') ## Read the data, change the path accordingly
print(data)
V = data['Visible'].values # get only observable data


# Transition Probabilities
a = np.array(((0.54, 0.46), (0.49, 0.51)))                  # a: N x N = 2 x 2

# Emission Probabilities
b = np.array(((0.16, 0.26, 0.58),
              (0.25, 0.28, 0.47)))                          # b: N x M = 2 x 3

# Equal Probabilities for the initial distribution
initial_distribution = np.array((0.5, 0.5))                 # pi: 1 x N = 1 x 2 ; this is pi


def forward(V, a, b, initial_distribution):
    T = V.shape[0]
    N = a.shape[0]
    alpha = np.zeros((T, N))                                # T = num of obs, N = num of states
    alpha[0, :] = initial_distribution * b[:, V[0]]         # base case
    for t in range(1, T):
        for j in range(N):
            alpha[t, j] = np.dot(alpha[t-1, :], a[:,j]) * b[j, V[t]] # recursively get alpha for time t until t=T-1
    # print(alpha)
    return alpha

alpha = forward(V, a, b, initial_distribution)


    Hidden  Visible
0        B        0
1        B        1
2        B        2
3        B        2
4        B        2
..     ...      ...
495      A        1
496      A        2
497      B        2
498      A        1
499      A        2

[500 rows x 2 columns]


In a similar way we can define the backward variable $\beta_{t}(i)$ as the probability of the partial observation sequence $v_{t+1}$,$v_{t+2}$,...,$v_{T}$ , given that the current state is i. Mathematically,

\begin{align*}
\beta_{t}(i) & = p(v_{t+1},v_{t+2},...,v_{T} | q_{t} = i, \lambda) \\ 
\beta_{t}(i) & = \sum_{j=1}^{N} \beta_{t+1}(j) a_{ij} b_{j}(v_{t+1})~~~~~~~~~~~~~~~,1\leq i \leq N, 1 \leq t \leq T-1 \\
\beta_{T}(i) & = 1~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~,1\leq i \leq N
\end{align*}



In [4]:
def backward(V, a, b):
    T = V.shape[0]
    N = a.shape[0]
    beta = np.zeros((T, N))
    beta[T-1, :] = np.ones((N))         # set last row to ones
    for t in range(T-2, -1, -1):        # T-1 to 0 (non-incl), steps of -1
        for i in range(a.shape[0]):
            beta[t, i] = np.dot(beta[t+1]*b[:, V[t+1]], a[i, :])
    return beta

beta = backward(V, a, b)

### 2. Learning Problem : Implementation of Baum Welch Algorithm

Goal: ${\lambda , B, \pi}$ in order to maximize $P(V^{T}|\lambda)$

Let us define a new auxiliary variable $\xi_{t}(i,j)$ as the probability of being in state i at t=t and in state j at t=t+1. Then,

\begin{align*}
\xi_{t}(i,j) & = p(q_{t} = i, q_{t+1} = j | V^{T}, \lambda) \\
 & = \frac{p(q_{t} = i, q_{t+1} = j, V^{T} | \lambda)}{p(V^{T}|\lambda)} \\
 & = \frac{\alpha_{t}(i)a_{ij}\beta_{t+1}(j)b_{j}(v_{t+1})}{\sum_{i=1}^{N} \sum_{j=1}^{N} \alpha_{t}(i)a_{ij}\beta_{t+1}(j)b_{j}(v_{t+1})}
\end{align*}

Also, let us define $\gamma_{t}(i)$ as the a poteriori probability, that is the probability of being in state i at t=t, given the observation sequence and the model.

\begin{align*}
\gamma_{t}(i) & = p(q_{t} | V^{T}, \lambda) \\
& = \frac{\alpha_{t}(i)\beta_{t}(i)}{\sum_{i=1}^{N} \alpha_{t}(i)\beta_{t}(i)}
\end{align*}

Also, 
$$ \gamma_{t}(i) = \sum_{j=1}^{N} \xi_{t}(i,j)  ~~~~~~~, 1\leq j \leq N $$

Our goal:
\begin{align*}
\overline{a}_{ij} &= \frac{\text{expected number of transitions from hidden state i to state j}}{\text{expected number of transition from hidden state i}} \\
\overline{b}_{j}(k) &= \frac{\text{expected number of times in hidden state j and observing v(k)}}{\text{expected number of times in hidden state j}}
\end{align*}

Re-estimations are:
\begin{align*}
\overline{a}_{ij} &= \frac{\sum_{t=1}^{T-1} \xi_{t}(i, j)}{\sum_{t=1}^{T-1} \gamma_{t}(i)} ~~~~~, 1\leq i \leq N, 1\leq j \leq N \\
\overline{b}_{j}(k) &= \frac{\sum_{t=1}^{T} \gamma_{t}(j) 1(v_{t} == k)}{\sum_{t=1}^{T} \gamma_{t}(j)} ~~~~~, 1\leq j \leq N, 1\leq k \leq M
\end{align*}

In [5]:
def baum_welch(V, a, b, initial_distribution, n_iter=100):
    T = V.shape[0]
    N = a.shape[0]
    M = b.shape[1]
    
    for it in range(n_iter):
        alpha = forward(V, a, b, initial_distribution)  # 500 x 2
        beta = backward(V, a, b)                        # 500 x 2
        
        xi = np.zeros((N, N, T-1))
        
        for t in range(T-1):
            d = np.dot(np.dot(alpha[t,:].T, a) * b[:,V[t + 1]].T, beta[t + 1, :]) # get the overall sum using dot
            for i in range(N):
                n = alpha[t, i] * a[i,:] * b[:, V[t + 1]].T * beta[t + 1, :].T    # just the curr term
                xi[i,:,t] = n / d

        gamma = np.sum(xi, axis=1)                                      # gamma is sum of xi along j's (axis=1)

        a = np.sum(xi, axis=2) / np.sum(gamma, axis=1).reshape((-1, 1)) # get re-estimated transition probabilities

        gamma = np.hstack((gamma, np.sum(xi[:, :, T - 2], axis=0).reshape((-1, 1)))) 
        
        d = np.sum(gamma, axis=1)
        for l in range(M):
            b[:, l] = np.sum(gamma[:, V == l], axis=1)
            
        b = (b/d.reshape((-1, 1)))


    return (a, b)

In [6]:

data = pd.read_csv('data_python.csv')

V = data['Visible'].values

# Transition Probabilities
a = np.ones((2, 2))
a = a / np.sum(a, axis=1)

# Emission Probabilities
b = np.array(((1, 3, 5), (2, 4, 6)))
b = b / np.sum(b, axis=1).reshape((-1, 1))

# Equal Probabilities for the initial distribution
initial_distribution = np.array((0.5, 0.5))

a,b = baum_welch(V, a, b, initial_distribution, n_iter=100)
print("a = ",a)
print("b = ",b)


a =  [[0.53816345 0.46183655]
 [0.48664443 0.51335557]]
b =  [[0.16277513 0.26258073 0.57464414]
 [0.2514996  0.27780971 0.47069069]]


### 3. Decoding Problem : Implementation of Viterbi Algorithm

Goal: Most likely state sequence $V^{T}$ for a given $\lambda$ 

Let us define an auxiliary variable $\delta_{t}(i)$, which is the highest probability that partial observation sequence and state sequence up to t=t can have, when the current state is i. Then,

\begin{align*}

\delta_{t}(i) & = \max_{q_{1}, q_{2}, ..., q_{t-1}} p(q_{1}, q_{2}, ..., q_{t-1}, q_{t} = i , v_{1}, v_{2}, ... , v_{t-1} | \lambda) \\
\delta_{t+1}(i) & = b_{j}(v_{t+1}) {{\Large[}\max_{1 \leq i \leq N} \delta_{t}(i)a_{ij}{\Large]}} \\
\delta_{1}(j) & = \pi_{j} b_{j}(v_{1})

\end{align*}

In [7]:
def viterbi(V, a, b, initial_distribution):
    T = V.shape[0]
    N = a.shape[0]
    delta = np.zeros((T, N))

    delta[0, :] = np.log(initial_distribution * b[:, V[0]])
    prev = np.zeros((T - 1, N))

    for t in range(1, T):
        for i in range(N):
            p = delta[t - 1] + np.log(a[:, i]) + np.log(b[i, V[t]]) # log of the forward probability to avoid underflow
            prev[t - 1, i] = np.argmax(p)                           # find the state that maximizes delta_i_t. get index,value
            delta[t, i] = np.max(p)

    seq = np.zeros(T)
    last = np.argmax(delta[T - 1, :])       # get argmax of probs at T-1
    seq[0] = last

    ind = 1
    for i in range(T - 2, -1, -1):
        seq[ind] = prev[i, int(last)]
        last = prev[i, int(last)]
        ind += 1

    seq = np.flip(seq, axis=0)              # flip since backtracking

    result = []
    for ele in seq:
        if ele == 0:
            result.append("A")
        else:
            result.append("B")
    return result

In [8]:
data = pd.read_csv('data_python.csv')

V = data['Visible'].values

# Transition Probabilities
a = np.ones((2, 2))
a = a / np.sum(a, axis=1)

# Emission Probabilities
b = np.array(((1, 3, 5), (2, 4, 6)))
b = b / np.sum(b, axis=1).reshape((-1, 1))

# Equal Probabilities for the initial distribution
initial_distribution = np.array((0.5, 0.5))

a, b = baum_welch(V, a, b, initial_distribution, n_iter=100)

result = viterbi(V, a, b, initial_distribution)

print(result)


['B', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'A', 'A', 'A', 'B', 'A', 'B', 'B', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'B', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'A', 'B', 'B', 'B', 'B', 'A', 'B', 'B', 'A', 'B', 'B', 'B', 'A', 'B', 'B', 'B', 'A', 'B', 'B', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'B', 'B', 'A', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'A', 'B', 'A', 'A', 'B', 'B', 'B', 'B', 'B', 'A', 'A', 'B', 'A', 'B', 'B', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A',

4. Use the built-in **hmmlearn** package to fit the data and generate the result using the decoder

In [9]:
%pip install hmmlearn==0.2.7

Note: you may need to restart the kernel to use updated packages.


You should consider upgrading via the 'c:\Users\bsidd\AppData\Local\Programs\Python\Python39\python.exe -m pip install --upgrade pip' command.


In [10]:
## Write your code here
from hmmlearn import hmm
import numpy as np
import math

data = pd.read_csv('data_python.csv')
V = data['Visible'].values

model = hmm.MultinomialHMM(n_components=2)
model.startprob_ = np.array([0.5, 0.5])
model.transmat_ = np.array([[0.5, 0.5],
                            [0.5, 0.5]])
model.emissionprob_ = np.array([[0.1, 0.3, 0.6],
                                [0.2, 0.3 ,0.5]])

_, sequence = model.decode((np.array(V).reshape(-1,1)).transpose())

In [11]:
out = []
for i in sequence:
    if i == 1:
        i = "B"
    else:
        i = "A"
    out.append(i)
print(out)

['B', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'B', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'B', 'B', 'B', 'B', 'A', 'B', 'A', 'B', 'B', 'B', 'B', 'A', 'B', 'B', 'B', 'B', 'A', 'B', 'B', 'A', 'B', 'B', 'B', 'A', 'B', 'B', 'B', 'A', 'B', 'B', 'B', 'A', 'B', 'A', 'B', 'B', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'B', 'B', 'A', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'B', 'B', 'A', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'A', 'B', 'A', 'A', 'B', 'B', 'B', 'B', 'B', 'A', 'A', 'B', 'A', 'B', 'B', 'B', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'B', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'B', 'A',