# Machine Problem #2

## Imports Here

In [1]:
import matplotlib.pyplot as plt
import numpy as np
import scipy
from numpy import linalg
import wave
from scipy.io import wavfile
from scipy import signal
from collections import defaultdict
from functools import partial
import scipy.stats as stats

## Data Extraction

In [2]:
data= {}
fs={}
peeps={'dg', 'ls', 'mh', 'yx'}
words={'asr', 'cnn', 'dnn', 'hmm', 'tts'}

for p in peeps:
    for w in words:
        for i in range(1, 6):
            wav_r=wave.open('data/'+p+'/'+p+'_'+w+str(i)+'.wav', 'rb')
            fs[p, w, i], x = wavfile.read('data/'+p+'/'+p+'_'+w+str(i)+'.wav')
            if wav_r.getnchannels()==2:
                data[p, w, i]=x[:, 0]
            else:
                data[p, w, i]=x
        



## Feature Extraction

In [3]:
features= defaultdict(lambda: defaultdict(lambda: defaultdict(partial(np.ndarray, 0))))
peeps={'dg', 'ls', 'mh', 'yx'}
words={'asr', 'cnn', 'dnn', 'hmm', 'tts'}

for p in peeps:
    for w in words:
        for i in range(1, 6):
            file_mat=[]
            file_data=open(('feature/'+p+'/'+p+'_'+w+str(i)+'.fea'), encoding = "ISO-8859-1").read().strip().split('\n')
            file_mat = np.array([np.float_(line.split(',')) for line in file_data])
            features[p][w][i]=file_mat

In [4]:
features['dg']['cnn'][1].shape

(112, 14)

## Splitting Testing-Training Data (Speaker Independent)

In [5]:
test_sp_independent=defaultdict(lambda: defaultdict(lambda: defaultdict(partial(np.ndarray, 0))))
train_sp_independent=defaultdict(lambda: defaultdict(lambda: defaultdict(partial(np.ndarray, 0))))
data_per_word_id=[]

for w in words:
    per_word=[]
    c=0
    for p in {'dg', 'ls', 'yx'}:
        for i in range(1, 6):
            train_sp_independent[p][w][i]=features[p][w][i]
            per_word.append(features[p][w][i])
    data_per_word_id.append(per_word)
           
for w in words:
    per_word=[]
    for p in {'mh'}:
        for i in range(0, 5):
            test_sp_independent[p][w][i]=features[p][w][i] 

In [6]:
len(data_per_word_id[4])

15

## Splitting Testing-Training Data (Speaker Dependent)

In [7]:
train_sp_dependent=defaultdict(lambda: defaultdict(lambda: defaultdict(partial(np.ndarray, 0))))
test_sp_dependent=defaultdict(lambda: defaultdict(lambda: defaultdict(partial(np.ndarray, 0))))
data_per_word_d=[]

for w in words:
    per_word=[]
    c=0
    for p in {'dg', 'ls', 'mh', 'yx'}:
        for i in range(1, 5):
            train_sp_dependent[p][w][i]=features[p][w][i]
            per_word.append(np.array(features[p, w, i]))
    data_per_word_d.append(np.array(per_word))
            
for w in words:
    per_word=[]
    for p in {'dg', 'ls', 'mh', 'yx'}:
        for i in range(5, 6):
            test_sp_dependent[p][w][i]=features[p][w][i]

## Self Recorded Samples: Test Feature Extraction 

In [8]:
words={'asr', 'cnn', 'dnn', 'hmm', 'tts'}
self_features=defaultdict(lambda: defaultdict(partial(np.ndarray, 0)))
test_self=[]=defaultdict(lambda: defaultdict(partial(np.ndarray, 0)))

for w in words:
    for i in range(1, 6):
        self_features[w][i] = open(('Self-Recorded/Self Features/ak_'+w+str(i)+'.txt'), encoding = "ISO-8859-1").read().split('\n')


# for x, y in self_features.items():
#     for z in y:
#         l=z.replace('\t', ',')[:-1].split()
#         test_self.append(l)

## HMM Parameters: $\pi$, A, B

In [12]:
states = [1,2,3,4,5]
N = len(states)-1
pi_true= [1/5, 1/5, 1/5, 1/5, 1/5] #Initial Probabilty Vector

A = [[0.8, 0.2, 0, 0, 0],
     [0, 0.8, 0.2, 0, 0],
     [0, 0, 0.8, 0.2, 0],   #Transition Probabilty Matrix
     [0, 0, 0, 0.8, 0.2],
     [0, 0, 0, 0, 1]]

#Create B below:
means_per_word_id=[]
for w in words:
    for p in {'dg', 'ls', 'yx'}:
        for i in range(1, 6):
            if len(train_sp_independent[p][w][i]) != 0:
                temp_mean=np.mean(train_sp_independent[p][w][i], axis=0)
    means_per_word_id.append(temp_mean)
    
cov_per_word_id=[]
for w in words:
    for p in {'dg', 'ls', 'yx'}:
        for i in range(1, 6):
            if len(train_sp_independent[p][w][i]) != 0:
                temp_cov=np.cov(train_sp_independent[p][w][i].T)
    cov_per_word_id.append(temp_cov)
    

# # #Initializing b_j(x):
# for i in range(0, 5):
#     for j in range (0, 15):
#         gauss[i][j] = stats.multivariate_normal(mean=means_per_word_id[i], cov=cov_per_word_id[i]).pdf(data_per_word_id[i][j])

In [13]:
def E_Step_1(self, N, T, mu, Sigma, X): 
    T=len(X)
    B=np.zeros((N,T))
    for t in range(0,T):
        for i in range(0,N):
            B[i,t]=stats.multivariate_normal(mu[i],Sigma[i]).pdf(X[t])
# states=[0, 1, 2, 3, 4]
# Q = list(np.random.choice(states,1,p=pi_true))
# q = Q[-1]
# X = []
# while q < 5:
#     print(q)
#     xt = stats.multivariate_normal.rvs(mean=mu_true[q],cov=np.diag(sigsq_true[q]))
#     X.append(xt)
#     Q.extend(np.random.choice(states,1,p=A_true[q]))
#     q = Q[-1]

In [None]:
def E_step_2(self, N, T, B):
    alpha = np.zeros((N,T))
    beta = np.zeros((N,T))
    gamma = np.zeros((N,T))
    xi = np.zeros((2*N,T))
    Amat = np.array(A)  # Convert to an np matrix so we can compute inner products
    for i in range(0,N):
        alpha[i,0]=pi[i]*B[i,0]
    for t in range(1,T):
        for i in range(0,N):
            alpha[i,t]=B[i,t]*np.inner(alpha[:,t-1],Amat[:,i])
    for i in range(0,N):
        beta[i,T-1]=1
    for t in range(T-2,-1,-1):
        for i in range(0,N):
            beta[i,t]=np.inner(Amat[i,0:N],beta[:,t+1]*B[:,t+1])
    for t in range(0,T):
        gamma[:,t]=alpha[:,t]*beta[:,t]
        gamma[:,t]=gamma[:,t]/np.sum(gamma[:,t])
    for t in range(0,T):
        for i in range(0,N):
            for j in range(i,i+2):
                xi[i+j,t]=alpha[i,t]*Amat[i,j]
                if (t<T-1):
                    if j==N:
                        xi[i+j,t]=0
                    else:
                        xi[i+j,t] = xi[i+j,t]*B[j,t+1]*beta[j,t+1]
        xi[:,t]=xi[:,t]/np.sum(xi[:,t])