# Basic HMM Implementation
This is the python version of the pseudo-code from Section 7 of [A Revealing Introduction to Hidden Markov Models](https://www.cs.sjsu.edu/~stamp/RUA/HMM.pdf) by Mark Stamp. It consists of lots of experimentation, with a fully functional HMM class at the end. Woot!

In [2]:
#activate venv
!source ../.venv/bin/activate

In [3]:
import numpy as np
import pandas as pd
import math
from matplotlib import pyplot as plt

In [4]:
A = np.array([[0.7, 0.3], [0.4, 0.6]])
B = np.array([[0.1,0.4,0.5], [0.7,0.2,0.1]])
pi = np.array([[0.0,1.0]])
seq = np.array([1,0,2])

N = np.shape(B)[0]
M = np.shape(B)[1]
T = np.shape(seq)[0]

alpha = np.zeros([T,N])

In [5]:
A

array([[0.7, 0.3],
       [0.4, 0.6]])

In [6]:
def alpha_pass(A,B,pi,seq):
    """Alpha pass is a standalone forward pass function. It takes in A,B,pi,seq and outputs the alphas and 
    by recursively summing the probabilities of the partial observation sequences up to times t, until the final observation.
    The values of alpha are scaled along the way to prevent underflow."""

    #Gets shapes N (number of states), M (number of possible observations), and T (number of observations in the observation sequence seq)
    N = np.shape(A)[0]
    M = np.shape(B)[1]
    T = np.shape(seq)[0]
    
    #initialize an empty alpha matrix scaling vector
    alpha = np.zeros([T,N])
    scale = np.zeros([1,T])
    
    #computes alpha[0] (the first row of alpha) and c0 (initial scaler)
    c0 = 0
    for i in range(N):
        alpha[0][i] = pi[0][i] * B[i][seq[0]]
        c0 += alpha[0][i]
    scale[0][0] = c0
    
    
    #scale alpha[0]
    c0 = 1 / c0
    for i in range(N):
        alpha[0][i] = c0 * alpha[0][i]

    #computes and scales alpha[t][i], as entries in an alpha matrix
    for t in range(1,T):
        c_t = 0
        for i in range(N):
            ati = 0 #place holder for summing
            for j in range(N):
                ati += alpha[t-1][j] * A[j,i]
            alpha[t][i] = ati * B[i][seq[t]]
            c_t += alpha[t][i]
            scale[0][t] = c_t
        #scale alpha[t][i]
        c_t = 1 / c_t
        for i in range(N):
            alpha[t][i] = c_t * alpha[t][i]
    
    return alpha, scale.reshape([T,1])

In [7]:
alpha,c = alpha_pass(A,B,pi,seq)
print(alpha)
print(c)

[[0.         1.        ]
 [0.08695652 0.91304348]
 [0.78778135 0.21221865]]
[[0.2       ]
 [0.46      ]
 [0.27043478]]


In [8]:
print(alpha[-1].sum())

1.0


In [9]:
L = list(range(-T,-1))[::-1]
print(L)

[-2, -3]


In [10]:
def beta_pass(A,B,seq,c):
    """Beta pass is a standalone backward pass function. It takes in A, B, observation sequence, and scalar and outputs the betas
    by recursively summing the probabilities of the partial observation sequences backward from the final observation to a time t, until the initial observation.
    The values of beta are scaled (by c, determined by the alpha pass) along the way to prevent underflow."""

    #Gets shapes N (number of states), M (number of possible observations), and T (number of observations in the observation sequence seq)
    N = np.shape(A)[0]
    M = np.shape(B)[1]
    T = np.shape(seq)[0]
    
    #initialize an empty beta matrix
    beta = np.zeros([T,N])

    #let beta[T-1][i] = 1, scaled by c[T-1]
    for i in range(N):
        beta[T-1][i] = c[T-1][0]
    
    #computes and scales beta[t][i] as entries in the beta matrix. Goes from bottom to top.
    for t in list(range(-T,-1))[::-1]: #this indexes beta bottom-to-top, skipping the bottom row (T-1)
        for i in range(N):
            beta[t][i] = 0
            for j in range(N):
                beta[t][i] += A[i,j] * B[j][seq[t+1]] * beta[t+1][j]
            #scale beta[t][i]
            beta[t][i] = c[t][0] * beta[t][i]
    
    return beta

In [11]:
beta = beta_pass(A,B,seq,c)
beta

array([[0.00202026, 0.00309507],
       [0.047272  , 0.032344  ],
       [0.27043478, 0.27043478]])

In [12]:
def gammas(A,B,seq,alpha,beta):
    """Gammas calculates the gamma and digamma. It takes in A, B, observation sequence, and scalars and outputs the gamma[i]'s (probability of being in state q_i at time t)
    and digamma[i][j]'s (probability of being in state q_i at time t and state q_j at time t+1).
    The values of alpha and beta are scaled (by c, determined by the alpha pass) along the way to prevent underflow, so the gamma and digamma are not scaled."""
    
    #Gets shapes N (number of states), M (number of possible observations), and T (number of observations in the observation sequence seq)
    N = np.shape(A)[0]
    M = np.shape(B)[1]
    T = np.shape(seq)[0]

    #initialize empty gamma and digamma
    gamma = np.zeros([T,N])
    digamma = np.zeros([T,N,N])

    for t in range(T-1):
        for i in range (N):
            gamma[t][i] = 0
            for j in range(N):
                digamma[t][i][j] = alpha[t][i] * A[i][j] * B[j][seq[t+1]] * beta[t+1][j]
                gamma[t][i] += digamma[t][i][j]
        
    #special case for gamma[i][T-1]
    for i in range(N):
        gamma[T-1][i] = alpha[T-1][i]

    return gamma, digamma

In [13]:
gamma, digamma = gammas(A,B,seq,alpha,beta)
print(gamma)
print(digamma) #digamma is zero for obs T-1 because it doesn't know anything about the next state/observation

[[0.         0.01547536]
 [0.00893611 0.06419887]
 [0.78778135 0.21221865]]
[[[0.         0.        ]
  [0.00189088 0.01358448]]

 [[0.00823062 0.00070548]
  [0.04938374 0.01481512]]

 [[0.         0.        ]
  [0.         0.        ]]]


In [14]:
class HMM:
    def __init__(self,N,M,seq):
        #init observation sequence and dimensions
        self.seq = seq
        self.N = N
        self.M = M
        self.T = len(seq)

        #Init transition, emission, and intitializaiton matrices
        self.A = np.random.dirichlet(np.ones(N), size=N)
        self.B = np.random.dirichlet(np.ones(M), size=N)
        self.pi = np.random.dirichlet(np.ones(N), size=1)

        #iteration variables
        self.old_logprob = -np.inf
        self.alpha = np.zeros([self.T,self.N])
        self.scale = np.zeros([self.T,1])
        self.beta = np.zeros([self.T,self.N])
        self.gamma = np.zeros([self.T,self.N])
        self.digamma = np.zeros([self.T,self.N,self.N])

        #for plotting
        self.logprobs = []

    def alpha_pass(self):
        """Alpha pass is a standalone forward pass function. It takes in A,B,pi,seq and outputs the alphas and scalar 
        by recursively summing the probabilities of the partial observation sequences up to times t, until the final observation.
        The values of alpha are scaled along the way to prevent underflow."""
        
        #computes alpha[0] (the first row of alpha) and c0 (initial scaler)
        c0 = 0
        for i in range(self.N):
            self.alpha[0][i] = self.pi[0][i] * self.B[i][self.seq[0]]
            c0 += self.alpha[0][i]
        self.scale[0][0] = 1/c0
        
        #scale alpha[0]
        for i in range(self.N):
            self.alpha[0][i] = self.scale[0][0] * self.alpha[0][i]

        #computes and scales alpha[t][i], as entries in an alpha matrix
        for t in range(1,self.T):
            c_t = 0
            for i in range(self.N):
                self.alpha[t][i] = 0 
                for j in range(self.N):
                    self.alpha[t][i] += self.alpha[t-1][j] * self.A[j,i]
                self.alpha[t][i] = self.alpha[t][i] * self.B[i][self.seq[t]]
                c_t += self.alpha[t][i]
            self.scale[t][0] = 1/c_t
            #scale alpha[t][i]
            for i in range(self.N):
                self.alpha[t][i] = self.scale[t][0] * self.alpha[t][i]

    def beta_pass(self):
        """Beta pass is a standalone backward pass function. It takes in A, B, observation sequence, and scalar and outputs the betas
        by recursively summing the probabilities of the partial observation sequences backward from the final observation to a time t, until the initial observation.
        The values of beta are scaled (by c, determined by the alpha pass) along the way to prevent underflow."""

        #let beta[T-1][i] = 1, scaled by c[T-1]
        for i in range(self.N):
            self.beta[self.T-1][i] = self.scale[self.T-1][0]
        
        #computes and scales beta[t][i] as entries in the beta matrix. Goes from bottom to top.
        for t in list(range(-1*self.T,-1))[::-1]: #this indexes beta bottom-to-top, skipping the bottom row (T-1)
            for i in range(self.N):
                self.beta[t][i] = 0
                for j in range(self.N):
                    self.beta[t][i] += self.A[i,j] * self.B[j][self.seq[t+1]] * self.beta[t+1][j]
                #scale beta[t][i]
                self.beta[t][i] = self.scale[t][0] * self.beta[t][i]

    def gamma_pass(self):
        """Gammas calculates the gamma and digamma. It takes in A, B, observation sequence, and scalars and outputs the gamma[i]'s (probability of being in state q_i at time t)
        and digamma[i][j]'s (probability of being in state q_i at time t and state q_j at time t+1).
        The values of alpha and beta are scaled (by c, determined by the alpha pass) along the way to prevent underflow, so the gamma and digamma are not scaled."""

        for t in range(self.T-1):
            for i in range (self.N):
                self.gamma[t][i] = 0
                for j in range(self.N):
                    self.digamma[t][i][j] = self.alpha[t][i] * self.A[i][j] * self.B[j][self.seq[t+1]] * self.beta[t+1][j]
                    self.gamma[t][i] += self.digamma[t][i][j]
            
        #special case for gamma[i][T-1]
        for i in range(self.N):
            self.gamma[self.T-1][i] = self.alpha[self.T-1][i]
    
    def get_logprob(self):
        logprob = 0
        for t in range(self.T):
            logprob += math.log(self.scale[t][0])
        return -logprob
    
    def restimate(self):
        
        #restimate pi
        for i in range(self.N):
            self.pi[0][i] = self.gamma[0][i]

        #restimate A
        for i in range(self.N):
            denom = 0
            for t in range(self.T-1):
                denom += self.gamma[t][i]
            for j in range(self.N):
                numer = 0
                for t in range(self.T-1):
                    numer += self.digamma[t][i][j]
                self.A[i][j] = numer / denom
        
        #restimate B
        for i in range(self.N):
            denom = 0
            for t in range(self.T):
                denom += self.gamma[t][i]
            for j in range(self.M):
                numer = 0
                for t in range(self.T):
                    if self.seq[t] == j:
                        numer += self.gamma[t][i]
                self.B[i][j] = numer / denom

    def train(self, max_iters, min_climb):
        for i in range(max_iters+1):
            if i == 0:
                self.alpha_pass()
                self.beta_pass()
                self.gamma_pass()
                self.restimate()
            elif (i < max_iters+1) and (self.get_logprob() - self.old_logprob > min_climb):
                    self.logprobs.append(self.get_logprob())
                    self.alpha_pass()
                    self.beta_pass()
                    self.gamma_pass()
                    self.restimate()
            else:
                print(f"Train loop complete on iteration {i}. Log Probability: {self.get_logprob()}")
                break

In [15]:
O = np.array([1,0,2])
hmm = HMM(N=2,M=3,seq=O)
hmm.train(1000,10)

In [16]:
hmm.logprobs

[-3.268410303773077,
 -2.509016364810399,
 -1.7775883983423137,
 -1.4264546689896558,
 -1.3866972512996814,
 -1.386294401686933,
 -1.3862943611198915,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.3862943611198906,
 -1.386294361