In [14]:
!pip install librosa



In [0]:
import torch
from torch import nn, optim
import torch.nn.functional as F
from torch.autograd.variable import Variable

from IPython import display
from matplotlib import pyplot as plt

from torch.utils.data.dataset import Dataset 
from torch.utils.data import DataLoader
import librosa

from IPython import display
import matplotlib.pyplot as plt

In [16]:
from google.colab import drive
drive.mount('/content/drive')

train_file_pickle = "/content/drive/My Drive/Deep Learning Systems/Assignment4Part1Data/hw4_trs.pkl"
test_file_pickle = "/content/drive/My Drive/Deep Learning Systems/Assignment4Part1Data/hw4_tes.pkl"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
from itertools import combinations, product
from random import sample
import pickle
import numpy as np

def getstft(raw):
    stft=librosa.stft(raw, n_fft=1024, hop_length=512)
    return torch.tensor(np.abs(stft)).float().t()

def getstftdatafrompickle(filepath):
    data_array = None
    
    with open(filepath, 'rb') as f:
        data_array = pickle.load(f)
    stft_arr = []
    for item in data_array:
        stft_arr.append(getstft(item))
    return stft_arr

def generate_combinations(numspeak, sample_count):
    # This part of combitions genereration was discussed with a friend
     positive_pairs = torch.zeros((numspeak, sample_count, 3))
     negative_pairs = torch.zeros((numspeak, sample_count, 3))
     for speaker in range(numspeak):
         fullset = set(range(numspeak*10))
         positiveset = set((range(speaker*10, speaker*10+10)))
         negativeset = set(fullset-positiveset)
         pos_comb = torch.tensor(sample(list(combinations(list(positiveset),2)), sample_count))
         neg_comb = torch.tensor(sample(list(product(list(positiveset), list(negativeset))), sample_count))
         positive_labels = torch.ones(sample_count, 1, dtype=torch.long)
         negative_labels = torch.zeros(sample_count,1, dtype=torch.long)
         positive_pairs[speaker] = torch.cat((pos_comb, positive_labels), 1)
         negative_pairs[speaker] = torch.cat((neg_comb, negative_labels), 1)
     return positive_pairs, negative_pairs

class SiameseDataset(Dataset):
    def __init__(self, positive_comb, negative_comb, datafile):
        pos_pair, neg_pair = generate_combinations(50, 45)
        all_pairs = torch.cat((pos_pair, neg_pair), 0)
        all_pairs = all_pairs.reshape((2*50*45, 3))
        self.samples_indices = all_pairs.int()
        self.stft_arr = getstftdatafrompickle(datafile)
    
    def __len__(self):
        return len(self.samples_indices)
    
    def __getitem__(self, idx):
        #returns first utterance, second utterance and label
        return self.stft_arr[self.samples_indices[idx][0]], self.stft_arr[self.samples_indices[idx][1]], self.samples_indices[idx][2]

class SiameseDatasetTest(Dataset):
    def __init__(self, positive_comb, negative_comb, datafile):
        pos_pair, neg_pair = generate_combinations(20, 45)
        all_pairs = torch.cat((pos_pair, neg_pair), 0)
        all_pairs = all_pairs.reshape((2*20*45, 3))
        self.samples_indices = all_pairs.int()
        self.stft_arr = getstftdatafrompickle(datafile)
    
    def __len__(self):
        return len(self.samples_indices)
    
    def __getitem__(self, idx):
        #returns first utterance, second utterance and label
        return self.stft_arr[self.samples_indices[idx][0]], self.stft_arr[self.samples_indices[idx][1]], self.samples_indices[idx][2]

def get_batch_dot_product(embedding_1_batch, embedding_2_batch):
    op = torch.bmm(
        embedding_1_batch.view(embedding_1_batch.shape[0], 1, embedding_1_batch.shape[1]),
        embedding_2_batch.view(embedding_2_batch.shape[0], embedding_2_batch.shape[1], 1)
    )
    return op.reshape(-1)

In [18]:
positive_comb, negative_comb = generate_combinations(50, 45)
ds = SiameseDataset(positive_comb, negative_comb, train_file_pickle) 
train_loader = DataLoader(ds, batch_size=45, shuffle=True, num_workers=2)

class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        self.lstm = nn.LSTM(513, 513, 2, batch_first=True)
        self.lin1 = nn.Linear(513, 100)


    def forward(self, x):
        x,_ = self.lstm(x)
        x = x[:, -1, :]
        x = self.lin1(x)
        return x

net = Net()
print(net)

learning_rate = 5e-4
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = optim.Adam(net.parameters(), lr=learning_rate)


Net(
  (lstm): LSTM(513, 513, num_layers=2, batch_first=True)
  (lin1): Linear(in_features=513, out_features=100, bias=True)
)


In [64]:
# for epoch in range(40):
for epoch in range(5):
    for index, batch in enumerate(train_loader):
        uttr1, uttr2, label = batch
        optimizer.zero_grad()
        embedding1 = net(uttr1)
        embedding2 = net(uttr2)
        op = get_batch_dot_product(embedding1, embedding2)
        loss = criterion(op, label.float())
        
        loss.backward()
        optimizer.step()
        if index%256 == 0:
            print("Epoch: {}, Loss: {}".format(epoch, loss.data))

Epoch: 0, Loss: 0.0004076989134773612
Epoch: 1, Loss: 3.168747207382694e-05
Epoch: 2, Loss: 0.0001128827134380117
Epoch: 3, Loss: 2.2173251636559144e-05
Epoch: 4, Loss: 0.00013234572543296963


Though it looks like I have done 5 epochs, i actually did 45 epochs.

40 Before this and 5 More to get the network to converge a bit more. 

In [0]:
accuracy_arr = []
for acc_test in range(10):
    #generate combitions for test data
    positive_comb, negative_comb = generate_combinations(20, 45)
    #get data from test pickle file and combitions
    dstest = SiameseDatasetTest(positive_comb, negative_comb, test_file_pickle)
    test_loader = DataLoader(dstest, batch_size=512, shuffle=False, num_workers=2)
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in test_loader:
            uttr1, uttr2, label = batch
            embedding1 = net(uttr1)
            embedding2 = net(uttr2)
            op = get_batch_dot_product(embedding1, embedding2)
            predictions = torch.sigmoid(op)

            for index, val in enumerate(predictions):
                #predict 1 if the sigmoid of dot product is more than 0.5
                predict = 1 if val > 0.5 else 0
                if predict == label[index]:
                    correct+=1
                total+=1
    accuracy_arr.append(correct*100/total)


In [70]:
accuracy_arr = np.asarray(accuracy_arr)
print(accuracy_arr)
print("Average Accuracy: {}".format(np.average(accuracy_arr)))
print("Max Accuracy: {}, Min Accuracy: {}".format(np.max(accuracy_arr), np.min(accuracy_arr)))

[68.38888889 67.83333333 68.88888889 67.88888889 66.72222222 69.33333333
 68.27777778 67.16666667 68.5        69.05555556]
Average Accuracy: 68.20555555555555
Max Accuracy: 69.33333333333333, Min Accuracy: 66.72222222222223
