In [None]:
import numpy as np

class HMM():
    def __init__(self, transition, mu,sigma, obs,T,pi):
        self.transition = transition #shape 4x4
        self.state=[0,1,2,3] #a modifier 
        self.pi=pi
        self.mu = mu
        self.sigma = sigma
        self.obs=obs
        self.T=T
        self.q=np.zeros(T)

    
    def  multivariate_gaussian(self, xi):
        """ Computes the vector of the probabilities p(u_t | q_t=i )"""
        probs = np.zeros(4)
        mu = self.mu
        sigmas = self.sigma

        inv_sigmas = [np.linalg.inv(sigmas[j]) for j in range(4)]
        det_sigmas = [np.linalg.det(sigmas[j]) for j in range(4)]
        for j in range(4):
            x_m=(xi-mu[j]).reshape(2,1)
            probs[j] = math.exp(-(xi - mu[j]).T.dot(inv_sigmas[j]).dot(xi - mu[j]) / 2)
            probs[j] /= (2 * math.pi * math.sqrt(det_sigmas[j]))

        #probs = probs / np.sum(probs)
        return probs
       
    def alpha_(self,index,probs): #backward
        result=np.zeros(4)
        if index==0:
            return self.pi
        else:
            alpha=self.alpha_(index-1,probs)
            for q in self.state:
                result[q]= probs[q]*np.sum(self.transition[q,:].dot(alpha))
            return result
        
    def beta_(self,index,probs): #forwoard
        result=np.zeros(4)
        if index == T:
            return 1
        else: 
            beta = self.beta_(index+1,probs)
            for q in self.state:
                result[q]=np.sum(probs[q]*self.transition[:,q].dot(beta))  
            return result 
        
    def estimate_transition(self,alphas,betas,probs):
        for k in range(4):
            for l in range(4):
                res=0
                denominateur=0
                for index in range(self.T):
                    res+=self.transition[k,l]*alphas[index][l]*betas[index][k]*probs[k]
                    denominateur+=np.sum(alphas[index].dot(betas[index]))
                self.transition[k,l]=res/denominateur
        print('transision',self.transition)
        return self.transition
    
    def estimate_mu(self,alphas,betas):
        for k in range(4):
            res=0
            for index in range(self.T):
                res = res + alphas[index][k]*betas[index][k]*(self.obs[index])/np.sum([alphas[index][k]*betas[index][k] for index in range(T)])
            self.mu[k]=res
        print('selfmu',self.mu)
        return self.mu
    def estimate_sigma(self,alphas,betas):                         
        for k in range(4):
            res=0
            for index in range(self.T):
                x_u=self.obs[index]-self.mu[k]
                x_u=x_u.reshape(2,1)
                res = res+ alphas[index][k]*betas[index][k]*(x_u).dot(x_u.T)/np.sum([alphas[index][k]*betas[index][k] for index in range(T)])
            self.sigma[k]=res
        print('selfsigmas',self.sigma)
        return self.sigma
    
    def estimate_multinomial(self,alphas,betas):
        for k in range(4):
            self.pi[k]=alphas[0][k]*betas[0][k]/np.sum(alphas[0].dot(betas[0]))
        return self.pi
    
        
    def fit(self): 
        for j in range(10):
            #forward,backward (E-step)
            alphas=[[] for i in range(self.T)]
            betas=[[] for i in range(self.T)]
            for index in range(self.T):
                probs=self.multivariate_gaussian(self.obs[index])
                alphas[index]=self.alpha_(index,probs)
                betas[index]=self.beta_(index,probs)
                self.q[index]=np.argmax(alphas[index] * betas[index] /np.sum(alphas[index].dot(betas[index])))
            alphas=np.array(alphas)
            print('alpha',alphas)
            betas=np.array(betas)
            print('beta',betas)
            #maximization(M-step)
            self.transition=self.estimate_transition(alphas,betas,probs)
            self.mu=self.estimate_mu(alphas,betas)
            self.sigma=self.estimate_sigma(alphas,betas)
            self.multinomial=self.estimate_multinomial(alphas,betas)
            
            print('fin')
          
        return self.q
            
            
        
    

In [11]:
import pandas as pd

def read_file(filename):
    """ Load the data
    return:: x array, y array """
    print("Loading data " + filename)
    df = pd.read_csv(filename, delimiter=' ',  header=None)
    return df.values

filename = "EMGaussian.data"
filename_test = "EMGaussian.test"
x = read_file(filename)
x_test = read_file(filename_test)


Loading data EMGaussian.data
Loading data EMGaussian.test


In [None]:
from emgaussian import EMGaussian
from kmean import KMeans
import matplotlib.pyplot as plt
import math
#Initialize parameters with EMGaussian
emg = EMGaussian(x, isotropic=False)
emg.fit(plot_likelihood=False)

pi=emg.pi
mu=emg.mu
sigma=emg.sigmas
transition = np.array([ [0.25, 0.25, 0.25, 0.25], [0.25, 0.25, 0.25, 0.25], [0.25, 0.25, 0.25, 0.25], [0.25, 0.25, 0.25, 0.25]])
obs = x
T=x.shape[0]

#HMM
model = HMM(transition, mu,sigma,obs,T,pi)
model.fit()

Kmeans converged after 9 iterations, loss: 3238.691087
Fit Gaussian converged after 10 iterations




In [None]:
T