## Speech recognition

### 1 - Language detection

In [40]:
import pandas as pd
import numpy as np
import os

### GET DATA ###
path = '/Users/anosharahim/cs156-pcw-anosharahim/23/'
trainA = os.listdir(path + 'trainA')
trainB = os.listdir(path + 'trainB')
trainC = os.listdir(path + 'trainC')
ts = os.listdir(path + 'test')

#unique alphabet encodings
alphabet = ['A', 'o', 'e', 't', 'p', 'g', 'k']
alphabet_dict = {'o': 0, 'p': 1, 't': 2, 'e': 3, 'k': 4, 'g': 5, 'A': 6} #

def getfiles(folder,folder_name, pth): 
    items = []
    c=0
    for item in folder: 
        myfile = open(pth+folder_name+'/'+item)
        c+=1
        
        for line in myfile:
            items.append(line)   
    return items


A_tr = getfiles(trainA, 'trainA', path)
trainB.pop(18) #remove .DStore file
B_tr = getfiles(trainB, 'trainB', path)
C_tr = getfiles(trainC, 'trainC', path)
print('training data set size = ', len(A_tr))

test = getfiles(ts, 'test', path)
print('test data size =', len(test))
print('language alphabet ==',alphabet)


training data set size =  30
test data size = 10
language alphabet == ['A', 'o', 'e', 't', 'p', 'g', 'k']


In [18]:
### Functions for transition matrix, log probability and language detection ### 

def T_matrix(sequences, dict_):
    '''Returns transition matrix of a list of sequences. 
    
    Input
    sequences: list of str
    dict_: dictionary of letters as keys and their encoding as values.
    
    Output 
    M: list of lists
    where M[i][j] is the probability of transitioning from i to j
    '''
    
    #initialize transition matrix with cols = alphabet size
    T = [[0]*7 for _ in range(7)] 
    
    for seq in sequences: #for each sequence
        seq = list(seq) #create list of chars from string
        #get encoding for letters in sequence 
        e = [dict_[letter] for letter in seq] 
        
        for (i,j) in zip(e,e[1:]): #count the transitions 
            T[i][j] += 1 #append when state transitions from i to j
            
    #once counted, convert to probabilities
    for row in T:
        n = sum(row) #row should sum to 1
        if n > 0:
            #normalize rows
            row[:] = [f/sum(row) for f in row]
            
    return T #transition matrix


def log_probability(seq, t_matrix, letters = alphabet):
    '''Returns log probability that a sequence is from a given language.
    '''
    log_prob = 0 #initialize log probability to zero  
    
    for i in range(len(seq)-1): 
        #for each sequence, get current and next letter
        curr_ = letters.index(seq[i])
        next_ = letters.index(seq[i+1])
        #print(type(curr_),type(next_))
        #find probability of going from curr to next in the transition matrix
        prob = t_matrix[curr_, next_]
        
        #fix if probability is greater than 0
        if prob > 0:
            log_prob += np.log(prob)
    return log_prob 


def language_detector(seq, t_matrices, letters = alphabet) -> str:
    '''Returns the most probable language a sequence (str) is from.
    Input 
    seq: str for which language is to be detected 
    t_matrices: list of lists, transition matrix of each language
    letters: list of strs of alphabet in language 
    
    Output 
    lang_p: most probable language of sequence
    '''
    lang_p = [] #store language probabilities here
    for name, t_matrix in t_matrices: #for each transition matrix
        lang_p.append(log_probability(seq, t_matrix, letters)) #get log probability 
    
    lang_detected = np.argmax(np.asarray(lang_p))
    prediction = T_matrices[lang_detected][0]
    return lang_p    

In [25]:
#Use training data to get transition matrices for each language
T_A = T_matrix(A_tr, alphabet_dict)
T_B = T_matrix(B_tr, alphabet_dict)
T_C = T_matrix(C_tr, alphabet_dict)
T_matrices = [
    ('Language A', transition_matrix_A),
    ('Language B', transition_matrix_B),
    ('Language C', transition_matrix_C)
]

#Detect language for each sequence in the test set. 
predictions = pd.DataFrame()
for i in range(len(test)):
    log_probabilities = language_detector(test[i], T_matrices, alphabet)
    predictions = predictions.append(
        {
            "string": f"test_string_{i}",
            "language_A": log_probabilities[0],
            "language_B": log_probabilities[1],
            "language_C": log_probabilities[2]
        },
        ignore_index=True
    )
    #get highest log probability 
    highest_prob = np.argmax(np.asarray(log_probabilities)) 
    #get the associated langugages and display 
    predicted = T_matrices[highest_prob][0]
    print(f"Sequence {i} is from {predicted}")

Sequence 0 is from Language B
Sequence 1 is from Language A
Sequence 2 is from Language A
Sequence 3 is from Language A
Sequence 4 is from Language A
Sequence 5 is from Language C
Sequence 6 is from Language A
Sequence 7 is from Language C
Sequence 8 is from Language C
Sequence 9 is from Language B


### 2 - Speaker identification

In [44]:
## Data Preprocessing ##
text = ''
with open('speaker.txt') as f:
    text = f.read()

o, letters = pd.factorize(list(text)) #returns observed states and unique letters 
print('unique letters ==',letters)

#transition matrix -- these are row stochastic
#initialize the diagonal to .9 because on average a speaker says 10 phonemes
#spread the probability mass equally elsewhere. 
A = np.array([
    [.9,.05,.05],
    [.05,.9,.05],
    [.05,.05,.9]
]) 

#emission matrix
B = np.random.dirichlet((1, 1, 1), 7).transpose() #uniformly distrtibuted? 

#initial conditions of len 3 because there are three speakers
#set initial conditions to equal randomly
pi = np.array([0.32, 0.34, 0.33])

pi.shape

unique letters == ['e' 'o' 'g' 'A' 'p' 't' 'k']


(3,)

In [85]:
## Calculate Parameters Alpha and Beta before EM Step##

def get_alphas(A,B,os,pi):
    '''
    Computes alphas using forward pass.
    
    in:
    A - transition matrix
    B - emission matrix
    os - observations
    pi - initial condition
    
    out:
    alphas -array 
    '''
    #use formula: \alpha_t=[\alpha_{t-1}*A]*(b)^T(o_t)
    
    #initiate array to store alphas 
    #of size no. of states x observations 
    alphas = np.zeros((len(pi),len(os))) 
    
    #initialize the first row of alpha using initial conditions
    #so that it has the previous time step value in the loop 
    A[:,0] = pi * B[:,os[0]]
        
    for t in range(1,len(os)): #for each observations, comput alpha
        alphas[:,t] = (A.T @ alphas[:,t-1]) * B[:,os[t]] 
    
    return alphas

def get_betas(A,B,os,pi):
    ''' Computes betas using backward pass
    in:
    A - transition matrix
    B - emission matrix
    os - observations
    pi - initial condition
    
    out: 
    betas - array
    '''
    
    #use formula: \beta_t-1=[A*b(o_{t+1}]*\beta_{t+1}
    
    betas = np.zeros((len(pi),len(os))) 
    
    for t in range(len(os)-1,0,-1): #count backwards!!
        #calculate previous beta from the next time step
        betas[:,t-1] = A @ (betas[:,t] * B[:,os[t]])

    betas = np.where(betas == 0, 0.00000000001, betas)
    
    return betas

In [86]:
# Baum Welch Algorithm with E-Step and M-Step function ##

def E_step(A,B,os,pi):
    '''Computes gammas and next state using alphas, betas, and initial state.
    '''
    
    #get alphas and betas in E step 
    alpha = get_alphas(A,B,os,pi)
    beta = get_betas(A,B,os,pi)
    
    #- We can find P(o|gamma) by marginalizing all possible hmm chains of hidden variables x as such:   
    #$$p(o|\gamma)=\Sigma_{x}p(o|x,\gamma)p(x|\gamma)$$ by summing over alphas
    marginal = sum(alpha[:,-1]) #p(y)
    
    #use formula: \gamma_t(i)=\frac{\alpha_t(i)*\beta_t(i)}{p(o|\gamma)}
    gamma = (alpha*beta)/marginal
    
    #get new pis now
    new_pis = alpha[:,np.newaxis,0:-1] * A[:,:,np.newaxis] * B[np.newaxis,0:,os[1:]] * beta[np.newaxis,0:,1:] / marginal

    return gamma, new_pis

def M_step(gamma,new_pis,os):
    '''Update transition, and emmision probabilities, and sets new initial state. 
    '''
    letters = np.unique(os)
    
    #get new transition and emission matrices
    new_A = np.sum(new_pis, axis = 2)/np.sum(new_pis, axis = (1,2))[:,np.newaxis]
    new_B = np.zeros(B.shape)
    
    #get new_pi using gamma
    pi_ = gamma[:,0]
    
    for j in range(0,len(letters)):
        new_B[:,j] = np.sum(gamma[:,os == letters[j]], axis = 1) / np.sum(gamma, axis = 1)
        
    return new_A, new_B, pi_


In [88]:
# Run the EM algorithm
for steps in range(0,10):
    [gamma, pi_i] = E_step(A, B, o, pi)
    print([gamma, pi_i] )
    [A,B,pi_0] = M_step(gamma, pi_i, o)
    
print(A)
print(B)

[array([[nan, nan, nan, ..., nan, nan, nan],
       [nan, nan, nan, ..., nan, nan, nan],
       [nan, nan, nan, ..., nan, nan, nan]]), array([[[nan, nan, nan, ..., nan, nan, nan],
        [nan, nan, nan, ..., nan, nan, nan],
        [nan, nan, nan, ..., nan, nan, nan]],

       [[nan, nan, nan, ..., nan, nan, nan],
        [nan, nan, nan, ..., nan, nan, nan],
        [nan, nan, nan, ..., nan, nan, nan]],

       [[nan, nan, nan, ..., nan, nan, nan],
        [nan, nan, nan, ..., nan, nan, nan],
        [nan, nan, nan, ..., nan, nan, nan]]])]
[array([[nan, nan, nan, ..., nan, nan, nan],
       [nan, nan, nan, ..., nan, nan, nan],
       [nan, nan, nan, ..., nan, nan, nan]]), array([[[nan, nan, nan, ..., nan, nan, nan],
        [nan, nan, nan, ..., nan, nan, nan],
        [nan, nan, nan, ..., nan, nan, nan]],

       [[nan, nan, nan, ..., nan, nan, nan],
        [nan, nan, nan, ..., nan, nan, nan],
        [nan, nan, nan, ..., nan, nan, nan]],

       [[nan, nan, nan, ..., nan, nan, nan],