In [1]:
import numpy as np
from scipy import linalg

In [2]:
class Hidden_Markov_Model:
    
    def __init__(self,n_states,A,B,labels,initial_distribution=None):
        self.n = n_states
        self.A = A
        self.B = B
        self.labels = labels
        self.classifiers = np.arange(len(self.labels),dtype=int)
        self.dictionary = {}
        if initial_distribution is None:
            self.initial_distribution = np.linalg.matrix_power(self.A,2000)[0]
        else:
            self.intial_distribution = initial_distribution
        
        for c,l in zip(self.classifiers,self.labels):
            self.dictionary[c] = l
            
        print("Please follow the labels according to the matrix A")
    
    def forward_backward(self,observations):
        
        alpha = np.zeros((observations.shape[0], self.A.shape[0]))
        beta = np.zeros((observations.shape[0], self.A.shape[0]))
        
        alpha[0, :] = self.initial_distribution * self.B[:, observations[0]]
        beta[observations.shape[0] - 1] = np.ones((self.A.shape[0]))

        for t in range(1, observations.shape[0]):
            for j in range(self.A.shape[0]):
                alpha[t, j] = alpha[t - 1].dot(self.A[:, j]) * self.B[j, observations[t]]
                
        for t in range(observations.shape[0] - 2, -1, -1):
            for j in range(self.A.shape[0]):
                beta[t, j] = np.dot((beta[t + 1] * self.B[:, observations[t + 1]]),self.A[j, :])


        return alpha,beta,np.sum(alpha[-1])
    
    @staticmethod
    def forward_algorithm(observations,A,B,initial_distribution):
        alpha = np.zeros((observations.shape[0], A.shape[0]))
        alpha[0, :] = initial_distribution * B[:, observations[0]]

        for t in range(1, observations.shape[0]):
            for j in range(A.shape[0]):
                alpha[t, j] = alpha[t - 1].dot(A[:, j]) * B[j, observations[t]]

        return alpha
    
    @staticmethod
    def backward_algorithm(observations,A,B):
        beta = np.zeros((observations.shape[0], A.shape[0]))
        beta[observations.shape[0] - 1] = np.ones((A.shape[0]))

        for t in range(observations.shape[0] - 2, -1, -1):
            for j in range(A.shape[0]):
                beta[t, j] = (beta[t + 1] * B[:, observations[t + 1]]).dot(A[j, :])

        return beta

        
    
    def viterbi(self,observations):
        
        T = int(observations.shape[0])
        M = int(self.A.shape[0])

        gamma = np.zeros((T,M))
        psi = np.zeros((1,T))
        gamma[0,:] = self.initial_distribution * self.B[:, observations[0]]
        psi[0] = np.argmax(gamma[0,:])
        Most_likely_sequence = []

        for i in range(1,T):
            gamma[i,:] = np.max(gamma[i-1,:])*self.A[np.argmax(gamma[i-1,:]),:]*self.B[:,observations[i]]
            psi[:,i] = np.argmax(gamma[i,:])
        
        psi = psi.astype(int)

        for i in range(len(psi.T)):
            Most_likely_sequence.append(self.dictionary[psi[0][i]])
            
            
        return gamma, Most_likely_sequence
    
    def baum_welch(self,observations,n_iter=100):
        
        A = self.A
        B = self.B
        M = A.shape[0]
        T = len(observations)
        initial_distribution = self.initial_distribution

        for n in range(n_iter):
            
            alpha = Hidden_Markov_Model.forward_algorithm(observations,A,B,initial_distribution)
            beta = Hidden_Markov_Model.backward_algorithm(observations,A,B)
           
            Eta = np.zeros((M, M, T - 1))
            
            
            for t in range(T - 1):
                den = np.dot(np.dot(alpha[t, :].T, A) * B[:, observations[t + 1]].T, beta[t + 1, :])
                for i in range(M):
                    num = alpha[t, i] * A[i, :] * B[:, observations[t + 1]].T * beta[t + 1, :].T
                    Eta[i, :, t] = num / den

            gamma = np.sum(Eta, axis=1)
            A = np.sum(Eta, 2) / np.sum(gamma, axis=1).reshape((-1, 1))

            gamma = np.hstack((gamma, np.sum(Eta[:, :, T - 2], axis=0).reshape((-1, 1))))

            K = B.shape[1]
            den = np.sum(gamma, axis=1)
            for l in range(K):
                B[:, l] = np.sum(gamma[:, observations == l], axis=1)

            B = np.divide(B, den.reshape((-1, 1)))

        return A,B


In [3]:
A = np.array([[0.5,0.5],[0.3,0.7]])
B = np.array([[0.8,0.2],[0.4,0.6]])
stationary_distribution = np.array([0.375,0.625])
Hmm = Hidden_Markov_Model(2,A,B,["Rainy","Sunny"])
Observed_sequence = np.array([0,0,1])

Please follow the labels according to the matrix A


In [4]:
Hmm.forward_backward(Observed_sequence)

(array([[0.3   , 0.25  ],
        [0.18  , 0.13  ],
        [0.0258, 0.1086]]),
 array([[0.256 , 0.2304],
        [0.4   , 0.48  ],
        [1.    , 1.    ]]),
 0.13439999999998964)

Sad state - 0
Happy - 1

In [5]:
Hmm.viterbi(Observed_sequence)

(array([[0.3  , 0.25 ],
        [0.12 , 0.06 ],
        [0.012, 0.036]]),
 ['Rainy', 'Rainy', 'Sunny'])

In [6]:
Hmm.baum_welch(Observed_sequence)

(array([[2.1542712e-01, 7.8457288e-01],
        [2.0952416e-31, 1.0000000e+00]]),
 array([[1.0000000e+00, 7.7032125e-83],
        [5.6914576e-01, 4.3085424e-01]]))