In [None]:
# Import the libraries
import os
import torch
from torch import nn, optim
import numpy as np
import matplotlib.pyplot as plt
from scipy.fftpack import fft, fftfreq
import sounddevice as sd
import librosa
from torch.utils.data import Dataset
import numpy as np
import torchaudio
from torchsummary import summary
from torch.utils.data import DataLoader


### Create a matrix for audio

### Extract feature

### Plot spectrograms some file and compare them

### Plot MFCC some file and compare them

In [None]:
class SoundDataSet(Dataset):
    def __init__(self, file_paths, labels, device, sr=44100,n_fft=1024,hop_length=512, n_mfcc=13,n_mels=128, duration=5):
        self.file_paths = file_paths
        self.labels = labels
        
        self.sampled_sr = sr
        self.n_fft = n_fft
        self.duration = duration
        self.hop_length = hop_length
        self.n_mfcc= n_mfcc
        self.device = device
        self.n_mels = n_mels

        self.melspectrogram_dbs= []
        for index in range(len(file_paths)):
            mel_spec_db = self.__get_melspectrogram_db__(index)
            mel_spec_db = torch.tensor(mel_spec_db, dtype=torch.float32).unsqueeze(0)
            self.melspectrogram_dbs.append(mel_spec_db)

         
    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, index):
        return self.melspectrogram_dbs[index], self.labels[index]
    
    def __load_audio__(self, index):
        audio_sample_path = self.file_paths[index]
        signal, sr = librosa.load(audio_sample_path, sr=None)
        y, x = torchaudio.load(audio_sample_path)
        # Get the first 5s (duration) of the audio
        signal = self.__get_audio_duration__(signal, sr)
        signal, sr = self.__resample_audio__(signal, sr)
        return signal, sr

    def __get_audio_duration__(self, signal,sr):
        if signal.shape[0]<self.duration*sr:
            signal=np.pad(signal,int(np.ceil((self.duration*sr-signal.shape[0])/2)),mode='reflect')
        else:
            signal=signal[:self.duration*sr]
        
        return signal

    def __resample_audio__(self, signal, sr):
        if sr != self.sampled_sr:
            signal_resampled = librosa.resample(signal, orig_sr=sr, target_sr=self.sampled_sr)
            return signal_resampled, self.sampled_sr
        return signal, sr

    def __get_melspectrogram_db__(self, index):
        signal, sr = self.__load_audio__(index)
        ms = librosa.feature.melspectrogram(y=signal, sr=sr,fmax=sr// 2,n_mels=self.n_mels)
        mel_spec_db = librosa.power_to_db(ms, ref=np.max)
        return self.__melspec_normalization__(mel_spec_db)

    def __melspec_normalization__(self,mel_spec_db):
        # Normalize to [0, 1]
        return (mel_spec_db - mel_spec_db.min()) / (mel_spec_db.max() - mel_spec_db.min())

    def __plot_mfcc__(self, index):
        signal, sr = self.__load_audio__(index)
        hop_size = self.hop_length//2
        mfccs_librosa = librosa.feature.mfcc(y=signal, sr=sr, n_fft=self.n_fft, hop_length=hop_size)
        plt.figure()
        plt.imshow(mfccs_librosa, aspect='auto', origin='lower', cmap='viridis')
        plt.colorbar(format='%+2.0f')
        plt.title("MFCCs")
        plt.xlabel("Time Frames")
        plt.ylabel("MFCC Coefficients")
        plt.tight_layout()
        plt.show()
        
    def __plot_spectrogram__(self, index):
        signal, sr = self.__load_audio__(index)
        nfft = self.n_fft
        win_size = nfft
        hop_size = nfft//2
        librosa_spectrogram = librosa.stft(signal,n_fft=nfft, hop_length=hop_size, win_length=win_size)
        librosa_power_spectrogram = librosa.amplitude_to_db(librosa_spectrogram, ref=np.max)

        plt.figure()
        librosa.display.specshow(librosa_power_spectrogram, sr=sr, x_axis='time', y_axis='hz', cmap='viridis')
        plt.colorbar(format='%+2.0f dB')
        plt.title('Spectrogram')
        plt.xlabel('Time (s)')
        plt.ylabel('Frequency (Hz)')
        plt.tight_layout()
        plt.show()

    def __play_audio__(self, index):
        signal, sr = self.__load_audio__(index)
        plt.figure()
        plt.plot(signal)
        plt.show()
        sd.play(signal)
        sd.wait()
        




### Implement the classifier

In [None]:
def torch_loss_and_optimizer(model:nn.Module, learning_rate:float=0.001):
    """
    :param model:nn.Module
    :param learning_rate:float = 0.001
    :return: loss_function:nn.Module, optimizer:nn.Module

    Return Cross Entropy Loss function and Adam optimizer
    """
    loss_function = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    return [loss_function, optimizer]

def get_total_false(label):
    false_count = 0

    for index in range(len(label)):

        if label[index] == 0:
            false_count += 1
    return false_count

def get_total_true(label):
    true_count = 0

    for index in range(len(label)):

        if label[index] == 0:
            true_count += 1
    return true_count

def custom_loss(data_label, pred_label):
    true_positive = 0
    true_negative = 0
    false_positive = 0
    false_negative = 0

    for index in range(len(data_label)):
        if pred_label[index] == data_label[index]:
            true_positive += 1
        elif pred_label[index] == 1 & data_label[index] == 0:
            false_positive += 1
        elif pred_label[index] == 0 & data_label[index] == 1:
            false_negative += 1
        else:
            true_negative += 1
        
    precision = true_positive / (true_positive + true_negative)
    recall =true_positive / (true_positive + false_positive)
    
    return precision, recall
            


### Train the mode

In [None]:
class CNNNetwork(nn.Module):
    #def __init__(self, time_frames, n_mels=128):  # Accepts time_frames and n_mels
    def __init__(self, n_mels=128):  # Accepts time_frames and n_mels
        super().__init__()
        self.n_mels = n_mels
        #self.time_frames = time_frames

        # Convolutional Layers
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)  # Reduces dimensions by half
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv4 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )

        self.flatten = nn.Flatten()

        """
        # Calculate the dimensions of the feature map after convolutional layers
        final_height = self._calc_final_dim(self.n_mels, num_pooling_layers=4)
        final_width = self._calc_final_dim(self.time_frames, num_pooling_layers=4)

        # Fully Connected Layers
        
       
        self.linear1 = nn.Sequential(
            nn.Linear(in_features=128 * final_height * final_width, out_features=128),
            nn.ReLU(),
            nn.Dropout(0.5)  # Regularization
        )
        """
        print("self.flatten shape",self.flatten.__sizeof__())
        self.linear2 = nn.Linear(in_features=128 *208, out_features=2)

        # Output Layer
        self.output = nn.Softmax(dim=1)

    def _calc_final_dim(self, input_dim, num_pooling_layers):
        """
        Calculate the final dimension after convolution and pooling layers.
        Each pooling layer halves the input dimension.
        """
        for _ in range(num_pooling_layers):
            input_dim = input_dim // 2
        return input_dim

    def forward(self, input_data):
        x = self.conv1(input_data)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.flatten(x)  # Flatten feature maps into a 1D vector
        #x = self.linear1(x)  # Dense hidden layer
        logits = self.linear2(x)  # Final linear layer
        output = self.output(logits)  # Apply sigmoid for binary classification
        return output

#model=CNNNetwork().cuda()
#summary(model,(1,128,430))

### Calculate results and analyze them

In [None]:
BATCH_SIZE = 128
EPOCHS = 10
LEARNING_RATE = 0.001

def getFilePaths(path):
    return [path +'/'+ file for file in os.listdir(path)]

def create_data_loader(train_data, batch_size):
    train_dataloader = DataLoader(train_data, batch_size=batch_size)
    return train_dataloader

def train_single_epoch(model, data_loader, loss_fn, optimiser, device):
    loss = None
    for input, target in data_loader:
        input, target = input.to(device), target.to(device)

        # calculate loss
        prediction = model(input)
        loss = loss_fn(prediction, target)

        # backpropagate error and update weights
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()

    print(f"loss: {loss.item()}")


def train(model, data_loader, loss_fn, optimiser, device, epochs):

    for i in range(epochs):
        print(f"Epoch {i+1}")
        train_single_epoch(model, data_loader, loss_fn, optimiser, device)
        print("---------------------------")
    print("Finished training")



audio_paths = []
audio_labels = []
categories = {
    0: "bus",
    1: "tram"
}
for key, value in categories.items():
    paths = getFilePaths('dataset/' + value)
    audio_paths += paths
    audio_labels += [key]*len(paths)

if torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"
print(f"Using device {device}")
audio_data = SoundDataSet(file_paths=audio_paths, labels=audio_labels, device=device)
#audio_data.__plot_spectrogram__(26)
#audio_data.__plot_mfcc__(26)
#audio_data.__play_audio__(26)
model = CNNNetwork()
loss_func, optimizer = torch_loss_and_optimizer(model=model, learning_rate= LEARNING_RATE)
train_data_loader = create_data_loader(audio_data, BATCH_SIZE)
train(model, train_data_loader,loss_func, optimizer, device, EPOCHS)
if False:
    torch.save(model.state_dict(), "vehicle_audio_processing_model.pth")
    print("Trained feed forward net saved at vehicle_audio_processing_model.pth")
