In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [9]:
!pip install torchinfo

Collecting torchinfo
  Downloading torchinfo-1.8.0-py3-none-any.whl (23 kB)
Installing collected packages: torchinfo
Successfully installed torchinfo-1.8.0


In [2]:
import librosa
import soundfile
import os
import numpy as np

In [10]:
from torchinfo import summary

In [3]:
from torch.utils.data import Dataset, DataLoader

In [32]:
def extract_features(waveform, sample_rate):

    """Extract MFCC features from an audio file, shape=(TIME, MFCC)."""

    # if len(waveform.shape) == 2:
    #     waveform = librosa.to_mono(waveform.transpose())  No need since single channel

    if sample_rate != 16000:
        waveform = librosa.resample(waveform, sample_rate, 16000)

    features = librosa.feature.mfcc(y=waveform, sr=sample_rate, n_mfcc= 20 )

    return features.transpose()

In [5]:
waveform, sample_rate = soundfile.read('/content/drive/MyDrive/ROBOVOX_SP_CUP_2024/data/single-channel/denoised_enrollments/spk_11-11_22_1_0_d1_ch5.wav')

In [6]:
f = extract_features(waveform, sample_rate)

In [7]:
f.shape

(148, 40)

In [5]:
import torch
import torch.nn as nn
# import torch.nn.functional as F

In [6]:
class LstmSpeakerEncoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, embedding_size):

        super(LstmSpeakerEncoder, self).__init__()
        self.lstm = nn.LSTM(
            input_size,     # Number of MFCC coefficients
            hidden_size,     # Number of hidden units in each LSTM layer
            num_layers,    # Number of stacked LSTM layers (3)
            batch_first=True,
        )
        self.fc = nn.Linear(hidden_size, embedding_size)


    def forward(self, x):
        _, (h_n, _) = self.lstm(x)
        embedding = self.fc(h_n[-1])  # Take the last hidden state as the embedding
        return embedding

In [22]:
model = LstmSpeakerEncoder(20, hidden_size = 5, num_layers= 2, embedding_size =32)

In [23]:
summary(model)

Layer (type:depth-idx)                   Param #
LstmSpeakerEncoder                       --
├─LSTM: 1-1                              580
├─Linear: 1-2                            192
Total params: 772
Trainable params: 772
Non-trainable params: 0

In [11]:
anch_waveform, anch_sample_rate = soundfile.read('/content/drive/MyDrive/ROBOVOX_SP_CUP_2024/data/single-channel/denoised_enrollments/spk_2-2_1_1_0_d5_ch5.wav')
anch_f = extract_features(anch_waveform,anch_sample_rate)
anchor_input = torch.tensor(anch_f, dtype=torch.float32).unsqueeze(0)  # Add batch dimension

pos_waveform, pos_sample_rate = soundfile.read('/content/drive/MyDrive/ROBOVOX_SP_CUP_2024/data/single-channel/denoised_enrollments/spk_2-2_1_1_0_d6_ch5.wav')
pos_f = extract_features(pos_waveform, pos_sample_rate)
pos_input = torch.tensor(pos_f, dtype=torch.float32).unsqueeze(0)  # Add batch dimension

neg_waveform, neg_sample_rate = soundfile.read('/content/drive/MyDrive/ROBOVOX_SP_CUP_2024/data/single-channel/denoised_enrollments/spk_3-3_22_0_0_d2_ch5.wav')
neg_f = extract_features(neg_waveform, neg_sample_rate)
neg_input = torch.tensor(neg_f, dtype=torch.float32).unsqueeze(0)  # Add batch dimension


In [12]:
anchor_embedding = model(anchor_input)
positive_embedding = model(pos_input)
negative_embedding = model(neg_input)

In [104]:
distance_positive = torch.norm(anchor_embedding - positive_embedding, p=2, dim=1)
distance_positive.item()

0.6429540514945984

In [25]:
triplet_loss = nn.TripletMarginLoss(margin=1.0, p=2, eps=1e-7)


In [14]:
loss = triplet_loss(anchor_embedding, positive_embedding, negative_embedding)
loss.item()

0.8674868941307068

In [33]:
class TripletDataset(Dataset):
    def __init__(self, folder_path):
        self.folder_path = folder_path
        self.files = sorted(os.listdir(folder_path))
        self.data = self.load_data()

    def load_data(self):
        data = []
        for recordings in self.files:
            path = os.path.join(self.folder_path, recordings)

            waveform, sample_rate = soundfile.read(path)
            label = recordings.split('-')[0]
            data.append((extract_features(waveform, sample_rate), label))
        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        anchor = self.data[index]

        # Sample positive from the same class
        positive_class_samples = [i for i,(features,label) in enumerate(self.data) if label == anchor[1]]
        positive_index = np.random.choice(positive_class_samples)

        # Sample negative from a different class
        negative_class_samples = [i for i,(features,label) in enumerate(self.data) if label != anchor[1]]
        negative_index = np.random.choice(negative_class_samples)

        return index, positive_index, negative_index

In [34]:
dataset = TripletDataset('/content/drive/MyDrive/ROBOVOX_SP_CUP_2024/data/single-channel/denoised_enrollments')

In [35]:
data_store = dataset.data

In [17]:
len(dataset)

225

In [36]:
batch_size = 25  #Note that I am not using powers of 2
triplet_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [81]:
len(triplet_loader)

9

In [37]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.005)

model = LstmSpeakerEncoder(20, hidden_size = 10, num_layers= 5, embedding_size =32)

epochs = 10

for epoch in range(epochs):
  print("Epoch %d running..."%(epoch))

  model.train()
  correct = 0
  train_loss = 0
  optimizer.zero_grad()
  total = 0

  for k in triplet_loader:
    loss = 0
    for b in range(k[0].shape[0]):  #iterate through each in batch - NOT EFFICIET AND I KNOW IT

      anc = data_store[k[0][b]][0]
      pos = data_store[k[1][b]][0]
      neg = data_store[k[2][b]][0]
      # print(anc.shape)
      # print(pos.shape)
      # print(neg.shape)

      anchor_input = torch.tensor(anc, dtype=torch.float32).unsqueeze(0)
      pos_input = torch.tensor(pos, dtype=torch.float32).unsqueeze(0)
      neg_input = torch.tensor(neg, dtype=torch.float32).unsqueeze(0)

      anchor_embedding = model(anchor_input)
      positive_embedding = model(pos_input)
      negative_embedding = model(neg_input)

      loss += triplet_loss(anchor_embedding, positive_embedding, negative_embedding)

      distance_positive =  torch.norm(anchor_embedding - positive_embedding, p=2, dim=1)
      distance_negative = torch.norm(anchor_embedding - negative_embedding, p=2, dim=1)

      # Check if distances satisfy triplet condition
      if distance_positive < distance_negative:
        correct += 1

      total += 1


    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    train_loss += loss.item()


  print('Accuracy: ',(correct/total)*100, '   train loss: ',train_loss)



Epoch 0 running...
Accuracy:  65.77777777777779    train loss:  224.92675018310547
Epoch 1 running...
Accuracy:  73.77777777777777    train loss:  224.91765785217285
Epoch 2 running...
Accuracy:  68.0    train loss:  224.8962745666504
Epoch 3 running...
Accuracy:  72.44444444444444    train loss:  224.93536186218262
Epoch 4 running...
Accuracy:  65.77777777777779    train loss:  224.92816925048828
Epoch 5 running...
Accuracy:  61.33333333333333    train loss:  224.94325256347656
Epoch 6 running...
Accuracy:  68.44444444444444    train loss:  224.90753364562988
Epoch 7 running...
Accuracy:  69.33333333333334    train loss:  224.8996067047119
Epoch 8 running...
Accuracy:  70.22222222222221    train loss:  224.90123176574707
Epoch 9 running...
Accuracy:  66.66666666666666    train loss:  224.8723087310791
