# Speaker Recogition

In [2]:
import sys
import os
from pathlib import Path
from typing import Tuple

import torch
import torchaudio
from torch import Tensor
from torch.utils.data import Dataset
import torchvision # load datasets
import torchvision.transforms as transforms # transform data
import torch.nn as nn # basic building block for neural networks
import torch.nn.functional as F # import convolution functions like Relu
import torch.optim as optim # optimzer

import math
import random
import numpy as np
import librosa
import matplotlib.pyplot as plt

from google.colab import drive
drive.mount('/content/drive')
ROOT_DIR='/content/drive/MyDrive/College/Research/Linh_2023_Research'
sys.path.append(ROOT_DIR)

Mounted at /content/drive


## Helper functions

In [None]:
def collate_fn(data):
    """
    data: is a list of tuples with (example, label, length)
            where 'example' is a tensor of arbitrary shape
            and label/length are scalars
    """
    _, labels, lengths = zip(*data)     # ([batch_size, num_channels, partsCount, 32, 128], [label])
    max_len = max(lengths)
    n_ftrs = data[0][0].size(1)
    features = torch.zeros((len(data), max_len, n_ftrs))
    labels = torch.tensor(labels)
    lengths = torch.tensor(lengths)

    for i in range(len(data)):
        j, k = data[i][0].size(0), data[i][0].size(1)
        features[i] = torch.cat([data[i][0], torch.zeros((max_len - j, k))])

    return features.float(), labels.long(), lengths.long()

def plot_waveform(waveform, sample_rate, title="Waveform", xlim=None, ylim=None):
  waveform = waveform.numpy()

  num_channels, num_frames = waveform.shape
  time_axis = torch.arange(0, num_frames) / sample_rate

  figure, axes = plt.subplots(num_channels, 1)
  if num_channels == 1:
    axes = [axes]
  for c in range(num_channels):
    axes[c].plot(time_axis, waveform[c], linewidth=1)
    axes[c].grid(True)
    if num_channels > 1:
      axes[c].set_ylabel(f'Channel {c+1}')
    if xlim:
      axes[c].set_xlim(xlim)
    if ylim:
      axes[c].set_ylim(ylim)
  # figure.suptitle(title)
  # plt.show(block=False)
  plt.show()

def plot_spectrogram(spec, title=None, ylabel='freq_bin', aspect='auto', xmax=None):
  fig, axs = plt.subplots(1, 1)
  axs.set_title('Spectrogram (db)' if not title else title)
  axs.set_ylabel(ylabel)
  axs.set_xlabel('frame')
  im = axs.imshow(librosa.power_to_db(spec), origin='lower', aspect=aspect)
  if xmax:
    axs.set_xlim((0, xmax))
  fig.colorbar(im, ax=axs)
  # plt.show(block=False)
  # plt.savefig(title, bbox_inches='tight')
  plt.show()

## Data processing

**Input Data**: The dataset is located in a specified directory (`root_path`). A list of classes (`includes`) and the number of classes (`num_classes`) are provided as arguments when creating an instance of the `SpeakerData` class.

**Initializing the Dataset**: The __init__ method initializes the dataset by setting the root_path, generating samples, and loading labels.

**Generating Samples** (`make_samples` method):
- Randomly selects classes from the dataset if the requested number of classes exceeds the available ones.
- Collects file paths and their corresponding labels.

**Loading Audio Data** (`load_item` method): Loads and returns the audio waveform, sample rate, and label.

**Spectrogram Computation**: Computes the Short-Time Fourier Transform (STFT) of the audio data to generate a spectrogram (`spec`).

**Transformations**: Resizes the spectrogram to a fixed size of (32, 128). Transposes the spectrogram tensor.

**Extracting Parts**: Divides the spectrogram into multiple parts of size (`image_width`, `n_fft`) if the audio length exceeds `image_width`.

**Label Mapping**: Maps the labels to integer indices using label2idx.

**Returning Data**: Returns the parts and their corresponding label indices.

In [None]:
# Spectrogram parameters
n_fft = 512
image_width = 64
frame_length = 512
step_length = 0.001

class SpeakerData(Dataset):
    """Create a Dataset for Speech Commands.

    Args:
        root (str): Path to the directory where the dataset is found or downloaded.
    """
    def __init__(self, root: str, num_classes: int, includes=[]) -> None:
        super().__init__()
        self.root_path = root
        self.samples = self.make_samples(num_classes, includes)
        self.labels, self.label2idx = self.load_labels(self.samples)

    def __len__(self) -> int:
        return len(self.samples)

    def make_samples(self, num_classes: int, classes: list) -> list[Tuple[str, int]]:
        """Create samples for imported dataset.

        Returns:
            List[Tuple[str, int]]: samples of a form (path_to_sample, class)
        """
        n = len(classes)
        if num_classes - n >= 0:
            num_classes -= n
            classes.extend(random.sample(os.listdir(self.root_path), num_classes))

        paths = []
        # for p in Path(self.root_path).glob("*/*/*.flac"):
        for p in Path(self.root_path).glob("*/*.wav"):
            label = str(p).split("/")[-2]
            if label in classes:
                paths.append((str(p), label))
        paths.sort()
        return paths

    def _load_item(self, filepath: str, path: str) -> Tuple[Tensor, int, int]:
        relpath = os.path.relpath(filepath, path)
        label = relpath.split("/")[0]
        # Load audio
        waveform, sample_rate = torchaudio.load(filepath)
        return waveform, sample_rate, label

    def load_labels(self, samples: list):
        labels = []
        label2idx = {}
        cur = -1
        count = 0
        for n in samples:
            if cur != n[1]:
              labels.append(n[1])
              label2idx[n[1]] = count
              count += 1
              cur = n[1]
        return labels, label2idx

    def __getitem__(self, n: int) -> Tuple[Tensor, int]:
        """Load the n-th sample from the dataset.

        Args:
            n (int): The index of the sample to be loaded

        Returns:
            X (tensor): features tensor
            Y (int): label id
        """
        filepath = self.samples[n][0]
        waveform, sample_rate, label = self._load_item(filepath, self.root_path)

        # Transformations
        frame_step = int(sample_rate * step_length) # 328
        spec = torch.stft(waveform,           # spec.shape = (1, freq_bin, audio_len)
                          n_fft=frame_length,
                          hop_length=frame_step,
                          return_complex=True)
        resize = torchvision.transforms.Resize((32, 128))
        X = torch.transpose(spec, 1, 2) # X.shape = (1, audio_len, freq_bin)
        X = torch.real(X)
        X = torch.abs(X)

        # Get multiple parts of size (image_width, n_fft)
        audio_len= spec.shape[2]
        if audio_len > image_width:
            partsCount = audio_len // image_width
            parts = torch.zeros((partsCount, image_width, int(n_fft/2+1)))
            # print('parts', parts.shape)
            for i in range(partsCount):
                p = i * image_width
                # print(X[:,p:p+image_width,:].shape, p+image_width)
                parts[i] = X[:,p:p+image_width,:]
            # print('parts', parts.shape)
        else:
            parts = X

        parts = resize(parts)
        Y = self.label2idx[label]

        return parts, Y

In [None]:
DATA_PATH = ROOT_DIR + "/test_data/vox"
dataset = SpeakerData(DATA_PATH, 5, ['id10182','id10634','id10820','id11004','id11232'])
title = ""
for speaker in dataset.labels:
    title += speaker + "\n"
print(title)

id10182
id10634
id10820
id11004
id11232



In [None]:
print(dataset.__len__())

693


## Model

In [None]:
class SpeakerCNN(nn.Module):
    ''' Models a simple Convolutional Neural Network'''
    def __init__(self, num_class):
      ''' initialize the network '''
      super(SpeakerCNN, self).__init__()
      # 1 input image channel, 8 output channels,
      # 3x3 square convolution kernel
      self.conv1 = nn.Conv2d(1, 8, 3, padding='same')
      self.conv2 = nn.Conv2d(8, 16, 3, padding='same')

      self.conv3 = nn.Conv2d(16, 16, 3, padding='same')
      self.conv4 = nn.Conv2d(16, 32, 3, padding='same')

      self.conv5 = nn.Conv2d(32, 32, 3, padding='same')
      self.conv6 = nn.Conv2d(32, 64, 3, padding='same')

      # Max pooling over a (2, 2) window
      self.pool = nn.MaxPool2d(2, 2)
      # Average pooling over a (3, 3) window
      self.avgpool = nn.AvgPool2d(3, 3)

      self.fc1 = nn.Linear(64 * 10 * 2, 16)# 3x3 from image dimension
      self.fc2 = nn.Linear(16, num_class)

    def forward(self, x):
      ''' the forward propagation algorithm '''
      x = self.pool(F.relu(self.conv2(F.relu(self.conv1(x)))))
      x = self.pool(F.relu(self.conv4(F.relu(self.conv3(x)))))
      x = F.relu(self.conv6(F.relu(self.conv5(x))))
      x = self.avgpool(x)
      x = x.view(-1, 64 * 10 * 2)
      x = self.fc1(x)
      x = torch.sigmoid(self.fc2(x))
      return x

model_acc = []

## Training

In [None]:
# DATA_PATH = ROOT_DIR + "/test_data/leaderspeech"
DATA_PATH = ROOT_DIR + "/test_data/vox"
# DATA_PATH = ROOT_DIR + "/test_data/librispeech/dev-clean"

# Hyper-parameters
train_test_ratio = 0.8
num_classes = 20   # 395 in vox, 40 in librispeech, 5 in leaderspeech
batch_size = 1
learning_rate = 0.00001

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Import data
dataset = SpeakerData(DATA_PATH, num_classes)
train_size = int(train_test_ratio * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

# Data loader
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)


# Save title for training figure
title = ""
for speaker in dataset.labels:
    title += speaker + "\n"
# print(title)

In [None]:
model = SpeakerCNN(num_classes)
model.to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)

Train

In [None]:
num_epochs = 25
loss_plt = []
acc_plt = []
for epoch in range(num_epochs):
    loss_per_epoch = 0.0
    correct = 0.0
    running_loss = 0.0
    steps = 0
    total_steps = 0
    model.train()
    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)

        for j in range(images.shape[1]):
            single_image = images[:,j:j + 1,:,:]

            # zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(single_image)
            _, predicted = torch.max(outputs.data, 1)
            correct += (predicted == labels).sum().item()
            loss = criterion(outputs, labels)

            # Backward and optimize
            loss.backward()
            optimizer.step()

            # Print data (CNN)
            loss_per_epoch += loss.item()
            running_loss += loss.item()
        total_steps += images.shape[1]
        steps += images.shape[1]

        if i % 200 == 199:
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / steps:.5f}')
            steps = 0
            running_loss = 0.0

    loss_plt.append(round((100 * loss_per_epoch/total_steps), 5))
    acc_plt.append(round((100 * correct/total_steps), 5))

torch.save(model.state_dict(), 'model.ckpt')

Plot loss and accuracy graph

In [None]:
# Loss graph
plt.figure(1)
fig, axs = plt.subplots()
plt.plot(loss_plt)
title = 'Training loss on {} speakers'.format(len(dataset.labels))
axs.set_title(title)
axs.set_ylabel('Loss')
axs.set_xlabel('Epoch')
plt.savefig('Loss: '+ title)
# Accuracy graph
plt.figure(2)
fig, axs = plt.subplots()
plt.plot(acc_plt, color='orange')
title = 'Training accuracy on {} speakers'.format(len(dataset.labels))
axs.set_title(title)
axs.set_ylabel('Accuracy')
axs.set_xlabel('Epoch')
plt.savefig('Accuracy: '+ title)

Evaluate

In [None]:
# Load model
model = SpeakerCNN(num_classes)
model.load_state_dict(torch.load('model.ckpt'))
model.to(device)
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        labels = labels.to(device)
        images = images.to(device)

        for j in range(images.shape[1]):
            single_image = images[:,j:j + 1,:,:]
            outputs = model(single_image)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            acc = 100 * correct / total

    print('Test Accuracy of the model on the {} test images: {} %'.format(len(test_loader), acc))
# model_acc.append(acc)

Confusion matrix

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sn
import pandas as pd

y_pred = []
y_true = []

model = SpeakerCNN(num_classes)
model.load_state_dict(torch.load('model.ckpt'))
model.to(device)
model.eval()
# iterate over test data
for images, labels in test_loader:
    labels = labels.to(device)
    images = images.to(device)

    for j in range(images.shape[1]):
        single_image = images[:,j:j + 1,:,:]
        output = model(single_image) # Feed Network

        output = (torch.max(torch.exp(output), 1)[1]).data.cpu()
        y_pred.extend(output) # Save Prediction

        labels = labels.data.cpu()
        y_true.extend(labels) # Save Truth

# Build confusion matrix
cf_matrix = confusion_matrix(y_true, y_pred)
df_cm = pd.DataFrame(cf_matrix / np.sum(cf_matrix, axis=1)[:, None], index = [i for i in dataset.labels],
                     columns = [i for i in dataset.labels])
plt.figure(figsize = (12,7))
sn.heatmap(df_cm, annot=True)
plt.savefig('confusion_matrix.png')