In [1]:
import numpy as np
import numpy.linalg as la
import matplotlib.pyplot as plt
import scipy
import scipy.stats as stats
import csv

The following part is to extract features from input .fea files

In [2]:
# In this part, we read the feature from files and store the feature into arrays
# In this example, we extract the word 'tts' spoken by speaker dg and store the feature into list named dg_tts_feature

# Note of list dg_tts_feature:
# Since we have each word spoken 5 times per person, the list contains 5 elements
# Each element represents a matrix of size T * D
# T is the total number of frames (this may vary in different utterence), D is the dimension of the mfcc (14)
people = ['dg', 'ls', 'yx', 'mh']
words = ['asr', 'cnn', 'dnn', 'hmm', 'tts']
count = 0
feature_dict = dict()
for person in people:
    temp_dict = dict()
    for word in words:
        # find the name and word of the file
        name_word = person + '_' + word
        temp_feature = []
        for i in range(1, 6):
            index = str(i)
            filename = 'LAB2/feature/' + person + '/' + name_word + index + '.fea'
            # file = glob.glob(filename)
            with open(filename, newline='') as csvfile:
                data = np.array(list(csv.reader(csvfile)))
            data = data.astype(float)
            temp_feature.append(data)
            count += 1
        temp_dict[word] = temp_feature
    feature_dict[person] = temp_dict
print(count)

100


After we get all 100 feature files, we need to separate them into training set and testing set.

In [3]:
#######################
# people dependent
#######################
pd_dict = {}
pd_dict['train'] = {}
pd_dict['test'] = {}
# initialize dict
for word in words:
    pd_dict['train'][word] = []
    pd_dict['test'][word] = []

# for each word in training set, we have a list contains 15 files
# for test set, we have 5 files per word
for word in words:
    for person in people:
        # if the persion is mh, we need to put the feature into the test set
        if person == 'mh':
            pd_dict['test'][word] += feature_dict[person][word]
        # if the person is not mh, we need to put the feature into the train set
        else:
            pd_dict['train'][word] += feature_dict[person][word]

########################
# people independent
########################
pid_dict = {}
pid_dict['train'] = {}
pid_dict['test'] = {}
# initialize dict
for word in words:
    pid_dict['train'][word] = []
    pid_dict['test'][word] = []

# for each word in training set, we have a list contains 16 files
# for test set, we have 4 files per word
for word in words:
    for person in people:
        # only the first four utterances for each word go to train set
        pid_dict['train'][word] += feature_dict[person][word][0:4]
        pid_dict['test'][word] += feature_dict[person][word][4:5]

Next, we will find the observation likelihood based for Gaussian variable

In [4]:
def get_obs_likelihood(x, mu, i, Sigma):
    # x: 1D array, mu: 2D array, i: int (state number), Sigma: 3D array
    # this function will return the probability that given state = i, the obs is x
    # P(x | q = i)
    temp_mu = mu[i:i+1, :]
    temp_Sigma = Sigma[i, :, :] * 2 * np.pi
    exponent = -0.5 * (x-temp_mu) @ la.inv(Sigma[i,:,:])@(x-temp_mu).T
    prob = np.exp(exponent) / la.det(temp_Sigma)**0.5
    # print(exponent)
    return prob[0, 0]

# from scipy.stats import multivariate_normal as mvn
# def emision(X, means, cov):
#     e_p = np.zeros((X.shape[0], 5))
#     for t in range(X.shape[0]):
#         for k in range(5):
#             e_p[t, k] = mvn.pdf(X[t, :], means[k], cov[k], allow_singular=True)
#     return e_p

# model_cnn = My_HMM(cnn_train_data)
# mean = model_cnn.get_mu()
# sigma = model_cnn.get_Sigma()
# means = mean[0, :]
# cov = sigma[0, :, :]
# print('mean is', means)
# print('cov is', cov)
# p_1 = get_obs_likelihood(cnn_train_data[0][0, :], mean, 0, sigma)
# p_2 = emision(cnn_train_data[0], means, cov)
# print(p_1)

In [8]:
# define a class
# input_data must be a 
import numpy as np
class My_HMM:
    def __init__(self, train_data, N=5):
        self.train_data = train_data
        # self.test_data = test_data
        self.train_data_matrix = np.vstack(train_data)
        # first, we need to initlaize the class
        # in the mp, we have 4 parameters needed to be trained
        # pi: an array of size N contains the probability that the first state is i
        # A: transition matrix size N * N
        # mu: a matrix of size N * D, mean of the Gaussian model for each state
        # Sigma: a matrix of size N * D * D, covariance of the Gaussian model for each state
        self.N = N-1
        _, self.D = train_data[0].shape
        self.pi = np.ones((N, )) / N
        self.A = np.eye(N) * 0.8 + np.eye(N, k=1) * 0.2
        self.A[N-1, N-1] = 1.0
        
        # find mu and Sigma
        # temporarily, we use the mean and covariance of the first file in input data
        # first_feature = input_data[0]
        m, n = self.train_data_matrix.shape
        temp_mu = self.train_data_matrix.mean(0)
        self.mu = np.tile(temp_mu, (N,1))
        temp_Sigma = np.cov(self.train_data_matrix, rowvar=False)
        self.Sigma = np.tile(temp_Sigma, (N, 1, 1))
        # this is for Gaussian Mixture Model
        self.c = np.ones((N, 3))/3
        
    def get_pi(self):
        return self.pi

    def get_A(self):
        return self.A

    def get_mu(self):
        return self.mu
    
    def get_Sigma(self):
        return self.Sigma
    
    def likelihood(self, X, mu, Sigma):
        T, _ = X.shape
        B = np.zeros((T, self.N))
        for t in range(T):
            for i in range(self.N):
                B[t, i]=stats.multivariate_normal(mu[i], Sigma[i]).pdf(X[t])
        return B
    
    def mixture_gaussian(self, X, mu, Sigma, c)
        T,_ = X.shape
        B = np.zeros((3,self.N, T))
        for t in range (T):
            for i in range (self.N):
                B[0][t, i]=stats.multivariate_normal(mu[0][i], Sigma[0][i]).pdf(X[t])
                B[1][t, i]=stats.multivariate_normal(mu[1][i], Sigma[1][i]).pdf(X[t])
                B[2][t, i]=stats.multivariate_normal(mu[2][i], Sigma[2][i]).pdf(X[t])
        
        prob = c[:,0].reshape(-1, 1)*B[0] + c[:,1].reshape(-1, 1)*B[1] + c[:,2].reshape(-1, 1)*B[2]
        return prob, B

    def forward(self, X, pi, A, mu, Sigma, B):
        # in this method, we will construct the forward algorithm
        # we will use the dynamic programming to store the alpha array while computing the value
        T, _ = X.shape
        alpha = np.zeros((T, self.N))
        g = np.zeros((T, ))
        # first, we need to initialize alpha at time 0
        for i in range(self.N):
            alpha[0, i] = pi[i] * B[0, i]
             #alpha[0, i] = pi[i] * get_obs_likelihood(X[0:1, :], mu, i, Sigma)
        # normalize
        g[0] = np.sum(alpha[0, :])
        alpha[0:1, :] = alpha[0:1, :] / g[0]
        # iteration
        for t in range(1, T):
            # for each frame
            for j in range(self.N):
                # for each state
                for i in range(self.N):
                    alpha[t, j] += alpha[t-1, i]*A[i, j]*B[t, j]
                    # alpha[t, j] += alpha[t-1, i]*A[i, j]*get_obs_likelihood(X[t:t+1, :], mu, j, Sigma)
            # normalize
            g[t] = np.sum(alpha[t, :])
            alpha[t:t+1, :] = alpha[t:t+1, :] / g[t]
        # return alpha and g
        return alpha, g
    
    def backward(self, X, pi, A, mu, Sigma, B, g):
        T, _ = X.shape
#         beta = np.zeros((T, self.N))
#         # we use the same normalize factor as we used in forward algorithm
#         for i in range(self.N):
#             beta[T-1, i] = 1.0
#         # iteration
#         for t in range(T-2, -1, -1):
#             print('method 1, t =', t)
#             # for each frame
#             for i in range(self.N):
#                 # for each state
#                 for j in range(self.N):
#                     beta[t, i] += beta[t+1, j]*A[i, j]*B[t+1, j]
#                     # beta[t, i] += beta[t+1, j]*A[i, j]*get_obs_likelihood(X[t+1:t+2, :], mu, j, Sigma)
#             # normalize
#             beta[t:t+1, :] = beta[t:t+1, :] / g[t+1]
#             print('beta =', beta[t, :])
#         return beta

        beta = np.zeros((T, self.N))
        for i in range(self.N):
            beta[T-1, i] = 1.0
        for t in range(T-2, -1, -1):
            obs = B[t+1, :]
            # obs = [get_obs_likelihood(X[t+1:t+2, :], mu, j, Sigma) for j in range(self.N)]
            temp_obs = np.diag(obs)
            beta[t:t+1, :] = beta[t+1:t+2, :] @ temp_obs @ A.T / g[t+1]
        return beta
    
    def for_back(self, X, pi, A, mu, Sigma, B):
        T, _ = X.shape
        alpha, g = self.forward(X, pi, A, mu, Sigma, B)
        beta = self.backward(X, pi, A, mu, Sigma, B, g)
        gamma = np.zeros((T, self.N))
        for t in range(T):
            gamma[t:t+1, :] = alpha[t:t+1, :]*beta[t:t+1, :]
            gamma[t:t+1, :] = gamma[t:t+1, :] / np.sum(gamma[t:t+1, :])
        return alpha, beta, gamma 
    
    
    def find_zai(self, X, pi, A, mu, Sigma, alpha, beta, B):
        T, _ = X.shape
        zai = np.zeros((T, 2*self.N))
        for t in range(T):
            for i in range(self.N):
                for j in range(i, i+2):
                    zai[t,i+j] = 
                    if j == self.N:
                        zai[t, i+j] = 0.0
                    elif(t == T-1):
                        zai[t, i+j] = 0.0
                    # special case
                    else:
                        # b = get_obs_likelihood(X[t+1:t+2, :], mu, j, Sigma)
                        zai[t, i+j] = alpha[t, i]*A[i, j]*B[t+1, j]*beta[t+1, j]
            # now we normalize zai_t(i, j)
            temp_sum = np.sum(zai[t, :])
            zai[t:t+1, :] /= temp_sum
        return zai
    
    def training(self):
        # in this file, we will go through all files and get all alpha beta value
        alpha_list = []
        beta_list = []
        gamma_list = []
        zai_list = []
        for data in self.train_data:
            B = self.likelihood(data, self.mu, self.Sigma)
            temp_alpha, temp_beta, temp_gamma = self.for_back(data, self.pi, self.A, self.mu, self.Sigma, B)
            temp_zai = self.find_zai(data, self.pi, self.A, self.mu, self.Sigma, temp_alpha, temp_beta, B)
            alpha_list.append(temp_alpha)
            beta_list.append(temp_beta)
            gamma_list.append(temp_gamma)
            zai_list.append(temp_zai)
        return alpha_list, beta_list, gamma_list, zai_list
    
    def update(self):
        # in this function, we will use the F/B algorithm to find alpha and beta first
        # and use alpha and beta to update self.mu, self.Sigma and self.A
        alpha_list, beta_list, gamma_list, zai_list = self.training()
        new_A = np.zeros_like(self.A)
        new_mu = np.zeros_like(self.mu)
        new_Sigma = np.zeros_like(self.Sigma)
        # first, we can sum the gamma value
        sum_gamma = np.zeros((self.N, ))
        for i in range(self.N):
            temp_sum = 0
            for l in range(len(gamma_list)):
                for t in range(gamma_list[l].shape[0]):
                    temp_sum += gamma_list[l][t, i]
            sum_gamma[i] = temp_sum
                    
        # now we update A
        for i in range(self.N):
            for j in range(i, i+2):
                if j < self.N:
                    if (j == i+1) or (j == i and j == self.N):
                        new_A[i, j] = 1 - new_A[i, j-1]
                    else:
                        for l in range(len(self.train_data)):
                            for t in range(self.train_data[l].shape[0]):
                                new_A[i, j] += np.sum(zai_list[l][t, i+j])
            new_A[i, :] /= sum_gamma[i]
            
        # now we update mu
        for i in range(self.N):
            temp_mu = np.zeros((1, self.D))
            for l in range(len(self.train_data)):
                for t in range(self.train_data[l].shape[0]):
                    #temp_mu += self.train_data[l][t:t+1, :]* gamma_list[l][t, i]
                    temp_mu = temp_mu + np.inner(gamma_list[l][t, i],self.train_data[l][t:t+1, :])
            temp_mu /= sum_gamma[i]
            new_mu[i:i+1, :] = temp_mu
        
        # new we update Sigma
        for i in range(self.N):
            temp_Sigma = np.zeros((self.D, self.D))
            for l in range(len(self.train_data)):
                for t in range(self.train_data[l].shape[0]):
                    #temp = self.train_data[l][t:t+1, :] - new_mu[i:i+1, :]
                    #temp_Sigma += (temp.T @ temp) * gamma_list[l][t, i]
                    temp = self.train_data[l][t:t+1, :] - new_mu[i:i+1, :]
                    temp_Sigma += gamma_list[l][t, i]*np.outer(temp, temp);
            temp_Sigma /= sum_gamma[i]
            new_Sigma[i, :, :] = temp_Sigma
        
        # update here
        self.A = new_A
        self.mu = new_mu
        self.Sigma = new_Sigma

    def new_update(self)
        alpha_list, beta_list, gamma_list, zai_list = self.training()
        new_A = np.zeros_like(self.A)
        new_mu = np.zeros_like(self.mu)
        new_Sigma = np.zeros_like(self.Sigma)
        sum_gamma = np.zeros((self.N, ))
        m_sum_gamma = np.zeros((self.N,3))
        # sum_gamma for A
        for i in range(self.N):
            temp_sum = 0
            for l in range(len(gamma_list)):
                for t in range(gamma_list[l].shape[0]):
                    temp_sum += gamma_list[l][t, i]
            sum_gamma[i] = temp_sum
        # update new A
        for i in range(self.N):
            for j in range(i, i+2):
                if j < self.N:
                    if (j == i+1) or (j == i and j == self.N):
                        new_A[i, j] = 1 - new_A[i, j-1]
                    else:
                        for l in range(len(self.train_data)):
                            for t in range(self.train_data[l].shape[0]):
                                new_A[i, j] += np.sum(zai_list[l][t, i+j])
            new_A[i, :] /= sum_gamma[i]
            
        #update new_sum_gama
        for l in range(len(self.train_data)):
            for m in range(3):
                new_sum_gamma[l][m] = np.zeros(gamma[l].shape)
                for i in range(self.N):
                    new_sum_gamma[l][i,:] = gamma[l][i, :]*c[i][m]*B[l][m][i, :]/(B[l][0][i,:]+B[l][1][i,:]+B[l][2][i,:])
        #update the m_sum_gamma
        for i in range(self.N):
            temp_sum = 0
            for m in range(3):
                for l in range(len(self.train_data)):
                    m_sum_gamma[i,m] += np.sum(new_sum_gamma[l][m][i,:])
        
        #updata c
        for i in range(self.N):
            for m in range(3):
                new_c[i][m] = 0
                for l in range(len(self.train_data)):
                    new_c[i][m]+=np.sum(new_sum_gamma[l][m][i,:])
                new_c[i][m] /= m_sum_gamma[i]
        # update mu
        for m in range(3):
            for i in range(self.N):
                mu[m][i] = 0
                for l in range(len(self.train_data)):
                    mu[m][i] += np.inner(self.train_data[l][t:t+1, :],new_sum_gamma[l][m][i,:])
            mu[m][i] /= m_sum_gamma[i,m]
        # update sigma
        for i in range(self.N)
            for m in range(3)
                new_Sigma[m][i] = 0.0*np.identity(14)
                for l in range(len(self.train_data)):
                    for t in range(self.train_data[l].shape[0]):
                        temp = self.train_data[l][t:t+1, :] - mu[m][i]
                        new_Sigma[m][i] += new_sum_gamma[l][m][i,t]*np.outer(temp, temp)/m_sum_gamma[i,m]
                    
        
        self.A = new_A
        self.mu = new_mu
        self.Sigma = new_Sigma
        self.c = new_c
        
#     def training(self, data):
#         B = self.likelihood(data, self.mu, self.Sigma)
#         alpha, beta, gamma = self.for_back(data, self.pi, self.A, self.mu, self.Sigma, B)
#         zai = self.find_zai(data, self.pi, self.A, self.mu, self.Sigma, alpha, beta, B)
#         return alpha, beta, gamma, zai
    
#     def update(self, data):
#         alpha, beta, gamma, zai = self.training(data)
#         new_A = np.zeros_like(self.A)
#         new_mu = np.zeros_like(self.mu)
#         new_Sigma = np.zeros_like(self.Sigma)
#         # first, we can sum the gamma value
#         sum_gamma = np.sum(gamma, axis=0)

#         # now we update A
#         sum_zai = np.sum(zai, axis=0)
#         for i in range(self.N):
#             for j in range(i, i+2):
#                 if j < self.N:
#                     new_A[i, j] = sum_zai[i+j]
#             new_A[i, :] /= sum_gamma[i]
            
#         # now we update mu
#         for i in range(self.N):
#             temp_mu = np.zeros((1, self.D))
#             for t in range(data.shape[0]):
#                 temp_mu += data[t:t+1, :] * gamma[t, i]
#             temp_mu /= sum_gamma[i]
#             new_mu[i:i+1, :] = temp_mu        
        
#         # new we update Sigma
#         for i in range(self.N):
#             temp_Sigma = np.zeros((self.D, self.D))
#             for t in range(data.shape[0]):
#                 temp = data[t:t+1, :] - new_mu[i:i+1, :]
#                 temp_Sigma += np.outer(temp, temp) * gamma[t, i]
#             temp_Sigma /= sum_gamma[i]
#             # deal with singular matrix
#             if la.det(temp_Sigma) < 1e-10:
#                 temp_Sigma += 0.5*np.eye(self.D)
#                 # print(la.det(temp_Sigma))
#             new_Sigma[i] = temp_Sigma
        
#         # update here
#         self.A = new_A
#         self.mu = new_mu
#         self.Sigma = new_Sigma
        
    def iterate(self):
        # in this function, we will iterate through all file
        for i in range(10):
            self.update()
          
    def test(self, test_data):
        # in this method, we will test on the test input data
        retval = np.zeros((len(test_data, )))
        for i, data in enumerate(test_data):
            T, _ = data.shape
            B = self.likelihood(data, self.mu, self.Sigma)
            alpha, g = self.forward(data, self.pi, self.A, self.mu, self.Sigma, B)
            retval[i] = np.sum(np.log(g[T-1]))
        return retval



SyntaxError: invalid syntax (<ipython-input-8-daa459a94f72>, line 51)

In [12]:
# words = ['asr', 'cnn', 'dnn', 'hmm', 'tts']
def final_test(words, test_dict, model_asr, model_cnn, model_dnn, model_hmm, model_tts):
    all_decisions = []
    for word in words:
        single_decision = []
        test_data = test_dict[word]
        temp_decision = np.zeros((5, len(test_data)))
        temp_decision[0, :] = model_asr.test(test_data)
        temp_decision[1, :] = model_cnn.test(test_data)
        temp_decision[2, :] = model_dnn.test(test_data)
        temp_decision[3, :] = model_hmm.test(test_data)
        temp_decision[4, :] = model_tts.test(test_data)
        for i in range(len(test_data)):
            single_decision.append(np.argmax(temp_decision[:,i]))
        all_decisions.append(single_decision)
    return all_decisions

def accuracy(decisions):
    acc = np.zeros((5, 5))
    for i in range(len(decision)):
        for j in range(len(decision[i])):
            acc[i, decision[i][j]] += 1
        acc[i:i+1, :] /= np.sum(acc[i:i+1, :])
    return acc

In [13]:
cnn_train_data = pd_dict['train']['cnn']
dnn_train_data = pd_dict['train']['dnn']
asr_train_data = pd_dict['train']['asr']
hmm_train_data = pd_dict['train']['hmm']
tts_train_data = pd_dict['train']['tts']
# train data
model_asr = My_HMM(asr_train_data)
model_cnn = My_HMM(cnn_train_data)
model_dnn = My_HMM(dnn_train_data)
model_hmm = My_HMM(hmm_train_data)
model_tts = My_HMM(tts_train_data)

model_asr.iterate()
model_cnn.iterate()
model_dnn.iterate()
model_hmm.iterate()
model_tts.iterate()

test_dict = pd_dict['test']
decision = final_test(words, test_dict, model_asr, model_cnn, model_dnn, model_hmm, model_tts)
print(decision)
acc = accuracy(decision)
print(acc)
# model_dnn = My_HMM(dnn_train_data)
# model_dnn.training()
# model_asr = My_HMM(asr_train_data)
# model_asr.training()
# model_hmm = My_HMM(hmm_train_data)
# model_hmm.training()
# model_tts = My_HMM(tts_train_data)
# model_tts.training()
# test data
# asr_test_data = pd_dict['test']['asr']
# cnn_result = model_cnn.test(cnn_test_data)
# dnn_result = model_dnn.test(cnn_test_data)
# asr_result = model_asr.test(asr_test_data)
# hmm_result = model_hmm.test(cnn_test_data)
# tts_result = model_tts.test(cnn_test_data)

# alpha, g = model_asr.forward(asr_train_data[0], pi, A, mu, Sigma)
# beta = model_asr.backward(asr_train_data[0], pi, A, mu, Sigma, g)
# print(beta)


0




1
2


ValueError: array must not contain infs or NaNs

In [None]:
a = np

In [8]:
print(decision)
acc = accuracy(decision)
print(acc)

[[3, 3, 3, 3, 3], [3, 0, 3, 3, 3], [0, 0, 3, 3, 3], [3, 3, 3, 3, 3], [0, 0, 3, 3, 3]]
[[0.  0.  0.  1.  0. ]
 [0.2 0.  0.  0.8 0. ]
 [0.4 0.  0.  0.6 0. ]
 [0.  0.  0.  1.  0. ]
 [0.4 0.  0.  0.6 0. ]]


[ 1  6 11]
[[ 1]
 [ 6]
 [11]]
