## Question 3, HW1 :
## Submitted by: Sayan Chakraborty, EE18MTECH11030

In [1]:
import numpy as np
import python_speech_features as speech
import scipy.io.wavfile as wav
import os
import glob
from scipy.stats import multivariate_normal as mult_gauss
from sklearn.cluster import KMeans
from numpy import linalg as LA

## The Hidden Markov Model is implemented as follows: 

In [2]:
class HMM_EM(object):
    def __init__(self, X, K, tol, data_name):
        self.X = X                                      # X_data has dimension in the rows and observations in the coloumns
        self.K = K                                      # Model size
        self.N, self.d = self.X.shape                   # find the shape of the data
        self.alpha = np.zeros((self.N,self.K))          # initialize all alpha as zero
        self.beta = np.zeros((self.N,self.K))           # initialize all beta as zero
        self.gamma = np.zeros((self.N,self.K))          # initialize all gamma as zero
        self.zeta = np.zeros([self.N,self.K,self.K])
        self.data_name = data_name
        
        kmeans = KMeans(n_clusters=self.K, random_state=1).fit(self.X)
        lables = kmeans.labels_
        unique, counts = np.unique(lables, return_counts=True)

        self.pi = np.array(counts)/self.N                 # initialize the pi
        self.mu = np.array(kmeans.cluster_centers_).T   # initialize the means
        self.sigma =  np.array([np.eye(self.d)]*self.K)   # initialize the covariances
        self.A = self.normalize(np.random.rand(self.K,self.K))  # initialize transition probabilities               
        self.tol = tol
        
    ## E-step 
    def get_alpha(self):
        
        dum = 0
        for n in range(self.N):
            if n == 0:
                for k in range(self.K):
                    self.alpha[n,k] = self.pi[k]*mult_gauss.pdf(self.X[0,:], self.mu[:,k], self.sigma[k,:,:])
            else:
                for k in range(self.K):
                    self.alpha[n,k] = mult_gauss.pdf(self.X[n,:], self.mu[:,k], self.sigma[k,:,:])*\
                    np.dot(self.alpha[n-1,:],self.A[k,:])     
        return self.alpha
    
    
    
    def get_beta(self):
        
        dum_sum = 0
        for n in range(self.N, -1, -1):
            if n == self.N:
                    self.beta[n-1,:] = np.ones(self.K)
            else:
                for k in range(self.K):
                    for i in range(self.K):
                        dum_sum += self.beta[n,i]*self.A[i,k]*mult_gauss.pdf(self.X[n,:], self.mu[:,i], self.sigma[i,:,:])
                    self.beta[n,k] = dum_sum       
        return self.beta
        
    def get_gamma(self):
        self.alpha = self.get_alpha()
        self.beta = self.get_beta()
        self.gamma = self.alpha*self.beta
        return self.gamma
    
    def get_zeta(self):
        self.alpha = self.get_alpha()
        self.beta = self.get_beta()
        for n in range(1,self.N):
            for j in range(self.K):
                for k in range(self.K):
#                     print(n,j,k)
                    self.zeta[n,j,k] = self.alpha[n-1,j]*mult_gauss.pdf(self.X[n,:], self.mu[:,k], self.sigma[k,:,:])*self.A[j,k]*self.beta[n,k]
        return self.zeta
    
    
    ## M-step
    def M_step(self):
        self.gamma = self.get_gamma()
        self.zeta = self.get_zeta()
        N_k = np.sum(self.gamma, axis = 0)
        self.pi = self.gamma[0,:]/np.sum(self.gamma[0,:])    

        self.mu = np.dot(self.gamma.T, self.X)
        self.mu = self.mu.T/N_k
        
        dum_sum = 0
        X_Data = self.X.T
        for k in range(self.K):                    # compute sigma
            for n in range(self.N):
                dum_sum += np.reshape((X_Data[:,n]-self.mu[:,k]), (self.d,1))*np.reshape((X_Data[:,n]-self.mu[:,k]), (self.d,1)).T
            self.sigma[k,:,:] = dum_sum

        
        for j in range(self.K):
            for k in range(self.K):
                num = np.sum(self.zeta[:,j,k])
                den = np.sum(np.sum(self.zeta[:,j,:], axis = 0))
                self.A[j,k] = num/den
        
        return self.pi, self.mu, self.sigma, self.A
        

    def normalize(self, x):
        X = np.reshape(x, [x.shape[0],1,x.shape[1]])
        X = X/np.reshape(np.sum(X, axis=2),[x.shape[0],1,1])
        X = np.reshape(X,[x.shape[0],x.shape[1]])
        return X
    
    def hmm_em(self):
        a = 0
        b = 0
        c = 0
        d = 0
        error = 100
        print('Training HMM for the data: '+self.data_name, '\n')
        while(error>self.tol):
            self.pi, self.mu, self.sigma, self.A = self.M_step()
            error = LA.norm(self.pi-a)+LA.norm(self.mu-b)+LA.norm(self.sigma-c)+LA.norm(self.A-d)
            a = self.pi
            b = self.mu
            c = self.sigma
            d = self.A 
        print('Training completed for the data: '+self.data_name, '\n')
        print('--------------------------------- \n')
        return self.pi, self.mu, self.sigma, self.A


def scale(X, x_min, x_max):
    nom = (X-X.min(axis=0))*(x_max-x_min)
    denom = X.max(axis=0) - X.min(axis=0)
    denom[denom==0] = 1
    return x_min + nom/denom 

## Training:
### Two phones are chosen for this implementation: 1) bha 2)bhe
### The HMM implemented above is trained on the these data

In [3]:
K = 3
tol = 0.0001

############## Training for bha #############################
data_name = "training_bha.wav"
(rate,sig) = wav.read(data_name)
X_Data = speech.mfcc(sig,rate)

X_Data = scale(X_Data, -1, 1)
obj = HMM_EM(X_Data, K, tol, data_name)
pi_bha, mu_bha, sigma_bha, A_bha = obj.hmm_em()
#############################################################

############## Training for bhe #############################

data_name = "training_bhe.wav"
(rate,sig) = wav.read(data_name)
X_Data = speech.mfcc(sig,rate)

X_Data = scale(X_Data, -1, 1)
obj = HMM_EM(X_Data, K, tol, data_name)
pi_bhe, mu_bhe, sigma_bhe, A_bhe = obj.hmm_em()
#############################################################

Training HMM for the data: training_bha.wav 

Training completed for the data: training_bha.wav 

--------------------------------- 

Training HMM for the data: training_bhe.wav 

Training completed for the data: training_bhe.wav 

--------------------------------- 



## Classifier:
### The classifier is implemenetd below.
### First the HMM parameters for a training set (with no overlapping with testinf set) is obtained using the HMM implemenetd aobe.
### After obtaining the parametrs for a training set, we compare the obtained parameters with the previously trained parametrs for datasets bha and bhe.
### The training set belongs to the dataset which yields the lest error.
### Similar to the training set, two testing sets are prepared for the two chosen phones bha and bhe with no overlap with the training set.

In [4]:
data_test = "testing_bha.wav"
# data_test = "testing_bhe.wav"
(rate,sig) = wav.read(data_test)
X_Data = speech.mfcc(sig,rate)
X_Data = scale(X_Data, -1, 1)
obj = HMM_EM(X_Data, K, tol, data_test)
pi_test, mu_test, sigma_test, A_test = obj.hmm_em()
error_bha = LA.norm(pi_test-pi_bha)+LA.norm(mu_test-mu_bha)+LA.norm(sigma_test-sigma_bha)+LA.norm(A_test-A_bha)
error_bhe = LA.norm(pi_test-pi_bhe)+LA.norm(mu_test-mu_bhe)+LA.norm(sigma_test-sigma_bhe)+LA.norm(A_test-A_bhe)
if error_bha<error_bhe:
    print("training data belongs to dataset bha\n")
else:
    print("training data belongs to dataset bhe\n")
        

Training HMM for the data: testing_bha.wav 

Training completed for the data: testing_bha.wav 

--------------------------------- 

training data belongs to dataset bha

