## Audio Classification

In this notebook we are going to train an audio classifier on the [Speech Commands dataset](https://drive.usercontent.google.com/download?id=1J0rGy64nRSNdAjgfDXzPHs_YHkvAiP8-). It is a multi-class classification problem. After running the notebook as it is, you are invited to use the same notebook to train a model on your custom dataset. 

The network of this notebook is a 1-Dimensional CNN, similar to the networks we saw in week 5 for image classification. The specific architecture is modeled after the M5 network architecture described in [this paper](https://arxiv.org/pdf/1610.00087.pdf). You can read more about Speech Command Classification with torchaudio in [PyTorch Tutorial page](https://pytorch.org/tutorials/intermediate/speech_command_classification_with_torchaudio_tutorial.html) which is also the reference point for this notebook.

##### Imports

In [None]:
from torch.utils.data import DataLoader, Subset
import torch.nn.functional as F
import torch.optim as optim
import torch.nn as nn
import torchaudio
import torch

from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import IPython.display as ipd
from tqdm import tqdm
import numpy as np
import os

from src.audio_dataloader import AudioFolder

##### Download dataset

In [None]:
# check if the datasets folder exists
# if not, create it
PATH = './datasets'
if not os.path.exists(PATH):
    os.makedirs(PATH)

In [None]:
# you can skip this step if you create your dataset folder manually
# with your custom classes
dataset = torchaudio.datasets.SPEECHCOMMANDS('./datasets' , url = 'speech_commands_v0.02', folder_in_archive= 'SpeechCommands',  download = True)

In [None]:
# move the dataset to the datasets folder manually or:

# *** for MAC and Linux OS ***
# !mv SpeechCommands ./datasets # move the dataset in the designated directory

# *** for Windows OS ***
# !robocopy SpeechCommands ./datasets/ /E # copy and paste unzipped dataset to the designated directory 

##### Hyperparameters

In [None]:
new_sample_rate = 8000
batch_size = 256
learn_rate = 1e-2
weight_decay = 1e-4
num_epochs = 2
val_size = 0.3
# root directory for dataset
dataroot = './datasets/SpeechCommands/speech_commands_v0.02'
# path to new model
save_path = 'models/model.pt'
# replace mps with cpu if not using M1/M2
device = 'cuda' if torch.cuda.is_available() else 'mps' 
device

##### Metadata of a sample audio file

In [None]:
# select a sample audio file from the training dataset and look into its metadata
sample_wav = dataroot + '/bed/0a7c2a8d_nohash_0.wav'
metadata = torchaudio.info(sample_wav)
waveform, sample_rate = torchaudio.load(sample_wav)
print(metadata)

##### Listen to audio with its original sample rate

In [None]:
ipd.Audio(waveform.numpy(), rate=sample_rate)

##### Listen to audio after transformations are applied

Compare the sound of the file before and after downsampling it.

In [None]:
transform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=new_sample_rate)

transformed = transform(waveform)
ipd.Audio(transformed.numpy(), rate=new_sample_rate)

##### Our dataset

##### To be splitted into train and validation subsets

In [None]:
# sneak peak into the dataset classes
dataset = AudioFolder(dataroot, transform=transform)
print(len(dataset), len(dataset.classes), dataset.classes)

In [None]:
# code cell from PyTorch Tutorial
# https://pytorch.org/tutorials/intermediate/speech_command_classification_with_torchaudio_tutorial.html
# to make all tensors in a batch of the same length

def pad_sequence(batch):
    # Make all tensor in a batch the same length by padding with zeros
    batch = [item.t() for item in batch]
    batch = torch.nn.utils.rnn.pad_sequence(batch, batch_first=True, padding_value=0.)
    return batch.permute(0, 2, 1)

def collate_fn(batch):

    # A data tuple has the form:
    # waveform, sample_rate, label, speaker_id, utterance_number
    tensors, targets = [], []

    # Gather in lists, and encode labels as indices
    for waveform, label in batch:
        tensors += [torch.Tensor(waveform)]
        targets += [label]

    # Group the list of tensors into a batched tensor
    tensors = pad_sequence(tensors)
    targets = np.stack(targets)
    targets = torch.Tensor(targets)
    targets = targets.to(torch.long)

    return tensors, targets

In [None]:
# create an array of idx numbers for each element of the full dataset
dataset_size = len(dataset)
idx = list(range(dataset_size))

# perform train / val split for data points
# by setting `random_state=42`, we are doing the split in a deterministic way, 
# ie. we will always get the same 'random' split of data into the training and validation subsets
train_indices, val_indices = train_test_split(idx, test_size=val_size, random_state=42)

# override datasets to only be samples for each split
train_dataset = Subset(dataset, train_indices)
val_dataset = Subset(dataset, val_indices)

# create data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=False)

##### Plot a sample

In [None]:
data_batch, label_batch = next(iter(train_loader))
sample_waveform = data_batch[0].squeeze()
print(f'Data batch shape: {data_batch.shape}')
print(f"Shape of waveform: {sample_waveform.size()}")
sample_class = int(label_batch[0].item())
print(f'Class of waveform: \'{dataset.classes[sample_class]}\'')
plt.plot(sample_waveform.t().numpy())

##### The model architecture

1-Dimensional Convolutional Neural Network to process raw audio data.

In [None]:
class M5(nn.Module):
    def __init__(self, n_input=1, n_output=35, stride=16, n_channel=32):
        super().__init__()
        self.conv1 = nn.Conv1d(n_input, n_channel, kernel_size=80, stride=stride)
        self.bn1 = nn.BatchNorm1d(n_channel)
        self.pool1 = nn.MaxPool1d(4)
        self.conv2 = nn.Conv1d(n_channel, n_channel, kernel_size=3)
        self.bn2 = nn.BatchNorm1d(n_channel)
        self.pool2 = nn.MaxPool1d(4)
        self.conv3 = nn.Conv1d(n_channel, 2 * n_channel, kernel_size=3)
        self.bn3 = nn.BatchNorm1d(2 * n_channel)
        self.pool3 = nn.MaxPool1d(4)
        self.conv4 = nn.Conv1d(2 * n_channel, 2 * n_channel, kernel_size=3)
        self.bn4 = nn.BatchNorm1d(2 * n_channel)
        self.pool4 = nn.MaxPool1d(4)
        self.fc1 = nn.Linear(2 * n_channel, n_output)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(self.bn1(x))
        x = self.pool1(x)
        x = self.conv2(x)
        x = F.relu(self.bn2(x))
        x = self.pool2(x)
        x = self.conv3(x)
        x = F.relu(self.bn3(x))
        x = self.pool3(x)
        x = self.conv4(x)
        x = F.relu(self.bn4(x))
        x = self.pool4(x)
        x = F.avg_pool1d(x, x.shape[-1])
        x = x.permute(0, 2, 1)
        x = self.fc1(x)
        return F.log_softmax(x, dim=2)

In [None]:
model = M5(n_input=1, n_output=len(dataset.classes))
model.to(device)
print(model)

# the number of parameters to be learnt
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)
n = count_parameters(model)
print("Number of parameters: %s" % n)

# optimiser
optimizer = optim.Adam(model.parameters(), lr=learn_rate, weight_decay=weight_decay)
# loss function
criterion = nn.CrossEntropyLoss()

##### Train function

In [None]:
def train(model, epoch, log_interval, losses):
    # put model is training mode
    model.train()
    model.to(device)
    train_loss = 0.0
    for batch_idx, (data, target) in enumerate(train_loader):

        # move data into the designated device
        data = data.to(device)
        target = target.to(device)

        # forward pass through the model
        output = model(data)

        # evaluate based on the difference between predicted output and original target
        loss = criterion(output.squeeze(), target)
        # backpropagate loss
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # to be used for cumulative loss
        train_loss += loss.item()
        
        # update the graphic bar that shows the progress of training per batch
        pbar.update(pbar_update)

        # print training stats
        if batch_idx % log_interval == 0:
            print(f"Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")
    
    # get cumulative loss for each batch
    train_loss = train_loss / len(train_loader)
    losses.append(train_loss)


##### Validation function

In [None]:
# evaluation functions
def number_of_correct(pred, target):
    # count number of correct predictions
    return pred.squeeze().eq(target).sum().item()

def get_likely_index(tensor):
    # find most likely label index for each element in the batch
    return tensor.argmax(dim=-1)

# validation function
def val(model, epoch, losses):
    # put model in evaluation mode
    model.eval()
    model.to(device)
    val_loss = 0.0
    with torch.no_grad():
        correct = 0
        for data, target in val_loader:
            
            # move data into the designated device
            data = data.to(device)
            target = target.to(device)
            
            # forward pass through the model
            output = model(data)

            # evaluate based on the difference between predicted output and original target
            loss = criterion(output.squeeze(), target)

            # to be used for cumulative loss
            val_loss += loss.item()

            # use evaluation functions defined above
            pred = get_likely_index(output)
            correct += number_of_correct(pred, target)
            
            # update the graphic bar that shows the progress of training per batch
            pbar.update(pbar_update)

        val_loss = val_loss / len(train_loader)
        losses.append(val_loss)

    print(f"\nValidation Epoch: {epoch}\tAccuracy: {correct}/{len(val_loader.dataset)} ({100. * correct / len(val_loader.dataset):.0f}%)\n")

##### Training loop

In [None]:
log_interval = 20
pbar_update = 1 / (len(train_loader) + len(val_loader))
train_losses = []
val_losses = []
best_loss = 10000

# while tracking the graphic bar of progress
with tqdm(total=num_epochs) as pbar:
    # call the train and val functions for each epoch of training
    for epoch in range(1, num_epochs+1):
        train(model, epoch, log_interval, train_losses)
        val(model, epoch, val_losses)
        # save the model if the last val_loss added is better than the previous one
        if val_losses[-1] < best_loss:
            best_loss = val_losses[-1]
            torch.save(model.cpu().state_dict(), save_path)

##### Plot losses

In [None]:
plt.figure(figsize=(10,5))
plt.title("Train n' Validation Loss")
plt.plot(train_losses, label="train")
plt.plot(val_losses, label="val")
plt.xlabel("epochs")
plt.ylabel("cumulative loss")
plt.legend()
plt.show()

##### Inference

Test the performance of the model with unseen data from the test subfolder of speech commands.

In [None]:
# load the saved model
state_dict = torch.load('./models/model.pt') # add the path to your saved model, if different

# instantiate the model and put it in evaluation mode
model = M5(n_input=1, n_output=len(dataset.classes))
model.load_state_dict(state_dict)
model.eval()

In [None]:
# load audio file from your custom path
test_wav = './PATH/TO/AUDIO-SAMPLE.wav'

# check if the file exists
if not os.path.exists(test_wav):
    print("Error: Audio file does not exist.")
else:
    # load the audio file
    waveform, sample_rate = torchaudio.load(test_wav)
    # check if the duration of the audio is too short
    min_duration = 1.0  # minimum duration in seconds
    # print(waveform.size(1))
    if waveform.size(1) / sample_rate < min_duration:
        print("Error: Audio duration is too short.")
    else:
        # proceed with inference
        transform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=new_sample_rate)
        transformed = transform(waveform)
        # print(transformed.shape)
        input_data = transformed.unsqueeze(0)  # add batch dimension
        # print(input_data.shape)

In [None]:
# perform inference
with torch.no_grad():
    output = model(input_data)

# get the predicted label
pred = output.argmax(dim=-1)
pred_label = dataset.classes[pred.item()]

print("Predicted label:", pred_label)

In [None]:
# is it what the model predicted?
ipd.Audio(transformed.numpy(), rate=new_sample_rate)