Firstly, I have stated all of the imports necessary for this project at the very beginning to keep them together.

Included in this is the comet_ml package that I am using to track the performance of the models as I develop them further, helping to improve their performance. 

In [None]:
import torch
import os
import getpass
import torchaudio
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch.optim as optim
import comet_ml
import numpy as np
from itertools import repeat
import random
import librosa

To record the performance of each model, I am once again using comet_ml to track each training experiment and its outcome which is very useful when tweaking the models and their hyperparameters and observing the difference in performance. 

The code below creates a new experiment for me to record this session within.

In [None]:
comet_api_key=os.environ.get("COMET_API_KEY")
comet_api_key=''

experiment = comet_ml.Experiment(api_key=comet_api_key, workspace="benhipwell", project_name="cw2", auto_metric_logging=True, auto_output_logging=True)
# experiment.set_name('benhipwellcomp6252cw2')

# if comet_api_key is None:
#   comet_api_key=getpass.getpass("Enter key")

One method of training machine learning models using audio files is to generate fixed length segments of the time series to make up the dataset.

Below I have created a custom PyTorch dataset that takes in the dataset of full length audio files, breaks them down into segments with their respective genre and stores them accordingly so that they can be used later on for training a model. 

In this case, I am extracting MFCC features from the wavelengths of each of the segments instead of using the raw audio as it provides an improved training accuracy and is much more time and space efficient. 

In [None]:
class AudioSegmentDataset(Dataset):
    def __init__(self, root_dir = None, segment_length = 11000):
        self.root_dir = root_dir
        self.segment_length = segment_length
        self.segments = []
        self.labels = []

        # Loads and preprocesses the audio dataset
        if root_dir:
            self.classes = sorted(os.listdir(root_dir))

            for idx, genre in enumerate(self.classes):
                genre_folder = os.path.join(self.root_dir, genre)
                files = os.listdir(genre_folder)
                for file in files:
                    filepath = os.path.join(genre_folder, file)
                    waveform, sample_rate = torchaudio.load(filepath)
                    num_samples = waveform.size(1)
                    num_segments = num_samples // segment_length
                    # breaks each track down into fixed sized segments
                    for i in range(num_segments):
                        start_idx = i * segment_length
                        end_idx = start_idx + segment_length
                        segment = waveform[:, start_idx:end_idx]
                        # extracts MFCC features
                        segment = torch.tensor(librosa.feature.mfcc(y=segment.squeeze(0).numpy(), sr=sample_rate)).to('cuda')
                        self.segments.append(segment)
                        self.labels.append(idx)
            
            # shuffles the dataset for later splitting into train, test and validation sets
            combined = list(zip(self.segments, self.labels))
            random.shuffle(combined)
            self.segments[:], self.labels[:] = zip(*combined)

    # returns the length of the segments list
    def __len__(self):
        return len(self.segments)

    # returns the respective segment and label given the index
    def __getitem__(self, idx):
        segment = self.segments[idx]
        label = self.labels[idx]
        return segment, torch.tensor(label, dtype=torch.long).to('cuda')

    # adds new segments and labels to the dataset
    def add_segment(self, new_segments, new_labels):
        self.segments.extend(new_segments)
        self.labels.extend(new_labels)

    # to check and make sure that all of the segmments are of the same shape
    def check_segment_shapes(self):
        if len(set(segment.shape for segment in self.segments)) != 1:
            raise ValueError("Segments have different shapes")

This initialises the dataset, taking the root directory of the audio file dataset and the size of which fixed sized segments should be created. 

To confirm that the data has been loaded correctly, it displays the classes identified and the dataset size

In [None]:
root_dir = 'data/genres_original/'
segment_length = 110000

dataset = AudioSegmentDataset(root_dir, segment_length)

print(dataset.classes)

total_size = len(dataset)
print(f'Dataset size: {total_size}')

Instead of using 'random_split()', I have had to manually split the dataset into other AudioSegment Datasets to allow for the use of the custom functions that a Dataset subset from 'random_split' could not provide. This is due to the later addition of augmented data that requires the use of the 'add_segment()' function. 

As the dataset has been shuffled on initialisation, this method of indexing to split up the dataset is viable. 

In [None]:
train_size = int(0.7 * total_size)
val_size = int(0.2 * total_size)
test_size = total_size - train_size - val_size

print(f'Training set size: {train_size}')
print(f'Validation set size: {val_size}')
print(f'Testing set size: {test_size}')

train_set = AudioSegmentDataset()
val_set = AudioSegmentDataset()
test_set = AudioSegmentDataset()

train_set.segments = dataset.segments[:train_size]
train_set.labels = dataset.labels[:train_size]

val_set.segments = dataset.segments[train_size:train_size + val_size]
val_set.labels = dataset.labels[train_size:train_size + val_size]

test_set.segments = dataset.segments[train_size + val_size:]
test_set.labels = dataset.labels[train_size + val_size:]

Before using data loaders to create batches of the dataset for model training, it is important to ensure all of the data has been loaded correctly, where all of the segments are of the expected shape. 


In [None]:
train_set.check_segment_shapes()

batch_size = 32

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size)
test_loader = DataLoader(test_set, batch_size=batch_size)

This is my implementation of a LSTM RNN model, which takes:

    input_size: number of expected features within the input
    hidden_size: number of features in the hidden state
    num_layers: number of recurrent layers in the LSTM
    output_size: size of the output layer = number of output features after the last hidden layer
    batch_size: size of each batch of data provided to the model

This model makes use of LSTM layers to learn long-term dependencies in the input data, especially in time series tasks such as this. 

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, batch_size):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.batch_size = batch_size
        
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        # initialize h0 with zeros
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # initialize c0 with zeros
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # forward propagate LSTM, with an output of shape (batch_size, seq_length, hidden_size)
        out, _ = self.lstm(x, (h0, c0))
        
        out = self.fc(out[:, -1, :])
        return out

This is function to carry out the training of the LSTM model, containing the training loop. This is a very standard implementation of the training loop, identical to that used in CW1 as it does not need to be adapted. 

In [None]:
def train(model, train_loader, optimizer, loss_func, epochs, name):
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = loss_func(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()


        print(f"epoch {epoch + 1}/{epochs}, loss: {running_loss / len(train_loader)}")
        experiment.log_metrics({f'loss_{name}': running_loss / len(train_loader)}, epoch=epoch)       

This carries out the evaluation of a given model against a given testing or validation set. 

Similar to the training loop, the evaluation function is also identical to that used in the previous coursework as it is unaffected by the differing data and models.

In [None]:
def evaluate(model, data_loader, loss_func, name):
    
    print(f"evaluating: {name}")

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    model.eval()

    total_correct = 0
    total_samples = 0
    total_loss = 0.0

    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = loss_func(outputs, labels)

            _, predicted = torch.max(outputs, 1)
            total_correct += (predicted == labels).sum().item()
            total_samples += labels.size(0)
            total_loss += loss.item()

    accuracy = total_correct / total_samples
    avg_loss = total_loss / len(data_loader)

    print(f"accuracy: {accuracy}, average loss: {avg_loss}")
    experiment.log_metrics({f"test_accuracy_{name}": accuracy})

This firstly sets the parameters to train the first model that simply uses the provided dataset of audio files that have been processed into MFCC features of fixed sized segments that make up the audio tracks. Where possible, it is very helpful to group parameters together to be able to adjust them easily during testing and tweaking of the models. 

In [None]:
lr = 0.0007
loss_func = nn.CrossEntropyLoss()
input_size = 215
hidden_size = 128
num_layers = 4
num_classes = 10
epochs = 60

rnn = LSTMModel(input_size, hidden_size, num_layers, num_classes, batch_size=batch_size).cuda()

optimizer = optim.Adam(rnn.parameters(), lr=lr)
train(rnn, train_loader, optimizer, loss_func, epochs=epochs, name='rnn_train')
evaluate(rnn, test_loader, loss_func, name=f'rnn_without_GAN_{epochs}_accuracy')

Moving onto the second part of this project, the implementation of a GAN will follow. For this project, instead of the methods of either training separate GANs for each class in the dataset or assigning random class labels to generated data, I have made use of Conditional GANs. 

To begin with, the implementation of the Generator part of the GAN which is responsible for generating augmented data from noise and a provided label. This label decides how the Generator should produce new augmented data from the given noise. 

The architecture of this model makes use of Linear layers of varying size and LeakyReLU layers which have proved to be more effective than regular ReLU layers in this instance. The activation of tanh is optimal for use in the generator to ensure the generated outputs are of the format and scale of the real data, whilst providing symmetry around 0.

In [None]:
class Generator(nn.Module):
    def __init__(self, noise_size, label_size, embed_size, output_size):
        super(Generator, self).__init__()
        self.label_emb = nn.Embedding(num_embeddings=label_size, embedding_dim=embed_size)
        self.fc = nn.Sequential(
            nn.Linear(noise_size + embed_size, 256),
            nn.BatchNorm1d(256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(256, 512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(512, 1024),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(1024, output_size),
            nn.Tanh(),
        )

    def forward(self, noise, labels):
        labels = labels.long()
        # generates dense embedding for the labels so that they can be used within the model
        label_embedding = self.label_emb(labels)
        # combines input noise and label embeddings to provide contextual label information
        gen_input = torch.cat((noise, label_embedding), dim=1)
        output = self.fc(gen_input)
        # reshape output to match MFCC segment shape for training
        return output.view(-1, 20, 215)

Moving onto the Discriminator, this is used to evaluate whether the generated output from the Generator is real or fake, eseentially becoming the 'critic'. This is essential for the training process of a GAN, where both of these models train together to provide the best possible augmented data. 

This is a near identical model architecture to the Generator, to keep them consistent whilst they train. The only change is the use of the Sigmoid activation function which is instead used for binary classification of whether the Discriminator believes the generated augmented data is real of fake. 

In [None]:
class Discriminator(nn.Module):
    def __init__(self, input_size, label_size, embed_size):
        super(Discriminator, self).__init__()
        self.label_emb = nn.Embedding(num_embeddings=label_size, embedding_dim=embed_size)
        self.fc = nn.Sequential(
            nn.Linear(input_size + embed_size, 256),
            nn.BatchNorm1d(256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(256, 512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(512, 1024),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(1024, 1),
            nn.Sigmoid()
        )

    def forward(self, input, labels):
        labels = labels.long()
        # generates dense embedding for the labels so that they can be used within the model
        label_embedding = self.label_emb(labels)
        input = input.view(input.size(0), -1)
        # combines input data and label embeddings to combine raw data features with contextual label information
        disc_input = torch.cat((input, label_embedding), dim=1)
        output = self.fc(disc_input)
        return output

This function contains the main training loop of the CGAN to generate augmented data from the original dataset. The key aspect is the simulataneous training of the above defined Generator and Discriminator learn from each other to have the ability to generate these new samples effectively. 

To help stop the Discriminator from dominating the Generator, I have added some noise to the Discriminator inputs. 

In [None]:
def train_gan(generator, discriminator, device, dataloader, optimizer_G, optimizer_D, criterion, num_epochs, label_size, noise_size):
    generator.to(device)
    discriminator.to(device)

    for epoch in range(num_epochs):
        for i, (audio, labels) in enumerate(dataloader):
            real_data = audio.to(device) + 0.05 * torch.randn_like(audio)
            real_labels = labels.to(device)
            batch_size = real_data.size(0)

            # train Discriminator
            optimizer_D.zero_grad()
            real_output = discriminator(real_data, real_labels)
            real_loss = criterion(real_output, torch.ones(batch_size, 1, device=device))

            fake_labels = torch.randint(0, label_size, (batch_size,), device=device)
            noise = torch.randn(batch_size, noise_size, device=device)
            fake_data = generator(noise, fake_labels) + 0.05 * torch.randn_like(audio)
            fake_output = discriminator(fake_data.detach(), fake_labels)
            fake_loss = criterion(fake_output, torch.zeros(batch_size, 1, device=device))

            d_loss = real_loss + fake_loss
            d_loss.backward()
            optimizer_D.step()

            # train Generator
            optimizer_G.zero_grad()
            output = discriminator(fake_data, fake_labels)
            g_loss = criterion(output, torch.ones(batch_size, 1, device=device))
            g_loss.backward()
            optimizer_G.step()

        print(f"Epoch {epoch+1}/{num_epochs}, Loss D: {d_loss.item()}, Loss G: {g_loss.item()}")
        experiment.log_metrics({f'loss_Generator': g_loss.item()}, epoch=epoch)
        experiment.log_metrics({f'loss_Discriminator': d_loss.item()}, epoch=epoch)

This sets the parameters required for training the GAN, before initialising all of the required components and making use of the training loop. Note the Generator learning rate is higher to stop it from being dominated by the Discriminator and avoiding Mode Collapse.

BCE Loss is used as to measure the distance between predicted probabilities of the Discriminator output and the actual binary labels.

In [None]:
noise_size = 200
label_size = 10
embed_size = 50
output_size = 4300
g_lr = 0.001
d_lr = 0.0006

generator = Generator(noise_size, label_size, embed_size, output_size)
discriminator = Discriminator(output_size, label_size, embed_size)
optimizer_G = optim.Adam(generator.parameters(), lr=g_lr)
optimizer_D = optim.Adam(discriminator.parameters(), lr=d_lr)
criterion = nn.BCELoss()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

train_gan(generator, discriminator, device, train_loader, optimizer_G, optimizer_D, criterion, num_epochs=50, label_size=label_size, noise_size=noise_size)

This function is used to generate new augmented data from the trained CGAN for each of the genre classes, with an equal amount of samples generated for each one, duplicating the original training set size. 

In [None]:
def generate_audio_samples(generator, num_samples, noise_size, num_classes, device='cuda'):
    
    generator.eval()
    num_samples_per_class = num_samples // num_classes

    with torch.no_grad():
        for id in range(num_classes):
            print(f'Augmenting for class: {id}')
            # generate intput noise vector
            noise = torch.randn(num_samples_per_class, noise_size, device=device)
            # generate augmented data based on the label
            labels = torch.Tensor(list(repeat(id, num_samples_per_class))).to('cuda')
            synthetic_data = generator(noise, labels).to('cuda')
            # make use of the custom function to add the synthetic data to only the train set
            train_set.add_segment(synthetic_data, labels)

# generate the same number of samples already in the train set, duplicating its size
num_samples_to_generate = train_size
num_classes = 10
generate_audio_samples(generator, num_samples_to_generate, noise_size, num_classes, device='cuda')

After populating the train set with the synthetic data, the dataset it once again checked to ensure all of the data is of the same shape and therefore suitable to be used for model training. 

This makes use of the same parameters and conditions as the previously trained model, however with the train set being the only changing factor which now contains extra synthetic data. This consistency provides a better environment to measure the impact of the synthetic data on the training of a model. 

In [None]:
train_set.check_segment_shapes()

train_loader_GAN = DataLoader(train_set, batch_size=batch_size, shuffle=True)

rnn_gan = LSTMModel(input_size, hidden_size, num_layers, num_classes, batch_size=batch_size).cuda()

optimizer_gan = optim.Adam(rnn_gan.parameters(), lr=lr)
train(rnn_gan, train_loader_GAN, optimizer_gan, loss_func, epochs=epochs, name='rnn_gan_train')
evaluate(rnn_gan, test_loader, loss_func, name=f'rnn_with_GAN_{epochs}_accuracy')

Finally, this makes use of comet_ml to track the hyperparameters used when training the two models so that they can be compared when running with changing parameters. 

In [None]:
params = {
    "batch_size":batch_size,
    "learning_rate":lr,
    "generator_learning_rate" : g_lr,
    "discriminator_learning_rate" : d_lr,
    "noise_size": noise_size,
    "hidden_size" : hidden_size,
    "num_layers" : num_layers,
    "segment_length" : segment_length,
    "epochs" : epochs
}

experiment.log_parameters(params)


experiment.end()