**Problem 1: Speaker Verification**

Importing all the libraries

In [0]:
import tensorflow as tf
import numpy as np
import pickle
import librosa

Mounting the google drive files to load the input data

In [2]:
from google.colab import drive
drive.mount('/content/gdrive')
directory='/content/gdrive/My Drive/Masters/DeepLearning/Homework4/Data/'

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


Function to read the pickle objects

In [0]:
def read_pkl(name):
  with open(name+'.pkl', 'rb') as f:
      X = pickle.load(f)
  return X

Load the training & test dataset

In [0]:
train=read_pkl(directory+'hw4_trs')
test=read_pkl(directory+'hw4_tes')

Function to compute the STFT of the input data

In [0]:
def stft_data(X):
  for i in range(X.shape[0]):
    S=librosa.stft(X[i], n_fft=1024, hop_length=hop_length)
    spectrogram=S if not i else np.vstack((spectrogram,S))
  return spectrogram.reshape((X.shape[0],hop_length+1,-1))

Compute the magnitude of the stft of the training & test dataset

In [6]:
hop_length=512
train_spectrogram=np.abs(stft_data(train))
test_spectrogram=np.abs(stft_data(test))
print(train_spectrogram.shape)
print(test_spectrogram.shape)

(500, 513, 32)
(200, 513, 45)


In [0]:
Function to define the RNN architecture

In [0]:
def rnn_architecture(lstm_sizes,X,keep_prob,reuse,rnn_tuple_state):
  with tf.name_scope('lstm'):  # LSTM layer
    lstms=[tf.contrib.rnn.LSTMCell(size,initializer=tf.contrib.layers.xavier_initializer(),reuse=reuse,state_is_tuple=True) for size in lstm_sizes]
    drops=[tf.contrib.rnn.DropoutWrapper(lstm,output_keep_prob=keep_prob) for lstm in lstms]  # Add dropout to the LSTM cell
    cell=tf.contrib.rnn.MultiRNNCell(drops,state_is_tuple=True)  # Stack up multiple LSTM layers for deep learning
    lstm_output,final_state=tf.nn.dynamic_rnn(cell,X,initial_state=rnn_tuple_state)
  
  with tf.name_scope('fully_connected'):  # Fully connected layer to get the required latent embedding dimension
    embedding=tf.layers.dense(lstm_output[:,-1,:],128,kernel_initializer=tf.contrib.layers.xavier_initializer(),activation=tf.nn.tanh,reuse=reuse)
    
  return embedding,final_state,cell

Function to compute the similarity between 2 embeddings

In [0]:
def build_lstm_layers(lstm_sizes,X1,X2,keep_prob,rnn_tuple_state):  # Create the LSTM layers
# Parameters: lstm_sizes=number of hidden states in each lstm cell, X=input, keep_prob=1-Dropout probability
  embedding1,final_state1,cell=rnn_architecture(lstm_sizes,X1,keep_prob,False,rnn_tuple_state)  # Latent embedding of the 1st speaker
  embedding2,final_state2,cell=rnn_architecture(lstm_sizes,X2,keep_prob,True,rnn_tuple_state)  # Latent embedding of the 2nd speaker
  output=tf.nn.sigmoid(tf.reduce_sum(embedding1*embedding2, 1, keep_dims=True))  # Similarity between the 2 speakers
  return output,cell,final_state1

Function to calculate the loss of the network

In [0]:
def calculate_loss(y,yhat,learning_rate):
  loss=tf.reduce_mean(-tf.cast(y,tf.float32)*tf.log(yhat)-tf.cast(1-y,tf.float32)*tf.log(1-yhat))
  optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss)
  return loss,optimizer

Function to build the minibatch from the input data

In [0]:
def load_minibatch(spectrogram,index):
  size=[]
  for i in range(spectrogram.shape[0]):
    if not 10*index<i<10*index+9:
      size.append(i)
  size=np.array(size)
  x=np.zeros((batch_size,2,spectrogram.shape[2],spectrogram.shape[1]))
  y=np.zeros(batch_size)
  count=0
  for i in range(9):
    for j in range(i+1,10):
      x[count,0]=spectrogram[10*index+i].T
      x[count,1]=spectrogram[10*index+j].T
      y[count]=1
      count+=1
  rand1=np.random.randint(low=10*index,high=10*index+9,size=batch_size//2)
  rand2=np.random.choice(size,batch_size//2,replace=False)
  rand3=np.random.choice([0,1],batch_size//2)
  for i in range(batch_size//2):
    x[count+i,rand3[i]]=spectrogram[rand1[i]].T
    x[count+i,1-rand3[i]]=spectrogram[rand2[i]].T
  indices=np.arange(batch_size)
  np.random.shuffle(indices)
  return x[indices],y[indices]

Function to compute the test accuracy

In [0]:
def getTestAccuracy(n_batches):
  accuracy=np.zeros(n_batches)
  test_state=np.zeros((len(lstm_sizes),2,batch_size,lstm_sizes[0]))
  for i in range(n_batches):
    xte,yte=load_minibatch(test_spectrogram,i)
    feed_dict={input1:xte[:,0,...],input2:xte[:,1,...],keep_prob:1,initial_state:test_state}
    te_output,test_state=sess.run([lstm_output,final_state],feed_dict=feed_dict)
    te_output=np.round(te_output)
    accuracy[i]=np.sum(te_output[:,0]==yte)*100/yte.shape[0]
  return np.mean(accuracy)

Initializing the parameters of LSTM network

In [0]:
tf.reset_default_graph()  # To reset all the parameters of the graph for every execution
tf.random.set_random_seed(0)
batch_size=90
n_batches=50  # Number of minibatches
learning_rate=0.00001
lstm_sizes=[256,256]  # Hidden units in the LSTM architecture
input1=tf.placeholder(tf.float32,[batch_size,None,hop_length+1],name='input1')  # Placeholder for the 1st speaker
input2=tf.placeholder(tf.float32,[batch_size,None,hop_length+1],name='input2')  # Placeholder for the 2nd speaker
output=tf.placeholder(tf.int32,[batch_size],name='output')  
keep_prob=tf.placeholder(tf.float32,name='keep_prob')  # keep_prob=1-dropout_prob
initial_state=tf.placeholder(tf.float32,[len(lstm_sizes),2,batch_size,lstm_sizes[0]],name='initial_state')  # Initial state of the LSTM network
state_per_layer_list = tf.unstack(initial_state, axis=0)
rnn_tuple_state = tuple([tf.nn.rnn_cell.LSTMStateTuple(state_per_layer_list[idx][0], state_per_layer_list[idx][1]) for idx in range(len(lstm_sizes))])

Training the LSTM network

In [48]:
lstm_output,lstm_cell,final_state=build_lstm_layers(lstm_sizes,input1,input2,keep_prob,rnn_tuple_state)
loss,optimizer=calculate_loss(output,lstm_output,learning_rate)
sess=tf.InteractiveSession()
tf.global_variables_initializer().run()  # Initialize all the graph variables
maxEpoch=131
np.random.seed(0)
for e in range(maxEpoch):
  l_t=0
  state=np.zeros((len(lstm_sizes),2,batch_size,lstm_sizes[0]))
  for i in range(n_batches):
    x,y=load_minibatch(train_spectrogram,i)
    feed_dict={input1:x[:,0,...],input2:x[:,1,...],output:y,keep_prob:0.98,initial_state:state}
    train_op,state,l_train,_=sess.run([lstm_output,final_state,loss,optimizer],feed_dict=feed_dict)
    l_t+=l_train
  if not e%10:
    print('Epoch:{} Training loss:{}'.format(e,l_t))
print('Test Accuracy:{} %'.format(getTestAccuracy(20)))



Epoch:0 Training loss:38.93625956773758
Epoch:10 Training loss:34.823713064193726
Epoch:20 Training loss:34.729903876781464
Epoch:30 Training loss:34.69943916797638
Epoch:40 Training loss:34.68405079841614
Epoch:50 Training loss:34.6759672164917
Epoch:60 Training loss:34.67029792070389
Epoch:70 Training loss:34.666658997535706
Epoch:80 Training loss:34.664449870586395
Epoch:90 Training loss:34.66307157278061
Epoch:100 Training loss:34.66163271665573
Epoch:110 Training loss:34.66065865755081
Epoch:120 Training loss:34.660042226314545
Epoch:130 Training loss:34.65940374135971
Test Accuracy:60.222222222222214 %


In [0]:
sess.close()