**Speaker Verification**

In [1]:
!pip install librosa # in colab, you'll need to install this
import librosa

import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

import matplotlib.pyplot as plt
import pylab as pl
import pandas as pd
import numpy as np
import pickle

from IPython.display import display, clear_output
from __future__ import print_function
from ipywidgets import interact, interactive, fixed
import ipywidgets as widgets
from math import ceil
from IPython.display import Audio
from scipy.io import wavfile
import math
from sklearn.metrics import accuracy_score

%matplotlib inline



In [0]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [3]:
from google.colab import files
uploaded = files.upload()

Saving hw4_tes.pkl to hw4_tes.pkl
Saving hw4_trs.pkl to hw4_trs.pkl


In [0]:
#Data Loading and manipulation

with open('hw4_trs.pkl', 'rb') as f:
    train_data = pickle.load(f)

with open('hw4_tes.pkl', 'rb') as f:
    test_data = pickle.load(f)

In [5]:
train_data[0].shape
test_data[0].shape

(16180,)

(22631,)

In [0]:
#Getting magnitudes, transpose, padding and forming pairs
max_len = 45

def speech_file_loading(data, numfiles):
    stft_signals = []
    abs_signals = []
    lengths = []
    
    for i in range(numfiles):
        
        #Calculating STFT
        stft = librosa.stft(data[i], n_fft= 1024, hop_length= 512)
        stftlen = stft.shape[1]
        stft_signals.append(stft)
        
        #Calculating Magnitude values
        stftabs = np.abs(stft)
        stftabs = np.pad(stftabs, ((0,0),(0, max_len-stftlen)), 'constant')
        abs_signals.append(stftabs)
        
        lengths.append(stftlen)
        
    return stft_signals, abs_signals, lengths
  
#Training data
X_inp, X_abs, X_len = speech_file_loading(train_data, 500)

#Test data
X_test_inp, X_test_abs, X_test_len = speech_file_loading(test_data, 200)  

In [7]:
len(X_inp), len(X_abs)
len(X_test_inp), len(X_test_abs)
X_inp[0].shape
X_abs[0].shape
X_test_inp[0].shape
X_test_abs[0].shape
print(X_len)
print(X_test_len)

(500, 500)

(200, 200)

(513, 32)

(513, 45)

(513, 45)

(513, 45)

[32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32,

In [8]:
#Getting required batches for the speaker verification task

#Training dataset
#Get positive batches for each speaker with L=10 i.e. 10 positive pairs --> 20 samples
#Get negative batches for each speaker with L=10 i.e. 10 negative pairs --> 20 samples

positive_indices = []
negative_indices = []
for i in range(0,500,10):
  positive_indices.append(list(np.random.choice(np.arange(i,i+9), 20, replace=True)))
  pos = np.arange(i,i+9)
  neg = [j for j in range(500) if j not in pos] 
  negative_indices.append(list(np.random.choice(pos, 10, replace=True)) + list(np.random.choice(neg, 10, replace=True)))

print (positive_indices)
print ("\n")
print (len(positive_indices))
print ("\n")
print (negative_indices)
print ("\n")
print (len(negative_indices))

[[2, 4, 8, 8, 7, 8, 2, 7, 2, 5, 3, 1, 2, 6, 7, 1, 7, 2, 5, 5], [16, 15, 14, 11, 11, 12, 16, 10, 14, 15, 13, 13, 10, 15, 17, 14, 16, 10, 18, 18], [20, 28, 21, 27, 25, 25, 22, 26, 27, 28, 21, 20, 26, 21, 25, 20, 26, 22, 21, 27], [34, 30, 32, 31, 34, 31, 38, 30, 37, 36, 33, 32, 30, 31, 36, 35, 32, 36, 32, 36], [40, 48, 42, 43, 43, 44, 42, 48, 43, 42, 45, 40, 46, 46, 40, 43, 45, 42, 48, 45], [53, 51, 50, 55, 53, 52, 51, 51, 50, 54, 50, 55, 57, 58, 58, 55, 51, 58, 55, 55], [67, 67, 68, 64, 63, 67, 64, 68, 63, 60, 64, 68, 66, 68, 68, 62, 65, 66, 60, 65], [77, 73, 75, 78, 76, 73, 70, 72, 78, 71, 78, 76, 74, 71, 72, 71, 75, 73, 78, 72], [82, 83, 84, 85, 87, 84, 88, 86, 84, 80, 82, 81, 88, 86, 87, 86, 82, 83, 84, 88], [95, 98, 93, 96, 92, 91, 97, 94, 90, 94, 95, 93, 95, 91, 92, 90, 96, 95, 91, 92], [101, 105, 100, 100, 101, 107, 106, 102, 105, 100, 104, 104, 106, 107, 105, 107, 105, 104, 107, 104], [113, 113, 112, 115, 118, 111, 114, 117, 117, 116, 113, 117, 114, 118, 113, 113, 110, 111, 113, 1

In [9]:
#Test dataset
#Get positive batches for each speaker with L=10 i.e. 10 positive pairs --> 20 samples
#Get negative batches for each speaker with L=10 i.e. 10 negative pairs --> 20 samples

positive_indices_test = []
negative_indices_test = []
for i in range(0,200,10):
  positive_indices_test.append(list(np.random.choice(np.arange(i,i+9), 20, replace=True)))
  pos = np.arange(i,i+9)
  neg = [j for j in range(200) if j not in pos] 
  negative_indices_test.append(list(np.random.choice(pos, 10, replace=True)) + list(np.random.choice(neg, 10, replace=True)))

print (positive_indices_test)
print ("\n")
print (len(positive_indices_test))
print ("\n")
print (negative_indices_test)
print ("\n")
print (len(negative_indices_test))

[[8, 8, 4, 8, 1, 6, 3, 0, 4, 7, 2, 8, 7, 0, 0, 1, 6, 5, 7, 1], [11, 14, 12, 18, 14, 13, 14, 16, 17, 15, 16, 14, 14, 11, 13, 12, 13, 18, 10, 15], [28, 25, 23, 24, 28, 21, 20, 24, 27, 28, 21, 20, 25, 25, 25, 26, 20, 23, 20, 21], [32, 37, 37, 37, 32, 38, 36, 32, 35, 30, 33, 30, 35, 33, 37, 30, 30, 33, 37, 36], [42, 42, 41, 45, 46, 43, 48, 41, 45, 40, 44, 46, 47, 45, 47, 43, 41, 45, 44, 46], [52, 58, 55, 50, 54, 51, 52, 57, 53, 55, 56, 54, 57, 51, 58, 53, 54, 58, 54, 52], [63, 60, 60, 68, 60, 68, 60, 61, 63, 65, 62, 66, 60, 63, 62, 60, 62, 60, 64, 61], [75, 76, 74, 78, 72, 75, 73, 70, 76, 72, 76, 75, 70, 72, 74, 70, 71, 76, 73, 72], [83, 80, 82, 80, 84, 85, 80, 87, 81, 87, 88, 85, 85, 84, 88, 80, 82, 87, 87, 81], [93, 95, 94, 93, 93, 92, 96, 94, 97, 96, 96, 94, 93, 93, 91, 92, 97, 98, 92, 97], [103, 108, 106, 102, 108, 105, 108, 107, 103, 102, 105, 105, 106, 108, 101, 102, 102, 107, 105, 106], [110, 116, 111, 110, 116, 118, 115, 111, 114, 116, 118, 115, 118, 116, 113, 115, 118, 117, 113, 1

In [0]:
#Create positive and negative batches datasets

#Training dataset
train_positive_signal_1 = []
train_positive_signal_2 = []
train_negative_signal_1 = []
train_negative_signal_2 = []

for i in range(50):
  train_positive_signal_1.append([X_abs[j] for j in positive_indices[i][0:10]])
  train_positive_signal_2.append([X_abs[j] for j in positive_indices[i][10:20]])
  train_negative_signal_1.append([X_abs[j] for j in negative_indices[i][0:10]])
  train_negative_signal_2.append([X_abs[j] for j in negative_indices[i][10:20]])

In [11]:
len(train_positive_signal_1)
len(train_positive_signal_2)
len(train_negative_signal_1)
len(train_negative_signal_2)

len(train_positive_signal_1[0])
len(train_positive_signal_2[0])
len(train_negative_signal_1[0])
len(train_negative_signal_2[0])

train_positive_signal_1[0][0].shape
train_positive_signal_2[0][0].shape
train_negative_signal_1[0][0].shape
train_negative_signal_2[0][0].shape


50

50

50

50

10

10

10

10

(513, 45)

(513, 45)

(513, 45)

(513, 45)

In [0]:
#Test dataset
test_positive_signal_1 = []
test_positive_signal_2 = []
test_negative_signal_1 = []
test_negative_signal_2 = []

for i in range(20):
  test_positive_signal_1.append([X_test_abs[j] for j in positive_indices_test[i][0:10]])
  test_positive_signal_2.append([X_test_abs[j] for j in positive_indices_test[i][10:20]])
  test_negative_signal_1.append([X_test_abs[j] for j in negative_indices_test[i][0:10]])
  test_negative_signal_2.append([X_test_abs[j] for j in negative_indices_test[i][10:20]])

In [13]:
len(test_positive_signal_1)
len(test_positive_signal_2)
len(test_negative_signal_1)
len(test_negative_signal_2)

len(test_positive_signal_1[0])
len(test_positive_signal_2[0])
len(test_negative_signal_1[0])
len(test_negative_signal_2[0])

test_positive_signal_1[0][0].shape
test_positive_signal_2[0][0].shape
test_negative_signal_1[0][0].shape
test_negative_signal_2[0][0].shape

20

20

20

20

10

10

10

10

(513, 45)

(513, 45)

(513, 45)

(513, 45)

In [0]:
#Target lists for training and test datasets

y_train_positive = [1]*500
y_train_negative = [0]*500
y_test_positive = [1]*200
y_test_negative = [0]*200

In [15]:
#Stack datasets to create one for training and test

train_signal_1 = []
train_signal_2 = []
train_y = []

for i in range(0,50):
  train_signal_1.append(np.concatenate((np.array(train_positive_signal_1[i]), np.array(train_negative_signal_1[i]))))
  train_signal_2.append(np.concatenate((np.array(train_positive_signal_2[i]), np.array(train_negative_signal_2[i]))))
  train_y.append(np.concatenate((np.array(y_train_positive[i*10 : ((i*10)+10)]), np.array(y_train_negative[i*10 : ((i*10)+10)]))))

train_signal_1 = list(np.vstack(np.array(train_signal_1)))
train_signal_2 = list(np.vstack(np.array(train_signal_2)))
train_y = np.ndarray.flatten(np.array(train_y))

len(train_signal_1)
len(train_signal_2)
train_y.shape


1000

1000

(1000,)

In [16]:
test_signal_1 = []
test_signal_2 = []
test_y = []

for i in range(0,20):
  test_signal_1.append(np.concatenate((np.array(test_positive_signal_1[i]), np.array(test_negative_signal_1[i]))))
  test_signal_2.append(np.concatenate((np.array(test_positive_signal_2[i]), np.array(test_negative_signal_2[i]))))
  test_y.append(np.concatenate((np.array(y_test_positive[i*10 : ((i*10)+10)]), np.array(y_test_negative[i*10 : ((i*10)+10)]))))

test_signal_1 = list(np.vstack(np.array(test_signal_1)))
test_signal_2 = list(np.vstack(np.array(test_signal_2)))
test_y = np.ndarray.flatten(np.array(test_y))

len(test_signal_1)
len(test_signal_2)
test_y.shape

400

400

(400,)

In [0]:
#Designing the Siamese Network

seq = tf.placeholder(tf.int32, None)
dropout_var = tf.placeholder(tf.float32, ())

X1 = tf.placeholder(tf.float32, [None, max_len, 513])
X2 = tf.placeholder(tf.float32, [None, max_len, 513])
y = tf.placeholder(tf.float32, [None])

def rnn_function(X, dp):
  with tf.variable_scope("rnn1"):
    
    stacked_rnn = []
    for _ in range(5):
      r_cell = tf.nn.rnn_cell.BasicLSTMCell(1024, forget_bias=1.0, state_is_tuple=True)
      lstm_r_cell = tf.contrib.rnn.DropoutWrapper(r_cell,output_keep_prob=dp)
      stacked_rnn.append(lstm_r_cell)
    lstm_cell_m = tf.nn.rnn_cell.MultiRNNCell(cells=stacked_rnn, state_is_tuple=True)

    outputs, _ = tf.nn.dynamic_rnn(lstm_cell_m, X, dtype=tf.float32, sequence_length=seq)
    dense_output = tf.layers.dense(outputs, 513, kernel_initializer= tf.contrib.layers.xavier_initializer())
    d = seq[0]
    return [dense_output, d]

In [0]:
def loss_function(y,d,batch_size):
  tmp= y *tf.square(d)
  tmp2 = (1-y) *tf.square(tf.maximum((1 - d),0))
  return tf.reduce_sum(tmp +tmp2)/batch_size/2

In [19]:
#Output, loss and optimization

with tf.variable_scope("model"):
  out1 = rnn_function(X1, dropout_var)
  output1 = out1[0]
  d = out1[1]
with tf.variable_scope("model", reuse=tf.AUTO_REUSE):
  out2 = rnn_function(X2, dropout_var)
  output2 = out2[0]
  distance = tf.reduce_sum(tf.diag_part(tf.tensordot( output1[:, :d,:], output2[:, :d,:], axes=[[1],[1]])),1, keep_dims=True) 
  y_preds = tf.sigmoid(distance)
  loss = loss_function(y, y_preds, 20)
  optimizer = tf.train.AdamOptimizer(learning_rate= 0.0000001).minimize(loss)  

Instructions for updating:
This class is equivalent as tf.keras.layers.LSTMCell, and will be replaced by that in Tensorflow 2.0.
Instructions for updating:
This class is equivalent as tf.keras.layers.StackedRNNCells, and will be replaced by that in Tensorflow 2.0.
Instructions for updating:
Please use `keras.layers.RNN(cell)`, which is equivalent to this API
Instructions for updating:
Use tf.cast instead.
Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.
Instructions for updating:
Use keras.layers.dense instead.
Instructions for updating:
keep_dims is deprecated, use keepdims instead


In [20]:
#Running model and testing on training and test datasets

init = tf.global_variables_initializer()

max_epochs = 40
batch_step = 20 #1000 for 50 user batches = 20 batch step i.e. 2L
step = 5
train_len = np.array([32]*1000)

sess = tf.Session()
sess.run(init)

for epoch in range(max_epochs):
    avg_cost = 0.
    avg_acc = 0.
    train_acc = []
    random = np.arange(0, 1000, 20)

    for i in range(len(random)):
        start = int(random[i])
        end = int(start + batch_step)
        b_x1, b_x2, b_y = np.array(train_signal_1[start:end]).swapaxes(1,2), np.array(train_signal_2[start:end]).swapaxes(1,2), train_y[start:end]
        seqlen = np.array(train_len[start:end])
        data = {X1: b_x1, X2: b_x2, y: b_y, seq : seqlen, dropout_var : 0.95}
        sess.run(optimizer, feed_dict=data)
        avg_cost += sess.run(loss, feed_dict=data)
        train_preds = sess.run(y_preds, feed_dict=data)
        y_pred_binary = np.array([1 if i >= 0.55 else 0 for i in train_preds])
        train_acc.append(accuracy_score(b_y, y_pred_binary))
     
    avg_cost = avg_cost / len(random)
    avg_acc = np.mean(train_acc)
        
    if (epoch+1) % step == 0:
        print ("Epoch: %03d/%03d accuracy: %.9f" % (epoch, max_epochs, avg_acc))
        print ("Loss", avg_cost)
        
print ("=========================Model Optimization Complete============================")

Epoch: 004/040 accuracy: 0.524000000
Loss 2.8732653522491454
Epoch: 009/040 accuracy: 0.555000000
Loss 2.6899362134933473
Epoch: 014/040 accuracy: 0.587000000
Loss 2.624114851951599
Epoch: 019/040 accuracy: 0.612000000
Loss 2.5922104835510256
Epoch: 024/040 accuracy: 0.645000000
Loss 2.574125738143921
Epoch: 029/040 accuracy: 0.644000000
Loss 2.5618166017532347
Epoch: 034/040 accuracy: 0.647000000
Loss 2.5531778573989867
Epoch: 039/040 accuracy: 0.652000000
Loss 2.5469803047180175


In [24]:
#Accuracy for test dataset

batch_step = 20 #400 for 20 user batches = 20 batch step i.e. 2L
step = 5
test_len = np.array([45]*400)

avg_cost_test = 0.
avg_acc_test = 0.
test_acc = []
random_test = np.arange(0, 400, 20)

for i in range(len(random_test)):
  start_test = int(random_test[i])
  end_test = int(start_test + batch_step)
  b_test_x1, b_test_x2, b_test_y = np.array(test_signal_1[start_test:end_test]).swapaxes(1,2), np.array(test_signal_2[start_test:end_test]).swapaxes(1,2), test_y[start_test:end_test]
  seqlen_test = np.array(test_len[start_test:end_test])
  data_test = {X1: b_test_x1, X2: b_test_x2, y: b_test_y, seq : seqlen_test, dropout_var : 1.0}
  avg_cost_test += sess.run(loss, feed_dict=data_test)
  test_preds = sess.run(y_preds, feed_dict=data_test)
  y_pred_test_binary = np.array([1 if i >= 0.55 else 0 for i in test_preds])
  test_acc.append(accuracy_score(b_test_y, y_pred_test_binary))

avg_cost_test = avg_cost_test / len(random_test)
avg_acc_test = np.mean(test_acc)
        
print ("Accuracy:" , avg_acc_test)

Accuracy: 0.6249999999999999
