In [None]:
# when executed in a Google Colab setting, we must install the required libraries

# !pip install torch
# !pip install os
# !pip install transformers
# !pip install numpy
# !pip install pandas

In [None]:
import os
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pack_padded_sequence, pad_sequence
from torchaudio.transforms import MelSpectrogram, MFCC
import torchaudio
import numpy as np
import pandas as pd
import random
import pickle
import base64
import matplotlib.pyplot as plt
import re

In [None]:
DATASET_FILEPATH = './drive/MyDrive/Thesis/'
DATASET_SEED = 2
SEED = 42
torch.manual_seed(SEED)

SAVE_WEIGHTS_PATH = os.path.join(DATASET_FILEPATH, 'weights-and-graphs/baseline-vad/model.pth')
SAVE_PLOTS_PATH = os.path.join(DATASET_FILEPATH, 'weights-and-graphs/baseline-vad/loss.png')

<torch._C.Generator at 0x7fc185ef4f90>

In [None]:
train_csv_file = os.path.join(DATASET_FILEPATH, f'base/{DATASET_SEED}/processed/train_dataset.csv')
validation_csv_file = os.path.join(DATASET_FILEPATH, f'base/{DATASET_SEED}/processed/validation_dataset.csv')
aug_train_csv_file = os.path.join(DATASET_FILEPATH, f'{BASE}/aug-dataset/processed/train_dataset.csv')

In [None]:
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device("cpu")
print('Device: ', device)

Device:  cuda


In [None]:
selected_columns = ['audio_file_name', 'classification']

train_df = pd.read_csv(train_csv_file, usecols=selected_columns)
validation_df = pd.read_csv(validation_csv_file, usecols=selected_columns)

In [None]:
def process_training_set(train_df, oversample_minority=False, undersample_majority=False):
  """
  Re-sample the training dataset, with options to oversample minority class and undersample majority class based on audio lengths.

  :param train_df: DataFrame containing the training data with columns ['classification', 'audio_file_name'] among others.
  :param oversample_minority: Boolean, if True, the minority class (classification == 0) is duplicated to balance the dataset.
  :param undersample_majority: Boolean, if True, majority class data with audio lengths above a threshold (specified by DROP_SEGMENTS) are dropped.
  :returns: DataFrame with the desired processed training data.
  """
  if oversample_minority:
    class_0 = train_df[train_df['classification'] == 0]
    train_df = pd.concat([train_df, class_0])
  if undersample_majority:
    DROP_SEGMENTS = 5
    def get_audio_length_group(file_name):
        return int(re.findall(r'\d+', file_name)[-1])
    train_df['audio_length_group'] = train_df['audio_file_name'].apply(get_audio_length_group)
    train_df = train_df[train_df['audio_length_group'] <= DROP_SEGMENTS]
    train_df = train_df.drop(columns=['audio_length_group'])

  # some indices are duplicated / removed so we have to reset them
  train_df.reset_index(drop=True, inplace=True)
  return train_df

def print_dataset_balance(df):
    """
    Prints the balance of classifications in a given dataset.

    :param df: DataFrame containing the data with a 'classification' column.
    """
    classification_counts = df['classification'].value_counts().reset_index()
    classification_counts.columns = ['classification', 'count']
    total_rows = classification_counts['count'].sum()
    classification_counts['percentage'] = (classification_counts['count'] / total_rows) * 100
    classification_counts['percentage'] = classification_counts['percentage'].round(1)
    print(classification_counts)

def augment_train_dataset(df, augmented_df_filepath):
  """
  Introduce additional 'non-interruption' samples to the dataset, which have been extracted from the GAP dataset with an LLM.

  :param df: Original DataFrame containing the training data.
  :param augmented_df_filepath: Filepath to the CSV containing the augmented data.
  :returns: A combined DataFrame of the original and augmented training data.
  """
  selected_columns = ['audio_file_name','classification', 'wav2vec_embeddings', 'hubert_embeddings']
  aug_train_df = pd.read_csv(aug_train_csv_file, usecols=selected_columns, converters={'wav2vec_embeddings': to_tensor, 'hubert_embeddings' : to_tensor})
  augmented_df = pd.concat([df, aug_train_df], ignore_index=True)
  return augmented_df

In [None]:
AUGMENT = True

if AUGMENT:
  print("Length of the DataFrame before:", len(train_df))
  train_df = augment_train_dataset(train_df, aug_train_csv_file)
  print("Length of the DataFrame after:", len(train_df))
else:
  train_df = process_training_set(train_df, oversample_minority=True, undersample_majority=True, prune=False)
print_dataset_balance(train_df)

In [None]:
class AudioDataset(Dataset):
    def __init__(self, audio_file_name, labels):
        self.labels = labels

        # Precompute and store all MFCC features
        self.audio_features = [self.extract_mfcc(os.path.join('./drive/MyDrive/Thesis/audio', fname)) for fname in audio_file_name]

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.audio_features[idx], self.labels[idx]

    def extract_mfcc(self, audio_path):
        waveform, sample_rate = torchaudio.load(audio_path)
        mfcc_transform = MFCC(
            sample_rate=sample_rate,
            n_mfcc=13,
            melkwargs={"n_fft": 400, "hop_length": 160, "n_mels": 23, "center": False},
        )
        mfcc = mfcc_transform(waveform).squeeze().transpose(0, 1)
        return mfcc

audio_train_data, audio_valid_data = train_df['audio_file_name'], validation_df['audio_file_name']
train_labels, valid_labels = train_df['classification'], validation_df['classification']

train_dataset = AudioDataset(audio_train_data, train_labels)
valid_dataset = AudioDataset(audio_valid_data, valid_labels)

In [None]:
BATCH_SIZE = 16

def collate_fn(batch):
    """
    Function to be passed to the DataLoader class which processes a batch of data points before being passed to the model in training. The LSTM must have all batch samples of equal length.

    :param batch: array of data points in the dataset.
    """
    features, labels = zip(*batch)
    labels = torch.tensor(labels, dtype=torch.float32)

    # Convert stereo to mono by averaging across the channel dimension
    features = [feature.mean(1) for feature in features]
    lengths = [feature.shape[1] for feature in features]  # Updated index for time dimension

    # Transpose such that time dimension is first
    features = [feature.transpose(0, 1) for feature in features]
    features = pad_sequence(features, batch_first=True)

    return features, labels, lengths

# Change below for data augmentation
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)

In [None]:
# LSTM Classifier
class Classifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout_rate):
        super().__init__()
        self.rnn = nn.LSTM(input_dim, hidden_dim, num_layers=n_layers, bidirectional=bidirectional, dropout=dropout_rate if n_layers > 1 else 0)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, embedding, lengths):
        packed = pack_padded_sequence(embedding, lengths, batch_first=True, enforce_sorted=False)
        packed_output, (hidden, cell) = self.rnn(packed)
        hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1))
        return self.fc(hidden)

In [None]:
INPUT_DIMENSION = 13
NUM_HIDDEN_UNITS = 64
OUTPUT_DIMENSION = 1
NUM_LSTM_LAYERS = 1
BI_DIRECTIONAL = True
DROPOUT_RATE = 0.1

model = Classifier(INPUT_DIMENSION, NUM_HIDDEN_UNITS, OUTPUT_DIMENSION, NUM_LSTM_LAYERS, BI_DIRECTIONAL, DROPOUT_RATE).to(device)

optimizer = optim.Adam(model.parameters())
criterion = nn.BCEWithLogitsLoss()

In [None]:
def binary_accuracy(preds, y):
    """
    Receives predicted values and true labels and computes the average accuracy of the predictions.

    :param preds: Tensor of predicted values.
    :param y: Tensor of true labels.
    :returns: Accuracy as a floating point value.
    """

    #round predictions to the closest integer
    rounded_preds = torch.round(torch.sigmoid(preds))
    correct = (rounded_preds == y).float()  #convert into float for division
    acc = correct.sum() / len(correct)
    return acc

def evaluate(model, iterator, criterion):
    """
    Evaluate the model's performance on a given dataset. This is used for the validation

    :param model: PyTorch model to be evaluated.
    :param iterator: Iterator that provides batches of data for evaluation.
    :param criterion: Loss function used to compute the loss during evaluation.
    :returns: Tuple containing average loss and average accuracy over all batches.
    """
    epoch_loss = 0
    epoch_acc = 0
    model.eval()

    with torch.no_grad():
        for audio_features, labels, lengths in iterator:
            audio_features, labels = audio_features.to(device), labels.to(device)
            predictions = model(audio_features, lengths).squeeze(1)
            loss = criterion(predictions, labels.float())
            acc = binary_accuracy(predictions, labels)

            epoch_loss += loss.item()
            epoch_acc += acc.item()

    return epoch_loss / len(iterator), epoch_acc / len(iterator)

def train(model, iterator, optimizer, criterion):
    """
    Train the model for one epoch on the dataset.

    :param model: The PyTorch model to be trained.
    :param iterator: Iterator that provides batches of data for training.
    :param optimizer: Optimizer used to update the model's parameters.
    :param criterion: Loss function used to compute the loss during training.
    :returns: Tuple containing average loss, average accuracy over all batches.
    """
    epoch_loss = 0
    epoch_acc = 0
    model.train()

    for audio_features, labels, lengths in iterator:
        audio_features, labels = audio_features.to(device), labels.to(device)
        optimizer.zero_grad()
        predictions = model(audio_features, lengths).squeeze(1)
        loss = criterion(predictions, labels.float())
        acc = binary_accuracy(predictions, labels)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
        epoch_acc += acc.item()

    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
N_EPOCHS = 20

train_losses = []
valid_losses = []
for epoch in range(N_EPOCHS):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, valid_loader, criterion)

    train_losses.append(train_loss)
    valid_losses.append(valid_loss)

    print(f'Epoch: {epoch+1:02}')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

torch.save(model.state_dict(), SAVE_WEIGHTS_PATH)
print('Model weights saved')

plt.figure(figsize=(10, 6))
plt.plot(range(1, N_EPOCHS+1), train_losses, color='blue', label='Training Loss')
plt.plot(range(1, N_EPOCHS+1), valid_losses, color='red', label='Validation Loss')

plt.xticks(range(1, N_EPOCHS+1))
plt.yticks([i/20 for i in range(int(max(train_losses+valid_losses)*20)+1)])

plt.title('Average')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(loc='upper left')

# Save the plot to the './resources' directory
plt.savefig(SAVE_PLOTS_PATH)
print('Plot of loss saved')
plt.show()