In [20]:
import os 

import torch
import torch.nn as nn
import torchaudio

import pandas as pd
import torchmetrics as torchmetrics

str(torchaudio.get_audio_backend())

'soundfile'

## Download the dataset 
As part of HW2, you will work with the audio dataset. Download the dataset by the following [link](https://urbansounddataset.weebly.com/download-urbansound8k.html) and remember a path to it. You will have to train a sound classification model, explore different audio transformation methods and try different model architectures.

In [2]:
class UrbanSoundDataset(torch.utils.data.Dataset):
    def __init__(self,
                 annotations_file,
                 audio_dir,
                 transformation,
                 target_sample_rate,
                 num_samples,
                 device):
        self.annotations = pd.read_csv(annotations_file)
        self.audio_dir = audio_dir
        self.device = device
        self.transformation = transformation.to(self.device)
        self.target_sample_rate = target_sample_rate
        self.num_samples = num_samples

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        audio_sample_path = self._get_audio_sample_path(index)
        label = self._get_audio_sample_label(index)
        signal, sr = torchaudio.load(audio_sample_path)
        signal = signal.to(self.device)
        signal = self._resample_if_necessary(signal, sr)
        signal = self._mix_down_if_necessary(signal)
        signal = self._cut_if_necessary(signal)
        signal = self._right_pad_if_necessary(signal)
        signal = self.transformation(signal)
        return signal, label

    def _cut_if_necessary(self, signal):
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :self.num_samples]
        return signal

    def _right_pad_if_necessary(self, signal):
        length_signal = signal.shape[1]
        if length_signal < self.num_samples:
            num_missing_samples = self.num_samples - length_signal
            last_dim_padding = (0, num_missing_samples)
            signal = torch.nn.functional.pad(signal, last_dim_padding)
        return signal

    def _resample_if_necessary(self, signal, sr):
        if sr != self.target_sample_rate:
            resampler = torchaudio.transforms.Resample(sr, self.target_sample_rate)
            signal = resampler(signal)
        return signal

    def _mix_down_if_necessary(self, signal):
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal

    def _get_audio_sample_path(self, index):
        fold = f"fold{self.annotations.iloc[index, 5]}"
        path = os.path.join(self.audio_dir, fold, self.annotations.iloc[
            index, 0])
        return path

    def _get_audio_sample_label(self, index):
        return self.annotations.iloc[index, 6]

## Defining hyperparameters

In [3]:
# Path to the dataset 
ANNOTATIONS_FILE = "datasets/UrbanSound8K/metadata/UrbanSound8K.csv"
AUDIO_DIR = "datasets/UrbanSound8K/audio"

# audio signal sample rate 
SAMPLE_RATE = 22050
# max number of samples in audio
NUM_SAMPLES = 22050

# optimizer learning rate
LEARNING_RATE = 1e-5
# number of train epochs
EPOCHS = 10
# number of samples in each batch size
BATCH_SIZE = 16

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [4]:
# defining audio preprocessing function. Refer to torchaudio.transforms to 
# check Spectrogram(), MFCC() and LFCC() transfrormations. Retrain the model
# with each transformation and write your metrics resulst and conclusaions.
mel_spectrogram = torchaudio.transforms.MelSpectrogram(
        sample_rate=SAMPLE_RATE,
        n_fft=1024,
        hop_length=512,
        n_mels=64
)

## Define the dataset

In [5]:
# defining dataset class with 
dataset = UrbanSoundDataset(
    ANNOTATIONS_FILE,
    AUDIO_DIR,
    mel_spectrogram,
    SAMPLE_RATE,
    NUM_SAMPLES,
    device
)

# calculating validation split sizes
train_size = int(len(dataset) * 0.8)
val_test_size = len(dataset) - train_size

val_size = val_test_size // 2
test_size = val_test_size - val_size

print('Dataset len: ', len(dataset))
print(f'Train split: {train_size} | Validation split: {val_size} | Test split: {test_size}')

# splitting original dataset into train and val_test sets
train_dataset, val_dataset = torch.utils.data.random_split(
    dataset, [train_size, val_size + test_size]
)
# splitting val_test set into val dataset and test dataset
val_dataset, test_dataset = torch.utils.data.random_split(
    val_dataset, [val_size, test_size]
)

Dataset len:  8732
Train split: 6985 | Validation split: 873 | Test split: 874


In [6]:
signal, label = dataset[0]

## Model

Define your model architecture here. 

1) Build a baseline model that consists of consecutive convolution blocks. Flatten the convolution output and pass through the linear layer with softmax activation function, to obtain class distributions

2) Add the BatchNorm layer after each convolution block and compare the results with the baseline model

3) Add a Dropout layer after each BatchNorm block and compare the results

4) Try different parameters for the blocks:
 - Conv layer: out_channels, kernel_size, stride
 - Dropout: p
 
 and compare the results

In [None]:
# Class CLModel to define your model architecture and computational graph
class CLModel(nn.Module):
    def __init__(self):
        super().__init__()
        ################

        # YOUR CODE HERE
        # define you convolution based classification model
        # 4 conv blocks / flatten / linear / softmax

        ################

        #First Conv Block
        self.conv1 = nn.Conv2d(1, 8, kernel_size=(5,5), padding=(2,2))
        self.pool1 = nn.MaxPool2d(2,2)
        self.relu1 = nn.ReLU()
        #self.bn1 = nn.BatchNorm2d(8)
        #self.dropout1 = nn.Dropout(0.2)

        #Second Conv Block
        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3,3))
        self.pool2 = nn.MaxPool2d(2,2)
        self.relu2 = nn.ReLU()
        #self.bn2 = nn.BatchNorm2d(8)
        #self.dropout2 = nn.Dropout(0.2)


        #Third Conv Block
        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3,3))
        self.pool3 = nn.MaxPool2d(2,2)
        self.relu3 = nn.ReLU()
        #self.bn3 = nn.BatchNorm2d(8)
        #self.dropout3 = nn.Dropout(0.2)


        #Fourth Conv Block
        self.conv4 = nn.Conv2d(32, 16, kernel_size=(3,3))
        self.pool4 = nn.MaxPool2d(2,2)
        self.relu4 = nn.ReLU()
        #self.bn4 = nn.BatchNorm2d(8)
        #self.dropout4 = nn.Dropout(0.2)

        self.lin = nn.Linear(in_features=32, out_features=16)
        self.softmax = nn.Softmax()


        self.conv = nn.Sequential(
            self.conv1,
            self.pool1,
            self.relu1,
            #self.bn1,
            #self.dropout1,

            self.conv2,
            self.pool2,
            self.relu2,
            #self.bn2,
            #self.dropout2,

            self.conv3,
            self.pool3,
            self.relu3,
            #self.bn3,
            #self.dropout3,

            self.conv4,
            self.pool4,
            self.relu4,
            #self.bn4,
            #self.dropout4,

        )


    def forward(self, input_data: torch.Tensor) -> torch.Tensor:
        ################

        # YOUR CODE HERE
        # define the logic of your computational graph

        ################
        # Run the convolutional blocks
        data = self.conv(input_data)

        data = data.view(data.shape[0], -1)
        data = self.lin(data)
        data = self.softmax(data)

        return data


In [None]:
cnn = CLModel()
from torchsummary import summary
summary(cnn, (1, 64, 44))

## Train and evaluation functions

In [None]:
def create_data_loader(data, batch_size:int, shuffle:bool=False) -> torch.utils.data.DataLoader:
    ''' creating dataloader  
            Arguments:
                batch_size: int
                    number of samples to process for the model
                shuffle: bool
                    whether to shuffle dataset. 
    '''
    dataloader = torch.utils.data.DataLoader(data, batch_size=batch_size, shuffle=shuffle)
    return dataloader

def train_single_epoch(
    model:nn.Module, 
    data_loader:torch.utils.data.DataLoader, 
    loss_fn:torch.nn.modules.loss, 
    optimiser:torch.optim,
    metrics,
    device:torch.device
):
    ''' method to perform single epoch of training '''

    rolling_loss = 0. 
    rolling_metric = 0.
    for input, target in data_loader:
        input, target = input.to(device), target.to(device)
        ################

        # YOUR CODE HERE 
        optimiser.zero_grad()
        # step 1: pass input tensor through the model
        output = cnn(input)

        # step 2: compute loss
        loss = loss_fn(output, target)

        # step 3: update optimizer and do a backward for the loss function
        loss.backward()
        optimiser.step()

        _, prediction = torch.max(output,1)
        # step 5 (final): compute train epoch loss and metrics
        rolling_loss += loss.item()
        #rolling_metric += (prediction == target ).float().sum()
        rolling_metric += metrics(prediction, target)

    ################
    print(f'Train loss: {rolling_loss} | Train metric: {rolling_metric}')


def evaluate(
    model:nn.Module,
    val_dataloader:torch.utils.data.DataLoader, 
    loss_fn:torch.nn.modules.loss, 
    metrics, # define you type here  
    device:torch.device):
    ''' method to perform evaluation step '''

    rolling_loss = 0.
    rolling_metric = 0.
    for input, target in val_dataloader: 
        input, target = input.to(device), target.to(device)
        ################

        # YOUR CODE HERE 

        # step 1: pass input tensor through the model
        output = cnn(input)
        # step 2: compute loss and metrics
        loss = loss_fn(output, target)
        _, prediction = torch.max(output,1)

        rolling_loss += loss.item()
        kek = metrics(prediction, target)
        rolling_metric += metrics(prediction, target)

        a = 2
    ################
    print(f'Validation loss: {rolling_loss} | Validation metric: {rolling_metric}')
    

def train(model, train_dataloader, val_dataloader, loss_fn, metrics, optimiser, device, epochs):
    for i in range(epochs):
        print(f"Epoch {i+1}")
        train_single_epoch(model, train_dataloader, loss_fn, optimiser, metrics, device)
        evaluate(model, val_dataloader, loss_fn, metrics, device)
    print("Finished training")



loss_fn = nn.CrossEntropyLoss()
optimiser = torch.optim.Adam(cnn.parameters(),lr=0.01)
metrics = torchmetrics.functional.accuracy

train_dataloader = create_data_loader(train_dataset, BATCH_SIZE)
val_dataloader = create_data_loader(val_dataset, BATCH_SIZE)
test_dataloader = create_data_loader(test_dataset, BATCH_SIZE)


# train model
train(cnn, train_dataloader, val_dataloader, loss_fn, metrics, optimiser, device, EPOCHS)

# save model
torch.save(cnn.state_dict(), "feedforwardnet.pth")
print("Trained feed forward net saved at feedforwardnet.pth")

In [24]:
# YOUR CODE HERE
# Define your loss function, optimization algorithm, and a metric function(s)
loss_fn = nn.CrossEntropyLoss()
optimiser = torch.optim.Adam(cnn.parameters(),lr=0.001)
metrics = torchmetrics.functional.accuracy

train_dataloader = create_data_loader(train_dataset, BATCH_SIZE)
val_dataloader = create_data_loader(val_dataset, BATCH_SIZE)
test_dataloader = create_data_loader(test_dataset, BATCH_SIZE)


# train model
train(cnn, train_dataloader, val_dataloader, loss_fn, metrics, optimiser, device, EPOCHS)

# save model
torch.save(cnn.state_dict(), "feedforwardnet.pth")
print("Trained feed forward net saved at feedforwardnet.pth")

Epoch 1


  data = self.softmax(data)


RuntimeError: The size of tensor a (10) must match the size of tensor b (16) at non-singleton dimension 1

In [None]:
################

# YOUR CODE HERE
# evaluate your model on test split

################

## Results
1) Write a litter summary of the audio transformation methods (1-2 sentences), and explore them on your own

2) Create a table with the model performance comparison. The table should include short model summary (ex. 4xConvBlocks - Flatten - Linear - Softmax), comments about hyperparameters (ex. ConvBlocks: [64, 64, 64, 64] - Dropout: 0.2, etc.) and metrics values for validation and test datasets.

3) Train baseline model with 4x ConvBlocks - Flatten - Linear - Softmax

4) Train baseline model with BatchNorm

5) Train baseline model with BatchNorm and Dropout

6) Train 5 model models with different hyperparameters

7) Add all models to a table 

8) Write an explanation to the table and indicate the best model parameters and architecture