# Gaussian HMM

In [12]:
import numpy as np

In [14]:
eps = 1e-6
class HMM:
    def __init__(self,num_states):
        self.num_states = num_states
        self.initial_probabilities = np.random.rand(self.num_states)
        self.initial_probabilities /= np.sum(self.initial_probabilities)
        self.transition_matrix = np.random.rand(self.num_states,self.num_states)
        for i in range(self.num_states):
            self.transition_matrix[i] /= np.sum(self.transition_matrix[i])
        self.mean = np.zeros(num_states)
        self.variance = np.ones(self.num_states)
    
    def fit(self,observations,iter = 100):
        self.observations = observations
        self.num_observations = len(observations)

        for i in range(iter):
            self.gaussian_emission_matrix()
            self.forward_backward()
            self.false_viterbi()
            self.baum_welch()
            if i == iter-1:
                print(self.alpha,"\n\n")

    def gaussian_emission_matrix(self):
        # emission_matrix[i,t]
        self.emission_matrix = np.zeros((self.num_states,self.num_observations))

        for i in range(0,self.num_states):
            self.emission_matrix[i] = np.exp(-((self.observations - self.mean[i]) ** 2 / (2 * (self.variance[i] if self.variance[i]!=0 else eps))))
            self.emission_matrix[i] /= np.sqrt(2 * np.pi * (self.variance[i] if self.variance[i]!=0 else eps))

    def forward_backward(self):
        # alpha[t, i]
        alpha = np.zeros((self.num_observations, self.num_states))
        alpha[0,:] = self.initial_probabilities * self.emission_matrix[:,0]
        
        for t in range(1,self.num_observations):
            for i in range(0,self.num_states):
                alpha[t,i] = np.sum(np.fromiter(
                    (alpha[t-1,j] * self.transition_matrix[j,i] * self.emission_matrix[i,t] for j in range(self.num_states)),
                    dtype = float
                ))
        self.alpha = alpha

        # beta[t, i]
        beta = np.zeros((self.num_observations, self.num_states))
        beta[-1,:] = np.ones(self.num_states)
        
        for t in range(self.num_observations-2,-1,-1):
            for i in range(0,self.num_states):
                beta[t,i] = np.sum(np.fromiter(
                    (beta[t+1,j] * self.transition_matrix[i,j] * self.emission_matrix[i,t+1] for j in range(self.num_states)),
                    dtype = float
                ))
        self.beta = beta

    def false_viterbi(self):
        # not really viterbi, gamma[t,i]
        gamma = self.alpha * self.beta
        gamma /= np.sum(gamma, axis=1, keepdims=True)
        self.gamma = gamma
    
    def update_initial_probabilities(self):
        self.initial_probabilities = self.gamma[0]

    def update_transition_matrix(self):
        for i in range(self.num_states):
            for j in range(self.num_states):
                anew = np.sum(self.zeta[:, i, j])
                anew /= np.sum(self.zeta[:, i, :])
                self.transition_matrix[i][j] = anew
        
    def update_emission_parameters(self):
        for i in range(self.num_states):
            self.mean[i] = np.sum((self.gamma[t, i] * self.observations[t]) for t in range(self.num_observations)) 
            self.mean[i] /= np.sum(self.gamma[:, i]) if np.sum(self.gamma[:, i]) != 0 else eps
            self.variance[i] = np.sum((self.gamma[t, i] * ((self.observations[t] - self.mean[i]) ** 2)) for t in range(self.num_observations))
            self.variance[i] /= np.sum(self.gamma[:, i]) if np.sum(self.gamma[:, i]) != 0 else eps 
        
    def baum_welch(self):
        # zeta[t, i, j]
        zeta = np.zeros((self.num_observations-1,self.num_states,self.num_states))
        for t in range(self.num_observations-1):
            for i in range(self.num_states):
                for j in range(self.num_states):
                    zeta[t,i,j] = self.alpha[t,i] * self.transition_matrix[i,j] * self.emission_matrix[j,t+1] * self.beta[t+1,j]          
        zeta /= np.sum(self.alpha[1,i]*self.beta[1,i] for i in range(self.num_states))
        self.zeta = zeta
        
        self.update_initial_probabilities()
        self.update_transition_matrix()
        self.update_emission_parameters()
    
    # def predict(self):
    #     observations = self.observations
    #     V = np.zeros((self.num_states, len(observations)))
    #     path = np.zeros((self.num_states, len(observations)), 'int')

    # # Initialize base cases (t == 0)
    #     V[:,0] = self.initial_probabilities * self.emission_matrix[:,observations[0]]
    #     path[:, 0] = -1

    # # Run Viterbi for t > 0
    #     for t in range(1, len(observations)-1):
    #         for y in range(self.num_states):
    #             prob = V[:, t-1] * self.transition_matrix[:, y] * self.emission_matrix[y, observations[t]]
    #             V[y, t] = np.max(prob)
    #             path[y, t] = np.argmax(prob)

    # # Build the output, optimal model sequence
    #     nrow, ncol = path.shape
    #     sequence = np.zeros(ncol, 'int')
    #     sequence[-1] = np.argmax(V[:, ncol-1])
    #     for i in range(ncol-2, -1, -1):
    #         sequence[i] = path[int(sequence[i+1]), i+1]

    #     return sequence

    

In [10]:
#model = HMM(10)
#model.fit([1,2,3,4,5])
# model.predict()