In [None]:
import matplotlib.pyplot as plt # plotting library
import numpy as np # this module is useful to work with numerical arrays
import pandas as pd # this module is useful to work with tabular data
import random # this module will be used to select random samples from a collection
import os # this module will be used just to create directories in the local filesystem
from tqdm import tqdm # this module is useful to plot progress bars

import torch
import torch.nn.functional as F
import torchvision
import torchvision.transforms.v2 as v2
from torch.utils.data import DataLoader, Dataset
from torch import nn
import h5py
import librosa

# Define dataset

In [None]:
x_train = h5py.File('h5Dataset/train_x.h5', 'r')["train"][:]
y_train = h5py.File('h5Dataset/train_y.h5', 'r')["train"][:]

x_valid = h5py.File('h5Dataset/valid_x.h5', 'r')["valid"][:]
y_valid = h5py.File('h5Dataset/valid_y.h5', 'r')["valid"][:]

x_test = h5py.File('h5Dataset/test_x.h5', 'r')["test"][:]
y_test = h5py.File('h5Dataset/test_y.h5', 'r')["test"][:]

In [None]:
class DataAudioH5_colab(Dataset):

    def __init__(self,data_x,data_y,transform=None,input_type="2D"):
        
        self.x = data_x
        self.y = data_y
        self.transform = transform
        #Select type of input: 2D or 1D
        self.type = input_type

    def __len__(self):

        return len(self.x)

    def create_input(self, audio,sr=22050):

        """
        This function takes an audio clip and creates the input for the model
        """
      
        # Get audio

        # load audio track
        #with warnings.catch_warnings():
        #    warnings.simplefilter('ignore')
        """
        #Select random clip from audio
        start = np.random.randint(0, (audio.shape[0]-2**18))
        audio = audio[start:start+2**18]
        """

        if self.type ==  "2D":
            
            #Get 2D spectrogram
            stft = np.abs(librosa.stft(audio, n_fft=4096, hop_length=2048))
            
            mel = librosa.feature.melspectrogram(sr=sr, S=stft**2, n_mels=513)[:,:128]
            mel = librosa.power_to_db(mel).T
            return mel[np.newaxis,:] #Add channel dimension
    
        return audio[np.newaxis,:] #Add channel dimension



    def __getitem__(self, idx):

        # get input and label

        x = self.x[idx]
        x = self.create_input(x)

        y = self.y[idx]

        if self.transform:
            x = self.transform(x)
           
        return x,y


In [None]:
def MinMaxScaler(x):
    Xmin = torch.min(x)
    Xmax = torch.max(x)
    return (x-Xmin)/(Xmax-Xmin)

# Standard transformations for images
# Mean and std are computed on one file of the training set
transforms = v2.Compose([v2.ToTensor(),
    v2.RandomResizedCrop(size=(128,513), antialias=True),
    v2.RandomHorizontalFlip(p=0.5),
    v2.ToDtype(torch.float32, scale=True),
    # v2.Normalize(mean=[1.0784853], std=[4.0071154]),
    v2.Lambda(lambda x: MinMaxScaler(x))
    ])

In [None]:
train_dataset = DataAudioH5_colab(x_train,y_train,input_type="2D")
valid_dataset = DataAudioH5_colab(x_valid,y_valid,input_type="2D")
test_dataset  = DataAudioH5_colab(x_test,y_test,input_type="2D")

In [None]:
train_dataloader  = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=os.cpu_count())
valid_dataloader  = DataLoader(valid_dataset, batch_size=64, shuffle=False, num_workers=os.cpu_count())
test_dataloader   = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=os.cpu_count())

# NNET

In [None]:
class NNET2(nn.Module):

    def __init__(self,initialisation="xavier"):
        super(NNET2, self).__init__()


        self.c1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=256,kernel_size=(4,513)),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Dropout2d(.2)
        )

        self.c2 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=(4, 1),padding=(2,0)),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Dropout2d(.2)
        )

        self.c3 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=(4, 1),padding=(1,0)),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Dropout2d(.2)
        )


        self.fc = nn.Sequential(
            nn.Linear(512, 300),
            nn.ReLU(),
            nn.Dropout(p=0.2),
            nn.Linear(300, 150),
            nn.ReLU(),
            nn.Dropout(p=0.2),
            nn.Linear(150, 8),
            nn.Softmax(dim=1)
        )
        
        # Mi chiedo cosa succede se voglio caricare i miei parametri. Necessario debugging esplicito
        self.apply(self._init_weights)


    def _init_weights(self, module):
        if isinstance(module, torch.nn.Linear):
            torch.nn.init.xavier_uniform_(module.weight)
            if module.bias is not None:
                module.bias.data.zero_()
        if isinstance(module, torch.nn.Conv2d):
            torch.nn.init.xavier_uniform_(module.weight)
            if module.bias is not None:
                module.bias.data.zero_()

    


    def forward(self,x):

        c1 = self.c1(x)
        c2 = self.c2(c1)
        c3 = self.c3(c2)
        x = c1 + c3
        max_pool = F.max_pool2d(x, kernel_size=(125,1))
        avg_pool = F.avg_pool2d(x, kernel_size=(125,1))
        x = torch.cat([max_pool,avg_pool],dim=1)
        x = self.fc(x.view(x.size(0), -1)) # maybe I should use flatten instead of view
        return x


In [None]:
### Set the random seed for reproducible results
torch.manual_seed(0)
model = NNET2()

# Training

In [None]:
from torch.nn import CrossEntropyLoss

### Define the loss function
loss_fn = CrossEntropyLoss()
### Define an optimizer (both for the encoder and the decoder!)
lr = 1e-3
optim = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5)

# Check if the GPU is available
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f'Selected device: {device}')

# Move both the encoder and the decoder to the selected device
model.to(device)


## Training function

In [None]:
from tqdm import tqdm
def train_epoch(model, device, dataloader, loss_fn, optimizer):
    # Set train mode for both the encoder and the decoder
    model.train()
    losses = []
        
    # Iterate the dataloader (we do not need the label values, this is unsupervised learning)
    for x_batch, label_batch in dataloader: # with "_" we just ignore the labels (the second element of the dataloader tuple)
       
        x_batch = x_batch.to(device)
        label_batch = label_batch.to(device)

        out = model(x_batch)
        loss = loss_fn(out, label_batch) 
            
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        losses.append(loss.detach().cpu().numpy())
        
    losses = np.mean(losses)
    return losses

In [None]:
def test_epoch(model, device, dataloader, loss_fn):
    # Set eval mode for both the encoder and the decoder
    model.eval()
    with torch.no_grad():
        conc_out = []
        conc_label = []
    # Iterate the dataloader (we do not need the label values, this is unsupervised learning)
    for x_batch, label_batch in (dataloader): 
       
        x_batch = x_batch.to(device)
        label_batch = label_batch.to(device)

        out = model(x_batch)

        conc_out.append(out)
        conc_label.append(label_batch)
    # Create a single tensor with all the values in the lists
    conc_out = torch.cat(conc_out)
    conc_label = torch.cat(conc_label) 
    # Evaluate global loss
    val_loss = loss_fn(conc_out, conc_label)
    # Evaluate accuracy
    val_acc = np.sum(np.argmax(conc_label.detach().cpu().numpy(), axis=1) == np.argmax(conc_out.detach().cpu().numpy(), axis=1)) / len(conc_out)
    
    return val_loss.detach().cpu().numpy(), val_acc
    

# Training loop


In [None]:
### Training cycle
num_epochs = 1
train_losses = []
val_losses   = []
val_accs     = []

for epoch in (range(num_epochs)):
    
    
    ### Training (use the training function)
    train_loss = train_epoch(
        model=model, 
        device=device, 
        dataloader=train_dataloader, 
        loss_fn=loss_fn, 
        optimizer=optim)
    print(f'TRAIN - EPOCH {epoch+1}/{num_epochs} - loss: {train_loss}')

    train_losses.append(train_loss)
    
    ### Validation  (use the testing function)
    val_loss = test_epoch(
        model=model, 
        device=device, 
        dataloader=valid_dataloader, 
        loss_fn=loss_fn)
    val_losses.append(val_loss[0])
    val_accs.append(val_loss[1])
    # Print Validationloss
    print(f"VALIDATION - EPOCH {epoch+1}/{num_epochs} - loss:", val_loss[0].item(), "\n")

    
    # Save network parameters
    torch.save(model.state_dict(), 'nnet2D.pth')
    # NOTE: Remember to save also the parameters of the optimizer if you want to restore and continue the training
    