In [None]:
import os
import torch
from torch import nn
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import pandas as pd
import torchaudio


BATCH_SIZE = 128
EPOCHS = 10
LEARNING_RATE = .001
ANNOTATIONS_FILE = "UrbanSound8K/metadata/UrbanSound8K.csv"
AUDIO_DIR = "UrbanSound8K/audio/"
SAMPLE_RATE = 22050
NUM_SAMPLES = 22050

### Urban Sound Dataset

In [None]:
class UrbanSoundDataset(Dataset):
    # Constructor
    def __init__(self, annotations_file, audio_dir, transformation, target_sample_rate, num_samples, device):
        self.annotations = pd.read_csv(annotations_file)
        self.audio_dir = audio_dir
        self.device = device # use GPU if CUDA Available
        self.transformation = transformation.to(self.device)
        self.target_sample_rate = target_sample_rate
        self.num_samples = num_samples 

    def __len__(self):
        # return the number of samples in dataset
        return len(self.annotations)

    # (under hood using get item) ex: a_list[1] -> a_list.__getitem__(1) 
    def __getitem__(self, index):
        audio_sample_path = self._get_audio_sample_path(index)
        label = self._get_audio_sample_label(index)
        # Load in the audio file as signal:
        signal, sr = torchaudio.load(audio_sample_path)
        # Register Signal to the device:
        signal = signal.to(self.device)
        # Resample the signal if needed:
        signal = self._resample_if_necessary(signal, sr)
        # Transform the signal to mono for Spectrogram if needed:
        signal = self._mono_if_necessary(signal)
        # Cut the signal length if necessary (only handling signals with length >= num_samples):
        signal = self._cut_if_necessary(signal)
        # add padding if necessary:
        signal = self._add_padding_if_necessary(signal)
        # Pass the signal to the transformation (MelSpectrogram):
        signal = self.transformation(signal)
        return signal, label


    def _cut_if_necessary(self, signal):
        # signal -> Tensor -> (1, num_samples) -> (1, 50000) -> (1, 22050) # First 22050 samples of audio
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :self.num_samples]
        return signal

    def _add_padding_if_necessary(self, signal):
        length_signal = signal.shape[1]
        if length_signal < self.num_samples:
            num_missing_samples = self.num_samples - length_signal
            last_dim_padding = (0, num_missing_samples)
            signal = torch.nn.functional.pad(signal, last_dim_padding)
        return signal

    def _resample_if_necessary(self, signal, sr):
        if sr != self.target_sample_rate:
            resampler = torchaudio.transforms.Resample(sr, self.target_sample_rate).to(self.device)
            signal = resampler(signal)
        return signal
    
    def _mono_if_necessary(self, signal):
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal

    # indexes below refer to the columns in the annotations file (.csv)
    def _get_audio_sample_path(self, index):
        fold = f"fold{self.annotations.iloc[index, 5]}"
        path = os.path.join(self.audio_dir, fold, self.annotations.iloc[index, 0])
        return path

    def _get_audio_sample_label(self, index):
        return self.annotations.iloc[index, 6]


### Convolutional Neural Network:

In [None]:
class CNNNetwork(nn.Module):

    def __init__(self):
        super().__init__()
        # 4 Convolutional blocks, Blocks->(OUTPUT)-> Flatten Results -> Apply Linear Layer -> Softmax
        self.conv1 = nn.Sequential(
            nn.Conv2d(
                in_channels=1, # 1 channel for grayscale
                out_channels=16, # 16 filters
                kernel_size=3, # Average Value for Convolutional layers
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(
                in_channels=16, # 16 channels for grayscale from output of conv1
                out_channels=32, # 32 filters (doubling out from conv1)
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(
                in_channels=32,
                out_channels=64,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv4 = nn.Sequential(
            nn.Conv2d(
                in_channels=64,
                out_channels=128,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        # Flatten the results
        self.flatten = nn.Flatten()
        # Apply Linear Layer
        self.linear = nn.Linear(128 * 5 * 4, 10)
        # Apply Softmax
        self.softmax = nn.Softmax(dim=1)

    def forward(self, input_data):
        # Pass Data between the layers:
        x = self.conv1(input_data)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        # Pass reults to Flatten
        x = self.flatten(x)
        logits = self.linear(x)
        predictions = self.softmax(logits)
        return predictions

#### Train model:

In [None]:
# Training Epoch of our Model:
def train_one_epoch(model, data_loader, loss_fn, optimiser, device):
    # Loop through all the samples in dataset, in every iteration, we get a new batch of samples
    for inputs, targets in data_loader:
        inputs, targets = inputs.to(device), targets.to(device)

        # Calculate the loss
        predictions = model(inputs)
        loss = loss_fn(predictions, targets)
        # backpropagate the loss and update the model weights
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()
    print(f"Loss: {loss.item()}")

In [None]:
# Higher Level Function which will call the train_one_epoch function for multiple epochs:
def train(model, data_loader, loss_fn, optimiser, device, epochs):
    for i in range(epochs):
        print(f"Epoch {i+1}")
        train_one_epoch(model, data_loader, loss_fn, optimiser, device)
        print("---------------------------")
    print("Finished Training")

### RUN

In [None]:
if __name__ == "__main__":
    # Check if GPU ACC is available
    if torch.cuda.is_available():
        device = "cuda"
    else:
        device = "cpu"
    print (f"Using: {device}")  

    # calling for mel spectrogram from PyTorch Transforms
    mel_spectrogram = torchaudio.transforms.MelSpectrogram(
        sample_rate=SAMPLE_RATE,
        n_fft=1024,
        hop_length=512,
        n_mels=64
    ).to(device)

    usd = UrbanSoundDataset(ANNOTATIONS_FILE,
                            AUDIO_DIR,
                            mel_spectrogram,
                            SAMPLE_RATE,
                            NUM_SAMPLES,
                            device)
    print(f"There are {len(usd)} samples in the dataset.")
    signal, label = usd[0]

    #Initiate Dataset from Urban Sound 8K
    train_data_loader = DataLoader(usd, batch_size=BATCH_SIZE)
    
    
    # Construct the Model
    c_n_n = CNNNetwork().to(device)
    print(c_n_n)

    # instantiate loss function and optimizer
    loss_fn = nn.CrossEntropyLoss()
    optimiser = torch.optim.Adam(c_n_n.parameters(), lr=LEARNING_RATE)

    # Train Model:
    train(c_n_n, train_data_loader, loss_fn, optimiser, device, EPOCHS)

    # Save Model:
    torch.save(c_n_n.state_dict(), "trained-models/feedforwardnet.pth")
    print("Model trained and stored in feedforwardnet.pth")