In [1]:
import numpy as np
import pandas as pd
from scipy.stats import multivariate_normal as mvn

In [2]:
FEA_WP = pd.read_pickle('./work_shared/fea_word_person.pkl') # speaker dependent features
FEA_PW = pd.read_pickle('./work_shared/fea_person_word.pkl') # speaker independent features

In [3]:
# Learning the parameters of HMM
class Gaussian_HMM:
    def __init__(self, train_data, init_pi, init_A, K = 5, epoch = 10):
        self.train_data = train_data
        self.L = len(train_data)
        self.state = K
        self.pi = init_pi
        self.A = init_A
        self.mean = np.array([np.mean(np.vstack(train_data), axis = 0)] * K)
        self.cov = np.array([np.cov(np.vstack(train_data), rowvar = False)] * K)
        self.epoch = epoch
    
    def find_emission(self, x):
        T = x.shape[0]
        b = np.zeros((x.shape[0], self.state))
        
        for t in range(T):
            for k in range(self.state):
                b[t,k] = mvn.pdf(x[t],self.mean[k],self.cov[k],allow_singular = True)
                
        return b
    
    def forward(self, x, b):
        T = x.shape[0]
        alpha = np.zeros((T, self.state))
        g = np.zeros(T) # scaling factors
        alpha[0,:] = self.pi * b[0,:]
        g[0] = np.sum(alpha[0,:])
        alpha[0,:] = alpha[0,:] / g[0]
        
        for t in range(1,T):
            alpha_prev = np.dot(alpha[t-1].reshape(1,-1), self.A)
            alpha[t,:] = alpha_prev * b[t]
            g[t] = np.sum(alpha[t])
            alpha[t] = alpha[t] / g[t]
            
        return alpha, g
    
    def backward(self, x, b, g):
        T = x.shape[0]
        beta = np.zeros((T, self.state))
        beta[T-1, :] = np.ones(self.state)
        
        for t in reversed(range(T-1)):
            beta_next = beta[t+1] * b[t+1]
            beta[t] = np.dot(beta_next.reshape(1,-1),self.A.T)
            beta[t] = beta[t,:] / g[t+1]

        return beta
    
    def find_xi(self, x, b, g, alpha, beta):
        T = x.shape[0]
        xi = np.zeros((T-1, self.state, self.state))    
        
        for t in range(T-1):
            beta_next = beta[t+1] *  b[t+1]
            xi[t] = np.dot(alpha[t,:].reshape(-1,1),beta_next.reshape(1,-1))
            xi[t] =  xi[t] * self.A
            xi[t] =  xi[t]/ g[t+1] 
            
        return xi
        
    def find_gamma(self, alpha, beta):
        gamma = alpha * beta
        return gamma
    
    def train_model(self):    
        for it in range(self.epoch):
            print("Current iteration: ",it)
            alpha = []
            beta = []
            g = []
            xi = []
            gamma = []

            # E step
            for l in range(self.L):
                x = self.train_data[l]
                b = self.find_emission(x)
                cur_alpha,cur_g = self.forward(x, b)
                cur_beta = self.backward(x, b, cur_g)
                cur_xi = self.find_xi(x, b, cur_g, cur_alpha, cur_beta)
                cur_gamma = self.find_gamma(cur_alpha, cur_beta)

                alpha.append(cur_alpha)
                beta.append(cur_beta)
                g.append(cur_g)
                xi.append(cur_xi)
                gamma.append(cur_gamma)

            # M step
            # re-estimates pi
            for l in range(self.L):
                cur_gamma = gamma[l]
                self.pi  += cur_gamma[0]
            self.pi = self.pi / np.sum(self.pi)

            # re-estimate transition matrix  
            for l in range(self.L):
                cur_xi = xi[l]
                self.A += np.sum(cur_xi, axis = 0)
            self.A = self.A / np.sum(self.A, axis = 1, keepdims= True)

            # re-estimates means
            denom = np.zeros((self.state,1))
            num = np.zeros(self.mean.shape)
            for l in range(self.L):
                cur_gamma = gamma[l]
                x = self.train_data[l]
                denom += np.sum(cur_gamma, axis = 0).reshape(self.state,1)
                num += np.matmul(cur_gamma.T, x)
            self.mean = num/denom

            # re-estimates covariances
#             for l in range(self.L):
#                 cur_gamma = gamma[l]
#                 x = self.train_data[l]
#                 for k in range(self.state):
#                     denom = 0
#                     num = 0
#                     for t in range(x.shape[0]):
#                         denom += cur_gamma[t,k]
#                         num += cur_gamma[t,k] * np.dot((x[t] - self.mean[k]).reshape(-1,1), (x[t] - self.mean[k]).reshape(1,-1))
#                 self.cov[k] = num/denom
            denom = np.zeros((self.state, 1))
            num = np.zeros(self.cov.shape)
            for l in range(self.L):
                cur_gamma = gamma[l]
                x = self.train_data[l]
                denom += np.sum(cur_gamma, axis = 0).reshape(self.state,1)
                for k in range(self.state):
                    for t in range(x.shape[0]):
                        num[k] += cur_gamma[t,k] * np.dot((x[t] - self.mean[k]).reshape(-1,1), (x[t] - self.mean[k]).reshape(1,-1))
                self.cov[k] = num[k]/denom[k]
        return self.pi, self.A, self.mean, self.cov

In [4]:
init_pi = np.array([1/5] * 5)

init_A = np.array([[.8, .2,   0,   0,   0],
              [0,  .8,  .2,   0,   0],
              [0,   0,  .8,  .2,   0],
              [0,   0,   0,  .8,  .2],
              [0,   0,   0,   0,   1]])

words = ['cnn', 'dnn', 'asr', 'tts', 'hmm']
speakers = ['mh', 'ls', 'dg', 'yx']
num_words = 5
num_speakers = 4
num_utterances = 5
train_data = []
test_data = []

for w in range(num_words):
    train_word_data = []
    test_word_data = []
    word = words[w]
    for s in range(num_speakers):
        speaker = speakers[s]
        for u in range(num_utterances):
            if u != (num_utterances - 1):
                train_word_data.append(FEA_WP[word][speaker][u])
            else:
                test_word_data.append(FEA_WP[word][speaker][u])
    train_data.append(np.array(train_word_data))
    test_data.append(np.array(test_word_data))
    
cnn_model = Gaussian_HMM(train_data[0], init_pi, init_A)
dnn_model = Gaussian_HMM(train_data[1], init_pi, init_A)
asr_model = Gaussian_HMM(train_data[2], init_pi, init_A)
tts_model = Gaussian_HMM(train_data[3], init_pi, init_A)
hmm_model = Gaussian_HMM(train_data[4], init_pi, init_A)

In [5]:
cnn_pi, cnn_A, cnn_mean, cnn_cov = cnn_model.train_model()

Current iteration:  0
Current iteration:  1
Current iteration:  2
Current iteration:  3
Current iteration:  4
Current iteration:  5
Current iteration:  6
Current iteration:  7
Current iteration:  8
Current iteration:  9


In [6]:
dnn_pi, dnn_A, dnn_mean, dnn_cov = dnn_model.train_model()

Current iteration:  0
Current iteration:  1
Current iteration:  2
Current iteration:  3
Current iteration:  4
Current iteration:  5
Current iteration:  6
Current iteration:  7
Current iteration:  8
Current iteration:  9


In [7]:
asr_pi, asr_A, asr_mean, asr_cov = asr_model.train_model()

Current iteration:  0
Current iteration:  1
Current iteration:  2
Current iteration:  3
Current iteration:  4
Current iteration:  5
Current iteration:  6
Current iteration:  7
Current iteration:  8
Current iteration:  9


In [8]:
tts_pi, tts_A, tts_mean, tts_cov = tts_model.train_model()

Current iteration:  0
Current iteration:  1
Current iteration:  2
Current iteration:  3
Current iteration:  4
Current iteration:  5
Current iteration:  6
Current iteration:  7
Current iteration:  8
Current iteration:  9


In [9]:
hmm_pi, hmm_A, hmm_mean, hmm_cov = hmm_model.train_model()

Current iteration:  0
Current iteration:  1
Current iteration:  2
Current iteration:  3
Current iteration:  4
Current iteration:  5
Current iteration:  6
Current iteration:  7
Current iteration:  8
Current iteration:  9


In [10]:
def forward_predict(x, K, pi, A, mean, cov):
    T = x.shape[0]
    b = np.zeros((x.shape[0],K))
    for t in range(x.shape[0]):
        for k in range(K):
            b[t,k] = mvn.pdf(x[t], mean[k], cov[k], allow_singular=True)

    alpha = np.zeros((T,K))
    c = np.zeros(T)
    for i in range(K):
        alpha[0,i] = pi[i] * mvn.pdf(x[0], mean[i], cov[i], allow_singular=True)
    c[0] = np.sum(alpha[0])
    alpha[0] = alpha[0] / c[0]
    for t in range(1,T):
        alpha[t] = np.dot(alpha[t-1].reshape(1,-1), A)
        alpha[t] = alpha[t] * b[t]
        c[t] = np.sum(alpha[t])
        alpha[t] = alpha[t] / c[t]
        
    return np.sum(np.log(c))

In [11]:
confusion_matrix = np.zeros((5,5))
for w in range(len(test_data)):
    word_data = test_data[w]
    for l in range(len(word_data)):
        x = word_data[l]
        cnn_prob = forward_predict(x, 5, cnn_pi, cnn_A, cnn_mean, cnn_cov)
        dnn_prob = forward_predict(x, 5, dnn_pi, dnn_A, dnn_mean, dnn_cov)
        asr_prob = forward_predict(x, 5, asr_pi, asr_A, asr_mean, asr_cov)
        tts_prob = forward_predict(x, 5, tts_pi, tts_A, tts_mean, tts_cov)
        hmm_prob = forward_predict(x, 5, hmm_pi, hmm_A, hmm_mean, hmm_cov)
        prob = np.array([cnn_prob, dnn_prob, asr_prob, tts_prob, hmm_prob])
        confusion_matrix[w, np.argmax(prob)]+=1
confusion_matrix /= 4

In [12]:
print(confusion_matrix)

[[1.   0.   0.   0.   0.  ]
 [0.25 0.75 0.   0.   0.  ]
 [0.   0.   1.   0.   0.  ]
 [0.   0.   0.   1.   0.  ]
 [0.   0.   0.   0.   1.  ]]
