# <center>UrbanSound8K</center>

<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Import-of-Required-Libraries" data-toc-modified-id="Import-of-Required-Libraries-1">Import of Required Libraries</a></span></li><li><span><a href="#Creation-the-Dataset-Class" data-toc-modified-id="Creation-the-Dataset-Class-2">Creation the Dataset Class</a></span></li><li><span><a href="#Creation-of-the-Transforms-for-Audio-Pre-Processing" data-toc-modified-id="Creation-of-the-Transforms-for-Audio-Pre-Processing-3">Creation of the Transforms for Audio Pre-Processing</a></span></li><li><span><a href="#Creation-of-the-Model" data-toc-modified-id="Creation-of-the-Model-4">Creation of the Model</a></span></li><li><span><a href="#Test" data-toc-modified-id="Test-5">Test</a></span></li><li><span><a href="#Dataset-Exploration" data-toc-modified-id="Dataset-Exploration-6">Dataset Exploration</a></span><ul class="toc-item"><li><span><a href="#Classes-Counts" data-toc-modified-id="Classes-Counts-6.1">Classes Counts</a></span></li><li><span><a href="#Duration-of-Events" data-toc-modified-id="Duration-of-Events-6.2">Duration of Events</a></span></li><li><span><a href="#Analysis-of-Salience" data-toc-modified-id="Analysis-of-Salience-6.3">Analysis of Salience</a></span><ul class="toc-item"><li><span><a href="#Global" data-toc-modified-id="Global-6.3.1">Global</a></span></li><li><span><a href="#Per-Class" data-toc-modified-id="Per-Class-6.3.2">Per Class</a></span></li></ul></li><li><span><a href="#Folds-Distribution" data-toc-modified-id="Folds-Distribution-6.4">Folds Distribution</a></span></li></ul></li></ul></div>

## Libraries Import

In [68]:
import os
import pandas as pd
import torch
from torch.utils.data import Dataset
import torch.nn as nn
import torch.optim as optim
import torchaudio
import torchaudio.transforms as transforms
from random import randint
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style("darkgrid")

## Creation of the Dataset Class

In [3]:
class UrbanSound8K(Dataset):
    
    def __init__(self, annotations, dataset_path, transforms_params, device):
        self.device = device
        self.annotations = annotations
        self.dataset_path = dataset_path
        self.target_sample_rate = transforms_params["target_sample_rate"]
        self.target_event_length = transforms_params["target_event_length"]
        self.num_samples = target_event_length * target_sample_rate
        self.n_fft = transforms_params["n_fft"]
        self.hop_length = transforms_params["hop_length"]
        self.f_max = transforms_params["f_max"]
        self.n_mels = transforms_params["n_mels"]
        
    def __len__(self):
        return len(self.annotations)
    
    def __getitem__(self, index):
        label = torch.tensor(self._get_event_label(index), dtype=torch.long)
        signal, sr = self._get_event_signal(index)
        signal = signal.to(self.device)
        signal = self._mix_down_if_necessary(signal)
        signal = self._resample_if_necessary(signal, sr)
        signal = self._cut_if_necessary(signal)
        signal = self._right_pad_if_necessary(signal)
        signal = self._spectrogram_transform(signal)
        signal = self._db_transform(signal)
        return signal, label
    
    def _get_event_label(self, index):
        return self.annotations.iloc[index]["labelID"]
    
    def _get_event_signal(self, index):
        audio_path = os.path.join(self.dataset_path, self.annotations.iloc[index]["audio"])
        audio_signal, sr = torchaudio.load(audio_path)
        start_index = math.floor(self.annotations.iloc[index]["start"] * sr) - 1
        if start_index < 0:
            start_index = 0
        end_index = math.ceil(self.annotations.iloc[index]["end"] * sr) - 1
        event_signal = audio_signal[:, start_index:end_index]
        return event_signal, sr
    
    def _mix_down_if_necessary(self, signal):
        # If signal has multiple channels, mix down to mono
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal
        
    def _resample_if_necessary(self, signal, sr):
        if sr != self.target_sample_rate:
            resample_transform = torchaudio.transforms.Resample(sr, self.target_sample_rate)
            resample_transform = resample_transform.to(self.device)
            signal = resample_transform(signal)
        return signal
    
    def _cut_if_necessary(self, signal):
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :self.num_samples]
        return signal
        
    def _right_pad_if_necessary(self, signal):
        length_signal = signal.shape[1]
        if length_signal < self.num_samples:
            num_missing_samples = self.num_samples - length_signal
            last_dim_padding = (0, num_missing_samples)
            signal = nn.functional.pad(signal, last_dim_padding)
        return signal
    
    def _spectrogram_transform(self, signal):
        mel_spectrogram_transform = torchaudio.transforms.MelSpectrogram(
                                                        sample_rate = self.target_sample_rate,
                                                        n_fft = self.n_fft,
                                                        hop_length = self.hop_length,
                                                        f_max = self.f_max,
                                                        n_mels = self.n_mels,
                                                        power = 2
                                                        )      
        mel_spectrogram_transform = mel_spectrogram_transform.to(self.device)
        signal = mel_spectrogram_transform(signal)
        return signal
    
    def _db_transform(self, signal):
        db_transform = torchaudio.transforms.AmplitudeToDB(stype='power')
        db_transform = db_transform.to(self.device)
        signal = db_transform(signal)
        return signal
        

## Instantiation of the Dataset

In [None]:
# Instantiate a dataset object
dataset = UrbanSound8K(
    annotations=annotations, 
    dataset_path=dataset_path, 
    transforms_params=transforms_params,
    device=device
)

## Dataset Exploration

### Classes Counts

In [None]:
class_vc = dataset.annotations["class"].value_counts()
plt.figure(figsize=(18,8))
sns.barplot(x=class_vc.index, y=class_vc.values)
plt.title("Classes Counts", fontsize=20)
plt.xlabel("Classes", fontsize=14)
plt.ylabel("Counts", fontsize=14)
plt.show()

### Duration of Events

In [None]:
duration = dataset.annotations["end"] - dataset.annotations["start"]
plt.figure(figsize=(18,8))
sns.histplot(data=duration, x=duration.values, bins=20)
plt.title("Duration of Events", fontsize=20)
plt.xlabel("Duration", fontsize=14)
plt.ylabel("Counts", fontsize=14)

### Salience

#### Global

In [None]:
salience_vc = dataset.annotations["salience"].value_counts()
plt.figure(figsize=(14,6))
sns.barplot(x=salience_vc.index, y=salience_vc.values)
plt.title("Salience Counts", fontsize=20)
plt.xlabel("Salience", fontsize=14)
plt.ylabel("Counts", fontsize=14)
plt.show()

#### Per Class

### Folds Distribution

## Creation of the Model

In [None]:
class ConvNet(nn.Module):
    
    def __init__(self, num_classes):
        super().__init__()
        
        # Number of classes
        self.num_classes = num_classes
        
        self.conv1 = nn.Sequential(
            nn.Conv2d(
                in_channels=1,
                out_channels=16,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(
                in_channels=16,
                out_channels=32,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(
                in_channels=32,
                out_channels=64,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv4 = nn.Sequential(
            nn.Conv2d(
                in_channels=64,
                out_channels=128,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(128 * 5 * 12, self.num_classes)
        
    def forward(self, input_data):
        x = self.conv1(input_data)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.flatten(x)
        logits = self.linear(x)
        return logits

## Creation of the Train Function

In [None]:
def train(model, data_loader, loss_fn, optimiser, num_epochs, step_size, device):
    
    # Get the number of batches
    n_batches = len(data_loader)
    
    # Initialise running variables that will be reset after each step
    running_loss = 0.0
    running_correct = 0
    
    # For each epoch
    for i in range(num_epochs):
        
        # For each batch
        for j, (inputs, targets) in enumerate(data_loader):
        
            # Send the inputs and targets to the device
            inputs = inputs.to(device)
            targets = targets.to(device)

            # Forward pass
            outputs = model(inputs)
            loss = loss_fn(outputs, targets)
        
            # Backward and optimize
            optimiser.zero_grad()
            loss.backward()
            optimiser.step()
            
            # Get the running loss to write to tensorboard
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            running_correct += (predicted == targets).sum().item()
        
            if (j+1) % step_size == 0:
                print(f"Epoch {i+1}/{num_epochs}, Batch {j+1}/{n_batches}, Loss: {loss.item():.4f}")
                # Reset running loss and correct for next step
                running_loss = 0.0
                running_correct = 0
    
    print("Traning is done.")

## Creation of the Validation Function

In [None]:
def validate(model, data_loader, classes_map):

    with torch.no_grad():
        
        # Get the number of classes
        num_classes = len(classes_map)
        
        # Initialise `all_targets` and `all_predictions` tensors (staked later and used to plot the confusion matrix)
        all_targets = torch.tensor([], dtype=torch.int32).to(device)
        all_predictions = torch.tensor([], dtype=torch.int32).to(device)
        
        # Initialise counters for the number of correct predictions and number of samples
        n_correct = 0
        n_samples = 0
        
        # Initialise dictionnaries for the number of correct predictions per class and number of samples per class
        n_correct_per_class = {x:0 for x in range(num_classes)}
        n_samples_per_class = {x:0 for x in range(num_classes)}
        
        # Initialise a dictionnary to store prediction accuracy per class
        accuracy_per_class = {x:0 for x in classes_map.values()}
    
        # For each batch in the validation dataloader
        for inputs, targets in data_loader:

            # Send the inputs and targets to the device
            inputs = inputs.to(device)
            targets = targets.to(device)            
            
            # Append the batch targets to `all_targets` 
            all_targets = torch.cat((all_targets, targets), dim=0)
        
            # Pass the inputs through the model
            outputs = model(inputs)
        
            # Get the predictions
            _, predictions = torch.max(outputs.data, 1)
            all_predictions = torch.cat((all_predictions, predictions), dim=0)
        
            # Increment the counters
            n_samples += inputs.size(0)
            n_correct += (predictions == targets).sum().item()
        
            # For each sample in the batch
            for i in range(len(targets)):
                # Get its true class and predicted class
                target = targets[i]
                predicted = predictions[i]
                # If the prediction is correct
                if (target == predicted):
                    # Increment the counter for this class in the `n_correct_per_class` dictionnary
                    n_correct_per_class[target.item()] += 1
                # Increment the counter for this class in the `n_samples_per_class` dictionnary
                n_samples_per_class[target.item()] += 1
         
        # Stack `all_targets` and `all_predictions` together
        tp_stack = torch.stack((all_targets, all_predictions), dim=1)
        
        # Compute confusion matrix
        cm = torch.zeros(num_classes, num_classes, dtype=torch.int32).to(device)
        for tp in tp_stack:
            t, p = tp.tolist()
            cm[t, p] += 1
        
        # Calculate the global prediction accuracy
        acc = 100.0 * n_correct / n_samples
        acc = round(acc, 2)
        print(f"Accuracy of the model: {acc}%")
    
        # For each classs
        for i in range(num_classes):
            # Calculate its respective prediction accuracy
            if n_samples_per_class[i] == 0:
                class_acc = 0
            else:
                class_acc = 100.0 * n_correct_per_class[i] / n_samples_per_class[i]
            accuracy_per_class[classes_map[i]] = class_acc
            
        return acc, cm

## Creation of the Test Function

In [None]:
def predict(model, input_data):
    model.eval()
    with torch.no_grad():
        predictions = model(input_data)
        predicted = predictions[0].argmax(0)
    return predicted

## Model Training and Validation

In [None]:
# Create a dictionnary that matches each label with its ID
classes_map = dict(enumerate(dataset.annotations["label"].cat.categories))

# Instantiate the CNN
model = ConvNet(num_classes=len(classes_map)).to(device)

# Print the model summary (input shapes and parameters for each layer)
print(summary(model, torch.zeros((batch_size, 1, n_mels, n_frames)).to(device), show_input=True))

# Initialise the loss function and the optimiser
loss_fn = nn.CrossEntropyLoss()
optimiser = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Initialise a dictionnary to store each folds combination validation metrics
metrics = {f"Validation on fold {x+1}":{"acc":[], "cm":[]} for x in range(n_folds)}

for i in range(1,n_folds+1):
    # Get the train and validation sets
    train_annotations = dataset.annotations.drop(dataset.annotations[dataset.annotations["fold"]==i].index)
    validation_annotations = dataset.annotations[dataset.annotations["fold"]==i]
    train_indices = train_annotations.index
    validation_indices = validation_annotations.index
    train_sampler = SubsetRandomSampler(train_indices)
    validation_sampler = SubsetRandomSampler(validation_indices)
    
    # Create the train and validation dataloaders
    train_dataloader = DataLoader(
                            dataset, 
                            batch_size=batch_size, 
                            sampler=train_sampler
                            )
    validation_dataloader = DataLoader(
                            dataset, 
                            batch_size=batch_size,
                            sampler=validation_sampler
                            )
    
    # Train the model
    train(model, train_dataloader, loss_fn, optimiser, num_epochs, step_size, device)

    # Validate the model
    classes_map = dict(enumerate(annotations["label"].cat.categories))
    acc, cm = validate(model, validation_dataloader, classes_map)
    metrics[f"Validation on fold {i}"]["acc"] = acc
    metrics[f"Validation on fold {i}"]["cm"] = cm
    

    # Save the model
    filename = f"SonoLog-Classifier_validation-Fold{i}_acc-{acc}.pth"
    torch.save(model.state_dict(), os.path.join("Models",filename))
    print(f"Model stored as file : {filename}")

## Visualisation of the Confusion Matrix 

In [None]:
def plot_confusion_matrix(cm, classes_map, normalize=False):
    cm = cm.cpu()
    if normalize:
        cm = cm/cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print("Confusion matrix without normalization")
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title("Confusion Matrix", fontsize=20)
    plt.colorbar=()
    tick_marks = np.arange(len(classes_map))
    plt.xticks(tick_marks, classes_map.values(), rotation=90, fontsize=12)
    plt.yticks(tick_marks, classes_map.values(), fontsize=12)
    
    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt), 
                horizontalalignment="center",
                color="white" if cm[i,j] > thresh else "black")
    plt.tight_layout()
    plt.xlabel("Predicted Class", fontsize=16)
    plt.ylabel("True Class", fontsize=16)    
    plt.show()

In [None]:
plt.figure(figsize=(12,12))
plot_confusion_matrix(cm, classes_map, normalize=False)

## Model Test

In [None]:
# Load the model back (if needed)
model = ConvNet(num_classes=len(classes_map))
state_dict = torch.load("Models/SonoLog-Classifier_validation-Fold5_acc-100.0.pth")
model.load_state_dict(state_dict)
model = model.to(device)

In [None]:
# Get a sample from the validation set for inference
sample_index = 11
input_data, target = dataset[sample_index][0], dataset[sample_index][1]
input_data = torch.unsqueeze(input_data, dim=0)
input_data, target = input_data.to(device), target.to(device)

# Make an inference
predicted = predict(model, input_data)

predicted_class = classes_map[predicted.item()]
expected_class = classes_map[target.item()]
        
print(f"Predicted: '{predicted_class}', Expected: '{expected_class}'")