In [2]:
import numpy as np
from tqdm import tqdm
from scipy import stats

In [3]:
# Reading and arranging data
# All the mfcc features for each utterance is compiled in a folder called MFCC
train_speakers = ['ac',  'bh',  'cg',  'dg',  'eg',  'hg',  'il',  'jn',  'kh',  'la',
'ag',  'bi',  'cl',  'ea',  'ei',  'hp',  'jc',  'jp',  'kk',  'ld',
'ai',  'br', 'cm',  'ec',  'ek',  'ig',  'ji',  'kc',  'kn',  'ls',
'an',  'ca',  'dc',  'ee',  'es',  'ih',  'jj',  'kf',  'kt'
]
test_speakers =['mk',  'mm',  'ms',  'mw',  'nc',  'ng',  'nh',  'pe',  'pk',  'pm',  'pp',  'ra'] 
digits = ['1','4','6','9','o']
train_mfcc = {} #dictionary to be addressed with labels
test_mfcc = {}

for speaker in train_speakers:
    for i in digits:
        s = speaker+'_'+i+'.wav.mfcc' #generating file name
        
        with open('MFCC/'+s) as f:
            lines = f.readlines() #reading the lines of the mfcc files
            
        size = np.asarray(list(map(int,lines[0].split()))) #converting the space separated info to an array
        dim = size[0]  #dimension of feature vectors = 38
        length = size[1] #number of feature vectors in the utterance
        mfcc_coeff = np.zeros((length,dim)) #coefficients info row-> feature vector number,column->coefficients
        
        for i in range(length):
            a = np.asarray(list(map(float,lines[i+1].split())))
            mfcc_coeff[i] = a
            
        train_mfcc[s] = mfcc_coeff
        
for speaker in test_speakers:
    for i in digits:
        s = speaker+'_'+i+'.wav.mfcc' #generating file name
        
        with open('MFCC/'+s) as f:
            lines = f.readlines() #reading the lines of the mfcc files
            
        size = np.asarray(list(map(int,lines[0].split()))) #converting the space separated info to an array
        dim = size[0]  #dimension of feature vectors = 38
        length = size[1] #number of feature vectors in the utterance
        mfcc_coeff = np.zeros((length,dim)) #coefficients info row-> feature vector number,column->coefficients
        
        for i in range(length):
            a = np.asarray(list(map(float,lines[i+1].split())))
            mfcc_coeff[i] = a
            
        test_mfcc[s] = mfcc_coeff
        

In [5]:
#Function for dynamic time warping

def DTW(sample,test):  # function returns DTW cost matrix and the warped path, vertical movement allowed since test could be smaller than sample
    n = len(sample)
    m = len(test)
    d = len(sample[0])
    if(d != len(test[0])):
        return np.inf,np.inf
    else:
        phi = np.zeros((n+1,m+1))  #dtw cost matrix
        epsilon = np.zeros((n+1,m+1))  #dtw path matrix

        #initialisation
        for i in range(n+1):
            for j in range(m+1):
                if(i*j == 0):
                    if((i == 0)and(j == 0)):
                        phi[i,j] = 0
                    else:
                        phi[i,j] = np.inf
                #recursion
                else:
                    prev_min = np.min([phi[i-1,j],phi[i,j-1],phi[i-1,j-1]])
                    phi[i,j] = np.linalg.norm(sample[i-1,:] - test[j-1,:]) + prev_min #euclidean distance between the feature vectors + previous min val


        #backtracking to find optimal warped path
        for i in range(n+1):
            for j in range(m+1):
                i = n-i
                j = m-j

                if((phi[i-1,j] <= phi[i,j-1])and(phi[i-1,j] <= phi[i-1,j-1])):
                    epsilon[i-1,j] = 1
                elif((phi[i,j-1] <= phi[i-1,j] )and(phi[i,j-1] <= phi[i-1,j-1] )):
                    epsilon[i,j-1] = 1
                else:
                    epsilon[i-1,j-1] = 1


        epsilon[0,0] = 1
        epsilon[1,1] = 1 #start together
        epsilon[n,m] = 1 #end together

        return phi,epsilon
            
def predictor(score,k=len(train_speakers)): # K-NN based prediction given score matrix 
    n_digits,n_speaker = score.shape
    score_array = [] # score array with all score values
    pred_array = np.zeros(k)
    for i in range(n_digits):
        for j in range(n_speaker):
            score_array.append(score[i,j])
    score_array = np.asarray(score_array) # converting to numpy array
    indices = [b[0] for b in sorted(enumerate(score_array),key=lambda i:i[1])] #indices when sorted in ascending order
    
    for i in range(k):
        index = indices[i]
        pred_array[i] = int(index/n_speaker)
        
    prediction = stats.mode(pred_array) #majority prediction
    
    return prediction
        
    
        

    
    

In [2]:
# K-Means Algorithm

#K-Means helper functions
def dist(x,centroid):   #computes distance between two vectors
    distance = np.square(x-centroid).sum()
    return distance


def closest_centroid(x,centroids):  #computes the index of the closest centroid
    distance = []
    for i in range(len(centroids)):
        distance.append(dist(x,centroids[i]))
        
    closest_centroid_index = distance.index(min(distance))
    return closest_centroid_index 

def tot_error(data,centroids,assigned_centroids): # returns total error incurred
    error = 0
    
    for i,x in enumerate(data): #i is index, x is value at that index
        centroid = centroids[int(assigned_centroids[i])]
        error += dist(x,centroid)
        
    error /= len(data)
    return error
        
    
def KMeans(data,n_clusters,niter=50,tolerance = 0.0001):  #niter taken to be 50 as it converges before that
    cluster_centroids = np.zeros((n_clusters,data.shape[1]))
    assigned_centroids= np.zeros(data.shape[0])
    r                 = np.zeros((data.shape[0],n_clusters))
    
    #initialisation
    # assigning the cluster_centroids to random data points
    indices = np.random.randint(data.shape[0],size = n_clusters)
    
    for i,index in enumerate(indices):
        cluster_centroids[i] = data[index]
        
    error = np.zeros(niter)
    #Assignment and Update 
    for n in range(niter):
        
        #Assignment 
        for i,x in enumerate(data):
            ind = closest_centroid(x,cluster_centroids)
            assigned_centroids[i] = ind #storing the assigned centroid
            r[i,ind] = 1 #responsibility r[n,k] = 1

        #Update
        for i in range(n_clusters):
            R = 0  #total responsibility R
            for j,x in enumerate(data):
                cluster_centroids[i] += r[j,i]*x  # Sigma(r[n,k]x[n])
                R                    += r[j,i]
            cluster_centroids[i] /= R

        error[n] = tot_error(data,cluster_centroids,assigned_centroids)
        if((error[n]-error[n-1])<tolerance):
            break
    return cluster_centroids,assigned_centroids,error   
        

In [120]:
# DTW based digit recognition
pred = {}
error = 0
for speaker in tqdm(test_speakers):
    for i in digits:
        s_test = speaker+'_'+i+'.wav.mfcc'
        score = np.zeros((len(digits),len(train_speakers)))#score matrix with dynamic time warping scores 
        
        for k in range(len(train_speakers)):
            speaker2 = train_speakers[k]
            for j in range(len(digits)):
                    digit = digits[j]
                    s_train = speaker2+'_'+digit+'.wav.mfcc'
                    #print(train_mfcc[s_train].shape)
                    phi = DTW(train_mfcc[s_train],test_mfcc[s_test])[0]
                    score[j,k] = phi[-1,-1]
        
        #making prediction 
        predicted_index = predictor(score,20)[0]
        prediction = digits[int(predicted_index)]
        pred[s_test] = prediction
        if(prediction != i):
            error += 1

print(pred)
print(error)
        
                
            

100%|██████████████████████████████████████████████████████████████████████████████████| 12/12 [05:49<00:00, 29.12s/it]

{'mk_1.wav.mfcc': '1', 'mk_4.wav.mfcc': '4', 'mk_6.wav.mfcc': '6', 'mk_9.wav.mfcc': '9', 'mk_o.wav.mfcc': 'o', 'mm_1.wav.mfcc': 'o', 'mm_4.wav.mfcc': 'o', 'mm_6.wav.mfcc': '6', 'mm_9.wav.mfcc': '9', 'mm_o.wav.mfcc': 'o', 'ms_1.wav.mfcc': 'o', 'ms_4.wav.mfcc': '4', 'ms_6.wav.mfcc': '6', 'ms_9.wav.mfcc': '9', 'ms_o.wav.mfcc': 'o', 'mw_1.wav.mfcc': '1', 'mw_4.wav.mfcc': '4', 'mw_6.wav.mfcc': '6', 'mw_9.wav.mfcc': '9', 'mw_o.wav.mfcc': 'o', 'nc_1.wav.mfcc': '1', 'nc_4.wav.mfcc': '4', 'nc_6.wav.mfcc': '6', 'nc_9.wav.mfcc': '9', 'nc_o.wav.mfcc': 'o', 'ng_1.wav.mfcc': '1', 'ng_4.wav.mfcc': 'o', 'ng_6.wav.mfcc': '4', 'ng_9.wav.mfcc': '9', 'ng_o.wav.mfcc': 'o', 'nh_1.wav.mfcc': '1', 'nh_4.wav.mfcc': '4', 'nh_6.wav.mfcc': '6', 'nh_9.wav.mfcc': '9', 'nh_o.wav.mfcc': 'o', 'pe_1.wav.mfcc': '1', 'pe_4.wav.mfcc': '4', 'pe_6.wav.mfcc': '6', 'pe_9.wav.mfcc': '9', 'pe_o.wav.mfcc': 'o', 'pk_1.wav.mfcc': '1', 'pk_4.wav.mfcc': '4', 'pk_6.wav.mfcc': '6', 'pk_9.wav.mfcc': '9', 'pk_o.wav.mfcc': 'o', 'pm_1.wav




In [None]:
# Generating the K-Means codebook and the observation sequences



In [None]:
def dtw(s, t):
    n, m = len(s), len(t)
    dtw_matrix = np.zeros((n+1, m+1))
    for i in range(n+1):
        for j in range(m+1):
            dtw_matrix[i, j] = np.inf
    dtw_matrix[0, 0] = 0
    
    for i in range(1, n+1):
        for j in range(1, m+1):
            cost = abs(s[i-1] - t[j-1])
            # take last min from a square box
            last_min = np.min([dtw_matrix[i-1, j], dtw_matrix[i, j-1], dtw_matrix[i-1, j-1]])
            dtw_matrix[i, j] = cost + last_min
    return dtw_matrix

In [51]:
a = ['1','2']
b = np.asarray(a)
print(b)

['1' '2']


In [57]:
c = 'a'+'_'+'1'
print(c)

a_1


In [82]:
a = np.zeros((5,3))
for i in range(5):
    for j in range(3):
        a[i,j] = i+j
        
S_indices = [b[0] for b in sorted(enumerate(a.all()),key=lambda i:i[1], reverse = True)]
print(S_indices)

TypeError: 'numpy.bool_' object is not iterable

In [96]:
print(np.min([2,3,4]))

2


In [1]:
for i in range(10):
    if(i==5):
        break
    else:
        print(i)

0
1
2
3
4


In [6]:
print(len(test_mfcc_male.keys()))

NameError: name 'test_mfcc_male' is not defined