<a href="https://colab.research.google.com/github/asosawelford/Urban-Sound-Classification-with-PyTorch/blob/main/PyTorch_Urban_Sound_Classificator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Creating a custom PyTorch Dataset

In [10]:
import os

from torch.utils.data import Dataset
import pandas as pd
import torchaudio
import torch


class UrbanSoundDataset(Dataset):

    def __init__(self,
                 annotations_file,
                 audio_dir,
                 transformation,
                 target_sample_rate,
                 num_samples,
                 device):
        self.annotations = pd.read_csv(annotations_file)
        self.audio_dir = audio_dir
        self.transformation = transformation.to(self.device)
        self.target_sample_rate = target_sample_rate
        self.num_samples = num_samples
        self.device = device

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        audio_sample_path = self._get_audio_sample_path(index)
        label = self._get_audio_sample_label(index)
        signal, sr = torchaudio.load(audio_sample_path)
        signal = signal.to(self.device)
        signal = self._resample_if_necessary(signal, sr) # We want to resample all data so sr is consistent all over the dataset
        signal = self._mix_down_if_necessary(signal) # We want to mixdown all data so channel number is consistent all over the dataset
        signal = self._cut_if_necessary(signal)
        signal = self._right_pad_if_necessary(signal)
        signal  = self.transformation(signal) #Applies a transformation, if one is given
        return signal, label

    def _cut_if_necessary(self, signal):
        #signal -> Tensor -> (1, num_samples)
        if signal.shape[1] > self.num_samples:
          signal = signal[:, :self.num_samples]
        return signal

    def _right_pad_if_necessary(self, signal):
      lenght_signal = signal.shape[1]
      if lenght_signal < self.num_samples:
        num_missing_samples = self.num_samples - lenght_signal
        last_dim_padding = (0, num_missing_samples)
        signal =  torch.nn.functional.pad(signal, last_dim_padding)
      return signal

    def _resample_if_necessary(self, signal, sr):
        if sr != self.target_sample_rate:
          resampler = torchaudio.transforms.Resample(sr, self.target_sample_rate)
          signal = resampler(signal)
        return signal

    def _mix_down_if_necessary(self,signal):
      if signal.shape[0] > 1:
        signal = torch.mean(signal, dim=0, keepdim = True)
      return signal
      
    def _get_audio_sample_path(self, index):
        fold = f"fold{self.annotations.iloc[index, 5]}"
        path = os.path.join(self.audio_dir, fold, self.annotations.iloc[
            index, 0])
        return path

    def _get_audio_sample_label(self, index):
        return self.annotations.iloc[index, 6]

In [11]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [12]:
ANNOTATIONS_FILE = "/content/gdrive/MyDrive/Colab Notebooks/PyTorch for Audio/UrbanSound8K/metadata/UrbanSound8K.csv"
AUDIO_DIR = "/content/gdrive/MyDrive/Colab Notebooks/PyTorch for Audio/UrbanSound8K/audio"

# Extracting Mel Spectrograms with Pytorch and Torchaudio
We want to deal in Mel-Spectrograms. With torachaudio we will exctract Mel Spectrograms with torchaudio. We will utilize the torchaudio [transforms](https://pytorch.org/audio/stable/transforms.html) module.

In [13]:
SAMPLE_RATE = 22050
mel_spectrogram = torchaudio.transforms.MelSpectrogram(
    sample_rate = SAMPLE_RATE,
    n_fft = 1024,
    hop_length = 512,
    n_mels = 64
)

# Pre-processing Audio with Different Durations
We want fixed-shaped training data. Sometimes cutting down sample duration will be necessary in our dataset. We might need to either cut down the duration of some samples, or zero-pad the duration of others according to de desired NUM_SAMPLES variable.

In [14]:
NUM_SAMPLES = 22050 #(1 SECOND)

TypeError: ignored

# Pre-processing Audio on GPU
We re-define the UrbanSoundDataset and implement GPU (cuda) compatibility-

In [15]:
import os

import torch
from torch.utils.data import Dataset
import pandas as pd
import torchaudio


class UrbanSoundDataset(Dataset):

    def __init__(self,
                 annotations_file,
                 audio_dir,
                 transformation,
                 target_sample_rate,
                 num_samples,
                 device):
        self.annotations = pd.read_csv(annotations_file)
        self.audio_dir = audio_dir
        self.device = device
        self.transformation = transformation.to(self.device)
        self.target_sample_rate = target_sample_rate
        self.num_samples = num_samples

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        audio_sample_path = self._get_audio_sample_path(index)
        label = self._get_audio_sample_label(index)
        signal, sr = torchaudio.load(audio_sample_path)
        signal = signal.to(self.device)
        signal = self._resample_if_necessary(signal, sr)
        signal = self._mix_down_if_necessary(signal)
        signal = self._cut_if_necessary(signal)
        signal = self._right_pad_if_necessary(signal)
        signal = self.transformation(signal)
        return signal, label

    def _cut_if_necessary(self, signal):
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :self.num_samples]
        return signal

    def _right_pad_if_necessary(self, signal):
        length_signal = signal.shape[1]
        if length_signal < self.num_samples:
            num_missing_samples = self.num_samples - length_signal
            last_dim_padding = (0, num_missing_samples)
            signal = torch.nn.functional.pad(signal, last_dim_padding)
        return signal

    def _resample_if_necessary(self, signal, sr):
        if sr != self.target_sample_rate:
          resampler= torchaudio.transforms.Resample(sr,self.target_sample_rate).cuda()
          signal = resampler(signal)
        return signal

    def _mix_down_if_necessary(self, signal):
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal

    def _get_audio_sample_path(self, index):
        fold = f"fold{self.annotations.iloc[index, 5]}"
        path = os.path.join(self.audio_dir, fold, self.annotations.iloc[
            index, 0])
        return path

    def _get_audio_sample_label(self, index):
        return self.annotations.iloc[index, 6]


ANNOTATIONS_FILE = "/content/gdrive/MyDrive/Colab Notebooks/PyTorch for Audio/UrbanSound8K/metadata/UrbanSound8K.csv"
AUDIO_DIR = "/content/gdrive/MyDrive/Colab Notebooks/PyTorch for Audio/UrbanSound8K/audio"
SAMPLE_RATE = 22050
NUM_SAMPLES = 22050

if torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"
print(f"Using device {device}")

mel_spectrogram = torchaudio.transforms.MelSpectrogram(
    sample_rate=SAMPLE_RATE,
    n_fft=1024,
    hop_length=512,
    n_mels=64
)

usd = UrbanSoundDataset(ANNOTATIONS_FILE,
                        AUDIO_DIR,
                        mel_spectrogram,
                        SAMPLE_RATE,
                        NUM_SAMPLES,
                        device)
print(f"There are {len(usd)} samples in the dataset.")
signal, label = usd[0]
print(f"Random signal dim example: {signal.size()}")

Using device cuda
There are 8732 samples in the dataset.
Random signal dim example: torch.Size([1, 64, 44])


# CNN implementation for Sound Classification
The next code block will implement the CNNNetwork class, which will be comprised of 4 convolutiona blocks, a flatten layer, a linear layer, and a softmax activation for the output. There are 10 possible classes of sound classification targets.

In [16]:
from torch import nn
from torchsummary import summary


class CNNNetwork(nn.Module):

    def __init__(self):
        super().__init__()
        # 4 conv blocks, flatten layer, linear layer, softmax activation (10 classes for outputs)
        #pytorch will understang to process the data sequentially
        self.conv1 = nn.Sequential(
            nn.Conv2d(
                in_channels=1,
                out_channels=16,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(
                in_channels=16,
                out_channels=32,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(
                in_channels=32,
                out_channels=64,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv4 = nn.Sequential(
            nn.Conv2d(
                in_channels=64,
                out_channels=128,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(128 * 5 * 4, 10) #in pytorch dense layers are reffered to as "linear"
        self.softmax = nn.Softmax(dim=1)

    def forward(self, input_data):
        x = self.conv1(input_data)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.flatten(x)
        logits = self.linear(x) #here we got logits as outputs
        predictions = self.softmax(logits)
        return predictions


if __name__ == "__main__":
    cnn = CNNNetwork()
    summary(cnn.cuda(), (1, 64, 44))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 16, 66, 46]             160
              ReLU-2           [-1, 16, 66, 46]               0
         MaxPool2d-3           [-1, 16, 33, 23]               0
            Conv2d-4           [-1, 32, 35, 25]           4,640
              ReLU-5           [-1, 32, 35, 25]               0
         MaxPool2d-6           [-1, 32, 17, 12]               0
            Conv2d-7           [-1, 64, 19, 14]          18,496
              ReLU-8           [-1, 64, 19, 14]               0
         MaxPool2d-9             [-1, 64, 9, 7]               0
           Conv2d-10           [-1, 128, 11, 9]          73,856
             ReLU-11           [-1, 128, 11, 9]               0
        MaxPool2d-12            [-1, 128, 5, 4]               0
          Flatten-13                 [-1, 2560]               0
           Linear-14                   

# Training a Sound Classifier


In [17]:
import torch
import torchaudio
from torch import nn
from torch.utils.data import DataLoader


BATCH_SIZE = 128
EPOCHS = 10
LEARNING_RATE = 0.001

ANNOTATIONS_FILE = "/content/gdrive/MyDrive/Colab Notebooks/PyTorch for Audio/UrbanSound8K/metadata/UrbanSound8K.csv"
AUDIO_DIR = "/content/gdrive/MyDrive/Colab Notebooks/PyTorch for Audio/UrbanSound8K/audio"
SAMPLE_RATE = 22050
NUM_SAMPLES = 22050


def create_data_loader(train_data, batch_size):
    train_dataloader = DataLoader(train_data, batch_size=batch_size)
    return train_dataloader


def train_single_epoch(model, data_loader, loss_fn, optimiser, device):
    for input, target in data_loader:
        input, target = input.to(device), target.to(device)

        # calculate loss
        prediction = model(input)
        loss = loss_fn(prediction, target)

        # backpropagate error and update weights
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()

    print(f"loss: {loss.item()}")


def train(model, data_loader, loss_fn, optimiser, device, epochs):
    for i in range(epochs):
        print(f"Epoch {i+1}")
        train_single_epoch(model, data_loader, loss_fn, optimiser, device)
        print("---------------------------")
    print("Finished training")


if __name__ == "__main__":
    if torch.cuda.is_available():
        device = "cuda"
    else:
        device = "cpu"
    print(f"Using {device}")

    # instantiating our dataset object and create data loader
    mel_spectrogram = torchaudio.transforms.MelSpectrogram(
        sample_rate=SAMPLE_RATE,
        n_fft=1024,
        hop_length=512,
        n_mels=64
    )

    usd = UrbanSoundDataset(ANNOTATIONS_FILE,
                            AUDIO_DIR,
                            mel_spectrogram,
                            SAMPLE_RATE,
                            NUM_SAMPLES,
                            device)
    
    train_dataloader = create_data_loader(usd, BATCH_SIZE)

    # construct model and assign it to device
    cnn = CNNNetwork().to(device)
    print(cnn)

    # initialise loss funtion + optimiser
    loss_fn = nn.CrossEntropyLoss()
    optimiser = torch.optim.Adam(cnn.parameters(),
                                 lr=LEARNING_RATE)

    # train model
    train(cnn, train_dataloader, loss_fn, optimiser, device, EPOCHS)

    # save model
    torch.save(cnn.state_dict(), "feedforwardnet.pth")
    print("Trained feed forward net saved at feedforwardnet.pth")

KeyboardInterrupt: ignored