In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [2]:
!pip install librosa



In [0]:
import librosa
import pickle
import math

In [0]:
import numpy as np
import tensorflow as tf

# Read and transform data

Convert raw byte stream into useful feature space. STFT on the signals serves the initial feature extraction process

In [0]:
base_path = './drive/My Drive/Colab Notebooks/hw4/speaker-verification/'
train_path = base_path + 'hw4_trs.pkl'
test_path = base_path + 'hw4_tes.pkl'

base_path_model = base_path + 'model/'

In [0]:
with open(train_path, 'rb') as f:
  train_raw = pickle.load(f)

with open(test_path, 'rb') as f:
  test_raw = pickle.load(f)

trs.pkl contains an 500×16,180 matrix, whose row is a speech signal with 16,180 samples. They are
the returned vectors from the librosa.load function. Similarly, tes.pkl holds a 200×22,631 matrix

In [7]:
train_raw.shape, test_raw.shape

((500, 16180), (200, 22631))

## Convert into audio (.wav file) for checking manually

In [0]:
# for i in range(train_raw.shape[0]):
#   librosa.output.write_wav(base_path + 'trs/trs' + ('0000' + str(i))[-4:] + '.wav', train_raw[i], 16000)

In [0]:
# for i in range(test_raw.shape[0]):
#   librosa.output.write_wav(base_path + 'tes/tes' + ('0000' + str(i))[-4:] + '.wav', test_raw[i], 16000)

## Extract features using STFT

In [0]:
train_complex = np.array([librosa.stft(x, n_fft=1024, hop_length=512).T for x in train_raw])
test_complex = np.array([librosa.stft(x, n_fft=1024, hop_length=512).T for x in test_raw])

In [0]:
train = np.abs(train_complex)
test = np.abs(test_complex)

In [32]:
a = [[1, 2], [3, 4]]
np.pad()

[[1, 2], [3, 4]]

In [12]:
train.shape, test.shape

((500, 32, 513), (200, 45, 513))

# Generate mini-batches

## Structure of training data

The training matrix is ordered by speakers. Each speaker has 10 utterances, and there are 50 such
speakers (that’s why there are 500 rows). Similarly, the test set has 20 speakers, each of which is with
10 utterances.

In [0]:
samples_per_class = 10

## Procedure to generate mini-batches

1. Randomly sample L pairs of utterances from the ten utterance of the first speaker. In theory, there are $10 \choose 2$= 45 pairs you can sample from. You can use all 45 of them if you want. These are the positive examples in your first minibatch

2. Randomly sample L utterances from the 49 training speakers. Using them and the ten utterances of
the first speaker, form another set of L pairs. If L > 10, you’ll need to repeatedly use the first speaker’s
utterance (i.e. sampling with replacement). This set is your negative examples, each of whose pair
contains an utterance from the first speaker and a random utterance spoken by a different speaker.

3. In this first minibatch, you have 2L pairs of utterances.

4. Repeat this process for the other training speakers, so that each speaker is represented by L positive
pairs and L negative pairs. By doing so, you can form 50 minibatches with a balanced number of
positive and negative pair

In [0]:
max_length = 50
num_features = 513

In [0]:
def pad_zeros(stft):
  stft_val = np.zeros((max_length, num_features))
  stft_val[:stft.shape[0], :stft.shape[1]] = stft
  return stft_val

In [0]:
'''
data: train/test
batch_size: return batch_size number of positive and another batch_size number of negative pairs
stick: if true, one training class will be common across positive and negative pairs

returns:
x: list of pairs
y: 1 for positive pairs, 0 for negative
'''
def next_batch(data, batch_size, stick=False):
  x1, x2, l1, l2, y = [], [], [], [], []
  
  num_classes = len(data) // samples_per_class
  base = np.random.randint(num_classes)
  
  # generate positive pairs
  for _ in range(batch_size):
    # randomly select idx_0 only if stick is false
    if not stick:
      base = np.random.randint(num_classes)
      
    idx_0, idx_1 = base * samples_per_class + np.random.choice(np.arange(samples_per_class), size=2, replace=False)
    x1.append(pad_zeros(data[idx_0]))
    x2.append(pad_zeros(data[idx_1]))
    l1.append(data[idx_0].shape[0])
    l2.append(data[idx_1].shape[0])
    y.append([1])
#     print(idx_0, idx_1, 1)
    
  # generate negative pairs
  for _ in range(batch_size):
    # randomly select idx_0 only if stick is false
    if not stick:
      base = np.random.randint(num_classes)
    
    # make sure neg_base is not same as base
    while True:
      neg_base = np.random.randint(num_classes)
      if neg_base != base:
        break
      
    idx_0 = base * samples_per_class + np.random.randint(samples_per_class)
    idx_1 = neg_base * samples_per_class + np.random.randint(samples_per_class)
    
    x1.append(pad_zeros(train[idx_0]))
    x2.append(pad_zeros(train[idx_1]))
    l1.append(train[idx_0].shape[0])
    l2.append(train[idx_1].shape[0])
    y.append([0])
#     print(idx_0, idx_1, 0)
    
  return np.array(x1), np.array(x2), np.array(y)

# Create model

In [0]:
num_hidden = 256
learning_rate = 0.001

In [0]:
# Using this implementation as reference https://github.com/ardiya/siamesenetwork-tensorflow/blob/master/train.py

def model(inp, reuse):
  with tf.name_scope('model'):
    
    with tf.variable_scope('lstm_1') as scope:
      cell = tf.nn.rnn_cell.DropoutWrapper(tf.nn.rnn_cell.LSTMCell(num_hidden, reuse=reuse))
      output, state = tf.nn.dynamic_rnn(cell, inp, dtype=tf.float32, scope=scope)
      
#     with tf.variable_scope('conv_1') as scope:
#       conv_1 = tf.contrib.layers.conv2d(inp, 16, [1, 3], activation_fn=tf.nn.relu, padding='same', \
# 		        weights_initializer=tf.contrib.layers.xavier_initializer_conv2d(), scope=scope, reuse=reuse)
#       pool_1 = tf.contrib.layers.max_pool2d(conv_1, [1, 2], stride=[1, 2])
#       flattened_1 = tf.contrib.layers.flatten(pool_1)
#       dropout_1 = tf.contrib.layers.dropout(flattened_1, keep_prob=0.9)
      
    with tf.variable_scope('dense_1') as scope:
      flattened_1 = tf.contrib.layers.flatten(output)
      dropout_1 = tf.contrib.layers.dropout(flattened_1, keep_prob=0.9)
      dense_1 = tf.contrib.layers.fully_connected(dropout_1, 200, activation_fn=tf.nn.sigmoid, \
            weights_initializer=tf.contrib.layers.xavier_initializer(), scope=scope, reuse=reuse)
     
      connected = tf.layers.Dense(256)(dense_1)
      notConnected = tf.layers.Dense(256)
      
      print(connected)
      print(notConnected)
  return dense_1

## Loss function

In [0]:
def contrastive_loss(model1, model2, y, margin):
  # y can take values in {0, 1}
  
  with tf.name_scope('contrastive-loss'):
#     distance = tf.sqrt(tf.reduce_sum(tf.pow(model1 - model2, 2), 1, keepdims=True))
#     similarity = y * tf.square(distance)
    
#     # distance between different object should be at least 'margin'
#     dissimilarity = (1 - y) * tf.square(tf.maximum((margin - distance), 0))
    
#     return tf.reduce_mean(dissimilarity + similarity) / 2
    print(model1)
    print(model2)
    print(y)

    dotProduct = tf.reduce_sum(tf.multiply(model1, model2), axis = 1, keepdims=True)
    sigmDotProduct = tf.nn.sigmoid(dotProduct)
    notConnected = tf.contrib.layers.fully_connected(dropout_1, 200, activation_fn=tf.nn.sigmoid, \
            weights_initializer=tf.contrib.layers.xavier_initializer(), scope=scope, reuse=reuse)
    lossCalcu  = tf.reduce_sum(-y*tf.log(10e-8 + sigmDotProduct) - (1 - y) * tf.log(10e-8 + 1 - sigmDotProduct))
    print(dotProduct)
    print(sigmDotProduct)
    print(lossCalcu)
    return lossCalcu

In [0]:
def similar(model1, model2, y):
  distance = tf.sqrt(tf.reduce_sum(tf.pow(model1 - model2, 2), 1, keepdims=True))
  prob = 1 - tf.nn.sigmoid(distance)
  prediction = tf.cast(tf.greater_equal(prob, 0.5), tf.float32)
  accuracy = tf.reduce_mean(tf.cast(tf.equal(prediction, y), tf.float32))
  return prob, prediction, accuracy

## Inputs to the model

In [0]:
X1 = tf.placeholder(dtype='float', shape=[None, max_length, num_features])
X2 = tf.placeholder(dtype='float', shape=[None, max_length, num_features])
y = tf.placeholder(dtype='float', shape=[None, 1])
margin = 0.5

## Plug everything together

In [22]:
model1 = model(X1, None)

Instructions for updating:
This class is equivalent as tf.keras.layers.LSTMCell, and will be replaced by that in Tensorflow 2.0.
Instructions for updating:
Please use `keras.layers.RNN(cell)`, which is equivalent to this API
Instructions for updating:
Colocations handled automatically by placer.

For more information, please see:
  * https://github.com/tensorflow/community/blob/master/rfcs/20180907-contrib-sunset.md
  * https://github.com/tensorflow/addons
If you depend on functionality not listed there, please file an issue.

Instructions for updating:
Use keras.layers.flatten instead.
Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.


In [34]:
model2 = model(X2, True)

Tensor("model_2/dense_1/dense/BiasAdd:0", shape=(?, 256), dtype=float32)
<tensorflow.python.layers.core.Dense object at 0x7f6fa15ef7f0>


In [24]:
loss = contrastive_loss(model1, model2, y, margin)
train_op = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss=loss)

Tensor("model/dense_1/dense_1/Sigmoid:0", shape=(?, 200), dtype=float32)
Tensor("model_1/dense_1/dense_1/Sigmoid:0", shape=(?, 200), dtype=float32)
Tensor("Placeholder_2:0", shape=(?, 1), dtype=float32)
Tensor("contrastive-loss/Sum:0", shape=(?, 1), dtype=float32)
Tensor("contrastive-loss/Sigmoid:0", shape=(?, 1), dtype=float32)
Tensor("contrastive-loss/Sum_1:0", shape=(), dtype=float32)
Instructions for updating:
Use tf.cast instead.


In [25]:
print(model1)

Tensor("model/dense_1/dense_1/Sigmoid:0", shape=(?, 200), dtype=float32)


In [26]:
print(loss)

Tensor("contrastive-loss/Sum_1:0", shape=(), dtype=float32)


## Train model

In [0]:
batch_size = 10
num_epochs = 50
display_step = 10
save_step = 10

num_samples_tr = train.shape[0]
num_samples_te = test.shape[0]
num_batches_tr = int(math.ceil(num_samples_tr/batch_size))
num_batches_te = int(math.ceil(num_samples_te/batch_size))

train_loss = []
test_loss = []

In [0]:
sess = tf.Session()
saver = tf.train.Saver()

In [29]:
sess.run(tf.global_variables_initializer())

num_batches_tr = num_samples_tr // batch_size
for epoch in range(num_epochs):
  loss_val = 0
  for i in range(num_batches_tr):
    batch_x1, batch_x2, batch_y = next_batch(train, batch_size)
    
    _, lv = sess.run([train_op, loss], feed_dict={X1: batch_x1, X2: batch_x2, y: batch_y})
    loss_val += lv
    
  loss_val = loss_val / (num_batches_tr * batch_size)
    
  if epoch % display_step == 0:
    print(epoch, loss_val)
  if epoch % save_step == 0:
    saver.save(sess, base_path_model + 'model_' + str(epoch) + '.ckpt')

0 15.942385864257812
10 15.942385864257812


KeyboardInterrupt: ignored

## Test model

In [0]:
batch_x1, batch_x2, batch_y = next_batch(test, 10)

In [0]:
prob, prediction, accuracy = similar(model1, model2, y)

In [0]:
pb, pd, ac = sess.run([prob, prediction, accuracy], feed_dict={X1: batch_x1, X2: batch_x2, y: batch_y})

In [0]:
pb

In [0]:
pd, batch_y

In [0]:
ac