# Neural Networks

In [80]:
import time
import os
import pprint
import torch
import torch.nn as nn
from sklearn.metrics import confusion_matrix
from torch.utils.data import Dataset, DataLoader
from torchsummary import summary
import IPython.display as ipd
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import librosa
import librosa.display
import tqdm.notebook as tq
import utils
from tkinter import Tcl # file sorting by name

# Creation of labels

### Dictionary creation for the classes

We want a dictionary indicating a numbeer for each genre:

{0: 'Hip-Hop', 1: 'Pop', 2: 'Folk', 3: 'Rock', 4: 'Experimental', 5: 'International', 6: 'Electronic', 7: 'Instrumental'}

### Creation of the labels vector

In [5]:
def create_single_dataset(folder_path, tracks_dataframe, genre_dictionary):    
    labels = []
   
    _, file_list = get_sorted_file_paths(folder_path)
    
    for i,file in enumerate(file_list):
        #print("considering file:",file, "({}/{})".format(i,len(file_list)))
        track_id_clip_id = file.split('.')[0]
        track_id = track_id_clip_id.split('_')[0]
        #print("track id with clip: {}, track id: {}".format(track_id_clip_id, track_id))
        genre = tracks_dataframe.loc[int(track_id)]
        #print("genre from dataframe: ", genre)
        label = genre_dictionary[genre]
        #print("label from dictionary:",label)
        labels.append(label)
    print("labels length: {}".format(len(labels)))
    return labels
    

#create the train,validation and test vectors using the files in the train/validation/test folders
def create_dataset_splitted(folder_path):
    train_folder = os.path.join(folder_path,'train') # concatenate train folder to path
    validation_folder = os.path.join(folder_path,'validation') # concatenate train folder to path
    test_folder = os.path.join(folder_path,'test') # concatenate train folder to path
    
    print("train_folder:",train_folder)
    print("validation_folder:",validation_folder)
    print("test_folder:",test_folder,"\n")
    
    AUDIO_DIR = os.environ.get('AUDIO_DIR')
    print("audio directory: ",AUDIO_DIR)
    print("Loading tracks.csv...")
    tracks = utils.load('data/fma_metadata/tracks.csv')
    
    #get only the small subset of the dataset
    small = tracks[tracks['set', 'subset'] <= 'small']
    print("small dataset shape:",small.shape)    

    small_training = small.loc[small[('set', 'split')] == 'training']['track']
    small_validation = small.loc[small[('set', 'split')] == 'validation']['track']
    small_test = small.loc[small[('set', 'split')] == 'test']['track']

    print("Track.csv: {} training samples, {} validation samples, {} test samples\n".format(len(small_training), len(small_validation), len(small_test)))

    small_training_top_genres = small_training['genre_top']
    small_validation_top_genres = small_validation['genre_top']
    small_test_top_genres = small_test['genre_top']
    
    #create dictionary of genre classes:
    unique_genres = small_training_top_genres.unique()
    unique_genres = np.array(unique_genres)
    print("there are {} unique genres".format(len(unique_genres)))
    genre_dictionary = {}
    for i,genre in enumerate(unique_genres):
        genre_dictionary[genre] = i
    print("Dictionary of genres created:",genre_dictionary)
    
    
    Y_train = create_single_dataset(train_folder, small_training_top_genres, genre_dictionary)
    Y_validation = create_single_dataset(validation_folder, small_validation_top_genres, genre_dictionary)
    Y_test = create_single_dataset(test_folder, small_test_top_genres, genre_dictionary)
    
    return Y_train, Y_validation, Y_test
 
def get_sorted_file_paths(folder_path):
    file_list = os.listdir(folder_path)
    #sort the dataset files in alphabetical order (important to associate correct labels created using track_id in track.csv)
    file_list = Tcl().call('lsort', '-dict', file_list) # sort file by name: 2_0,2_1, ... 2_9,3_0, ... 400_0,400_1, ...
    file_paths = [os.path.join(folder_path, file_name) for file_name in file_list] #join filename with folder path
    #print("There are {} in the folder: {}".format(len(file_list),file_list))
    return file_paths, file_list
    
    
folder_path="data/fma_small_stft_transposed"
Y_train, Y_validation, Y_test = create_dataset_splitted(folder_path)

train_folder: data/fma_small_stft_transposed/train
validation_folder: data/fma_small_stft_transposed/validation
test_folder: data/fma_small_stft_transposed/test 

audio directory:  ./data/fma_small/
Loading tracks.csv...
small dataset shape: (8000, 52)
Track.csv: 6400 training samples, 800 validation samples, 800 test samples

there are 8 unique genres
Dictionary of genres created: {'Hip-Hop': 0, 'Pop': 1, 'Folk': 2, 'Rock': 3, 'Experimental': 4, 'International': 5, 'Electronic': 6, 'Instrumental': 7}
labels length: 63970
labels length: 8000
labels length: 8000


# Dataset Class

In [6]:
# Define the custom class for accessing our dataset
class MyDataset(Dataset):
    def __init__(self, file_list, labels):
        self.file_list = file_list
        self.labels=labels

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        # returns a training sample and its label
        file_path = self.file_list[idx]
        label = torch.tensor(self.labels[idx])
        stft_vector = torch.tensor(np.load(file_path)) #load from file
        return stft_vector, label

In [7]:
folder_path="data/fma_small_stft_transposed"

train_folder = os.path.join(folder_path,'train') # concatenate train folder to path
validation_folder = os.path.join(folder_path,'validation') # concatenate train folder to path
test_folder = os.path.join(folder_path,'test') # concatenate train folder to path

train_file_paths, _ = get_sorted_file_paths(train_folder)
train_dataset = MyDataset(train_file_paths, Y_train)
print("len of train dataset: ",len(train_dataset))

validation_file_paths, _ = get_sorted_file_paths(validation_folder)
validation_dataset = MyDataset(validation_file_paths, Y_validation)
print("len of validation dataset: ",len(validation_file_paths))

test_file_paths, _ = get_sorted_file_paths(test_folder)
test_dataset = MyDataset(test_file_paths, Y_test)
print("len of test dataset: ",len(test_dataset))

len of train dataset:  63970
len of validation dataset:  8000
len of test dataset:  8000


In [8]:
def check_for_dimension_errors(filepaths):
    error_indexes = []
    progress = 0
    for file in filepaths:
        progress+=1
        print("checked {}/{} files".format(progress,len(filepaths)))
        x = np.load(file)
        if(x.shape != (128,513)):
            error_indexes.append(x)
            print("error")
    print("{} errors found in files: {}".format(len(error_indexes),error_indexes))
    for idx,error in enumerate(error_indexes):
        print("index: {}, shape: {}".format(idx,error.shape))

# Network Architecture Definition (nnet1)

In [9]:
class NNet1(nn.Module):
    def __init__(self):
        super(NNet1, self).__init__()
        
        self.conv1 = nn.Conv2d(1, 128, kernel_size=(4, 513), stride=(1,513))
        self.relu = nn.ReLU()
        self.maxpool1 = nn.MaxPool2d(kernel_size=(2, 1))
        self.conv2 = nn.Conv2d(128, 128, kernel_size=(4, 1), stride=(1,513))
        self.maxpool2 = nn.MaxPool2d(kernel_size=(2, 1))
        self.conv3 = nn.Conv2d(128, 256, kernel_size=(4, 1), stride=(1,513))
        self.avgpool = nn.AvgPool2d(kernel_size=(26, 1))
        self.maxpool = nn.MaxPool2d(kernel_size=(26, 1))
        self.flatten = nn.Flatten()
        self.dropout=nn.Dropout(0.2)
        self.dense1 = nn.Linear(512, 300)
        self.dense2 = nn.Linear(300, 150)
        self.dense3 = nn.Linear(150, 8)
        self.softmax = nn.Softmax(dim=1)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.maxpool1(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.maxpool2(x)
        x = self.conv3(x)
        x_avg = self.avgpool(x)
        x_max = self.maxpool(x)
        x = torch.cat([x_avg, x_max], dim=1)
        x = self.flatten(x)
        x = self.dense1(x)
        x=self.dropout(x)
        x = self.relu(x)
        x = self.dense2(x)
        x = self.relu(x)
        x = self.dense3(x)
        x = self.softmax(x)
        return x

# Hyperparameters

In [51]:
BATCH_SIZE=512
EPOCHS=3
LEARNING_RATE=0.001

# Train function

In [85]:
def test(model, validation_dataset, Y_validation):
    #Stop parameters learning
    model.eval()
    
    validation_loader = torch.utils.data.DataLoader(validation_dataset)
    criterion = nn.CrossEntropyLoss()
    correct = 0
    total = 0
    total_loss = 0
    
    with torch.no_grad():
        for inputs, labels in validation_loader:
            
            inputs=inputs.unsqueeze(1)
            #predict label
            outputs = model(inputs)
            #compute loss
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    

    accuracy = correct / total
    average_loss = total_loss / len(validation_loader)

    return accuracy, average_loss


In [86]:
def train(model, dataset, batch_size, num_epochs, learning_rate, verbose = False):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    loss_list=[]
    acc_list=[]
    

    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()

    if not isinstance(dataset, Dataset):
        raise ValueError("The dataset parameter should be an instance of torch.utils.data.Dataset.")

    data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    num_batches = len(data_loader)
    
    
    for epoch in range(num_epochs):
        running_loss = 0.0 
        running_accuracy = 0.0
        #initialize correctly predicted samples
        
        # Initialize the progress bar
        progress_bar = tq.tqdm(total=num_batches, unit="batch")
    
        # Initialize the progress bar description
        progress_bar.set_description(f"Epoch {epoch+1}/{num_epochs}")
        start_time = time.time()
        
        for batch_idx, batch in enumerate(data_loader):
            correct = 0 # reset train accuracy each batch
            
            inputs,labels = batch[0],batch[1]
            inputs = inputs.unsqueeze(1)
            
            # Extract the inputs and targets
            optimizer.zero_grad()
            outputs = model(inputs)
            
            if(verbose == True):
                print("\ninputs shape:",inputs.size(),", content: ",inputs)
                print("\nlabels shape:",labels.size(),", content: ",labels)
                print("\noutputs size:",outputs.size(),"content:",outputs)

            loss = criterion(outputs, labels) #labels need to be a vector of class indexes (0-7) of dim (batch_size)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            
            #calculate train accuracy
            for index, output in enumerate(outputs):
                max_index = torch.argmax(output).item() #the index with maximum probability
                if(labels[index].item() == max_index):
                    correct += 1
            
                if(verbose==True):
                    print("considering output at index {}:".format(index,output))
                    print("max output index = {}",max_index)
                    if(labels[index].item() == max_index):
                        print("correct! in fact labels[index] = {}, max_index = {}".format(labels[index].item(),max_index))
                    else:
                        print("NOT correct! in fact labels[index] = {}, max_index = {}".format(labels[index].item(),max_index))

            
            accuracy = 100 * correct / batch_size
            running_accuracy += accuracy #epoch running_accuracy
            
            # Update the progress bar description and calculate bps
            #progress_bar.set_postfix({"Loss": running_loss / (batch_idx + 1)})
            average_accuracy = running_accuracy / (batch_idx + 1)
            average_loss = running_loss / (batch_idx + 1)
            progress_bar.set_postfix({"avg_loss": average_loss, "acc": accuracy, "avg_acc": average_accuracy})

            # Update the progress bar
            progress_bar.update(1)
            # Evaluate the model on the validation dataset
            
        val_acc, val_loss = test(model, validation_dataset, Y_validation)
        loss_list.append(val_loss)
        acc_list.append(val_acc)
        average_loss = running_loss / len(data_loader)
        average_accuracy = running_accuracy / len(data_loader)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {average_loss:.4f}. Accuracy: {average_accuracy}")
        progress_bar.close()
    return [acc_list,loss_list]


In [None]:
model = NNet1()
#summary(model, (1, 128, 513))
performance=train(model, train_dataset, batch_size=BATCH_SIZE, num_epochs=EPOCHS, learning_rate=LEARNING_RATE)

  0%|          | 0/125 [00:00<?, ?batch/s]

Epoch [1/3], Loss: 1.9132. Accuracy: 34.6453125


  0%|          | 0/125 [00:00<?, ?batch/s]

In [None]:
plt.plot(np.arange(1,EPOCHS+1), performance[1][:], label='Loss') 
plt.plot(np.arange(1,EPOCHS+1), performance[0][:], label='Accuracy')
plt.legend()  # Display the legend showing the labels
plt.show()
print(performance[0][:])

# Network Architecture Definition (nnet2)

In [None]:
# Define the custom model class
class NNet2(nn.Module):
    def __init__(self):
        super(NNet2, self).__init__()
        self.conv1 = nn.Conv2d(1, 256, kernel_size=(4, 513),padding="same")
        self.conv2 = nn.Conv2d(256, 256, kernel_size=(4, 1),padding="same")
        self.conv3 = nn.Conv2d(256, 256, kernel_size=(4, 1),padding="same")
        self.dense1 = nn.Linear(256*125*1, 300)
        self.dense2 = nn.Linear(300, 150)
        self.dense3 = nn.Linear(150, 8)
        
    def forward(self, x):
        x = nn.ReLU()(self.conv1(x))
        x1 = x  # Save the output of the first convolutional layer for later
        x = nn.ReLU()(self.conv2(x))
        x = nn.ReLU()(self.conv3(x))
        x += x1  # Sum the output of the first convolutional layer with the third convolutional layer
        
        x = torch.cat((nn.AvgPool2d(kernel_size=(125, 1))(x), nn.MaxPool2d(kernel_size=(125, 1))(x)), dim=1)
        x = x.view(x.size(0), -1)
        x = nn.ReLU()(self.dense1(x))
        x = nn.ReLU()(self.dense2(x))
        x = nn.Softmax(dim=1)(self.dense3(x))
        return x

In [None]:
model2 = NNet2()
#summary(model2, (1, 128, 513))
train(model2, train_dataset, batch_size=32, num_epochs=50, learning_rate=0.01)

# Recurrent Neural Network

In [None]:
class AudioGenreClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(AudioGenreClassifier, self).__init__()
        self.hidden_size = hidden_size
        
        # Time-distributed layer
        self.time_distributed = nn.Linear(input_size, hidden_size)
        
        # Bidirectional LSTM layers
        self.bilstm = nn.LSTM(256, hidden_size, num_layers = num_layers, bidirectional=True)
        
        # Attention mechanism
        self.attention = nn.Linear(hidden_size * 2, 1)
        
        # Pooling layer
        self.pooling = nn.AdaptiveAvgPool1d(1)
        
        # Output layer
        self.fc = nn.Linear(hidden_size * 2, num_classes)
    
    def forward(self, x):
        # Time-distributed layer
        x = self.time_distributed(x)

        # Bidirectional LSTM layers
        output, _ = self.bilstm(x)

        # Attention mechanism
        attention_weights = torch.softmax(self.attention(output), dim=1)
        attended_output = torch.sum(attention_weights * output, dim=1)

        # Pooling layer
        pooled_output = self.pooling(attended_output.permute(0, 1).unsqueeze(2))
        
        # Reshape and pass through the output layer
        pooled_output = pooled_output.squeeze(2)
        output = self.fc(pooled_output)

        return output

In [None]:
def trainRNN(model, dataset, batch_size, num_epochs, learning_rate):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("Using device: ",device)
    model.to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()

    if not isinstance(dataset, Dataset):
        raise ValueError("The dataset parameter should be an instance of torch.utils.data.Dataset.")

    data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    num_batches = len(data_loader)
    
    # Initialize the progress bar
    progress_bar = tq.tqdm(total=num_batches, unit="batch")
    
  
    
    for epoch in range(num_epochs):
        running_loss = 0.0 
        running_accuracy = 0.0
        #initialize correctly predicted samples
        
        # Initialize the progress bar
        progress_bar.set_description(f"Epoch {epoch+1}/{num_epochs}")
        start_time = time.time()
        
        for batch_idx, batch in enumerate(data_loader):
            correct = 0 # reset train accuracy each batch
            
            inputs,labels = batch[0],batch[1]
            #print("inputs shape:",inputs.shape,", content: ",inputs)
            #print("labels shape:",labels.shape,", content: ",labels)
            #inputs = inputs.unsqueeze(1)
            
            # Extract the inputs and targets
            optimizer.zero_grad()
            
            outputs = model(inputs)
            
            #print("\noutputs type:",type(outputs),"content:",outputs)
            #print("\nlabels type:",type(labels),"content:",labels)

            loss = criterion(outputs, labels.float()) #labels need to be a vector of float, not Long
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            
            #print("outputs shape:",outputs.size(),"content:",outputs)
            #print("labels shape:",labels.size(),"content:",labels)

            #calculate train accuracy
            for idx, predicted_label in enumerate(outputs):
                #print("predicted_label size:",predicted_label.size(),"content:",predicted_label)
                #print("labels[idx] size:",labels[idx].size(),"content:",labels[idx])
                max_idx = torch.argmax(predicted_label).item() #index with max argument in the one hot predicted label vector
                #print("max_idx content:",max_idx)

                if(labels[idx][max_idx].item() == 1):
                    correct += 1
            
            accuracy = 100 * correct / batch_size
            running_accuracy += accuracy #epoch running_accuracy
            #print("Accuracy = {}".format(accuracy))

            
            # Update the progress bar description and calculate bps
            #progress_bar.set_postfix({"Loss": running_loss / (batch_idx + 1)})
            average_accuracy = running_accuracy / (batch_idx + 1)
            progress_bar.set_postfix({"Batch accuracy": accuracy, "Average accuracy": average_accuracy})



            #bps = (batch_idx + 1) / (time.time() - start_time)

            # Update the progress bar
            progress_bar.update(1)
       


        average_loss = running_loss / len(data_loader)
        average_accuracy = running_accuracy / len(data_loader)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {average_loss:.4f}. Accuracy: {average_accuracy}")
        progress_bar.close()

In [None]:
modelRNN = AudioGenreClassifier(input_size=513, hidden_size=256, num_layers=5, num_classes=8)
trainRNN(modelRNN, train_dataset, batch_size=32, num_epochs=8, learning_rate=0.1)