In [1]:
import numpy as np
import os
import pandas as pd
import matplotlib.pyplot as plt
import librosa
import librosa.display
from scipy import fftpack
from tqdm.notebook import tqdm
from numpy.fft import fft
import random
from scipy.io import wavfile
from scipy import signal
from scipy.stats import multivariate_normal

In [2]:
# window specs
WINDOW = 400 # in samples 
SHIFT  = 160 # in samples

# Import Training data
## First part - Loading Speech files

In [None]:
# import speech data and apply FFt , window = hamming, hop_length = 160, Window_size = 400
x_speech = []
entries = os.listdir('train/speech/')
for entry in tqdm(range(len(entries))):
    sr,data = wavfile.read(os.path.join("train/speech/"+ entries[entry]))
    pad = np.array(np.zeros(80).T)  # padding applied
    data = np.append(data,pad)
    window = signal.hamming(400)  # window for applying FFT
    for i in range(0,len(data)-WINDOW+SHIFT,SHIFT):
        temp = np.array(data[i:i+400]).astype(float)
        #print(window.shape)
        temp = temp * window
        temp_fft = np.log(np.abs(fft(temp,64)[:32]))  # 64 component magnitude FFT
        if entry==0 and i==0:
            Speech = temp_fft.T.reshape(32,1)
            #print(speech_specto.shape)
            continue
        Speech = np.concatenate((Speech,temp_fft.reshape(32,1)),axis=1)



HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=40.0), HTML(value='')))

In [None]:
plt.figure(figsize=(10,10))
plt.pcolormesh(Speech)
print(Speech.shape)
#Speech = Speech[:,1:] # shape = (32,119960)

In [None]:
Speech.shape

## Second part - Loading Music files

In [None]:
# import Music data and apply FFt , window = hamming, hop_length = 160, Window_size = 400
x_music = []
entries = os.listdir('train/music/')
for entry in tqdm(range(len(entries))):
    sr,data = wavfile.read(os.path.join("train/music/"+ entries[entry]))
    pad = np.array(np.zeros(80).T)  # padding applied
    data = np.append(data,pad)
    window = signal.hamming(400)  # window for applying FFT
    for i in range(0,len(data)-WINDOW+SHIFT,SHIFT):
        temp = np.array(data[i:i+400]).astype(float)
        #print(window.shape)
        temp = temp * window
        temp_fft = np.log(np.abs(fft(temp,64)[:32]))  # 64 component magnitude FFT
        if entry==0 and i==0:
            Music = temp_fft.T.reshape(32,1)
            #print(speech_specto.shape)
            continue
        Music = np.concatenate((Music,temp_fft.reshape(32,1)),axis=1)

#Music = Music[:,1:]     # shape = (32,119960)
plt.figure(figsize=(10,10))
print(Music.shape)
plt.pcolormesh(Music)

In [None]:
Music.shape

# Import test data

In [None]:
# importing test data
x_test = []
entries = os.listdir('test/')
for entry in tqdm(range(len(entries))):
    sr,data = wavfile.read(os.path.join("test/"+ entries[entry]))
    pad = np.array(np.zeros(80).T)  # padding applied
    data = np.append(data,pad)
    window = signal.hamming(400)  # window for applying FFT
    for i in range(0,len(data)-WINDOW+SHIFT,SHIFT):
        temp = np.array(data[i:i+400]).astype(float)
        #print(window.shape)
        temp = temp * window
        temp_fft = np.log(np.abs(fft(temp,64)[:32]))  # 64 component magnitude FFT
        # concatenate the vectors now
        if entry==0 and i==0:
            Test = temp_fft.T.reshape(32,1)
            continue
        Test = np.concatenate((Test,temp_fft.reshape(32,1)),axis=1)

plt.figure(figsize=(10,10))
print(Test.shape)
plt.pcolormesh(Test)

## Storing the audio files as a binary sequence, Music = 1, Speech = 0

In [None]:
y_music = []
y_speech = []
for i,entry in enumerate(entries):
    if entry[0] == 'm':  # music
        y_music.append(i)
    else:                # speech
        y_speech.append(i)

# create a boolean 1-D array which stores 1 in the index of music file, and 0 corresponding to speech file. 
print("Printing the indices in which thew audio files are stored .")
print("Music")
print(y_music)
print("Speech")
print(y_speech)
y_test = [0 for i in range(48)] # initialising with 0
for i in range(len(y_music)):  # set 1 in the indices of music file
    y_test[y_music[i]] = 1
    
#y_test = np.array(y_test)

# K-Means clustering implementation

In [None]:
def K_means(n_classes,X_data):

    #X_data ==> data arranged in column wise fashion, X has the feature vectors in row-wise fashion
    X = X_data.T.copy() # X has data data vectors stacked in rows

    old_mew = np.zeros((n_classes,X.shape[1]))
    mew = []

    for i in range(n_classes):
        mew.append(X[random.randint(0,X.shape[0]-1),:]) # initialising means with a random point

    mew = np.array(mew)

    D = np.zeros((X.shape[0],n_classes))   # Distance matrix initialised with 0

    #print(D)
    k_iter = 0
    while True:
        print("Distance matrix calculation....")
        for i in tqdm(range(X.shape[0])):
            dist = 1e14
            index = 0
            for j in range(n_classes):
                D[i,j] = 0
                if dist > np.dot(X[i,:] - mew[j], X[i,:] - mew[j]):
                    dist = np.dot(X[i,:] - mew[j], X[i,:] - mew[j])
                    index = j
            D[i,index] = 1
            
        print(D)
        print("Update mean...")
        for i in tqdm(range(n_classes)):
            mew[i] = np.mean(X[D[:,i] == 1])#/np.sum(D[:,i])


        flag = False 
        for i in range(n_classes):
            if np.abs(np.dot(old_mew[i],old_mew[i]) - np.dot(mew[i],mew[i])) > 1e-1:
                flag = True
                break

        if flag == False:
            return mew,D

        old_mew = mew.copy()
        k_iter += 1
        if k_iter > 20:    # keeping bound in the number of iterations for large dimensions
            return mew,D
        



# Implementation of EM algorithm using full covariance matrix 

In [None]:
def EM_Algorithm_using_full_covariance(n_components,X_data):
    # initialisation before EM algo
    
    X = X_data.T.copy()

    N = X.shape[0]

    means,D = K_means(n_components,X_data) # data vectors are columns

    # data vectors are rows in X_data

    # covariances , sigma.shape = (n_components,32,32)
    sigma = []
    for i in range(n_components):
        sigma.append(np.matmul((X - means[i]).T,X - means[i]))
    sigma = np.array(sigma)

    # priors
    pi = []
    for i in range(n_components):
        pi.append(X[D[:,i]==1].shape[0]/N)
    pi = np.array(pi)
    print(pi)

    # class conditionals
    gamma = []
    for i in range(n_components):
        gamma.append(np.zeros((N,)))
    gamma = np.array(gamma)  # shape = (n_components,N)

    M_variate = np.zeros((N,n_components))
    for i in tqdm(range(n_components)):
        M_variate[:,i] = multivariate_normal.pdf(X, mean=means[i], cov=sigma[i],allow_singular = True)


    # EM algorithm for n_components
    n_iter = 50
    likelihood = []
    N = X.shape[0]
    for i in tqdm(range(n_iter)):

        print("E-step") 
        M_variate = pi.reshape(1,n_components) * M_variate
        gamma = M_variate * ((M_variate @ np.ones((n_components,1))) ** -1)


        print("M step")
        
        # Mean updation
        means = (X.T @ gamma) * (((gamma.T  @ np.ones((N,1))).T) ** -1 ) #  shape = (32,n_components)
        
        # Covariance matrix updation
        for i in tqdm(range(n_components)):
            x = gamma.T[i]
            sigma[i] = ((X.T - means[:,i].reshape(32,1)) * x.reshape(1,N)) @ (X - means[:,i].T)
            sigma[i] = sigma[i] / np.sum(x) 
        
        # prior probabilities updation
        pi = np.mean(gamma,axis=0)    
        print("Prior values :",pi)   
        print("Multivariate...")
        for i in tqdm(range(n_components)):
            M_variate[:,i] = multivariate_normal.pdf(X, mean=means[:,i], cov=sigma[i],allow_singular = True)


        print("Likelihood evaluation")    

        like = np.sum(np.log((pi.reshape(1,n_components) * M_variate) @ np.ones((n_components,1)))) # same as np.sum with axis=1
        print("Log Likelihood = ",like)                                                              
        likelihood.append(like)


    plt.plot(range(len(likelihood)),likelihood, color='b')
    plt.title("Log Likelihood")
    plt.xlabel("Iterations")
    plt.ylabel("Likelihood value")
    plt.show()
    
    return means,sigma,pi
    

# Implementation of EM algorithm using diagonal covariance matrix

In [None]:
def EM_Algorithm_using_diagonal_covariance(n_components,X_data):
    # initialisation before EM
    
    # K-means returns the means of the clusters
    means,D = K_means(n_components,X_data) # data vectors are columns in X_data
    
    X = X_data.T.copy() # data vectors are rows in X
    
    N = X.shape[0]

    sigma = []
    for i in range(n_components):
        sigma.append(np.matmul((X - means[i]).T,X - means[i]))
    sigma = np.array(sigma)

    # priors
    pi = []
    for i in range(n_components):
        pi.append(X[D[:,i]==1].shape[0]/N)
    pi = np.array(pi)
    print(pi)

    # posteriors
    gamma = []
    for i in range(n_components):
        gamma.append(np.zeros((N,)))
    gamma = np.array(gamma)  # shape = (n_components,N)

    M_variate = np.zeros((N,n_components))
    for i in tqdm(range(n_components)):
        M_variate[:,i] = multivariate_normal.pdf(X, mean=means[i], cov=sigma[i],allow_singular = True)

    # Em algorithm for n_components
    
    n_iter = 50
    likelihood = []
    
    for i in tqdm(range(n_iter)):

        print("E-step") 
        M_variate = pi.reshape(1,n_components) * M_variate
        gamma = M_variate * ((M_variate @ np.ones((n_components,1))) ** -1)


        print("M step")
        
        # update means
        means = (X.T @ gamma) * (((gamma.T  @ np.ones((N,1))).T) ** -1 ) # (32,n_components)
        
        # update covariance matrices
        for i in tqdm(range(n_components)):
            x = gamma.T[i]
            sigma[i] = ((X.T - means[:,i].reshape(32,1)) * x.reshape(1,N)) @ (X - means[:,i].T)
            sigma[i] = sigma[i] / np.sum(x)

        # update priors probabilities
        pi = np.mean(gamma,axis=0)    
        print("Prior values:",pi)   
        print("Multivariate...")
        for i in tqdm(range(n_components)):
            M_variate[:,i] = multivariate_normal.pdf(X, mean=means[:,i], cov=np.diag(np.diag(sigma[i])),allow_singular = True)


        print("Likelihood evaluation")    

        like = np.sum(np.log((pi.reshape(1,n_components) * M_variate) @ np.ones((n_components,1))))
        print("Log Likelihood = ",like)                                                              
        likelihood.append(like)


    plt.plot(range(len(likelihood)),likelihood, color='b')
    plt.title("Log Likelihood")
    plt.xlabel("Iterations")
    plt.ylabel("Likelihood value")
    plt.show()
    
    return means,sigma,pi


## 2 mixtures with full covariance for Speech data

In [None]:
means,sigma,pi = EM_Algorithm_using_full_covariance(2,Speech)

## Predict the posterior values for each vector in spectrogram of test data applying speech parameters (full covariance)

In [None]:
n_components = 2
test_speech_posterior = np.zeros((Test.shape[1],n_components)) #test_speech_posterior.shape = (143952,n_components = 2)

for i in tqdm(range(n_components)):
    test_speech_posterior[:,i] = multivariate_normal.pdf(Test.T, mean=means[:,i], cov=sigma[i],allow_singular = True)

test_speech_posterior = pi.reshape(1,n_components) * test_speech_posterior
print(test_speech_posterior.shape)

# 2 mixtures with full covariance for Music data

In [None]:
means,sigma,pi = EM_Algorithm_using_full_covariance(2,Music)

## Predict the posterior values for each vector in spectrogram of test data applying Music parameters (full covariance)

In [None]:
n_components = 2
test_music_posterior = np.zeros((Test.shape[1],n_components)) #test_music_posterior.shape = (143952,n_components = 2)

# calculate the likelihood 
for i in tqdm(range(n_components)):
    test_music_posterior[:,i] = multivariate_normal.pdf(Test.T, mean=means[:,i], cov=sigma[i],allow_singular = True)

# calculate the posterior by multiplying prior with likelihood
test_music_posterior = pi.reshape(1,n_components) * test_music_posterior
print(test_music_posterior.shape)

## Compare posteriors obtained form from test data calculated above for speech and music specifications and Accuracy calculation

In [None]:
y_pred = np.sum(test_speech_posterior,axis = 1) < np.sum(test_music_posterior,axis=1)

#print(y_pred.shape)
predict=[]
for i in range(0,y_pred.shape[0],2999):
    predict.append(np.mean(y_pred[i:i+2999])>0.5)
    

print("Accuracy= (Matches in prediction) / (Total number of audio files) = ",np.mean(predict==np.array(y_test)))

### Accuracy obtained = 81.25 %
### Mismatches = 7 audio files out of 48

# Procedure to predicting Audio files:
1) I have used the following technique for predicting the class of audio files.

2) Firstly, imported the test  dataset of size (32,143952) after applying FFT.

3) Each audio file gets transformed in to a matrix of size (32,2999)

4) After EM training, the class wise means, priors  and covariances are obtained.

5) For each audio file, each vector of size (32,1) is taken and applied the GMM parameters (obtained after training) of the classes to obtain the posterior values. Using the posterior value the class of that vector is assigned. Boolean value is assigned : Music = 1 and Speech = 0.

6) This is done for all the 2999 vectors of the audio file.

7) Now, I find the proportion of vectors assigned the class of MUSIC. This done by taking the mean value of the vector containing the class details. 

8) If the proportion value is more than 0.5, I assign the entire ausio file as Music, else the audio file is tagged as Speech.

9) Following this, I compare my prediction vector with the y_test vector and print the accuracy value.

10) This is done for all the models implemented in the Assignment.


#  PART (B): Sub-part (i):
# 2 mixtures with diagonal covariance for Speech data


In [None]:
means,sigma,pi = EM_Algorithm_using_diagonal_covariance(2,Speech)
# likelihood graph at the end

# Predict the posterior values for each vector in spectrogram of test data applying speech parameters (diagonal covariance)

In [None]:
n_components = 2
test_speech_posterior = np.zeros((Test.shape[1],n_components)) #test_speech_posterior.shape = (143952,n_components = 2)

for i in tqdm(range(n_components)):
    test_speech_posterior[:,i] = multivariate_normal.pdf(Test.T, mean=means[:,i], cov=np.diag(np.diag(sigma[i])),allow_singular = True)

test_speech_posterior = pi.reshape(1,n_components) * test_speech_posterior
print(test_speech_posterior.shape)

## 2 mixtures with diagonal covariance for Music data

In [None]:
means,sigma,pi = EM_Algorithm_using_diagonal_covariance(2,Music)


## Predict the posterior values for each vector in spectrogram of test data applying Music parameters (Diagonal covariance)

In [None]:
n_components = 2
test_music_posterior = np.zeros((Test.shape[1],n_components)) #test_music_posterior.shape = (143952,n_components = 2)

# calculate likelihoods
for i in tqdm(range(n_components)):
    test_music_posterior[:,i] = multivariate_normal.pdf(Test.T, mean=means[:,i], cov=np.diag(np.diag(sigma[i])),allow_singular = True)

# calculate posteriors by multiplying priors and likelihoods
test_music_posterior = pi.reshape(1,n_components) * test_music_posterior
print(test_music_posterior.shape)

## Compare posteriors obtained form from test data calculated above for speech and music specifications and Accuracy calculation

In [None]:
y_pred = np.sum(test_speech_posterior,axis = 1) < np.sum(test_music_posterior,axis=1)

predict=[]
for i in range(0,y_pred.shape[0],2999):
    predict.append(np.mean(y_pred[i:i+2999])>0.5)
    

print("Accuracy= (Matches in prediction) / (Total number of audio files) = ",np.mean(predict==np.array(y_test)))

### Accuracy = 70.83 % for using diagonal covariances
### Mismatches = 14 out of 48 audio files


## 5 mixture component with full covariance for Speech data 

In [None]:
Means,Sigma,Pi = EM_Algorithm_using_full_covariance(5,Speech)

## Predict the posterior values for each vector in spectrogram of test data applying Speech parameters (full covariance, 5 mixture component)

In [None]:
n_components = 5
test_speech_posterior = np.zeros((Test.shape[1],n_components)) #test_speech_posterior.shape = (143952,n_components = 2)

# calculate likelihood
for i in tqdm(range(n_components)):
    test_speech_posterior[:,i] = multivariate_normal.pdf(Test.T, mean=Means[:,i], cov=Sigma[i],allow_singular = True)

# calculate posterior= prior * likelihood
test_speech_posterior = Pi.reshape(1,n_components) * test_speech_posterior
print(test_speech_posterior.shape)

## 5 mixture component with full covariance for Music data

In [None]:
Means,Sigma,Pi = EM_Algorithm_using_full_covariance(5,Music)

## Predict the posterior values for each vector in spectrogram of test data applying Music parameters (full covariance, 5 mixture component)

In [None]:
n_components = 5
test_music_posterior = np.zeros((Test.shape[1],n_components)) #test_music_posterior.shape = (143952,n_components = 2)

# likelihood
for i in tqdm(range(n_components)):
    test_music_posterior[:,i] = multivariate_normal.pdf(Test.T, mean=Means[:,i], cov=np.diag(np.diag(Sigma[i])),allow_singular = True)

# posterior = prior * likelihood
test_music_posterior = Pi.reshape(1,n_components) * test_music_posterior
print(test_music_posterior.shape)

## Compare posteriors obtained form from test data calculated above for speech and music specifications and Accuracy calculation

In [None]:
y_pred = np.sum(test_speech_posterior,axis = 1) < np.sum(test_music_posterior,axis=1)

#print(y_pred.shape)
predict=[]
for i in range(0,y_pred.shape[0],2999):
    predict.append(np.mean(y_pred[i:i+2999])>0.5)
    

print("Accuracy= (Matches in prediction) / (Total number of audio files) = ",np.mean(predict==np.array(y_test)))

### Accuracy = 50%
### Misclassified audio files = 24 out of 48

# 5 mixture component with diagonal covariance for Speech data

In [None]:
Means,Sigma,Pi = EM_Algorithm_using_diagonal_covariance(5,Speech)

# Predict the posterior values for each vector in spectrogram of test data applying Speech parameters (diagonal covariance, 5 mixture component)

In [None]:
n_components = 5
test_speech_posterior = np.zeros((Test.shape[1],n_components)) #test_speech_posterior.shape = (143952,n_components = 2)

# likelihood
for i in tqdm(range(n_components)):
    test_speech_posterior[:,i] = multivariate_normal.pdf(Test.T, mean=Means[:,i], cov=Sigma[i],allow_singular = True)

# posterior = prior * likelihood
test_speech_posterior = Pi.reshape(1,n_components) * test_speech_posterior
print(test_speech_posterior.shape)

# 5 mixture component with diagonal covariance for Music data 

In [None]:
Means,Sigma,Pi = EM_Algorithm_using_diagonal_covariance(5,Music)

# Predict the posterior values for each vector in spectrogram of test data applying Music parameters (diagonal covariance, 5 mixture component)

In [None]:
n_components = 5
test_music_posterior = np.zeros((Test.shape[1],n_components)) #test_music_posterior.shape = (143952,n_components = 2)

# likelihood
for i in tqdm(range(n_components)):
    test_music_posterior[:,i] = multivariate_normal.pdf(Test.T, mean=Means[:,i], cov=np.diag(np.diag(Sigma[i])),allow_singular = True)

# posterior = prior * likelihood
test_music_posterior = Pi.reshape(1,n_components) * test_music_posterior
print(test_music_posterior.shape)

# Compare posteriors obtained form from test data calculated above for speech and music specifications and Accuracy calculation

In [None]:
y_pred = np.sum(test_speech_posterior,axis = 1) < np.sum(test_music_posterior,axis=1)

#print(y_pred.shape)
predict=[]
for i in range(0,y_pred.shape[0],2999):
    predict.append(np.mean(y_pred[i:i+2999])>0.5)
    

print("Accuracy= (Matches in prediction) / (Total number of audio files) = ",np.mean(predict==np.array(y_test)))

# Accuracy = 50 %
## Mismatches = 24 out of 48 audio files