# Hidden Markov Model

## Part 2 : Implementing HMM


For the set of training examples and test samples of isolated words (two word classes, 0
and 1) from part 1: Implement HMM
1. Find Mel-Frequency Cepstral Coefficients (MFCCs) from the raw speech samples. Pick 25 ms worth of speech samples with a 10 ms overlap to find MFCCs. Use the basic 13 element version of MFCC as the feature vector representing 25 ms of speech. You are free to use “Librosa” Python library to find this feature.
2. Write a code to implement the likelihood computation using the forward variable after assuming a uniform flat start based initialization with 5 states per HMM and GMM with 3 mixture components per state.
3. Write a code to implement the Viterbi algorithm to decode the best state sequence using the existing model
4. Use the Baum-Welch re estimation method to train HMMs with examples from class 0 and class 1
5. Implement a basic two-class classifier using the HMMs constructed in the previous step. In practise, test samples come from a continuous speech waveform. Herehowever, test your classifier using samples from the database. Your classifier should simply construct the likelihood of the test sample and choose the phone with higher likelihood. Classify the test examples and report the performance. How does the performance change for different number of states per HMM, different number of mixture components per GMM?.

In [1]:
import numpy as np
import os
import librosa
import matplotlib.pyplot as plt

ImportError: No module named librosa

In [2]:
def gaussian_pdf(x, mean, covar):
    d = len(x)
    px = 1/np.sqrt(np.linalg.det(covar) * ((2*np.pi)**d))
    px *= np.exp(-0.5*np.matmul(np.matmul(np.transpose(x-mean), np.linalg.inv(covar)), x-mean))
    px *= np.exp(-0.5 (x-mean).T @ np.linalg.in)
    return px

SyntaxError: invalid syntax (<ipython-input-2-776b8c198ef0>, line 5)

In [3]:
#@title
class dataset():
    def __init__(self, folder, sr=8000, window=25, overlap=10, n_mfcc=13):
        self.folder = folder
        self.files = sorted(os.listdir(folder))
        self.sr = sr # sampling rate of given audio files
        self.window = int(window*1e-3 * self.sr) # 25ms window
        self.overlap = int(overlap*1e-3 * self.sr) # 10ms overlap
        self.hop = self.window -  self.overlap
        self.n_mfcc = n_mfcc
        
    def get_features(self, wav_path):
        y, self.sr = librosa.core.load(path=wav_path, sr=None, mono=True)
        feats = librosa.feature.mfcc(y=y, sr=self.sr, n_mfcc=self.n_mfcc, n_fft=self.window, hop_length=self.hop, n_mels=64) 
        return feats.T

    def __call__(self):
        X = np.vstack((self.get_features(os.path.join(self.folder, wav)) for wav in self.files))
        return X

In [8]:
print("Aravind Ganesh")

Aravind Ganesh


In [0]:
class GMM():
    def __init__(self,data, d, K):
        self.d = d
        self.X = data
        self.K = K
        self.N = len(X)
        #initalization using kmeans
        kmeans = Kmeans(X=data, K=K)
        kmeans.cluster(n_iter=1)
        self.means = kmeans.means
        # covariances are initalized by finding covariances of each cluster
        mixtures = kmeans.get_clusters()
        self.covars = np.array([np.cov(mixture, rowvar=False) for mixture in mixtures])
        # inialize mixing coefficients by fractions of number of data points in each cluster
        self.w = np.array([np.mean(kmeans.assign_k == k) for k in np.arange(K)])
        self.ln_p = self.log_likelihood()
    
    def log_likelihood(self):
        Px = np.sum([np.log(
                np.sum(
                    [self.w[k] * self.normal_pdf(x, self.means[k], self.covars[k])
                     for k in range(self.K)]))
                for x in self.X])
        return Px
    
    def normal_pdf(self, x, mu, sigma): # multivariate Gaussian pdf function
        n = len(x)
        px = 1/np.sqrt(((2*np.pi)**n)*np.linalg.det(sigma))
        px *= np.exp(-0.5*np.matmul(np.matmul(np.transpose(x-mu), np.linalg.inv(sigma)),(x-mu)))
        return px
    
    def gamma(self, n, k): # E step: γ(z_nk)
        r = self.w[k]*self.normal_pdf(self.X[n], self.means[k], self.covars[k])
        r /= np.sum([self.w[i]*self.normal_pdf(self.X[n], self.means[i], self.covars[i]) for i in range(self.K)])
        return r

    def maximization(self): # M step
        Nk = np.array([np.sum([self.gamma(n,k) for n in range(self.N)]) for k in range(self.K)])
        # Update means  
        means_new = np.array([
            (1/Nk[k]) * np.sum([self.gamma(n,k)*self.X[n] for n in range(self.N)], axis=0) for k in range(self.K)
        ])
        # Update covariance matrices
        covars_new = np.array([
            (1/Nk[k]) * np.sum([self.gamma(n,k)*np.tensordot((X[n]-means_new[k]), (X[n]-means_new[k]), axes=0) 
                    for n in range(self.N)], axis=0) 
        for k in range(self.K)])
        # Update mixing coefficients
        w_new = Nk/self.N 
#         self.new_ln_p = self.log_likelihood()
        return w_new, means_new, covars_new
    
    def EM(self, threshold):
        count = 0
        self.w, self.means, self.covars = self.maximization()
        self.new_ln_p = self.log_likelihood()
        count +=1
        print('Iteration Count: ', count)
        print('Log likelihood error: ', (self.new_ln_p - self.ln_p))
        print('Log likelihood :', self.ln_p, self.new_ln_p)
        while (self.new_ln_p - self.ln_p) > threshold:
            self.ln_p = self.new_ln_p
            self.w, self.means, self.covars = self.maximization()
            self.new_ln_p = self.log_likelihood()
            count +=1
            print('Iteration Count:', count)
            print('Log likelihood error: ', (self.new_ln_p - self.ln_p))
            print('Log likelihood :', self.new_ln_p)
    
    def optimal_params(self):
        return self.w, self.means, self.covars

## HMM

In [0]:
class HMM(object):
    def __init__(self, num_states):
        self.K = K # states
        self.X = trainset
        self.M, self.N, self.d = self.X.shape # M:number of sound files, N=states, d=13 
        # initializing parameters
        self.pi = np.array([1/self.N]*self.N)
        self.A = np.triu(np.random.uniform(high=1, low=0, size=(self.K, self.K)))
        self.A = np.transpose(self.A.T/np.sum(self.A, axis=1))
        #initalization using kmeans
        kmeans = Kmeans(X=self.X[0], K=self.K)
        kmeans.cluster(n_iter=2)
        self.means = kmeans.means # initialize means
        # covariances are initalized by finding covariances of each cluster
        mixtures = kmeans.get_clusters()
        # print(mixtures[0].shape)
        self.covars = np.zeros((self.K, self.d, self.d))
        for k in range(self.K):
            self.covars[k] = np.cov(mixtures[k], rowvar=False)
        #intialize arrays
        self.alpha = self.beta = self.gamma = self.emission = np.zeros((self.N, self.K)) # shape: (N,k)
        self.Q = np.random.normal(0,1)
    
    def maximization(self, X):
        # emission probabilities
        for n,x in enumerate(X):
            for k in range(self.K):
                self.emission[n][k] = gaussian(np.asarray(x), self.means[k], np.asarray(self.covars[k]))
        # alpha - forward 
        self.alpha[0] = self.emission[0] * self.pi
        for n in range(1,self.N):
            self.alpha[n] = self.emission[n] * np.matmul(self.alpha[n-1], self.A)
        # beta - backward
        self.beta[self.N-1] = np.ones(self.K) # self.beta.shape: (N,k)
        for n in range(self.N-2, -1, -1):
            self.beta[n] = np.matmul(self.A, self.emission[n+1]*self.beta[n+1])
        self.pX = np.sum(self.alpha[self.N-1])
        # gamma 
        self.gamma = self.alpha * self.beta / self.pX
        self.Zeta = np.array([self.emission[n]*(self.alpha[n]*(self.A*self.beta[n]).T).T  for n in range(self.N)]) / self.pX
        # update parameters
        new_pi = self.gamma[0]/np.sum(self.gamma[0])
        new_A = np.sum(self.Zeta, axis=0) / np.sum(np.sum(self.Zeta, axis=0), axis=1)
        new_covars, new_means = np.zeros((self.K, self.d, self.d)), np.zeros((self.K, self.d))
        for k in range(self.K):
            # ck = np.matmul(X[n]-self.means[k], np.transpose(X[n]-self.means[k]))
            new_covars[k] = np.sum([self.gamma[n][k]*np.tensordot(X[n]-self.means[k], X[n]-self.means[k], axes=0) for n in range(self.N)], axis=0) / np.sum(self.gamma[:,k], axis=0)
            #print(np.linalg.det(self.covars[k]))
            new_means[k] = np.sum([self.gamma[n][k]*X[n] for n in range(self.N)], axis=0) / np.sum(self.gamma[:,k], axis=0)
            
        return [new_pi, new_A, new_means, new_covars]
        
    def likelihood(self, X):
        return np.sum(self.alpha[self.N-1]) # beta(ZN)=1
    
    def expectation(self):
        # expectation step
        Q = np.sum(self.gamma[0] * self.pi) + np.sum(self.Zeta*self.A) + np.sum(self.gamma * self.emission)
        return Q
    
    def train(self, threshold):
        # EM iterations to train the model
        [self.pi, self.A, self.means, self.covars] =  self.maximization(self.X[0])
        self.new_Q = self.expectation()
        print('log likelihood: ', self.Q)
        print('Error:', self.new_Q-self.Q)
#         while np.abs(self.new_Q - self.Q) > threshold:
        count=1
        while np.abs(self.Q - self.new_Q) > threshold:
            cnt=0
            print(self.Q, self.new_Q)
            for X in self.X:
                cnt+=1
                self.Q = self.new_Q
                [self.pi, self.A, self.means, self.covars] =  self.maximization(X)
                self.new_Q = self.expectation()
                print('batch_count', cnt)
                print('log likelihood: ', self.new_Q)
                print('Error: ', np.abs(self.new_Q-self.Q))
                break
            count +=1
            print('Epoch:', count)
            
    def get_params(self):
        return {'means':self.means, 'covars':self.covars, 'pi':self.pi, 'transitions':self.A}