In [29]:
import numpy as np


class HMM():
    def __init__(self, obs_num, state_num, initial_state = None):
        self.obs_num = obs_num
        self.state_num = state_num
        if initial_state is None:
            self.initial_state = np.ones((state_num))
        else: 
            self.initial_state = initial_state
        self.transition_matrix = np.ones((state_num, state_num))
        self.emission_matrix = np.ones((state_num,obs_num))
        self.buffer = None
    def set(self, transition_matrix, emission_matrix, initial_state):
        self.transition_matrix = np.array(transition_matrix)
        self.emission_matrix = np.array(emission_matrix)
        self.initial_state = np.array(initial_state)
    def forward(self, obs):
        '''we caculate the probability of the observation sequence given the model'''
        T = len(obs)
        alpha = np.zeros((self.state_num))
        for i in range(T):
            if i == 0:
                print("obs i: ", obs[i])
                print("emission_matrix: ", self.emission_matrix.shape)
                alpha = self.initial_state * self.emission_matrix[:, obs[i]]
                self.buffer = alpha
            else:
                alpha = np.dot(alpha, self.transition_matrix) * self.emission_matrix[:, obs[i]]
                self.buffer = np.vstack((self.buffer, alpha))
            print("alpha: ", alpha)
        prob = np.sum(alpha[-1])
        return prob
    def backward(self, obs):
        '''given the final observation, we caculate the most probable state sequence'''
        T = len(obs)
        if isinstance(obs, list):
            obs = np.array(obs)
        states_seq = np.zeros((T))
        # we can use the buffer to store the alpha values
        beta = np.zeros((T,self.state_num))
        idx = np.zeros((T,self.state_num))
        # forward process
        for i in range(T):
            if i == 0:
                beta[i] = self.initial_state * self.emission_matrix[:, obs[i]]
                idx[i] = np.zeros((self.state_num))
            else:
                tmp=self.transition_matrix.T* beta[i-1]

                beta[i] = np.max((tmp),axis=1) * self.emission_matrix[:, obs[i]]
                idx[i] = np.argmax((tmp),axis=1)
        print("beta: ", beta)
        # backward process

        for i in range(T-1, -1, -1):
            if i == T-1:
                states_seq[i] = np.argmax(beta[-1])
            else:
                states_seq[i] = idx[i+1, int(states_seq[i+1])]
            
        return states_seq
    
    
    def train(self, obs,states_seq):
        '''Vietebi algorithm'''
        T = len(obs)
        '''we should count the number of transitions and emissions example'''
        print("T: ", T)
        for i in range(T):
            #we count the initial state first
            self.initial_state[states_seq[i]] += 1
            if i == T-1:
                break
            #we count the transitions and emissions
            self.transition_matrix[states_seq[i], states_seq[i+1]] += 1
            self.emission_matrix[states_seq[i], obs[i]] += 1
        self.initial_state /= np.sum(self.initial_state)
        self.transition_matrix /= np.sum(self.transition_matrix, axis = 1, keepdims=True)
        self.emission_matrix /= np.sum(self.emission_matrix, axis = 1, keepdims=True)
        
        return self.transition_matrix, self.emission_matrix, self.initial_state
            
        
        

In [30]:

# test = HMM(obs_num=2, state_num=3)
# transitions = [[0.5, 0.2, 0.3], [0.3, 0.5, 0.2], [0.2, 0.3, 0.5]]
# emmisions = [[0.5, 0.5], [0.4, 0.6], [0.7, 0.3]]
# initial = [0.2, 0.4, 0.4]
test = HMM(obs_num=3, state_num=2)
transitions = [[0.7, 0.3], [0.5, 0.5]]
emmisions = [[0.6, 0.1, 0.3], [0.1, 0.7, 0.2]]
initial = [0.4, 0.6]
test.set(transitions, emmisions, initial)

In [31]:
test.forward([0, 1, 2])

obs i:  0
emission_matrix:  (2, 3)
alpha:  [0.24 0.06]
alpha:  [0.0198 0.0714]
alpha:  [0.014868 0.008328]


0.008328

In [32]:
test.backward([0, 1, 2])

beta:  [[0.24    0.06   ]
 [0.0168  0.0504 ]
 [0.00756 0.00504]]


array([0., 1., 0.])